Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
41 : };
42 : use crate::tenant::{PageReconstructError, Timeline};
43 : use crate::virtual_file::{self, VirtualFile};
44 : use crate::{walrecord, TEMP_FILE_SUFFIX};
45 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::BytesMut;
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use futures::StreamExt;
50 : use itertools::Itertools;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::models::LayerAccessKind;
53 : use pageserver_api::shard::TenantShardId;
54 : use rand::{distributions::Alphanumeric, Rng};
55 : use serde::{Deserialize, Serialize};
56 : use std::fs::File;
57 : use std::io::SeekFrom;
58 : use std::ops::Range;
59 : use std::os::unix::fs::FileExt;
60 : use std::str::FromStr;
61 : use std::sync::Arc;
62 : use tokio::sync::OnceCell;
63 : use tracing::*;
64 :
65 : use utils::{
66 : bin_ser::BeSer,
67 : id::{TenantId, TimelineId},
68 : lsn::Lsn,
69 : };
70 :
71 : use super::{
72 : AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
73 : ValuesReconstructState,
74 : };
75 :
76 : ///
77 : /// Header stored in the beginning of the file
78 : ///
79 : /// After this comes the 'values' part, starting on block 1. After that,
80 : /// the 'index' starts at the block indicated by 'index_start_blk'
81 : ///
82 818 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
83 : pub struct Summary {
84 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
85 : pub magic: u16,
86 : pub format_version: u16,
87 :
88 : pub tenant_id: TenantId,
89 : pub timeline_id: TimelineId,
90 : pub key_range: Range<Key>,
91 : pub lsn_range: Range<Lsn>,
92 :
93 : /// Block number where the 'index' part of the file begins.
94 : pub index_start_blk: u32,
95 : /// Block within the 'index', where the B-tree root page is stored
96 : pub index_root_blk: u32,
97 : }
98 :
99 : impl From<&DeltaLayer> for Summary {
100 0 : fn from(layer: &DeltaLayer) -> Self {
101 0 : Self::expected(
102 0 : layer.desc.tenant_shard_id.tenant_id,
103 0 : layer.desc.timeline_id,
104 0 : layer.desc.key_range.clone(),
105 0 : layer.desc.lsn_range.clone(),
106 0 : )
107 0 : }
108 : }
109 :
110 : impl Summary {
111 818 : pub(super) fn expected(
112 818 : tenant_id: TenantId,
113 818 : timeline_id: TimelineId,
114 818 : keys: Range<Key>,
115 818 : lsns: Range<Lsn>,
116 818 : ) -> Self {
117 818 : Self {
118 818 : magic: DELTA_FILE_MAGIC,
119 818 : format_version: STORAGE_FORMAT_VERSION,
120 818 :
121 818 : tenant_id,
122 818 : timeline_id,
123 818 : key_range: keys,
124 818 : lsn_range: lsns,
125 818 :
126 818 : index_start_blk: 0,
127 818 : index_root_blk: 0,
128 818 : }
129 818 : }
130 : }
131 :
132 : // Flag indicating that this version initialize the page
133 : const WILL_INIT: u64 = 1;
134 :
135 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
136 : /// offset, and for WAL records it also contains `will_init` flag. The flag
137 : /// helps to determine the range of records that needs to be applied, without
138 : /// reading/deserializing records themselves.
139 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
140 : pub struct BlobRef(pub u64);
141 :
142 : impl BlobRef {
143 215835 : pub fn will_init(&self) -> bool {
144 215835 : (self.0 & WILL_INIT) != 0
145 215835 : }
146 :
147 4361506 : pub fn pos(&self) -> u64 {
148 4361506 : self.0 >> 1
149 4361506 : }
150 :
151 6404724 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
152 6404724 : let mut blob_ref = pos << 1;
153 6404724 : if will_init {
154 6403498 : blob_ref |= WILL_INIT;
155 6403498 : }
156 6404724 : BlobRef(blob_ref)
157 6404724 : }
158 : }
159 :
160 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
161 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
162 :
163 : /// This is the key of the B-tree index stored in the delta layer. It consists
164 : /// of the serialized representation of a Key and LSN.
165 : impl DeltaKey {
166 2042158 : fn from_slice(buf: &[u8]) -> Self {
167 2042158 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
168 2042158 : bytes.copy_from_slice(buf);
169 2042158 : DeltaKey(bytes)
170 2042158 : }
171 :
172 6677360 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
173 6677360 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
174 6677360 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
175 6677360 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
176 6677360 : DeltaKey(bytes)
177 6677360 : }
178 :
179 2042158 : fn key(&self) -> Key {
180 2042158 : Key::from_slice(&self.0)
181 2042158 : }
182 :
183 2042158 : fn lsn(&self) -> Lsn {
184 2042158 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
185 2042158 : }
186 :
187 277290 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
188 277290 : let mut lsn_buf = [0u8; 8];
189 277290 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
190 277290 : Lsn(u64::from_be_bytes(lsn_buf))
191 277290 : }
192 : }
193 :
194 : /// This is used only from `pagectl`. Within pageserver, all layers are
195 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
196 : pub struct DeltaLayer {
197 : path: Utf8PathBuf,
198 : pub desc: PersistentLayerDesc,
199 : access_stats: LayerAccessStats,
200 : inner: OnceCell<Arc<DeltaLayerInner>>,
201 : }
202 :
203 : impl std::fmt::Debug for DeltaLayer {
204 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 0 : use super::RangeDisplayDebug;
206 0 :
207 0 : f.debug_struct("DeltaLayer")
208 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
209 0 : .field("lsn_range", &self.desc.lsn_range)
210 0 : .field("file_size", &self.desc.file_size)
211 0 : .field("inner", &self.inner)
212 0 : .finish()
213 0 : }
214 : }
215 :
216 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
217 : /// file.
218 : pub struct DeltaLayerInner {
219 : // values copied from summary
220 : index_start_blk: u32,
221 : index_root_blk: u32,
222 : lsn_range: Range<Lsn>,
223 :
224 : file: VirtualFile,
225 : file_id: FileId,
226 :
227 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
228 : }
229 :
230 : impl std::fmt::Debug for DeltaLayerInner {
231 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 0 : f.debug_struct("DeltaLayerInner")
233 0 : .field("index_start_blk", &self.index_start_blk)
234 0 : .field("index_root_blk", &self.index_root_blk)
235 0 : .finish()
236 0 : }
237 : }
238 :
239 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
240 : impl std::fmt::Display for DeltaLayer {
241 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 0 : write!(f, "{}", self.layer_desc().short_id())
243 0 : }
244 : }
245 :
246 : impl AsLayerDesc for DeltaLayer {
247 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
248 0 : &self.desc
249 0 : }
250 : }
251 :
252 : impl DeltaLayer {
253 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
254 0 : self.desc.dump();
255 0 :
256 0 : if !verbose {
257 0 : return Ok(());
258 0 : }
259 :
260 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
261 :
262 0 : inner.dump(ctx).await
263 0 : }
264 :
265 1062 : fn temp_path_for(
266 1062 : conf: &PageServerConf,
267 1062 : tenant_shard_id: &TenantShardId,
268 1062 : timeline_id: &TimelineId,
269 1062 : key_start: Key,
270 1062 : lsn_range: &Range<Lsn>,
271 1062 : ) -> Utf8PathBuf {
272 1062 : let rand_string: String = rand::thread_rng()
273 1062 : .sample_iter(&Alphanumeric)
274 1062 : .take(8)
275 1062 : .map(char::from)
276 1062 : .collect();
277 1062 :
278 1062 : conf.timeline_path(tenant_shard_id, timeline_id)
279 1062 : .join(format!(
280 1062 : "{}-XXX__{:016X}-{:016X}.{}.{}",
281 1062 : key_start,
282 1062 : u64::from(lsn_range.start),
283 1062 : u64::from(lsn_range.end),
284 1062 : rand_string,
285 1062 : TEMP_FILE_SUFFIX,
286 1062 : ))
287 1062 : }
288 :
289 : ///
290 : /// Open the underlying file and read the metadata into memory, if it's
291 : /// not loaded already.
292 : ///
293 0 : async fn load(
294 0 : &self,
295 0 : access_kind: LayerAccessKind,
296 0 : ctx: &RequestContext,
297 0 : ) -> Result<&Arc<DeltaLayerInner>> {
298 0 : self.access_stats.record_access(access_kind, ctx);
299 0 : // Quick exit if already loaded
300 0 : self.inner
301 0 : .get_or_try_init(|| self.load_inner(ctx))
302 0 : .await
303 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
304 0 : }
305 :
306 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
307 0 : let path = self.path();
308 :
309 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx)
310 0 : .await
311 0 : .and_then(|res| res)?;
312 :
313 : // not production code
314 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
315 0 : let expected_layer_name = self.layer_desc().layer_name();
316 0 :
317 0 : if actual_layer_name != expected_layer_name {
318 0 : println!("warning: filename does not match what is expected from in-file summary");
319 0 : println!("actual: {:?}", actual_layer_name.to_string());
320 0 : println!("expected: {:?}", expected_layer_name.to_string());
321 0 : }
322 :
323 0 : Ok(Arc::new(loaded))
324 0 : }
325 :
326 : /// Create a DeltaLayer struct representing an existing file on disk.
327 : ///
328 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
329 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
330 0 : let mut summary_buf = vec![0; PAGE_SZ];
331 0 : file.read_exact_at(&mut summary_buf, 0)?;
332 0 : let summary = Summary::des_prefix(&summary_buf)?;
333 :
334 0 : let metadata = file
335 0 : .metadata()
336 0 : .context("get file metadata to determine size")?;
337 :
338 : // This function is never used for constructing layers in a running pageserver,
339 : // so it does not need an accurate TenantShardId.
340 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
341 0 :
342 0 : Ok(DeltaLayer {
343 0 : path: path.to_path_buf(),
344 0 : desc: PersistentLayerDesc::new_delta(
345 0 : tenant_shard_id,
346 0 : summary.timeline_id,
347 0 : summary.key_range,
348 0 : summary.lsn_range,
349 0 : metadata.len(),
350 0 : ),
351 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
352 0 : inner: OnceCell::new(),
353 0 : })
354 0 : }
355 :
356 : /// Path to the layer file in pageserver workdir.
357 0 : fn path(&self) -> Utf8PathBuf {
358 0 : self.path.clone()
359 0 : }
360 : }
361 :
362 : /// A builder object for constructing a new delta layer.
363 : ///
364 : /// Usage:
365 : ///
366 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
367 : ///
368 : /// 2. Write the contents by calling `put_value` for every page
369 : /// version to store in the layer.
370 : ///
371 : /// 3. Call `finish`.
372 : ///
373 : struct DeltaLayerWriterInner {
374 : conf: &'static PageServerConf,
375 : pub path: Utf8PathBuf,
376 : timeline_id: TimelineId,
377 : tenant_shard_id: TenantShardId,
378 :
379 : key_start: Key,
380 : lsn_range: Range<Lsn>,
381 :
382 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
383 :
384 : blob_writer: BlobWriter<true>,
385 : }
386 :
387 : impl DeltaLayerWriterInner {
388 : ///
389 : /// Start building a new delta layer.
390 : ///
391 1062 : async fn new(
392 1062 : conf: &'static PageServerConf,
393 1062 : timeline_id: TimelineId,
394 1062 : tenant_shard_id: TenantShardId,
395 1062 : key_start: Key,
396 1062 : lsn_range: Range<Lsn>,
397 1062 : ) -> anyhow::Result<Self> {
398 1062 : // Create the file initially with a temporary filename. We don't know
399 1062 : // the end key yet, so we cannot form the final filename yet. We will
400 1062 : // rename it when we're done.
401 1062 : //
402 1062 : // Note: This overwrites any existing file. There shouldn't be any.
403 1062 : // FIXME: throw an error instead?
404 1062 : let path =
405 1062 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
406 :
407 1062 : let mut file = VirtualFile::create(&path).await?;
408 : // make room for the header block
409 1062 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
410 1062 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
411 1062 :
412 1062 : // Initialize the b-tree index builder
413 1062 : let block_buf = BlockBuf::new();
414 1062 : let tree_builder = DiskBtreeBuilder::new(block_buf);
415 1062 :
416 1062 : Ok(Self {
417 1062 : conf,
418 1062 : path,
419 1062 : timeline_id,
420 1062 : tenant_shard_id,
421 1062 : key_start,
422 1062 : lsn_range,
423 1062 : tree: tree_builder,
424 1062 : blob_writer,
425 1062 : })
426 1062 : }
427 :
428 : ///
429 : /// Append a key-value pair to the file.
430 : ///
431 : /// The values must be appended in key, lsn order.
432 : ///
433 2041998 : async fn put_value(
434 2041998 : &mut self,
435 2041998 : key: Key,
436 2041998 : lsn: Lsn,
437 2041998 : val: Value,
438 2041998 : ctx: &RequestContext,
439 2041998 : ) -> anyhow::Result<()> {
440 2041998 : let (_, res) = self
441 2041998 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
442 1555 : .await;
443 2041998 : res
444 2041998 : }
445 :
446 6404620 : async fn put_value_bytes(
447 6404620 : &mut self,
448 6404620 : key: Key,
449 6404620 : lsn: Lsn,
450 6404620 : val: Vec<u8>,
451 6404620 : will_init: bool,
452 6404620 : ctx: &RequestContext,
453 6404620 : ) -> (Vec<u8>, anyhow::Result<()>) {
454 6404620 : assert!(self.lsn_range.start <= lsn);
455 6404620 : let (val, res) = self.blob_writer.write_blob(val, ctx).await;
456 6404620 : let off = match res {
457 6404620 : Ok(off) => off,
458 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
459 : };
460 :
461 6404620 : let blob_ref = BlobRef::new(off, will_init);
462 6404620 :
463 6404620 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
464 6404620 : let res = self.tree.append(&delta_key.0, blob_ref.0);
465 6404620 : (val, res.map_err(|e| anyhow::anyhow!(e)))
466 6404620 : }
467 :
468 2021974 : fn size(&self) -> u64 {
469 2021974 : self.blob_writer.size() + self.tree.borrow_writer().size()
470 2021974 : }
471 :
472 : ///
473 : /// Finish writing the delta layer.
474 : ///
475 1062 : async fn finish(
476 1062 : self,
477 1062 : key_end: Key,
478 1062 : timeline: &Arc<Timeline>,
479 1062 : ctx: &RequestContext,
480 1062 : ) -> anyhow::Result<ResidentLayer> {
481 1062 : let index_start_blk =
482 1062 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
483 :
484 1062 : let mut file = self.blob_writer.into_inner(ctx).await?;
485 :
486 : // Write out the index
487 1062 : let (index_root_blk, block_buf) = self.tree.finish()?;
488 1062 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
489 0 : .await?;
490 14226 : for buf in block_buf.blocks {
491 13164 : let (_buf, res) = file.write_all(buf, ctx).await;
492 13164 : res?;
493 : }
494 1062 : assert!(self.lsn_range.start < self.lsn_range.end);
495 : // Fill in the summary on blk 0
496 1062 : let summary = Summary {
497 1062 : magic: DELTA_FILE_MAGIC,
498 1062 : format_version: STORAGE_FORMAT_VERSION,
499 1062 : tenant_id: self.tenant_shard_id.tenant_id,
500 1062 : timeline_id: self.timeline_id,
501 1062 : key_range: self.key_start..key_end,
502 1062 : lsn_range: self.lsn_range.clone(),
503 1062 : index_start_blk,
504 1062 : index_root_blk,
505 1062 : };
506 1062 :
507 1062 : let mut buf = Vec::with_capacity(PAGE_SZ);
508 1062 : // TODO: could use smallvec here but it's a pain with Slice<T>
509 1062 : Summary::ser_into(&summary, &mut buf)?;
510 1062 : file.seek(SeekFrom::Start(0)).await?;
511 1062 : let (_buf, res) = file.write_all(buf, ctx).await;
512 1062 : res?;
513 :
514 1062 : let metadata = file
515 1062 : .metadata()
516 536 : .await
517 1062 : .context("get file metadata to determine size")?;
518 :
519 : // 5GB limit for objects without multipart upload (which we don't want to use)
520 : // Make it a little bit below to account for differing GB units
521 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
522 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
523 1062 : ensure!(
524 1062 : metadata.len() <= S3_UPLOAD_LIMIT,
525 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
526 0 : file.path,
527 0 : metadata.len()
528 : );
529 :
530 : // Note: Because we opened the file in write-only mode, we cannot
531 : // reuse the same VirtualFile for reading later. That's why we don't
532 : // set inner.file here. The first read will have to re-open it.
533 :
534 1062 : let desc = PersistentLayerDesc::new_delta(
535 1062 : self.tenant_shard_id,
536 1062 : self.timeline_id,
537 1062 : self.key_start..key_end,
538 1062 : self.lsn_range.clone(),
539 1062 : metadata.len(),
540 1062 : );
541 1062 :
542 1062 : // fsync the file
543 1062 : file.sync_all().await?;
544 :
545 1062 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
546 :
547 1062 : trace!("created delta layer {}", layer.local_path());
548 :
549 1062 : Ok(layer)
550 1062 : }
551 : }
552 :
553 : /// A builder object for constructing a new delta layer.
554 : ///
555 : /// Usage:
556 : ///
557 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
558 : ///
559 : /// 2. Write the contents by calling `put_value` for every page
560 : /// version to store in the layer.
561 : ///
562 : /// 3. Call `finish`.
563 : ///
564 : /// # Note
565 : ///
566 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
567 : /// possible for the writer to drop before `finish` is actually called. So this
568 : /// could lead to odd temporary files in the directory, exhausting file system.
569 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
570 : /// implementation that cleans up the temporary file in failure. It's not
571 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
572 : /// out some fields, making it impossible to implement `Drop`.
573 : ///
574 : #[must_use]
575 : pub struct DeltaLayerWriter {
576 : inner: Option<DeltaLayerWriterInner>,
577 : }
578 :
579 : impl DeltaLayerWriter {
580 : ///
581 : /// Start building a new delta layer.
582 : ///
583 1062 : pub async fn new(
584 1062 : conf: &'static PageServerConf,
585 1062 : timeline_id: TimelineId,
586 1062 : tenant_shard_id: TenantShardId,
587 1062 : key_start: Key,
588 1062 : lsn_range: Range<Lsn>,
589 1062 : ) -> anyhow::Result<Self> {
590 1062 : Ok(Self {
591 1062 : inner: Some(
592 1062 : DeltaLayerWriterInner::new(
593 1062 : conf,
594 1062 : timeline_id,
595 1062 : tenant_shard_id,
596 1062 : key_start,
597 1062 : lsn_range,
598 1062 : )
599 542 : .await?,
600 : ),
601 : })
602 1062 : }
603 :
604 : ///
605 : /// Append a key-value pair to the file.
606 : ///
607 : /// The values must be appended in key, lsn order.
608 : ///
609 2041998 : pub async fn put_value(
610 2041998 : &mut self,
611 2041998 : key: Key,
612 2041998 : lsn: Lsn,
613 2041998 : val: Value,
614 2041998 : ctx: &RequestContext,
615 2041998 : ) -> anyhow::Result<()> {
616 2041998 : self.inner
617 2041998 : .as_mut()
618 2041998 : .unwrap()
619 2041998 : .put_value(key, lsn, val, ctx)
620 1555 : .await
621 2041998 : }
622 :
623 4362622 : pub async fn put_value_bytes(
624 4362622 : &mut self,
625 4362622 : key: Key,
626 4362622 : lsn: Lsn,
627 4362622 : val: Vec<u8>,
628 4362622 : will_init: bool,
629 4362622 : ctx: &RequestContext,
630 4362622 : ) -> (Vec<u8>, anyhow::Result<()>) {
631 4362622 : self.inner
632 4362622 : .as_mut()
633 4362622 : .unwrap()
634 4362622 : .put_value_bytes(key, lsn, val, will_init, ctx)
635 2936 : .await
636 4362622 : }
637 :
638 2021974 : pub fn size(&self) -> u64 {
639 2021974 : self.inner.as_ref().unwrap().size()
640 2021974 : }
641 :
642 : ///
643 : /// Finish writing the delta layer.
644 : ///
645 1062 : pub(crate) async fn finish(
646 1062 : mut self,
647 1062 : key_end: Key,
648 1062 : timeline: &Arc<Timeline>,
649 1062 : ctx: &RequestContext,
650 1062 : ) -> anyhow::Result<ResidentLayer> {
651 1062 : let inner = self.inner.take().unwrap();
652 1062 : let temp_path = inner.path.clone();
653 8796 : let result = inner.finish(key_end, timeline, ctx).await;
654 : // The delta layer files can sometimes be really large. Clean them up.
655 1062 : if result.is_err() {
656 0 : tracing::warn!(
657 0 : "Cleaning up temporary delta file {temp_path} after error during writing"
658 : );
659 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
660 0 : tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
661 0 : }
662 1062 : }
663 1062 : result
664 1062 : }
665 : }
666 :
667 : impl Drop for DeltaLayerWriter {
668 1062 : fn drop(&mut self) {
669 1062 : if let Some(inner) = self.inner.take() {
670 0 : // We want to remove the virtual file here, so it's fine to not
671 0 : // having completely flushed unwritten data.
672 0 : let vfile = inner.blob_writer.into_inner_no_flush();
673 0 : vfile.remove();
674 1062 : }
675 1062 : }
676 : }
677 :
678 0 : #[derive(thiserror::Error, Debug)]
679 : pub enum RewriteSummaryError {
680 : #[error("magic mismatch")]
681 : MagicMismatch,
682 : #[error(transparent)]
683 : Other(#[from] anyhow::Error),
684 : }
685 :
686 : impl From<std::io::Error> for RewriteSummaryError {
687 0 : fn from(e: std::io::Error) -> Self {
688 0 : Self::Other(anyhow::anyhow!(e))
689 0 : }
690 : }
691 :
692 : impl DeltaLayer {
693 0 : pub async fn rewrite_summary<F>(
694 0 : path: &Utf8Path,
695 0 : rewrite: F,
696 0 : ctx: &RequestContext,
697 0 : ) -> Result<(), RewriteSummaryError>
698 0 : where
699 0 : F: Fn(Summary) -> Summary,
700 0 : {
701 0 : let mut file = VirtualFile::open_with_options(
702 0 : path,
703 0 : virtual_file::OpenOptions::new().read(true).write(true),
704 0 : )
705 0 : .await
706 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
707 0 : let file_id = page_cache::next_file_id();
708 0 : let block_reader = FileBlockReader::new(&file, file_id);
709 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
710 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
711 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
712 0 : return Err(RewriteSummaryError::MagicMismatch);
713 0 : }
714 0 :
715 0 : let new_summary = rewrite(actual_summary);
716 0 :
717 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
718 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
719 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
720 0 : file.seek(SeekFrom::Start(0)).await?;
721 0 : let (_buf, res) = file.write_all(buf, ctx).await;
722 0 : res?;
723 0 : Ok(())
724 0 : }
725 : }
726 :
727 : impl DeltaLayerInner {
728 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
729 : /// - inner has the success or transient failure
730 : /// - outer has the permanent failure
731 818 : pub(super) async fn load(
732 818 : path: &Utf8Path,
733 818 : summary: Option<Summary>,
734 818 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
735 818 : ctx: &RequestContext,
736 818 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
737 818 : let file = match VirtualFile::open(path).await {
738 818 : Ok(file) => file,
739 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
740 : };
741 818 : let file_id = page_cache::next_file_id();
742 818 :
743 818 : let block_reader = FileBlockReader::new(&file, file_id);
744 :
745 818 : let summary_blk = match block_reader.read_blk(0, ctx).await {
746 818 : Ok(blk) => blk,
747 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
748 : };
749 :
750 : // TODO: this should be an assertion instead; see ImageLayerInner::load
751 818 : let actual_summary =
752 818 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
753 :
754 818 : if let Some(mut expected_summary) = summary {
755 : // production code path
756 818 : expected_summary.index_start_blk = actual_summary.index_start_blk;
757 818 : expected_summary.index_root_blk = actual_summary.index_root_blk;
758 818 : // mask out the timeline_id, but still require the layers to be from the same tenant
759 818 : expected_summary.timeline_id = actual_summary.timeline_id;
760 818 :
761 818 : if actual_summary != expected_summary {
762 0 : bail!(
763 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
764 0 : actual_summary,
765 0 : expected_summary
766 0 : );
767 818 : }
768 0 : }
769 :
770 818 : Ok(Ok(DeltaLayerInner {
771 818 : file,
772 818 : file_id,
773 818 : index_start_blk: actual_summary.index_start_blk,
774 818 : index_root_blk: actual_summary.index_root_blk,
775 818 : lsn_range: actual_summary.lsn_range,
776 818 : max_vectored_read_bytes,
777 818 : }))
778 818 : }
779 :
780 208782 : pub(super) async fn get_value_reconstruct_data(
781 208782 : &self,
782 208782 : key: Key,
783 208782 : lsn_range: Range<Lsn>,
784 208782 : reconstruct_state: &mut ValueReconstructState,
785 208782 : ctx: &RequestContext,
786 208782 : ) -> anyhow::Result<ValueReconstructResult> {
787 208782 : let mut need_image = true;
788 208782 : // Scan the page versions backwards, starting from `lsn`.
789 208782 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
790 208782 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
791 208782 : self.index_start_blk,
792 208782 : self.index_root_blk,
793 208782 : &block_reader,
794 208782 : );
795 208782 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
796 208782 :
797 208782 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
798 208782 :
799 208782 : tree_reader
800 208782 : .visit(
801 208782 : &search_key.0,
802 208782 : VisitDirection::Backwards,
803 208782 : |key, value| {
804 203278 : let blob_ref = BlobRef(value);
805 203278 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
806 77263 : return false;
807 126015 : }
808 126015 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
809 126015 : if entry_lsn < lsn_range.start {
810 0 : return false;
811 126015 : }
812 126015 : offsets.push((entry_lsn, blob_ref.pos()));
813 126015 :
814 126015 : !blob_ref.will_init()
815 208782 : },
816 208782 : &RequestContextBuilder::extend(ctx)
817 208782 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
818 208782 : .build(),
819 208782 : )
820 20340 : .await?;
821 :
822 208782 : let ctx = &RequestContextBuilder::extend(ctx)
823 208782 : .page_content_kind(PageContentKind::DeltaLayerValue)
824 208782 : .build();
825 208782 :
826 208782 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
827 208782 : let cursor = block_reader.block_cursor();
828 208782 : let mut buf = Vec::new();
829 208782 : for (entry_lsn, pos) in offsets {
830 126015 : cursor
831 126015 : .read_blob_into_buf(pos, &mut buf, ctx)
832 8094 : .await
833 126015 : .with_context(|| {
834 0 : format!("Failed to read blob from virtual file {}", self.file.path)
835 126015 : })?;
836 126015 : let val = Value::des(&buf).with_context(|| {
837 0 : format!(
838 0 : "Failed to deserialize file blob from virtual file {}",
839 0 : self.file.path
840 0 : )
841 126015 : })?;
842 126015 : match val {
843 126015 : Value::Image(img) => {
844 126015 : reconstruct_state.img = Some((entry_lsn, img));
845 126015 : need_image = false;
846 126015 : break;
847 : }
848 0 : Value::WalRecord(rec) => {
849 0 : let will_init = rec.will_init();
850 0 : reconstruct_state.records.push((entry_lsn, rec));
851 0 : if will_init {
852 : // This WAL record initializes the page, so no need to go further back
853 0 : need_image = false;
854 0 : break;
855 0 : }
856 : }
857 : }
858 : }
859 :
860 : // If an older page image is needed to reconstruct the page, let the
861 : // caller know.
862 208782 : if need_image {
863 82767 : Ok(ValueReconstructResult::Continue)
864 : } else {
865 126015 : Ok(ValueReconstructResult::Complete)
866 : }
867 208782 : }
868 :
869 : // Look up the keys in the provided keyspace and update
870 : // the reconstruct state with whatever is found.
871 : //
872 : // If the key is cached, go no further than the cached Lsn.
873 : //
874 : // Currently, the index is visited for each range, but this
875 : // can be further optimised to visit the index only once.
876 146 : pub(super) async fn get_values_reconstruct_data(
877 146 : &self,
878 146 : keyspace: KeySpace,
879 146 : lsn_range: Range<Lsn>,
880 146 : reconstruct_state: &mut ValuesReconstructState,
881 146 : ctx: &RequestContext,
882 146 : ) -> Result<(), GetVectoredError> {
883 146 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
884 146 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
885 146 : self.index_start_blk,
886 146 : self.index_root_blk,
887 146 : block_reader,
888 146 : );
889 146 :
890 146 : let planner = VectoredReadPlanner::new(
891 146 : self.max_vectored_read_bytes
892 146 : .expect("Layer is loaded with max vectored bytes config")
893 146 : .0
894 146 : .into(),
895 146 : );
896 146 :
897 146 : let data_end_offset = self.index_start_offset();
898 :
899 146 : let reads = Self::plan_reads(
900 146 : &keyspace,
901 146 : lsn_range,
902 146 : data_end_offset,
903 146 : index_reader,
904 146 : planner,
905 146 : reconstruct_state,
906 146 : ctx,
907 146 : )
908 2014 : .await
909 146 : .map_err(GetVectoredError::Other)?;
910 :
911 146 : self.do_reads_and_update_state(reads, reconstruct_state)
912 5345 : .await;
913 :
914 146 : reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
915 146 :
916 146 : Ok(())
917 146 : }
918 :
919 348 : async fn plan_reads<Reader>(
920 348 : keyspace: &KeySpace,
921 348 : lsn_range: Range<Lsn>,
922 348 : data_end_offset: u64,
923 348 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
924 348 : mut planner: VectoredReadPlanner,
925 348 : reconstruct_state: &mut ValuesReconstructState,
926 348 : ctx: &RequestContext,
927 348 : ) -> anyhow::Result<Vec<VectoredRead>>
928 348 : where
929 348 : Reader: BlockReader,
930 348 : {
931 348 : let ctx = RequestContextBuilder::extend(ctx)
932 348 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
933 348 : .build();
934 :
935 63924 : for range in keyspace.ranges.iter() {
936 63924 : let mut range_end_handled = false;
937 63924 :
938 63924 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
939 63924 : let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
940 63924 : let mut index_stream = std::pin::pin!(index_stream);
941 :
942 151482 : while let Some(index_entry) = index_stream.next().await {
943 151275 : let (raw_key, value) = index_entry?;
944 151275 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
945 151275 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
946 151275 : let blob_ref = BlobRef(value);
947 151275 :
948 151275 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
949 151275 : assert!(key >= range.start);
950 :
951 151275 : let outside_lsn_range = !lsn_range.contains(&lsn);
952 151275 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
953 :
954 151275 : let flag = {
955 151275 : if outside_lsn_range || below_cached_lsn {
956 61455 : BlobFlag::Ignore
957 89820 : } else if blob_ref.will_init() {
958 32172 : BlobFlag::ReplaceAll
959 : } else {
960 : // Usual path: add blob to the read
961 57648 : BlobFlag::None
962 : }
963 : };
964 :
965 151275 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
966 63717 : planner.handle_range_end(blob_ref.pos());
967 63717 : range_end_handled = true;
968 63717 : break;
969 87558 : } else {
970 87558 : planner.handle(key, lsn, blob_ref.pos(), flag);
971 87558 : }
972 : }
973 :
974 63924 : if !range_end_handled {
975 207 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
976 207 : planner.handle_range_end(data_end_offset);
977 63717 : }
978 : }
979 :
980 348 : Ok(planner.finish())
981 348 : }
982 :
983 346 : fn get_min_read_buffer_size(
984 346 : planned_reads: &[VectoredRead],
985 346 : read_size_soft_max: usize,
986 346 : ) -> usize {
987 30180 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
988 21 : return read_size_soft_max;
989 : };
990 :
991 325 : let largest_read_size = largest_read.size();
992 325 : if largest_read_size > read_size_soft_max {
993 : // If the read is oversized, it should only contain one key.
994 200 : let offenders = largest_read
995 200 : .blobs_at
996 200 : .as_slice()
997 200 : .iter()
998 200 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
999 200 : .join(", ");
1000 200 : tracing::warn!(
1001 0 : "Oversized vectored read ({} > {}) for keys {}",
1002 : largest_read_size,
1003 : read_size_soft_max,
1004 : offenders
1005 : );
1006 125 : }
1007 :
1008 325 : largest_read_size
1009 346 : }
1010 :
1011 146 : async fn do_reads_and_update_state(
1012 146 : &self,
1013 146 : reads: Vec<VectoredRead>,
1014 146 : reconstruct_state: &mut ValuesReconstructState,
1015 146 : ) {
1016 146 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
1017 146 : let mut ignore_key_with_err = None;
1018 146 :
1019 146 : let max_vectored_read_bytes = self
1020 146 : .max_vectored_read_bytes
1021 146 : .expect("Layer is loaded with max vectored bytes config")
1022 146 : .0
1023 146 : .into();
1024 146 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1025 146 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1026 :
1027 : // Note that reads are processed in reverse order (from highest key+lsn).
1028 : // This is the order that `ReconstructState` requires such that it can
1029 : // track when a key is done.
1030 10456 : for read in reads.into_iter().rev() {
1031 10456 : let res = vectored_blob_reader
1032 10456 : .read_blobs(&read, buf.take().expect("Should have a buffer"))
1033 5345 : .await;
1034 :
1035 10456 : let blobs_buf = match res {
1036 10456 : Ok(blobs_buf) => blobs_buf,
1037 0 : Err(err) => {
1038 0 : let kind = err.kind();
1039 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1040 0 : reconstruct_state.on_key_error(
1041 0 : blob_meta.key,
1042 0 : PageReconstructError::from(anyhow!(
1043 0 : "Failed to read blobs from virtual file {}: {}",
1044 0 : self.file.path,
1045 0 : kind
1046 0 : )),
1047 0 : );
1048 0 : }
1049 :
1050 : // We have "lost" the buffer since the lower level IO api
1051 : // doesn't return the buffer on error. Allocate a new one.
1052 0 : buf = Some(BytesMut::with_capacity(buf_size));
1053 0 :
1054 0 : continue;
1055 : }
1056 : };
1057 :
1058 18388 : for meta in blobs_buf.blobs.iter().rev() {
1059 18388 : if Some(meta.meta.key) == ignore_key_with_err {
1060 0 : continue;
1061 18388 : }
1062 18388 :
1063 18388 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1064 18388 : let value = match value {
1065 18388 : Ok(v) => v,
1066 0 : Err(e) => {
1067 0 : reconstruct_state.on_key_error(
1068 0 : meta.meta.key,
1069 0 : PageReconstructError::from(anyhow!(e).context(format!(
1070 0 : "Failed to deserialize blob from virtual file {}",
1071 0 : self.file.path,
1072 0 : ))),
1073 0 : );
1074 0 :
1075 0 : ignore_key_with_err = Some(meta.meta.key);
1076 0 : continue;
1077 : }
1078 : };
1079 :
1080 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1081 : // state, no further updates shall be made to it. The call below will
1082 : // panic if the invariant is violated.
1083 18388 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1084 : }
1085 :
1086 10456 : buf = Some(blobs_buf.buf);
1087 : }
1088 146 : }
1089 :
1090 266 : pub(super) async fn load_keys<'a>(
1091 266 : &'a self,
1092 266 : ctx: &RequestContext,
1093 266 : ) -> Result<Vec<DeltaEntry<'a>>> {
1094 266 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1095 266 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1096 266 : self.index_start_blk,
1097 266 : self.index_root_blk,
1098 266 : block_reader,
1099 266 : );
1100 266 :
1101 266 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1102 266 :
1103 266 : tree_reader
1104 266 : .visit(
1105 266 : &[0u8; DELTA_KEY_SIZE],
1106 266 : VisitDirection::Forwards,
1107 2042006 : |key, value| {
1108 2042006 : let delta_key = DeltaKey::from_slice(key);
1109 2042006 : let val_ref = ValueRef {
1110 2042006 : blob_ref: BlobRef(value),
1111 2042006 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
1112 2042006 : Adapter(self),
1113 2042006 : )),
1114 2042006 : };
1115 2042006 : let pos = BlobRef(value).pos();
1116 2042006 : if let Some(last) = all_keys.last_mut() {
1117 2041740 : // subtract offset of the current and last entries to get the size
1118 2041740 : // of the value associated with this (key, lsn) tuple
1119 2041740 : let first_pos = last.size;
1120 2041740 : last.size = pos - first_pos;
1121 2041740 : }
1122 2042006 : let entry = DeltaEntry {
1123 2042006 : key: delta_key.key(),
1124 2042006 : lsn: delta_key.lsn(),
1125 2042006 : size: pos,
1126 2042006 : val: val_ref,
1127 2042006 : };
1128 2042006 : all_keys.push(entry);
1129 2042006 : true
1130 2042006 : },
1131 266 : &RequestContextBuilder::extend(ctx)
1132 266 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1133 266 : .build(),
1134 266 : )
1135 2074 : .await?;
1136 266 : if let Some(last) = all_keys.last_mut() {
1137 266 : // Last key occupies all space till end of value storage,
1138 266 : // which corresponds to beginning of the index
1139 266 : last.size = self.index_start_offset() - last.size;
1140 266 : }
1141 266 : Ok(all_keys)
1142 266 : }
1143 :
1144 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1145 : ///
1146 : /// Return the amount of key value records pushed to the writer.
1147 10 : pub(super) async fn copy_prefix(
1148 10 : &self,
1149 10 : writer: &mut DeltaLayerWriter,
1150 10 : until: Lsn,
1151 10 : ctx: &RequestContext,
1152 10 : ) -> anyhow::Result<usize> {
1153 10 : use crate::tenant::vectored_blob_io::{
1154 10 : BlobMeta, VectoredReadBuilder, VectoredReadExtended,
1155 10 : };
1156 10 : use futures::stream::TryStreamExt;
1157 10 :
1158 10 : #[derive(Debug)]
1159 10 : enum Item {
1160 10 : Actual(Key, Lsn, BlobRef),
1161 10 : Sentinel,
1162 10 : }
1163 10 :
1164 10 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1165 70 : fn from(value: Item) -> Self {
1166 70 : match value {
1167 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1168 10 : Item::Sentinel => None,
1169 10 : }
1170 70 : }
1171 10 : }
1172 10 :
1173 10 : impl Item {
1174 70 : fn offset(&self) -> Option<BlobRef> {
1175 70 : match self {
1176 60 : Item::Actual(_, _, blob) => Some(*blob),
1177 10 : Item::Sentinel => None,
1178 10 : }
1179 70 : }
1180 10 :
1181 70 : fn is_last(&self) -> bool {
1182 70 : matches!(self, Item::Sentinel)
1183 70 : }
1184 10 : }
1185 10 :
1186 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1187 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1188 10 : self.index_start_blk,
1189 10 : self.index_root_blk,
1190 10 : block_reader,
1191 10 : );
1192 10 :
1193 10 : let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1194 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1195 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1196 10 : // repeat the whole read part
1197 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1198 10 : Item::Sentinel,
1199 10 : ))));
1200 10 : let mut stream = std::pin::pin!(stream);
1201 10 :
1202 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1203 10 :
1204 10 : let mut read_builder: Option<VectoredReadBuilder> = None;
1205 10 :
1206 10 : let max_read_size = self
1207 10 : .max_vectored_read_bytes
1208 10 : .map(|x| x.0.get())
1209 10 : .unwrap_or(8192);
1210 10 :
1211 10 : let mut buffer = Some(BytesMut::with_capacity(max_read_size));
1212 10 :
1213 10 : // FIXME: buffering of DeltaLayerWriter
1214 10 : let mut per_blob_copy = Vec::new();
1215 10 :
1216 10 : let mut records = 0;
1217 :
1218 80 : while let Some(item) = stream.try_next().await? {
1219 70 : tracing::debug!(?item, "popped");
1220 70 : let offset = item
1221 70 : .offset()
1222 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1223 :
1224 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1225 60 : let end_offset = offset;
1226 60 :
1227 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1228 : } else {
1229 10 : None
1230 : };
1231 :
1232 70 : let is_last = item.is_last();
1233 70 :
1234 70 : prev = Option::from(item);
1235 70 :
1236 70 : let actionable = actionable.filter(|x| x.0.lsn < until);
1237 :
1238 70 : let builder = if let Some((meta, offsets)) = actionable {
1239 : // extend or create a new builder
1240 32 : if read_builder
1241 32 : .as_mut()
1242 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1243 32 : .unwrap_or(VectoredReadExtended::No)
1244 32 : == VectoredReadExtended::Yes
1245 : {
1246 16 : None
1247 : } else {
1248 16 : read_builder.replace(VectoredReadBuilder::new(
1249 16 : offsets.start.pos(),
1250 16 : offsets.end.pos(),
1251 16 : meta,
1252 16 : max_read_size,
1253 16 : ))
1254 : }
1255 : } else {
1256 : // nothing to do, except perhaps flush any existing for the last element
1257 38 : None
1258 : };
1259 :
1260 : // flush the possible older builder and also the new one if the item was the last one
1261 70 : let builders = builder.into_iter();
1262 70 : let builders = if is_last {
1263 10 : builders.chain(read_builder.take())
1264 : } else {
1265 60 : builders.chain(None)
1266 : };
1267 :
1268 86 : for builder in builders {
1269 16 : let read = builder.build();
1270 16 :
1271 16 : let reader = VectoredBlobReader::new(&self.file);
1272 16 :
1273 16 : let mut buf = buffer.take().unwrap();
1274 16 :
1275 16 : buf.clear();
1276 16 : buf.reserve(read.size());
1277 16 : let res = reader.read_blobs(&read, buf).await?;
1278 :
1279 48 : for blob in res.blobs {
1280 32 : let key = blob.meta.key;
1281 32 : let lsn = blob.meta.lsn;
1282 32 : let data = &res.buf[blob.start..blob.end];
1283 32 :
1284 32 : #[cfg(debug_assertions)]
1285 32 : Value::des(data)
1286 32 : .with_context(|| {
1287 0 : format!(
1288 0 : "blob failed to deserialize for {}@{}, {}..{}: {:?}",
1289 0 : blob.meta.key,
1290 0 : blob.meta.lsn,
1291 0 : blob.start,
1292 0 : blob.end,
1293 0 : utils::Hex(data)
1294 0 : )
1295 32 : })
1296 32 : .unwrap();
1297 32 :
1298 32 : // is it an image or will_init walrecord?
1299 32 : // FIXME: this could be handled by threading the BlobRef to the
1300 32 : // VectoredReadBuilder
1301 32 : let will_init = crate::repository::ValueBytes::will_init(data)
1302 32 : .inspect_err(|_e| {
1303 0 : #[cfg(feature = "testing")]
1304 0 : tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1305 32 : })
1306 32 : .unwrap_or(false);
1307 32 :
1308 32 : per_blob_copy.clear();
1309 32 : per_blob_copy.extend_from_slice(data);
1310 :
1311 32 : let (tmp, res) = writer
1312 32 : .put_value_bytes(
1313 32 : key,
1314 32 : lsn,
1315 32 : std::mem::take(&mut per_blob_copy),
1316 32 : will_init,
1317 32 : ctx,
1318 32 : )
1319 4 : .await;
1320 32 : per_blob_copy = tmp;
1321 32 :
1322 32 : res?;
1323 :
1324 32 : records += 1;
1325 : }
1326 :
1327 16 : buffer = Some(res.buf);
1328 : }
1329 : }
1330 :
1331 10 : assert!(
1332 10 : read_builder.is_none(),
1333 0 : "with the sentinel above loop should had handled all"
1334 : );
1335 :
1336 10 : Ok(records)
1337 10 : }
1338 :
1339 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1340 4 : println!(
1341 4 : "index_start_blk: {}, root {}",
1342 4 : self.index_start_blk, self.index_root_blk
1343 4 : );
1344 4 :
1345 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1346 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1347 4 : self.index_start_blk,
1348 4 : self.index_root_blk,
1349 4 : block_reader,
1350 4 : );
1351 4 :
1352 4 : tree_reader.dump().await?;
1353 :
1354 4 : let keys = self.load_keys(ctx).await?;
1355 :
1356 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1357 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1358 8 : let val = Value::des(&buf)?;
1359 8 : let desc = match val {
1360 8 : Value::Image(img) => {
1361 8 : format!(" img {} bytes", img.len())
1362 : }
1363 0 : Value::WalRecord(rec) => {
1364 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1365 0 : format!(
1366 0 : " rec {} bytes will_init: {} {}",
1367 0 : buf.len(),
1368 0 : rec.will_init(),
1369 0 : wal_desc
1370 0 : )
1371 : }
1372 : };
1373 8 : Ok(desc)
1374 8 : }
1375 :
1376 12 : for entry in keys {
1377 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1378 8 : let desc = match dump_blob(&val, ctx).await {
1379 8 : Ok(desc) => desc,
1380 0 : Err(err) => {
1381 0 : format!("ERROR: {err}")
1382 : }
1383 : };
1384 8 : println!(" key {key} at {lsn}: {desc}");
1385 8 :
1386 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1387 8 : // of many other record types too, but these are particularly interesting, as
1388 8 : // have a lot of special processing for them in walingest.rs.
1389 8 : use pageserver_api::key::CHECKPOINT_KEY;
1390 8 : use postgres_ffi::CheckPoint;
1391 8 : if key == CHECKPOINT_KEY {
1392 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1393 0 : let val = Value::des(&buf)?;
1394 0 : match val {
1395 0 : Value::Image(img) => {
1396 0 : let checkpoint = CheckPoint::decode(&img)?;
1397 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1398 : }
1399 0 : Value::WalRecord(_rec) => {
1400 0 : println!(" unexpected walrecord value for checkpoint key");
1401 0 : }
1402 : }
1403 8 : }
1404 : }
1405 :
1406 4 : Ok(())
1407 4 : }
1408 :
1409 30 : fn stream_index_forwards<'a, R>(
1410 30 : &'a self,
1411 30 : reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
1412 30 : start: &'a [u8; DELTA_KEY_SIZE],
1413 30 : ctx: &'a RequestContext,
1414 30 : ) -> impl futures::stream::Stream<
1415 30 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1416 30 : > + 'a
1417 30 : where
1418 30 : R: BlockReader,
1419 30 : {
1420 30 : use futures::stream::TryStreamExt;
1421 30 : let stream = reader.get_stream_from(start, ctx);
1422 152 : stream.map_ok(|(key, value)| {
1423 152 : let key = DeltaKey::from_slice(&key);
1424 152 : let (key, lsn) = (key.key(), key.lsn());
1425 152 : let offset = BlobRef(value);
1426 152 :
1427 152 : (key, lsn, offset)
1428 152 : })
1429 30 : }
1430 :
1431 : /// The file offset to the first block of index.
1432 : ///
1433 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1434 482 : fn index_start_offset(&self) -> u64 {
1435 482 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1436 482 : let bref = BlobRef(offset);
1437 482 : tracing::debug!(
1438 : index_start_blk = self.index_start_blk,
1439 : offset,
1440 0 : pos = bref.pos(),
1441 0 : "index_start_offset"
1442 : );
1443 482 : offset
1444 482 : }
1445 : }
1446 :
1447 : /// A set of data associated with a delta layer key and its value
1448 : pub struct DeltaEntry<'a> {
1449 : pub key: Key,
1450 : pub lsn: Lsn,
1451 : /// Size of the stored value
1452 : pub size: u64,
1453 : /// Reference to the on-disk value
1454 : pub val: ValueRef<'a>,
1455 : }
1456 :
1457 : /// Reference to an on-disk value
1458 : pub struct ValueRef<'a> {
1459 : blob_ref: BlobRef,
1460 : reader: BlockCursor<'a>,
1461 : }
1462 :
1463 : impl<'a> ValueRef<'a> {
1464 : /// Loads the value from disk
1465 2041998 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1466 : // theoretically we *could* record an access time for each, but it does not really matter
1467 2041998 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1468 2041998 : let val = Value::des(&buf)?;
1469 2041998 : Ok(val)
1470 2041998 : }
1471 : }
1472 :
1473 : pub(crate) struct Adapter<T>(T);
1474 :
1475 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1476 2060764 : pub(crate) async fn read_blk(
1477 2060764 : &self,
1478 2060764 : blknum: u32,
1479 2060764 : ctx: &RequestContext,
1480 2060764 : ) -> Result<BlockLease, std::io::Error> {
1481 2060764 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1482 2060764 : block_reader.read_blk(blknum, ctx).await
1483 2060764 : }
1484 : }
1485 :
1486 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1487 4121528 : fn as_ref(&self) -> &DeltaLayerInner {
1488 4121528 : self
1489 4121528 : }
1490 : }
1491 :
1492 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1493 0 : fn key(&self) -> Key {
1494 0 : self.key
1495 0 : }
1496 0 : fn lsn(&self) -> Lsn {
1497 0 : self.lsn
1498 0 : }
1499 0 : fn size(&self) -> u64 {
1500 0 : self.size
1501 0 : }
1502 : }
1503 :
1504 : #[cfg(test)]
1505 : mod test {
1506 : use std::collections::BTreeMap;
1507 :
1508 : use itertools::MinMaxResult;
1509 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1510 : use rand::RngCore;
1511 :
1512 : use super::*;
1513 : use crate::{
1514 : context::DownloadBehavior,
1515 : task_mgr::TaskKind,
1516 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1517 : DEFAULT_PG_VERSION,
1518 : };
1519 :
1520 : /// Construct an index for a fictional delta layer and and then
1521 : /// traverse in order to plan vectored reads for a query. Finally,
1522 : /// verify that the traversal fed the right index key and value
1523 : /// pairs into the planner.
1524 : #[tokio::test]
1525 2 : async fn test_delta_layer_index_traversal() {
1526 2 : let base_key = Key {
1527 2 : field1: 0,
1528 2 : field2: 1663,
1529 2 : field3: 12972,
1530 2 : field4: 16396,
1531 2 : field5: 0,
1532 2 : field6: 246080,
1533 2 : };
1534 2 :
1535 2 : // Populate the index with some entries
1536 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1537 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1538 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1539 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1540 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1541 2 : ]);
1542 2 :
1543 2 : let mut disk = TestDisk::default();
1544 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1545 2 :
1546 2 : let mut disk_offset = 0;
1547 10 : for (key, lsns) in &entries {
1548 42 : for lsn in lsns {
1549 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1550 34 : let blob_ref = BlobRef::new(disk_offset, false);
1551 34 : writer
1552 34 : .append(&index_key.0, blob_ref.0)
1553 34 : .expect("In memory disk append should never fail");
1554 34 :
1555 34 : disk_offset += 1;
1556 34 : }
1557 2 : }
1558 2 :
1559 2 : // Prepare all the arguments for the call into `plan_reads` below
1560 2 : let (root_offset, _writer) = writer
1561 2 : .finish()
1562 2 : .expect("In memory disk finish should never fail");
1563 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1564 2 : let planner = VectoredReadPlanner::new(100);
1565 2 : let mut reconstruct_state = ValuesReconstructState::new();
1566 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1567 2 :
1568 2 : let keyspace = KeySpace {
1569 2 : ranges: vec![
1570 2 : base_key..base_key.add(3),
1571 2 : base_key.add(3)..base_key.add(100),
1572 2 : ],
1573 2 : };
1574 2 : let lsn_range = Lsn(2)..Lsn(40);
1575 2 :
1576 2 : // Plan and validate
1577 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1578 2 : &keyspace,
1579 2 : lsn_range.clone(),
1580 2 : disk_offset,
1581 2 : reader,
1582 2 : planner,
1583 2 : &mut reconstruct_state,
1584 2 : &ctx,
1585 2 : )
1586 2 : .await
1587 2 : .expect("Read planning should not fail");
1588 2 :
1589 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1590 2 : }
1591 :
1592 2 : fn validate(
1593 2 : keyspace: KeySpace,
1594 2 : lsn_range: Range<Lsn>,
1595 2 : vectored_reads: Vec<VectoredRead>,
1596 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1597 2 : ) {
1598 2 : #[derive(Debug, PartialEq, Eq)]
1599 2 : struct BlobSpec {
1600 2 : key: Key,
1601 2 : lsn: Lsn,
1602 2 : at: u64,
1603 2 : }
1604 2 :
1605 2 : let mut planned_blobs = Vec::new();
1606 8 : for read in vectored_reads {
1607 28 : for (at, meta) in read.blobs_at.as_slice() {
1608 28 : planned_blobs.push(BlobSpec {
1609 28 : key: meta.key,
1610 28 : lsn: meta.lsn,
1611 28 : at: *at,
1612 28 : });
1613 28 : }
1614 : }
1615 :
1616 2 : let mut expected_blobs = Vec::new();
1617 2 : let mut disk_offset = 0;
1618 10 : for (key, lsns) in index_entries {
1619 42 : for lsn in lsns {
1620 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1621 34 : let lsn_included = lsn_range.contains(&lsn);
1622 34 :
1623 34 : if key_included && lsn_included {
1624 28 : expected_blobs.push(BlobSpec {
1625 28 : key,
1626 28 : lsn,
1627 28 : at: disk_offset,
1628 28 : });
1629 28 : }
1630 :
1631 34 : disk_offset += 1;
1632 : }
1633 : }
1634 :
1635 2 : assert_eq!(planned_blobs, expected_blobs);
1636 2 : }
1637 :
1638 : mod constants {
1639 : use utils::lsn::Lsn;
1640 :
1641 : /// Offset used by all lsns in this test
1642 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1643 : /// Number of unique keys including in the test data
1644 : pub(super) const KEY_COUNT: u8 = 60;
1645 : /// Max number of different lsns for each key
1646 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1647 : /// Possible value sizes for each key along with a probability weight
1648 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1649 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1650 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1651 : /// The minimum size of a key range in all the generated reads
1652 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1653 : /// The number of ranges included in each vectored read
1654 : pub(super) const RANGES_COUNT: u8 = 2;
1655 : /// The number of vectored reads performed
1656 : pub(super) const READS_COUNT: u8 = 100;
1657 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1658 : /// with values larger than the limit
1659 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1660 : }
1661 :
1662 : struct Entry {
1663 : key: Key,
1664 : lsn: Lsn,
1665 : value: Vec<u8>,
1666 : }
1667 :
1668 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1669 2 : let mut current_key = Key::MIN;
1670 2 :
1671 2 : let mut entries = Vec::new();
1672 122 : for _ in 0..constants::KEY_COUNT {
1673 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1674 120 : let mut lsns_iter =
1675 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1676 2260 : Some(Lsn(lsn.0 + 0x08))
1677 2260 : });
1678 120 : let mut lsns = Vec::new();
1679 2380 : while lsns.len() < count as usize {
1680 2260 : let take = rng.gen_bool(0.5);
1681 2260 : let lsn = lsns_iter.next().unwrap();
1682 2260 : if take {
1683 1112 : lsns.push(lsn);
1684 1148 : }
1685 : }
1686 :
1687 1232 : for lsn in lsns {
1688 1112 : let size = constants::VALUE_SIZES
1689 3336 : .choose_weighted(rng, |item| item.1)
1690 1112 : .unwrap()
1691 1112 : .0;
1692 1112 : let mut buf = vec![0; size];
1693 1112 : rng.fill_bytes(&mut buf);
1694 1112 :
1695 1112 : entries.push(Entry {
1696 1112 : key: current_key,
1697 1112 : lsn,
1698 1112 : value: buf,
1699 1112 : })
1700 : }
1701 :
1702 120 : let gap = constants::KEY_GAP_CHANGES
1703 240 : .choose_weighted(rng, |item| item.1)
1704 120 : .unwrap()
1705 120 : .0;
1706 120 : if gap {
1707 38 : current_key = current_key.add(2);
1708 82 : } else {
1709 82 : current_key = current_key.add(1);
1710 82 : }
1711 : }
1712 :
1713 2 : entries
1714 2 : }
1715 :
1716 : struct EntriesMeta {
1717 : key_range: Range<Key>,
1718 : lsn_range: Range<Lsn>,
1719 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1720 : }
1721 :
1722 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1723 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1724 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1725 0 : _ => panic!("More than one entry is always expected"),
1726 : };
1727 :
1728 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1729 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1730 0 : _ => panic!("More than one entry is always expected"),
1731 : };
1732 :
1733 2 : let mut index = BTreeMap::new();
1734 1112 : for entry in entries.iter() {
1735 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1736 1112 : }
1737 :
1738 2 : EntriesMeta {
1739 2 : key_range,
1740 2 : lsn_range,
1741 2 : index,
1742 2 : }
1743 2 : }
1744 :
1745 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1746 200 : let start = key_range.start.to_i128();
1747 200 : let end = key_range.end.to_i128();
1748 200 :
1749 200 : let mut keyspace = KeySpace::default();
1750 :
1751 600 : for _ in 0..constants::RANGES_COUNT {
1752 400 : let mut range: Option<Range<Key>> = Option::default();
1753 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1754 844 : let range_start = rng.gen_range(start..end);
1755 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1756 844 : if range_end_offset >= end {
1757 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1758 744 : } else {
1759 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1760 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1761 744 : }
1762 : }
1763 400 : keyspace.ranges.push(range.unwrap());
1764 : }
1765 :
1766 200 : keyspace
1767 200 : }
1768 :
1769 : #[tokio::test]
1770 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1771 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
1772 8 : let (tenant, ctx) = harness.load().await;
1773 2 :
1774 2 : let timeline_id = TimelineId::generate();
1775 2 : let timeline = tenant
1776 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1777 6 : .await?;
1778 2 :
1779 2 : tracing::info!("Generating test data ...");
1780 2 :
1781 2 : let rng = &mut StdRng::seed_from_u64(0);
1782 2 : let entries = generate_entries(rng);
1783 2 : let entries_meta = get_entries_meta(&entries);
1784 2 :
1785 2 : tracing::info!("Done generating {} entries", entries.len());
1786 2 :
1787 2 : tracing::info!("Writing test data to delta layer ...");
1788 2 : let mut writer = DeltaLayerWriter::new(
1789 2 : harness.conf,
1790 2 : timeline_id,
1791 2 : harness.tenant_shard_id,
1792 2 : entries_meta.key_range.start,
1793 2 : entries_meta.lsn_range.clone(),
1794 2 : )
1795 2 : .await?;
1796 2 :
1797 1114 : for entry in entries {
1798 1112 : let (_, res) = writer
1799 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx)
1800 215 : .await;
1801 1112 : res?;
1802 2 : }
1803 2 :
1804 2 : let resident = writer
1805 2 : .finish(entries_meta.key_range.end, &timeline, &ctx)
1806 5 : .await?;
1807 2 :
1808 2 : let inner = resident.as_delta(&ctx).await?;
1809 2 :
1810 2 : let file_size = inner.file.metadata().await?.len();
1811 2 : tracing::info!(
1812 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1813 2 : file_size
1814 2 : );
1815 2 :
1816 202 : for i in 0..constants::READS_COUNT {
1817 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1818 2 :
1819 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1820 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1821 200 : inner.index_start_blk,
1822 200 : inner.index_root_blk,
1823 200 : block_reader,
1824 200 : );
1825 200 :
1826 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1827 200 : let mut reconstruct_state = ValuesReconstructState::new();
1828 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1829 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1830 2 :
1831 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1832 200 : &keyspace,
1833 200 : entries_meta.lsn_range.clone(),
1834 200 : data_end_offset,
1835 200 : index_reader,
1836 200 : planner,
1837 200 : &mut reconstruct_state,
1838 200 : &ctx,
1839 200 : )
1840 4 : .await?;
1841 2 :
1842 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1843 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1844 200 : &vectored_reads,
1845 200 : constants::MAX_VECTORED_READ_BYTES,
1846 200 : );
1847 200 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1848 2 :
1849 19924 : for read in vectored_reads {
1850 19724 : let blobs_buf = vectored_blob_reader
1851 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"))
1852 10016 : .await?;
1853 57304 : for meta in blobs_buf.blobs.iter() {
1854 57304 : let value = &blobs_buf.buf[meta.start..meta.end];
1855 57304 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
1856 2 : }
1857 2 :
1858 19724 : buf = Some(blobs_buf.buf);
1859 2 : }
1860 2 : }
1861 2 :
1862 2 : Ok(())
1863 2 : }
1864 :
1865 : #[tokio::test]
1866 2 : async fn copy_delta_prefix_smoke() {
1867 2 : use crate::walrecord::NeonWalRecord;
1868 2 : use bytes::Bytes;
1869 2 :
1870 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
1871 8 : let (tenant, ctx) = h.load().await;
1872 2 : let ctx = &ctx;
1873 2 : let timeline = tenant
1874 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1875 6 : .await
1876 2 : .unwrap();
1877 2 :
1878 2 : let initdb_layer = timeline
1879 2 : .layers
1880 2 : .read()
1881 2 : .await
1882 2 : .likely_resident_layers()
1883 2 : .next()
1884 2 : .unwrap();
1885 2 :
1886 2 : {
1887 2 : let mut writer = timeline.writer().await;
1888 2 :
1889 2 : let data = [
1890 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
1891 2 : (
1892 2 : 0x30,
1893 2 : 12,
1894 2 : Value::WalRecord(NeonWalRecord::Postgres {
1895 2 : will_init: false,
1896 2 : rec: Bytes::from_static(b"1"),
1897 2 : }),
1898 2 : ),
1899 2 : (
1900 2 : 0x40,
1901 2 : 12,
1902 2 : Value::WalRecord(NeonWalRecord::Postgres {
1903 2 : will_init: true,
1904 2 : rec: Bytes::from_static(b"2"),
1905 2 : }),
1906 2 : ),
1907 2 : // build an oversized value so we cannot extend and existing read over
1908 2 : // this
1909 2 : (
1910 2 : 0x50,
1911 2 : 12,
1912 2 : Value::WalRecord(NeonWalRecord::Postgres {
1913 2 : will_init: true,
1914 2 : rec: {
1915 2 : let mut buf =
1916 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
1917 2 : buf.iter_mut()
1918 2 : .enumerate()
1919 264192 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
1920 2 : Bytes::from(buf)
1921 2 : },
1922 2 : }),
1923 2 : ),
1924 2 : // because the oversized read cannot be extended further, we are sure to exercise the
1925 2 : // builder created on the last round with this:
1926 2 : (
1927 2 : 0x60,
1928 2 : 12,
1929 2 : Value::WalRecord(NeonWalRecord::Postgres {
1930 2 : will_init: true,
1931 2 : rec: Bytes::from_static(b"3"),
1932 2 : }),
1933 2 : ),
1934 2 : (
1935 2 : 0x60,
1936 2 : 9,
1937 2 : Value::Image(Bytes::from_static(b"something for a different key")),
1938 2 : ),
1939 2 : ];
1940 2 :
1941 2 : let mut last_lsn = None;
1942 2 :
1943 14 : for (lsn, key, value) in data {
1944 12 : let key = Key::from_i128(key);
1945 12 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
1946 12 : last_lsn = Some(lsn);
1947 2 : }
1948 2 :
1949 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
1950 2 : }
1951 2 : timeline.freeze_and_flush().await.unwrap();
1952 2 :
1953 2 : let new_layer = timeline
1954 2 : .layers
1955 2 : .read()
1956 2 : .await
1957 2 : .likely_resident_layers()
1958 4 : .find(|x| x != &initdb_layer)
1959 2 : .unwrap();
1960 2 :
1961 2 : // create a copy for the timeline, so we don't overwrite the file
1962 2 : let branch = tenant
1963 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
1964 2 : .await
1965 2 : .unwrap();
1966 2 :
1967 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
1968 2 :
1969 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
1970 2 : // a single key
1971 2 :
1972 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
1973 10 : let truncate_at = Lsn(truncate_at);
1974 2 :
1975 10 : let mut writer = DeltaLayerWriter::new(
1976 10 : tenant.conf,
1977 10 : branch.timeline_id,
1978 10 : tenant.tenant_shard_id,
1979 10 : Key::MIN,
1980 10 : Lsn(0x11)..truncate_at,
1981 10 : )
1982 5 : .await
1983 10 : .unwrap();
1984 2 :
1985 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
1986 10 :
1987 10 : new_layer
1988 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
1989 15 : .await
1990 10 : .unwrap();
1991 2 :
1992 24 : let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
1993 10 :
1994 11 : copied_layer.as_delta(ctx).await.unwrap();
1995 10 :
1996 10 : assert_keys_and_values_eq(
1997 10 : new_layer.as_delta(ctx).await.unwrap(),
1998 10 : copied_layer.as_delta(ctx).await.unwrap(),
1999 10 : truncate_at,
2000 10 : ctx,
2001 2 : )
2002 52 : .await;
2003 2 : }
2004 2 : }
2005 :
2006 10 : async fn assert_keys_and_values_eq(
2007 10 : source: &DeltaLayerInner,
2008 10 : truncated: &DeltaLayerInner,
2009 10 : truncated_at: Lsn,
2010 10 : ctx: &RequestContext,
2011 10 : ) {
2012 10 : use futures::future::ready;
2013 10 : use futures::stream::TryStreamExt;
2014 10 :
2015 10 : let start_key = [0u8; DELTA_KEY_SIZE];
2016 10 :
2017 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2018 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2019 10 : source.index_start_blk,
2020 10 : source.index_root_blk,
2021 10 : &source_reader,
2022 10 : );
2023 10 : let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
2024 60 : let source_stream = source_stream.filter(|res| match res {
2025 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2026 0 : _ => ready(true),
2027 60 : });
2028 10 : let mut source_stream = std::pin::pin!(source_stream);
2029 10 :
2030 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2031 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2032 10 : truncated.index_start_blk,
2033 10 : truncated.index_root_blk,
2034 10 : &truncated_reader,
2035 10 : );
2036 10 : let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
2037 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2038 10 :
2039 10 : let mut scratch_left = Vec::new();
2040 10 : let mut scratch_right = Vec::new();
2041 :
2042 : loop {
2043 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2044 42 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2045 42 :
2046 42 : if src.is_none() {
2047 10 : assert!(truncated.is_none());
2048 10 : break;
2049 32 : }
2050 32 :
2051 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2052 32 :
2053 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2054 32 : assert_eq!(src.0, truncated.0);
2055 32 : assert_eq!(src.1, truncated.1);
2056 :
2057 : // if this is needed for something else, just drop this assert.
2058 32 : assert!(
2059 32 : src.2.pos() >= truncated.2.pos(),
2060 0 : "value position should not go backwards {} vs. {}",
2061 0 : src.2.pos(),
2062 0 : truncated.2.pos()
2063 : );
2064 :
2065 32 : scratch_left.clear();
2066 32 : let src_cursor = source_reader.block_cursor();
2067 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2068 32 : scratch_right.clear();
2069 32 : let trunc_cursor = truncated_reader.block_cursor();
2070 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2071 :
2072 32 : tokio::try_join!(left, right).unwrap();
2073 32 :
2074 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2075 : }
2076 10 : }
2077 : }
|