Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use std::collections::{HashMap, VecDeque};
31 : use std::fs::File;
32 : use std::ops::Range;
33 : use std::os::unix::fs::FileExt;
34 : use std::str::FromStr;
35 : use std::sync::Arc;
36 : use std::sync::atomic::AtomicU64;
37 :
38 : use anyhow::{Context, Result, bail, ensure};
39 : use camino::{Utf8Path, Utf8PathBuf};
40 : use futures::StreamExt;
41 : use itertools::Itertools;
42 : use pageserver_api::config::MaxVectoredReadBytes;
43 : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
44 : use pageserver_api::keyspace::KeySpace;
45 : use pageserver_api::models::ImageCompressionAlgorithm;
46 : use pageserver_api::shard::TenantShardId;
47 : use pageserver_api::value::Value;
48 : use serde::{Deserialize, Serialize};
49 : use tokio::sync::OnceCell;
50 : use tokio_epoll_uring::IoBuf;
51 : use tokio_util::sync::CancellationToken;
52 : use tracing::*;
53 : use utils::bin_ser::BeSer;
54 : use utils::bin_ser::SerializeError;
55 : use utils::id::{TenantId, TimelineId};
56 : use utils::lsn::Lsn;
57 :
58 : use super::{
59 : AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
60 : ValuesReconstructState,
61 : };
62 : use crate::config::PageServerConf;
63 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
64 : use crate::page_cache::{self, FileId, PAGE_SZ};
65 : use crate::tenant::blob_io::BlobWriter;
66 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
67 : use crate::tenant::disk_btree::{
68 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
69 : };
70 : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
71 : use crate::tenant::timeline::GetVectoredError;
72 : use crate::tenant::vectored_blob_io::{
73 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
74 : VectoredReadPlanner,
75 : };
76 : use crate::virtual_file::TempVirtualFile;
77 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
78 : use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
79 : use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
80 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
81 :
82 : ///
83 : /// Header stored in the beginning of the file
84 : ///
85 : /// After this comes the 'values' part, starting on block 1. After that,
86 : /// the 'index' starts at the block indicated by 'index_start_blk'
87 : ///
88 0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
89 : pub struct Summary {
90 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
91 : pub magic: u16,
92 : pub format_version: u16,
93 :
94 : pub tenant_id: TenantId,
95 : pub timeline_id: TimelineId,
96 : pub key_range: Range<Key>,
97 : pub lsn_range: Range<Lsn>,
98 :
99 : /// Block number where the 'index' part of the file begins.
100 : pub index_start_blk: u32,
101 : /// Block within the 'index', where the B-tree root page is stored
102 : pub index_root_blk: u32,
103 : }
104 :
105 : impl From<&DeltaLayer> for Summary {
106 0 : fn from(layer: &DeltaLayer) -> Self {
107 0 : Self::expected(
108 0 : layer.desc.tenant_shard_id.tenant_id,
109 0 : layer.desc.timeline_id,
110 0 : layer.desc.key_range.clone(),
111 0 : layer.desc.lsn_range.clone(),
112 0 : )
113 0 : }
114 : }
115 :
116 : impl Summary {
117 : /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
118 8700 : pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
119 8700 : let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
120 8700 : Self::ser_into(self, &mut buf)?;
121 : // Pad zeroes to the buffer so the length is a multiple of the alignment.
122 8700 : buf.extend_with(0, buf.capacity() - buf.len());
123 8700 : Ok(buf.freeze())
124 8700 : }
125 :
126 6612 : pub(super) fn expected(
127 6612 : tenant_id: TenantId,
128 6612 : timeline_id: TimelineId,
129 6612 : keys: Range<Key>,
130 6612 : lsns: Range<Lsn>,
131 6612 : ) -> Self {
132 6612 : Self {
133 6612 : magic: DELTA_FILE_MAGIC,
134 6612 : format_version: STORAGE_FORMAT_VERSION,
135 6612 :
136 6612 : tenant_id,
137 6612 : timeline_id,
138 6612 : key_range: keys,
139 6612 : lsn_range: lsns,
140 6612 :
141 6612 : index_start_blk: 0,
142 6612 : index_root_blk: 0,
143 6612 : }
144 6612 : }
145 : }
146 :
147 : // Flag indicating that this version initialize the page
148 : const WILL_INIT: u64 = 1;
149 :
150 : /// Struct representing reference to BLOB in layers.
151 : ///
152 : /// Reference contains BLOB offset, and for WAL records it also contains
153 : /// `will_init` flag. The flag helps to determine the range of records
154 : /// that needs to be applied, without reading/deserializing records themselves.
155 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
156 : pub struct BlobRef(pub u64);
157 :
158 : impl BlobRef {
159 28825199 : pub fn will_init(&self) -> bool {
160 28825199 : (self.0 & WILL_INIT) != 0
161 28825199 : }
162 :
163 46431473 : pub fn pos(&self) -> u64 {
164 46431473 : self.0 >> 1
165 46431473 : }
166 :
167 38952492 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
168 38952492 : let mut blob_ref = pos << 1;
169 38952492 : if will_init {
170 38820996 : blob_ref |= WILL_INIT;
171 38820996 : }
172 38952492 : BlobRef(blob_ref)
173 38952492 : }
174 : }
175 :
176 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
177 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
178 :
179 : /// This is the key of the B-tree index stored in the delta layer. It consists
180 : /// of the serialized representation of a Key and LSN.
181 : impl DeltaKey {
182 12385188 : fn from_slice(buf: &[u8]) -> Self {
183 12385188 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
184 12385188 : bytes.copy_from_slice(buf);
185 12385188 : DeltaKey(bytes)
186 12385188 : }
187 :
188 40398511 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
189 40398511 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
190 40398511 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
191 40398511 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
192 40398511 : DeltaKey(bytes)
193 40398511 : }
194 :
195 12385188 : fn key(&self) -> Key {
196 12385188 : Key::from_slice(&self.0)
197 12385188 : }
198 :
199 12385188 : fn lsn(&self) -> Lsn {
200 12385188 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
201 12385188 : }
202 :
203 34045925 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
204 34045925 : let mut lsn_buf = [0u8; 8];
205 34045925 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
206 34045925 : Lsn(u64::from_be_bytes(lsn_buf))
207 34045925 : }
208 : }
209 :
210 : /// This is used only from `pagectl`. Within pageserver, all layers are
211 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
212 : pub struct DeltaLayer {
213 : path: Utf8PathBuf,
214 : pub desc: PersistentLayerDesc,
215 : inner: OnceCell<Arc<DeltaLayerInner>>,
216 : }
217 :
218 : impl std::fmt::Debug for DeltaLayer {
219 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 : use super::RangeDisplayDebug;
221 :
222 0 : f.debug_struct("DeltaLayer")
223 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
224 0 : .field("lsn_range", &self.desc.lsn_range)
225 0 : .field("file_size", &self.desc.file_size)
226 0 : .field("inner", &self.inner)
227 0 : .finish()
228 0 : }
229 : }
230 :
231 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
232 : /// file.
233 : pub struct DeltaLayerInner {
234 : // values copied from summary
235 : index_start_blk: u32,
236 : index_root_blk: u32,
237 :
238 : file: Arc<VirtualFile>,
239 : file_id: FileId,
240 :
241 : layer_key_range: Range<Key>,
242 : layer_lsn_range: Range<Lsn>,
243 :
244 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
245 : }
246 :
247 : impl DeltaLayerInner {
248 0 : pub(crate) fn layer_dbg_info(&self) -> String {
249 0 : format!(
250 0 : "delta {}..{} {}..{}",
251 0 : self.key_range().start,
252 0 : self.key_range().end,
253 0 : self.lsn_range().start,
254 0 : self.lsn_range().end
255 0 : )
256 0 : }
257 : }
258 :
259 : impl std::fmt::Debug for DeltaLayerInner {
260 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 0 : f.debug_struct("DeltaLayerInner")
262 0 : .field("index_start_blk", &self.index_start_blk)
263 0 : .field("index_root_blk", &self.index_root_blk)
264 0 : .finish()
265 0 : }
266 : }
267 :
268 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
269 : impl std::fmt::Display for DeltaLayer {
270 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 0 : write!(f, "{}", self.layer_desc().short_id())
272 0 : }
273 : }
274 :
275 : impl AsLayerDesc for DeltaLayer {
276 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
277 0 : &self.desc
278 0 : }
279 : }
280 :
281 : impl DeltaLayer {
282 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
283 0 : self.desc.dump();
284 0 :
285 0 : if !verbose {
286 0 : return Ok(());
287 0 : }
288 :
289 0 : let inner = self.load(ctx).await?;
290 :
291 0 : inner.dump(ctx).await
292 0 : }
293 :
294 8856 : fn temp_path_for(
295 8856 : conf: &PageServerConf,
296 8856 : tenant_shard_id: &TenantShardId,
297 8856 : timeline_id: &TimelineId,
298 8856 : key_start: Key,
299 8856 : lsn_range: &Range<Lsn>,
300 8856 : ) -> Utf8PathBuf {
301 : // TempVirtualFile requires us to never reuse a filename while an old
302 : // instance of TempVirtualFile created with that filename is not done dropping yet.
303 : // So, we use a monotonic counter to disambiguate the filenames.
304 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
305 8856 : let filename_disambiguator =
306 8856 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
307 8856 :
308 8856 : conf.timeline_path(tenant_shard_id, timeline_id)
309 8856 : .join(format!(
310 8856 : "{}-XXX__{:016X}-{:016X}.{:x}.{}",
311 8856 : key_start,
312 8856 : u64::from(lsn_range.start),
313 8856 : u64::from(lsn_range.end),
314 8856 : filename_disambiguator,
315 8856 : TEMP_FILE_SUFFIX,
316 8856 : ))
317 8856 : }
318 :
319 : ///
320 : /// Open the underlying file and read the metadata into memory, if it's
321 : /// not loaded already.
322 : ///
323 0 : async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
324 0 : // Quick exit if already loaded
325 0 : self.inner
326 0 : .get_or_try_init(|| self.load_inner(ctx))
327 0 : .await
328 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
329 0 : }
330 :
331 0 : async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
332 0 : let path = self.path();
333 :
334 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
335 :
336 : // not production code
337 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
338 0 : let expected_layer_name = self.layer_desc().layer_name();
339 0 :
340 0 : if actual_layer_name != expected_layer_name {
341 0 : println!("warning: filename does not match what is expected from in-file summary");
342 0 : println!("actual: {:?}", actual_layer_name.to_string());
343 0 : println!("expected: {:?}", expected_layer_name.to_string());
344 0 : }
345 :
346 0 : Ok(Arc::new(loaded))
347 0 : }
348 :
349 : /// Create a DeltaLayer struct representing an existing file on disk.
350 : ///
351 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
352 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
353 0 : let mut summary_buf = vec![0; PAGE_SZ];
354 0 : file.read_exact_at(&mut summary_buf, 0)?;
355 0 : let summary = Summary::des_prefix(&summary_buf)?;
356 :
357 0 : let metadata = file
358 0 : .metadata()
359 0 : .context("get file metadata to determine size")?;
360 :
361 : // This function is never used for constructing layers in a running pageserver,
362 : // so it does not need an accurate TenantShardId.
363 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
364 0 :
365 0 : Ok(DeltaLayer {
366 0 : path: path.to_path_buf(),
367 0 : desc: PersistentLayerDesc::new_delta(
368 0 : tenant_shard_id,
369 0 : summary.timeline_id,
370 0 : summary.key_range,
371 0 : summary.lsn_range,
372 0 : metadata.len(),
373 0 : ),
374 0 : inner: OnceCell::new(),
375 0 : })
376 0 : }
377 :
378 : /// Path to the layer file in pageserver workdir.
379 0 : fn path(&self) -> Utf8PathBuf {
380 0 : self.path.clone()
381 0 : }
382 : }
383 :
384 : /// A builder object for constructing a new delta layer.
385 : ///
386 : /// Usage:
387 : ///
388 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
389 : ///
390 : /// 2. Write the contents by calling `put_value` for every page
391 : /// version to store in the layer.
392 : ///
393 : /// 3. Call `finish`.
394 : ///
395 : struct DeltaLayerWriterInner {
396 : pub path: Utf8PathBuf,
397 : timeline_id: TimelineId,
398 : tenant_shard_id: TenantShardId,
399 :
400 : key_start: Key,
401 : lsn_range: Range<Lsn>,
402 :
403 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
404 :
405 : blob_writer: BlobWriter<TempVirtualFile>,
406 :
407 : // Number of key-lsns in the layer.
408 : num_keys: usize,
409 : }
410 :
411 : impl DeltaLayerWriterInner {
412 : ///
413 : /// Start building a new delta layer.
414 : ///
415 : #[allow(clippy::too_many_arguments)]
416 8856 : async fn new(
417 8856 : conf: &'static PageServerConf,
418 8856 : timeline_id: TimelineId,
419 8856 : tenant_shard_id: TenantShardId,
420 8856 : key_start: Key,
421 8856 : lsn_range: Range<Lsn>,
422 8856 : gate: &utils::sync::gate::Gate,
423 8856 : cancel: CancellationToken,
424 8856 : ctx: &RequestContext,
425 8856 : ) -> anyhow::Result<Self> {
426 8856 : // Create the file initially with a temporary filename. We don't know
427 8856 : // the end key yet, so we cannot form the final filename yet. We will
428 8856 : // rename it when we're done.
429 8856 : let path =
430 8856 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
431 8856 : let file = TempVirtualFile::new(
432 8856 : VirtualFile::open_with_options_v2(
433 8856 : &path,
434 8856 : virtual_file::OpenOptions::new()
435 8856 : .create_new(true)
436 8856 : .write(true),
437 8856 : ctx,
438 8856 : )
439 8856 : .await?,
440 8856 : gate.enter()?,
441 : );
442 :
443 : // Start at PAGE_SZ, make room for the header block
444 8856 : let blob_writer = BlobWriter::new(
445 8856 : file,
446 8856 : PAGE_SZ as u64,
447 8856 : gate,
448 8856 : cancel,
449 8856 : ctx,
450 8856 : info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
451 0 : )?;
452 :
453 : // Initialize the b-tree index builder
454 8856 : let block_buf = BlockBuf::new();
455 8856 : let tree_builder = DiskBtreeBuilder::new(block_buf);
456 8856 :
457 8856 : Ok(Self {
458 8856 : path,
459 8856 : timeline_id,
460 8856 : tenant_shard_id,
461 8856 : key_start,
462 8856 : lsn_range,
463 8856 : tree: tree_builder,
464 8856 : blob_writer,
465 8856 : num_keys: 0,
466 8856 : })
467 8856 : }
468 :
469 : ///
470 : /// Append a key-value pair to the file.
471 : ///
472 : /// The values must be appended in key, lsn order.
473 : ///
474 12631908 : async fn put_value(
475 12631908 : &mut self,
476 12631908 : key: Key,
477 12631908 : lsn: Lsn,
478 12631908 : val: Value,
479 12631908 : ctx: &RequestContext,
480 12631908 : ) -> anyhow::Result<()> {
481 12631908 : let (_, res) = self
482 12631908 : .put_value_bytes(
483 12631908 : key,
484 12631908 : lsn,
485 12631908 : Value::ser(&val)?.slice_len(),
486 12631908 : val.will_init(),
487 12631908 : ctx,
488 12631908 : )
489 12631908 : .await;
490 12631908 : res
491 12631908 : }
492 :
493 38951868 : async fn put_value_bytes<Buf>(
494 38951868 : &mut self,
495 38951868 : key: Key,
496 38951868 : lsn: Lsn,
497 38951868 : val: FullSlice<Buf>,
498 38951868 : will_init: bool,
499 38951868 : ctx: &RequestContext,
500 38951868 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
501 38951868 : where
502 38951868 : Buf: IoBuf + Send,
503 38951868 : {
504 38951868 : assert!(
505 38951868 : self.lsn_range.start <= lsn,
506 0 : "lsn_start={}, lsn={}",
507 : self.lsn_range.start,
508 : lsn
509 : );
510 : // We don't want to use compression in delta layer creation
511 38951868 : let compression = ImageCompressionAlgorithm::Disabled;
512 38951868 : let (val, res) = self
513 38951868 : .blob_writer
514 38951868 : .write_blob_maybe_compressed(val, ctx, compression)
515 38951868 : .await;
516 38951868 : let off = match res {
517 38951868 : Ok((off, _)) => off,
518 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
519 : };
520 :
521 38951868 : let blob_ref = BlobRef::new(off, will_init);
522 38951868 :
523 38951868 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
524 38951868 : let res = self.tree.append(&delta_key.0, blob_ref.0);
525 38951868 :
526 38951868 : self.num_keys += 1;
527 38951868 :
528 38951868 : (val, res.map_err(|e| anyhow::anyhow!(e)))
529 38951868 : }
530 :
531 12143832 : fn size(&self) -> u64 {
532 12143832 : self.blob_writer.size() + self.tree.borrow_writer().size()
533 12143832 : }
534 :
535 : ///
536 : /// Finish writing the delta layer.
537 : ///
538 8700 : async fn finish(
539 8700 : self,
540 8700 : key_end: Key,
541 8700 : ctx: &RequestContext,
542 8700 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
543 8700 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
544 :
545 8700 : let file = self
546 8700 : .blob_writer
547 8700 : .shutdown(
548 8700 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
549 8700 : ctx,
550 8700 : )
551 8700 : .await?;
552 :
553 : // Write out the index
554 8700 : let (index_root_blk, block_buf) = self.tree.finish()?;
555 8700 : let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
556 :
557 : // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
558 : // Should we just replace BlockBuf::blocks with one big buffer
559 90956 : for buf in block_buf.blocks {
560 82256 : let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
561 82256 : res?;
562 82256 : offset += PAGE_SZ as u64;
563 : }
564 8700 : assert!(self.lsn_range.start < self.lsn_range.end);
565 : // Fill in the summary on blk 0
566 8700 : let summary = Summary {
567 8700 : magic: DELTA_FILE_MAGIC,
568 8700 : format_version: STORAGE_FORMAT_VERSION,
569 8700 : tenant_id: self.tenant_shard_id.tenant_id,
570 8700 : timeline_id: self.timeline_id,
571 8700 : key_range: self.key_start..key_end,
572 8700 : lsn_range: self.lsn_range.clone(),
573 8700 : index_start_blk,
574 8700 : index_root_blk,
575 8700 : };
576 :
577 : // Writes summary at the first block (offset 0).
578 8700 : let buf = summary.ser_into_page()?;
579 8700 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
580 8700 : res?;
581 :
582 8700 : let metadata = file
583 8700 : .metadata()
584 8700 : .await
585 8700 : .context("get file metadata to determine size")?;
586 :
587 : // 5GB limit for objects without multipart upload (which we don't want to use)
588 : // Make it a little bit below to account for differing GB units
589 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
590 8700 : ensure!(
591 8700 : metadata.len() <= S3_UPLOAD_LIMIT,
592 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
593 0 : file.path(),
594 0 : metadata.len()
595 : );
596 :
597 : // Note: Because we opened the file in write-only mode, we cannot
598 : // reuse the same VirtualFile for reading later. That's why we don't
599 : // set inner.file here. The first read will have to re-open it.
600 :
601 8700 : let desc = PersistentLayerDesc::new_delta(
602 8700 : self.tenant_shard_id,
603 8700 : self.timeline_id,
604 8700 : self.key_start..key_end,
605 8700 : self.lsn_range.clone(),
606 8700 : metadata.len(),
607 8700 : );
608 8700 :
609 8700 : // fsync the file
610 8700 : file.sync_all()
611 8700 : .await
612 8700 : .maybe_fatal_err("delta_layer sync_all")?;
613 :
614 8700 : trace!("created delta layer {}", self.path);
615 :
616 : // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
617 : // keep the gate open also, so that it's safe for them to rename the file to its final destination.
618 8700 : file.disarm_into_inner();
619 8700 :
620 8700 : Ok((desc, self.path))
621 8700 : }
622 : }
623 :
624 : /// A builder object for constructing a new delta layer.
625 : ///
626 : /// Usage:
627 : ///
628 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
629 : ///
630 : /// 2. Write the contents by calling `put_value` for every page
631 : /// version to store in the layer.
632 : ///
633 : /// 3. Call `finish`.
634 : ///
635 : /// # Note
636 : ///
637 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
638 : /// possible for the writer to drop before `finish` is actually called. So this
639 : /// could lead to odd temporary files in the directory, exhausting file system.
640 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
641 : /// implementation that cleans up the temporary file in failure. It's not
642 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
643 : /// out some fields, making it impossible to implement `Drop`.
644 : ///
645 : #[must_use]
646 : pub struct DeltaLayerWriter {
647 : inner: Option<DeltaLayerWriterInner>,
648 : }
649 :
650 : impl DeltaLayerWriter {
651 : ///
652 : /// Start building a new delta layer.
653 : ///
654 : #[allow(clippy::too_many_arguments)]
655 8856 : pub async fn new(
656 8856 : conf: &'static PageServerConf,
657 8856 : timeline_id: TimelineId,
658 8856 : tenant_shard_id: TenantShardId,
659 8856 : key_start: Key,
660 8856 : lsn_range: Range<Lsn>,
661 8856 : gate: &utils::sync::gate::Gate,
662 8856 : cancel: CancellationToken,
663 8856 : ctx: &RequestContext,
664 8856 : ) -> anyhow::Result<Self> {
665 8856 : Ok(Self {
666 8856 : inner: Some(
667 8856 : DeltaLayerWriterInner::new(
668 8856 : conf,
669 8856 : timeline_id,
670 8856 : tenant_shard_id,
671 8856 : key_start,
672 8856 : lsn_range,
673 8856 : gate,
674 8856 : cancel,
675 8856 : ctx,
676 8856 : )
677 8856 : .await?,
678 : ),
679 : })
680 8856 : }
681 :
682 0 : pub fn is_empty(&self) -> bool {
683 0 : self.inner.as_ref().unwrap().num_keys == 0
684 0 : }
685 :
686 : ///
687 : /// Append a key-value pair to the file.
688 : ///
689 : /// The values must be appended in key, lsn order.
690 : ///
691 12631908 : pub async fn put_value(
692 12631908 : &mut self,
693 12631908 : key: Key,
694 12631908 : lsn: Lsn,
695 12631908 : val: Value,
696 12631908 : ctx: &RequestContext,
697 12631908 : ) -> anyhow::Result<()> {
698 12631908 : self.inner
699 12631908 : .as_mut()
700 12631908 : .unwrap()
701 12631908 : .put_value(key, lsn, val, ctx)
702 12631908 : .await
703 12631908 : }
704 :
705 26319960 : pub async fn put_value_bytes<Buf>(
706 26319960 : &mut self,
707 26319960 : key: Key,
708 26319960 : lsn: Lsn,
709 26319960 : val: FullSlice<Buf>,
710 26319960 : will_init: bool,
711 26319960 : ctx: &RequestContext,
712 26319960 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
713 26319960 : where
714 26319960 : Buf: IoBuf + Send,
715 26319960 : {
716 26319960 : self.inner
717 26319960 : .as_mut()
718 26319960 : .unwrap()
719 26319960 : .put_value_bytes(key, lsn, val, will_init, ctx)
720 26319960 : .await
721 26319960 : }
722 :
723 12143832 : pub fn size(&self) -> u64 {
724 12143832 : self.inner.as_ref().unwrap().size()
725 12143832 : }
726 :
727 : ///
728 : /// Finish writing the delta layer.
729 : ///
730 8700 : pub(crate) async fn finish(
731 8700 : mut self,
732 8700 : key_end: Key,
733 8700 : ctx: &RequestContext,
734 8700 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
735 8700 : self.inner.take().unwrap().finish(key_end, ctx).await
736 8700 : }
737 :
738 73524 : pub(crate) fn num_keys(&self) -> usize {
739 73524 : self.inner.as_ref().unwrap().num_keys
740 73524 : }
741 :
742 90840 : pub(crate) fn estimated_size(&self) -> u64 {
743 90840 : let inner = self.inner.as_ref().unwrap();
744 90840 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
745 90840 : }
746 : }
747 :
748 : #[derive(thiserror::Error, Debug)]
749 : pub enum RewriteSummaryError {
750 : #[error("magic mismatch")]
751 : MagicMismatch,
752 : #[error(transparent)]
753 : Other(#[from] anyhow::Error),
754 : }
755 :
756 : impl From<std::io::Error> for RewriteSummaryError {
757 0 : fn from(e: std::io::Error) -> Self {
758 0 : Self::Other(anyhow::anyhow!(e))
759 0 : }
760 : }
761 :
762 : impl DeltaLayer {
763 0 : pub async fn rewrite_summary<F>(
764 0 : path: &Utf8Path,
765 0 : rewrite: F,
766 0 : ctx: &RequestContext,
767 0 : ) -> Result<(), RewriteSummaryError>
768 0 : where
769 0 : F: Fn(Summary) -> Summary,
770 0 : {
771 0 : let file = VirtualFile::open_with_options_v2(
772 0 : path,
773 0 : virtual_file::OpenOptions::new().read(true).write(true),
774 0 : ctx,
775 0 : )
776 0 : .await
777 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
778 0 : let file_id = page_cache::next_file_id();
779 0 : let block_reader = FileBlockReader::new(&file, file_id);
780 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
781 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
782 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
783 0 : return Err(RewriteSummaryError::MagicMismatch);
784 0 : }
785 0 :
786 0 : let new_summary = rewrite(actual_summary);
787 :
788 0 : let buf = new_summary.ser_into_page().context("serialize")?;
789 0 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
790 0 : res?;
791 0 : Ok(())
792 0 : }
793 : }
794 :
795 : impl DeltaLayerInner {
796 6240 : pub(crate) fn key_range(&self) -> &Range<Key> {
797 6240 : &self.layer_key_range
798 6240 : }
799 :
800 6240 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
801 6240 : &self.layer_lsn_range
802 6240 : }
803 :
804 6612 : pub(super) async fn load(
805 6612 : path: &Utf8Path,
806 6612 : summary: Option<Summary>,
807 6612 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
808 6612 : ctx: &RequestContext,
809 6612 : ) -> anyhow::Result<Self> {
810 6612 : let file = Arc::new(
811 6612 : VirtualFile::open_v2(path, ctx)
812 6612 : .await
813 6612 : .context("open layer file")?,
814 : );
815 :
816 6612 : let file_id = page_cache::next_file_id();
817 6612 :
818 6612 : let block_reader = FileBlockReader::new(&file, file_id);
819 :
820 6612 : let summary_blk = block_reader
821 6612 : .read_blk(0, ctx)
822 6612 : .await
823 6612 : .context("read first block")?;
824 :
825 : // TODO: this should be an assertion instead; see ImageLayerInner::load
826 6612 : let actual_summary =
827 6612 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
828 :
829 6612 : if let Some(mut expected_summary) = summary {
830 : // production code path
831 6612 : expected_summary.index_start_blk = actual_summary.index_start_blk;
832 6612 : expected_summary.index_root_blk = actual_summary.index_root_blk;
833 6612 : // mask out the timeline_id, but still require the layers to be from the same tenant
834 6612 : expected_summary.timeline_id = actual_summary.timeline_id;
835 6612 :
836 6612 : if actual_summary != expected_summary {
837 0 : bail!(
838 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
839 0 : actual_summary,
840 0 : expected_summary
841 0 : );
842 6612 : }
843 0 : }
844 :
845 6612 : Ok(DeltaLayerInner {
846 6612 : file,
847 6612 : file_id,
848 6612 : index_start_blk: actual_summary.index_start_blk,
849 6612 : index_root_blk: actual_summary.index_root_blk,
850 6612 : max_vectored_read_bytes,
851 6612 : layer_key_range: actual_summary.key_range,
852 6612 : layer_lsn_range: actual_summary.lsn_range,
853 6612 : })
854 6612 : }
855 :
856 : // Look up the keys in the provided keyspace and update
857 : // the reconstruct state with whatever is found.
858 : //
859 : // Currently, the index is visited for each range, but this
860 : // can be further optimised to visit the index only once.
861 1414507 : pub(super) async fn get_values_reconstruct_data(
862 1414507 : &self,
863 1414507 : this: ResidentLayer,
864 1414507 : keyspace: KeySpace,
865 1414507 : lsn_range: Range<Lsn>,
866 1414507 : reconstruct_state: &mut ValuesReconstructState,
867 1414507 : ctx: &RequestContext,
868 1414507 : ) -> Result<(), GetVectoredError> {
869 1414507 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
870 1414507 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
871 1414507 : self.index_start_blk,
872 1414507 : self.index_root_blk,
873 1414507 : block_reader,
874 1414507 : );
875 1414507 :
876 1414507 : let planner = VectoredReadPlanner::new(
877 1414507 : self.max_vectored_read_bytes
878 1414507 : .expect("Layer is loaded with max vectored bytes config")
879 1414507 : .0
880 1414507 : .into(),
881 1414507 : );
882 1414507 :
883 1414507 : let data_end_offset = self.index_start_offset();
884 :
885 1414507 : let reads = Self::plan_reads(
886 1414507 : &keyspace,
887 1414507 : lsn_range.clone(),
888 1414507 : data_end_offset,
889 1414507 : index_reader,
890 1414507 : planner,
891 1414507 : ctx,
892 1414507 : )
893 1414507 : .await
894 1414507 : .map_err(GetVectoredError::Other)?;
895 :
896 1414507 : self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
897 1414507 : .await;
898 :
899 1414507 : Ok(())
900 1414507 : }
901 :
902 1415719 : async fn plan_reads<Reader>(
903 1415719 : keyspace: &KeySpace,
904 1415719 : lsn_range: Range<Lsn>,
905 1415719 : data_end_offset: u64,
906 1415719 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
907 1415719 : mut planner: VectoredReadPlanner,
908 1415719 : ctx: &RequestContext,
909 1415719 : ) -> anyhow::Result<Vec<VectoredRead>>
910 1415719 : where
911 1415719 : Reader: BlockReader + Clone,
912 1415719 : {
913 1415719 : let ctx = RequestContextBuilder::from(ctx)
914 1415719 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
915 1415719 : .attached_child();
916 :
917 1446439 : for range in keyspace.ranges.iter() {
918 1446439 : let mut range_end_handled = false;
919 1446439 :
920 1446439 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
921 1446439 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
922 1446439 : let mut index_stream = std::pin::pin!(index_stream);
923 :
924 21503671 : while let Some(index_entry) = index_stream.next().await {
925 21451433 : let (raw_key, value) = index_entry?;
926 21451433 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
927 21451433 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
928 21451433 : let blob_ref = BlobRef(value);
929 21451433 :
930 21451433 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
931 21451433 : assert!(key >= range.start);
932 :
933 21451433 : let outside_lsn_range = !lsn_range.contains(&lsn);
934 :
935 21451433 : let flag = {
936 21451433 : if outside_lsn_range {
937 5220726 : BlobFlag::Ignore
938 16230707 : } else if blob_ref.will_init() {
939 3286991 : BlobFlag::ReplaceAll
940 : } else {
941 : // Usual path: add blob to the read
942 12943716 : BlobFlag::None
943 : }
944 : };
945 :
946 21451433 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
947 1394201 : planner.handle_range_end(blob_ref.pos());
948 1394201 : range_end_handled = true;
949 1394201 : break;
950 20057232 : } else {
951 20057232 : planner.handle(key, lsn, blob_ref.pos(), flag);
952 20057232 : }
953 : }
954 :
955 1446439 : if !range_end_handled {
956 52238 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
957 52238 : planner.handle_range_end(data_end_offset);
958 1394201 : }
959 : }
960 :
961 1415719 : Ok(planner.finish())
962 1415719 : }
963 :
964 1415707 : fn get_min_read_buffer_size(
965 1415707 : planned_reads: &[VectoredRead],
966 1415707 : read_size_soft_max: usize,
967 1415707 : ) -> usize {
968 1415707 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
969 518678 : return read_size_soft_max;
970 : };
971 :
972 897029 : let largest_read_size = largest_read.size();
973 897029 : if largest_read_size > read_size_soft_max {
974 : // If the read is oversized, it should only contain one key.
975 1200 : let offenders = largest_read
976 1200 : .blobs_at
977 1200 : .as_slice()
978 1200 : .iter()
979 1200 : .filter_map(|(_, blob_meta)| {
980 1200 : if blob_meta.key.is_rel_dir_key()
981 1200 : || blob_meta.key == DBDIR_KEY
982 1200 : || blob_meta.key.is_aux_file_key()
983 : {
984 : // The size of values for these keys is unbounded and can
985 : // grow very large in pathological cases.
986 0 : None
987 : } else {
988 1200 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
989 : }
990 1200 : })
991 1200 : .join(", ");
992 1200 :
993 1200 : if !offenders.is_empty() {
994 1200 : tracing::warn!(
995 0 : "Oversized vectored read ({} > {}) for keys {}",
996 : largest_read_size,
997 : read_size_soft_max,
998 : offenders
999 : );
1000 0 : }
1001 895829 : }
1002 :
1003 897029 : largest_read_size
1004 1415707 : }
1005 :
1006 1414507 : async fn do_reads_and_update_state(
1007 1414507 : &self,
1008 1414507 : this: ResidentLayer,
1009 1414507 : reads: Vec<VectoredRead>,
1010 1414507 : reconstruct_state: &mut ValuesReconstructState,
1011 1414507 : ctx: &RequestContext,
1012 1414507 : ) {
1013 1414507 : let max_vectored_read_bytes = self
1014 1414507 : .max_vectored_read_bytes
1015 1414507 : .expect("Layer is loaded with max vectored bytes config")
1016 1414507 : .0
1017 1414507 : .into();
1018 1414507 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1019 :
1020 : // Note that reads are processed in reverse order (from highest key+lsn).
1021 : // This is the order that `ReconstructState` requires such that it can
1022 : // track when a key is done.
1023 1414507 : for read in reads.into_iter().rev() {
1024 1021341 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
1025 9441870 : for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
1026 9441870 : let io = reconstruct_state.update_key(
1027 9441870 : &blob_meta.key,
1028 9441870 : blob_meta.lsn,
1029 9441870 : blob_meta.will_init,
1030 9441870 : );
1031 9441870 : ios.insert((blob_meta.key, blob_meta.lsn), io);
1032 9441870 : }
1033 :
1034 1021341 : let read_extend_residency = this.clone();
1035 1021341 : let read_from = self.file.clone();
1036 1021341 : let read_ctx = ctx.attached_child();
1037 1021341 : reconstruct_state
1038 1021341 : .spawn_io(async move {
1039 1021341 : let vectored_blob_reader = VectoredBlobReader::new(&read_from);
1040 1021341 : let buf = IoBufferMut::with_capacity(buf_size);
1041 :
1042 1021341 : let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
1043 1021341 : match res {
1044 1021341 : Ok(blobs_buf) => {
1045 1021341 : let view = BufView::new_slice(&blobs_buf.buf);
1046 9441870 : for meta in blobs_buf.blobs.iter().rev() {
1047 9441870 : let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
1048 :
1049 9441870 : let blob_read = meta.read(&view).await;
1050 9441870 : let blob_read = match blob_read {
1051 9441870 : Ok(buf) => buf,
1052 0 : Err(e) => {
1053 0 : io.complete(Err(e));
1054 0 : continue;
1055 : }
1056 : };
1057 :
1058 9441870 : io.complete(Ok(OnDiskValue::WalRecordOrImage(
1059 9441870 : blob_read.into_bytes(),
1060 9441870 : )));
1061 : }
1062 :
1063 1021341 : assert!(ios.is_empty());
1064 : }
1065 0 : Err(err) => {
1066 0 : for (_, sender) in ios {
1067 0 : sender.complete(Err(std::io::Error::new(
1068 0 : err.kind(),
1069 0 : "vec read failed",
1070 0 : )));
1071 0 : }
1072 : }
1073 : }
1074 :
1075 : // keep layer resident until this IO is done; this spawned IO future generally outlives the
1076 : // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
1077 1021341 : drop(read_extend_residency);
1078 1021341 : })
1079 1021341 : .await;
1080 : }
1081 1414507 : }
1082 :
1083 2436 : pub(crate) async fn index_entries<'a>(
1084 2436 : &'a self,
1085 2436 : ctx: &RequestContext,
1086 2436 : ) -> Result<Vec<DeltaEntry<'a>>> {
1087 2436 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1088 2436 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1089 2436 : self.index_start_blk,
1090 2436 : self.index_root_blk,
1091 2436 : block_reader,
1092 2436 : );
1093 2436 :
1094 2436 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1095 2436 :
1096 2436 : tree_reader
1097 2436 : .visit(
1098 2436 : &[0u8; DELTA_KEY_SIZE],
1099 2436 : VisitDirection::Forwards,
1100 12384276 : |key, value| {
1101 12384276 : let delta_key = DeltaKey::from_slice(key);
1102 12384276 : let val_ref = ValueRef {
1103 12384276 : blob_ref: BlobRef(value),
1104 12384276 : layer: self,
1105 12384276 : };
1106 12384276 : let pos = BlobRef(value).pos();
1107 12384276 : if let Some(last) = all_keys.last_mut() {
1108 12381840 : // subtract offset of the current and last entries to get the size
1109 12381840 : // of the value associated with this (key, lsn) tuple
1110 12381840 : let first_pos = last.size;
1111 12381840 : last.size = pos - first_pos;
1112 12381840 : }
1113 12384276 : let entry = DeltaEntry {
1114 12384276 : key: delta_key.key(),
1115 12384276 : lsn: delta_key.lsn(),
1116 12384276 : size: pos,
1117 12384276 : val: val_ref,
1118 12384276 : };
1119 12384276 : all_keys.push(entry);
1120 12384276 : true
1121 12384276 : },
1122 2436 : &RequestContextBuilder::from(ctx)
1123 2436 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1124 2436 : .attached_child(),
1125 2436 : )
1126 2436 : .await?;
1127 2436 : if let Some(last) = all_keys.last_mut() {
1128 2436 : // Last key occupies all space till end of value storage,
1129 2436 : // which corresponds to beginning of the index
1130 2436 : last.size = self.index_start_offset() - last.size;
1131 2436 : }
1132 2436 : Ok(all_keys)
1133 2436 : }
1134 :
1135 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1136 : ///
1137 : /// Return the amount of key value records pushed to the writer.
1138 60 : pub(super) async fn copy_prefix(
1139 60 : &self,
1140 60 : writer: &mut DeltaLayerWriter,
1141 60 : until: Lsn,
1142 60 : ctx: &RequestContext,
1143 60 : ) -> anyhow::Result<usize> {
1144 : use futures::stream::TryStreamExt;
1145 :
1146 : use crate::tenant::vectored_blob_io::{
1147 : BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
1148 : };
1149 :
1150 : #[derive(Debug)]
1151 : enum Item {
1152 : Actual(Key, Lsn, BlobRef),
1153 : Sentinel,
1154 : }
1155 :
1156 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1157 420 : fn from(value: Item) -> Self {
1158 420 : match value {
1159 360 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1160 60 : Item::Sentinel => None,
1161 : }
1162 420 : }
1163 : }
1164 :
1165 : impl Item {
1166 420 : fn offset(&self) -> Option<BlobRef> {
1167 420 : match self {
1168 360 : Item::Actual(_, _, blob) => Some(*blob),
1169 60 : Item::Sentinel => None,
1170 : }
1171 420 : }
1172 :
1173 420 : fn is_last(&self) -> bool {
1174 420 : matches!(self, Item::Sentinel)
1175 420 : }
1176 : }
1177 :
1178 60 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1179 60 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1180 60 : self.index_start_blk,
1181 60 : self.index_root_blk,
1182 60 : block_reader,
1183 60 : );
1184 60 :
1185 60 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1186 360 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1187 60 : // put in a sentinel value for getting the end offset for last item, and not having to
1188 60 : // repeat the whole read part
1189 60 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1190 60 : Item::Sentinel,
1191 60 : ))));
1192 60 : let mut stream = std::pin::pin!(stream);
1193 60 :
1194 60 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1195 60 :
1196 60 : let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
1197 60 :
1198 60 : let max_read_size = self
1199 60 : .max_vectored_read_bytes
1200 60 : .map(|x| x.0.get())
1201 60 : .unwrap_or(8192);
1202 60 :
1203 60 : let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
1204 60 :
1205 60 : // FIXME: buffering of DeltaLayerWriter
1206 60 : let mut per_blob_copy = Vec::new();
1207 60 :
1208 60 : let mut records = 0;
1209 :
1210 480 : while let Some(item) = stream.try_next().await? {
1211 420 : tracing::debug!(?item, "popped");
1212 420 : let offset = item
1213 420 : .offset()
1214 420 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1215 :
1216 420 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1217 360 : let end_offset = offset;
1218 360 :
1219 360 : Some((
1220 360 : BlobMeta {
1221 360 : key,
1222 360 : lsn,
1223 360 : will_init: false,
1224 360 : },
1225 360 : start_offset..end_offset,
1226 360 : ))
1227 : } else {
1228 60 : None
1229 : };
1230 :
1231 420 : let is_last = item.is_last();
1232 420 :
1233 420 : prev = Option::from(item);
1234 420 :
1235 420 : let actionable = actionable.filter(|x| x.0.lsn < until);
1236 :
1237 420 : let builder = if let Some((meta, offsets)) = actionable {
1238 : // extend or create a new builder
1239 192 : if read_builder
1240 192 : .as_mut()
1241 192 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1242 192 : .unwrap_or(VectoredReadExtended::No)
1243 192 : == VectoredReadExtended::Yes
1244 : {
1245 96 : None
1246 : } else {
1247 96 : read_builder.replace(ChunkedVectoredReadBuilder::new(
1248 96 : offsets.start.pos(),
1249 96 : offsets.end.pos(),
1250 96 : meta,
1251 96 : max_read_size,
1252 96 : ))
1253 : }
1254 : } else {
1255 : // nothing to do, except perhaps flush any existing for the last element
1256 228 : None
1257 : };
1258 :
1259 : // flush the possible older builder and also the new one if the item was the last one
1260 420 : let builders = builder.into_iter();
1261 420 : let builders = if is_last {
1262 60 : builders.chain(read_builder.take())
1263 : } else {
1264 360 : builders.chain(None)
1265 : };
1266 :
1267 516 : for builder in builders {
1268 96 : let read = builder.build();
1269 96 :
1270 96 : let reader = VectoredBlobReader::new(&self.file);
1271 96 :
1272 96 : let mut buf = buffer.take().unwrap();
1273 96 :
1274 96 : buf.clear();
1275 96 : buf.reserve(read.size());
1276 96 : let res = reader.read_blobs(&read, buf, ctx).await?;
1277 :
1278 96 : let view = BufView::new_slice(&res.buf);
1279 :
1280 288 : for blob in res.blobs {
1281 192 : let key = blob.meta.key;
1282 192 : let lsn = blob.meta.lsn;
1283 :
1284 192 : let data = blob.read(&view).await?;
1285 :
1286 : #[cfg(debug_assertions)]
1287 192 : Value::des(&data)
1288 192 : .with_context(|| {
1289 0 : format!(
1290 0 : "blob failed to deserialize for {}: {:?}",
1291 0 : blob,
1292 0 : utils::Hex(&data)
1293 0 : )
1294 192 : })
1295 192 : .unwrap();
1296 192 :
1297 192 : // is it an image or will_init walrecord?
1298 192 : // FIXME: this could be handled by threading the BlobRef to the
1299 192 : // VectoredReadBuilder
1300 192 : let will_init = pageserver_api::value::ValueBytes::will_init(&data)
1301 192 : .inspect_err(|_e| {
1302 0 : #[cfg(feature = "testing")]
1303 0 : tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1304 192 : })
1305 192 : .unwrap_or(false);
1306 192 :
1307 192 : per_blob_copy.clear();
1308 192 : per_blob_copy.extend_from_slice(&data);
1309 :
1310 192 : let (tmp, res) = writer
1311 192 : .put_value_bytes(
1312 192 : key,
1313 192 : lsn,
1314 192 : std::mem::take(&mut per_blob_copy).slice_len(),
1315 192 : will_init,
1316 192 : ctx,
1317 192 : )
1318 192 : .await;
1319 192 : per_blob_copy = tmp.into_raw_slice().into_inner();
1320 192 :
1321 192 : res?;
1322 :
1323 192 : records += 1;
1324 : }
1325 :
1326 96 : buffer = Some(res.buf);
1327 : }
1328 : }
1329 :
1330 60 : assert!(
1331 60 : read_builder.is_none(),
1332 0 : "with the sentinel above loop should had handled all"
1333 : );
1334 :
1335 60 : Ok(records)
1336 60 : }
1337 :
1338 24 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1339 24 : println!(
1340 24 : "index_start_blk: {}, root {}",
1341 24 : self.index_start_blk, self.index_root_blk
1342 24 : );
1343 24 :
1344 24 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1345 24 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1346 24 : self.index_start_blk,
1347 24 : self.index_root_blk,
1348 24 : block_reader,
1349 24 : );
1350 24 :
1351 24 : tree_reader.dump(ctx).await?;
1352 :
1353 24 : let keys = self.index_entries(ctx).await?;
1354 :
1355 48 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1356 48 : let buf = val.load_raw(ctx).await?;
1357 48 : let val = Value::des(&buf)?;
1358 48 : let desc = match val {
1359 48 : Value::Image(img) => {
1360 48 : format!(" img {} bytes", img.len())
1361 : }
1362 0 : Value::WalRecord(rec) => {
1363 0 : let wal_desc = pageserver_api::record::describe_wal_record(&rec)?;
1364 0 : format!(
1365 0 : " rec {} bytes will_init: {} {}",
1366 0 : buf.len(),
1367 0 : rec.will_init(),
1368 0 : wal_desc
1369 0 : )
1370 : }
1371 : };
1372 48 : Ok(desc)
1373 48 : }
1374 :
1375 72 : for entry in keys {
1376 48 : let DeltaEntry { key, lsn, val, .. } = entry;
1377 48 : let desc = match dump_blob(&val, ctx).await {
1378 48 : Ok(desc) => desc,
1379 0 : Err(err) => {
1380 0 : format!("ERROR: {err}")
1381 : }
1382 : };
1383 48 : println!(" key {key} at {lsn}: {desc}");
1384 :
1385 : // Print more details about CHECKPOINT records. Would be nice to print details
1386 : // of many other record types too, but these are particularly interesting, as
1387 : // have a lot of special processing for them in walingest.rs.
1388 24 : use pageserver_api::key::CHECKPOINT_KEY;
1389 24 : use postgres_ffi::CheckPoint;
1390 48 : if key == CHECKPOINT_KEY {
1391 0 : let val = val.load(ctx).await?;
1392 0 : match val {
1393 0 : Value::Image(img) => {
1394 0 : let checkpoint = CheckPoint::decode(&img)?;
1395 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1396 : }
1397 0 : Value::WalRecord(_rec) => {
1398 0 : println!(" unexpected walrecord value for checkpoint key");
1399 0 : }
1400 : }
1401 48 : }
1402 : }
1403 :
1404 24 : Ok(())
1405 24 : }
1406 :
1407 180 : fn stream_index_forwards<'a, R>(
1408 180 : &'a self,
1409 180 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1410 180 : start: &'a [u8; DELTA_KEY_SIZE],
1411 180 : ctx: &'a RequestContext,
1412 180 : ) -> impl futures::stream::Stream<
1413 180 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1414 180 : > + 'a
1415 180 : where
1416 180 : R: BlockReader + 'a,
1417 180 : {
1418 : use futures::stream::TryStreamExt;
1419 180 : let stream = reader.into_stream(start, ctx);
1420 912 : stream.map_ok(|(key, value)| {
1421 912 : let key = DeltaKey::from_slice(&key);
1422 912 : let (key, lsn) = (key.key(), key.lsn());
1423 912 : let offset = BlobRef(value);
1424 912 :
1425 912 : (key, lsn, offset)
1426 912 : })
1427 180 : }
1428 :
1429 : /// The file offset to the first block of index.
1430 : ///
1431 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1432 1420651 : fn index_start_offset(&self) -> u64 {
1433 1420651 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1434 1420651 : let bref = BlobRef(offset);
1435 1420651 : tracing::debug!(
1436 : index_start_blk = self.index_start_blk,
1437 : offset,
1438 0 : pos = bref.pos(),
1439 0 : "index_start_offset"
1440 : );
1441 1420651 : offset
1442 1420651 : }
1443 :
1444 336 : pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1445 336 : self.iter_with_options(
1446 336 : ctx,
1447 336 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1448 336 : 1024, // The default value. Unit tests might use a different value
1449 336 : )
1450 336 : }
1451 :
1452 3456 : pub fn iter_with_options<'a>(
1453 3456 : &'a self,
1454 3456 : ctx: &'a RequestContext,
1455 3456 : max_read_size: u64,
1456 3456 : max_batch_size: usize,
1457 3456 : ) -> DeltaLayerIterator<'a> {
1458 3456 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1459 3456 : let tree_reader =
1460 3456 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1461 3456 : DeltaLayerIterator {
1462 3456 : delta_layer: self,
1463 3456 : ctx,
1464 3456 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1465 3456 : key_values_batch: std::collections::VecDeque::new(),
1466 3456 : is_end: false,
1467 3456 : planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
1468 3456 : }
1469 3456 : }
1470 :
1471 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
1472 : //
1473 : // We're reusing the index traversal logical in plan_reads; would be nice to
1474 : // factor that out.
1475 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
1476 0 : self.index_entries(ctx)
1477 0 : .await
1478 0 : .map(|entries| entries.into_iter().map(|entry| entry.key).collect())
1479 0 : }
1480 : }
1481 :
1482 : /// A set of data associated with a delta layer key and its value
1483 : pub struct DeltaEntry<'a> {
1484 : pub key: Key,
1485 : pub lsn: Lsn,
1486 : /// Size of the stored value
1487 : pub size: u64,
1488 : /// Reference to the on-disk value
1489 : pub val: ValueRef<'a>,
1490 : }
1491 :
1492 : /// Reference to an on-disk value
1493 : pub struct ValueRef<'a> {
1494 : blob_ref: BlobRef,
1495 : layer: &'a DeltaLayerInner,
1496 : }
1497 :
1498 : impl ValueRef<'_> {
1499 : /// Loads the value from disk
1500 0 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1501 0 : let buf = self.load_raw(ctx).await?;
1502 0 : let val = Value::des(&buf)?;
1503 0 : Ok(val)
1504 0 : }
1505 :
1506 48 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1507 48 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1508 48 : self.layer,
1509 48 : )));
1510 48 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1511 48 : Ok(buf)
1512 48 : }
1513 : }
1514 :
1515 : pub(crate) struct Adapter<T>(T);
1516 :
1517 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1518 48 : pub(crate) async fn read_blk(
1519 48 : &self,
1520 48 : blknum: u32,
1521 48 : ctx: &RequestContext,
1522 48 : ) -> Result<BlockLease, std::io::Error> {
1523 48 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1524 48 : block_reader.read_blk(blknum, ctx).await
1525 48 : }
1526 : }
1527 :
1528 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1529 96 : fn as_ref(&self) -> &DeltaLayerInner {
1530 96 : self
1531 96 : }
1532 : }
1533 :
1534 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1535 0 : fn key(&self) -> Key {
1536 0 : self.key
1537 0 : }
1538 0 : fn lsn(&self) -> Lsn {
1539 0 : self.lsn
1540 0 : }
1541 0 : fn size(&self) -> u64 {
1542 0 : self.size
1543 0 : }
1544 : }
1545 :
1546 : pub struct DeltaLayerIterator<'a> {
1547 : delta_layer: &'a DeltaLayerInner,
1548 : ctx: &'a RequestContext,
1549 : planner: StreamingVectoredReadPlanner,
1550 : index_iter: DiskBtreeIterator<'a>,
1551 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1552 : is_end: bool,
1553 : }
1554 :
1555 : impl DeltaLayerIterator<'_> {
1556 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1557 0 : self.delta_layer.layer_dbg_info()
1558 0 : }
1559 :
1560 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1561 128436 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1562 128436 : assert!(self.key_values_batch.is_empty());
1563 128436 : assert!(!self.is_end);
1564 :
1565 128436 : let plan = loop {
1566 12597780 : if let Some(res) = self.index_iter.next().await {
1567 12594492 : let (raw_key, value) = res?;
1568 12594492 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1569 12594492 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1570 12594492 : let blob_ref = BlobRef(value);
1571 12594492 : let offset = blob_ref.pos();
1572 125148 : if let Some(batch_plan) =
1573 12594492 : self.planner.handle(key, lsn, offset, blob_ref.will_init())
1574 : {
1575 125148 : break batch_plan;
1576 12469344 : }
1577 : } else {
1578 3288 : self.is_end = true;
1579 3288 : let data_end_offset = self.delta_layer.index_start_offset();
1580 3288 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1581 3288 : break item;
1582 : } else {
1583 0 : return Ok(()); // TODO: test empty iterator
1584 : }
1585 : }
1586 : };
1587 128436 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1588 128436 : let mut next_batch = std::collections::VecDeque::new();
1589 128436 : let buf_size = plan.size();
1590 128436 : let buf = IoBufferMut::with_capacity(buf_size);
1591 128436 : let blobs_buf = vectored_blob_reader
1592 128436 : .read_blobs(&plan, buf, self.ctx)
1593 128436 : .await?;
1594 128436 : let view = BufView::new_slice(&blobs_buf.buf);
1595 12594324 : for meta in blobs_buf.blobs.iter() {
1596 12594324 : let blob_read = meta.read(&view).await?;
1597 12594324 : let value = Value::des(&blob_read)?;
1598 :
1599 12594324 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1600 : }
1601 128436 : self.key_values_batch = next_batch;
1602 128436 : Ok(())
1603 128436 : }
1604 :
1605 12599052 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1606 12599052 : if self.key_values_batch.is_empty() {
1607 134304 : if self.is_end {
1608 6372 : return Ok(None);
1609 127932 : }
1610 127932 : self.next_batch().await?;
1611 12464748 : }
1612 12592680 : Ok(Some(
1613 12592680 : self.key_values_batch
1614 12592680 : .pop_front()
1615 12592680 : .expect("should not be empty"),
1616 12592680 : ))
1617 12599052 : }
1618 : }
1619 :
1620 : #[cfg(test)]
1621 : pub(crate) mod test {
1622 : use std::collections::BTreeMap;
1623 :
1624 : use bytes::Bytes;
1625 : use itertools::MinMaxResult;
1626 : use pageserver_api::value::Value;
1627 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1628 : use rand::{Rng, RngCore};
1629 :
1630 : use super::*;
1631 : use crate::DEFAULT_PG_VERSION;
1632 : use crate::context::DownloadBehavior;
1633 : use crate::task_mgr::TaskKind;
1634 : use crate::tenant::disk_btree::tests::TestDisk;
1635 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1636 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1637 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1638 : use crate::tenant::{TenantShard, Timeline};
1639 :
1640 : /// Construct an index for a fictional delta layer and and then
1641 : /// traverse in order to plan vectored reads for a query. Finally,
1642 : /// verify that the traversal fed the right index key and value
1643 : /// pairs into the planner.
1644 : #[tokio::test]
1645 12 : async fn test_delta_layer_index_traversal() {
1646 12 : let base_key = Key {
1647 12 : field1: 0,
1648 12 : field2: 1663,
1649 12 : field3: 12972,
1650 12 : field4: 16396,
1651 12 : field5: 0,
1652 12 : field6: 246080,
1653 12 : };
1654 12 :
1655 12 : // Populate the index with some entries
1656 12 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1657 12 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1658 12 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1659 12 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1660 12 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1661 12 : ]);
1662 12 :
1663 12 : let mut disk = TestDisk::default();
1664 12 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1665 12 :
1666 12 : let mut disk_offset = 0;
1667 60 : for (key, lsns) in &entries {
1668 252 : for lsn in lsns {
1669 204 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1670 204 : let blob_ref = BlobRef::new(disk_offset, false);
1671 204 : writer
1672 204 : .append(&index_key.0, blob_ref.0)
1673 204 : .expect("In memory disk append should never fail");
1674 204 :
1675 204 : disk_offset += 1;
1676 204 : }
1677 12 : }
1678 12 :
1679 12 : // Prepare all the arguments for the call into `plan_reads` below
1680 12 : let (root_offset, _writer) = writer
1681 12 : .finish()
1682 12 : .expect("In memory disk finish should never fail");
1683 12 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1684 12 : let planner = VectoredReadPlanner::new(100);
1685 12 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1686 12 :
1687 12 : let keyspace = KeySpace {
1688 12 : ranges: vec![
1689 12 : base_key..base_key.add(3),
1690 12 : base_key.add(3)..base_key.add(100),
1691 12 : ],
1692 12 : };
1693 12 : let lsn_range = Lsn(2)..Lsn(40);
1694 12 :
1695 12 : // Plan and validate
1696 12 : let vectored_reads = DeltaLayerInner::plan_reads(
1697 12 : &keyspace,
1698 12 : lsn_range.clone(),
1699 12 : disk_offset,
1700 12 : reader,
1701 12 : planner,
1702 12 : &ctx,
1703 12 : )
1704 12 : .await
1705 12 : .expect("Read planning should not fail");
1706 12 :
1707 12 : validate(keyspace, lsn_range, vectored_reads, entries);
1708 12 : }
1709 :
1710 12 : fn validate(
1711 12 : keyspace: KeySpace,
1712 12 : lsn_range: Range<Lsn>,
1713 12 : vectored_reads: Vec<VectoredRead>,
1714 12 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1715 12 : ) {
1716 : #[derive(Debug, PartialEq, Eq)]
1717 : struct BlobSpec {
1718 : key: Key,
1719 : lsn: Lsn,
1720 : at: u64,
1721 : }
1722 :
1723 12 : let mut planned_blobs = Vec::new();
1724 180 : for read in vectored_reads {
1725 168 : for (at, meta) in read.blobs_at.as_slice() {
1726 168 : planned_blobs.push(BlobSpec {
1727 168 : key: meta.key,
1728 168 : lsn: meta.lsn,
1729 168 : at: *at,
1730 168 : });
1731 168 : }
1732 : }
1733 :
1734 12 : let mut expected_blobs = Vec::new();
1735 12 : let mut disk_offset = 0;
1736 60 : for (key, lsns) in index_entries {
1737 252 : for lsn in lsns {
1738 252 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1739 204 : let lsn_included = lsn_range.contains(&lsn);
1740 204 :
1741 204 : if key_included && lsn_included {
1742 168 : expected_blobs.push(BlobSpec {
1743 168 : key,
1744 168 : lsn,
1745 168 : at: disk_offset,
1746 168 : });
1747 168 : }
1748 :
1749 204 : disk_offset += 1;
1750 : }
1751 : }
1752 :
1753 12 : assert_eq!(planned_blobs, expected_blobs);
1754 12 : }
1755 :
1756 : mod constants {
1757 : use utils::lsn::Lsn;
1758 :
1759 : /// Offset used by all lsns in this test
1760 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1761 : /// Number of unique keys including in the test data
1762 : pub(super) const KEY_COUNT: u8 = 60;
1763 : /// Max number of different lsns for each key
1764 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1765 : /// Possible value sizes for each key along with a probability weight
1766 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1767 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1768 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1769 : /// The minimum size of a key range in all the generated reads
1770 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1771 : /// The number of ranges included in each vectored read
1772 : pub(super) const RANGES_COUNT: u8 = 2;
1773 : /// The number of vectored reads performed
1774 : pub(super) const READS_COUNT: u8 = 100;
1775 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1776 : /// with values larger than the limit
1777 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1778 : }
1779 :
1780 : struct Entry {
1781 : key: Key,
1782 : lsn: Lsn,
1783 : value: Vec<u8>,
1784 : }
1785 :
1786 12 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1787 12 : let mut current_key = Key::MIN;
1788 12 :
1789 12 : let mut entries = Vec::new();
1790 732 : for _ in 0..constants::KEY_COUNT {
1791 720 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1792 720 : let mut lsns_iter =
1793 13560 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1794 13560 : Some(Lsn(lsn.0 + 0x08))
1795 13560 : });
1796 720 : let mut lsns = Vec::new();
1797 14280 : while lsns.len() < count as usize {
1798 13560 : let take = rng.gen_bool(0.5);
1799 13560 : let lsn = lsns_iter.next().unwrap();
1800 13560 : if take {
1801 6672 : lsns.push(lsn);
1802 6888 : }
1803 : }
1804 :
1805 7392 : for lsn in lsns {
1806 6672 : let size = constants::VALUE_SIZES
1807 20016 : .choose_weighted(rng, |item| item.1)
1808 6672 : .unwrap()
1809 6672 : .0;
1810 6672 : let mut buf = vec![0; size];
1811 6672 : rng.fill_bytes(&mut buf);
1812 6672 :
1813 6672 : entries.push(Entry {
1814 6672 : key: current_key,
1815 6672 : lsn,
1816 6672 : value: buf,
1817 6672 : })
1818 : }
1819 :
1820 720 : let gap = constants::KEY_GAP_CHANGES
1821 1440 : .choose_weighted(rng, |item| item.1)
1822 720 : .unwrap()
1823 720 : .0;
1824 720 : if gap {
1825 228 : current_key = current_key.add(2);
1826 492 : } else {
1827 492 : current_key = current_key.add(1);
1828 492 : }
1829 : }
1830 :
1831 12 : entries
1832 12 : }
1833 :
1834 : struct EntriesMeta {
1835 : key_range: Range<Key>,
1836 : lsn_range: Range<Lsn>,
1837 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1838 : }
1839 :
1840 12 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1841 6672 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1842 12 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1843 0 : _ => panic!("More than one entry is always expected"),
1844 : };
1845 :
1846 6672 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1847 12 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1848 0 : _ => panic!("More than one entry is always expected"),
1849 : };
1850 :
1851 12 : let mut index = BTreeMap::new();
1852 6672 : for entry in entries.iter() {
1853 6672 : index.insert((entry.key, entry.lsn), entry.value.clone());
1854 6672 : }
1855 :
1856 12 : EntriesMeta {
1857 12 : key_range,
1858 12 : lsn_range,
1859 12 : index,
1860 12 : }
1861 12 : }
1862 :
1863 1200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1864 1200 : let start = key_range.start.to_i128();
1865 1200 : let end = key_range.end.to_i128();
1866 1200 :
1867 1200 : let mut keyspace = KeySpace::default();
1868 :
1869 3600 : for _ in 0..constants::RANGES_COUNT {
1870 2400 : let mut range: Option<Range<Key>> = Option::default();
1871 7464 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1872 5064 : let range_start = rng.gen_range(start..end);
1873 5064 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1874 5064 : if range_end_offset >= end {
1875 600 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1876 4464 : } else {
1877 4464 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1878 4464 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1879 4464 : }
1880 : }
1881 2400 : keyspace.ranges.push(range.unwrap());
1882 : }
1883 :
1884 1200 : keyspace
1885 1200 : }
1886 :
1887 : #[tokio::test]
1888 12 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1889 12 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1890 12 : let (tenant, ctx) = harness.load().await;
1891 12 :
1892 12 : let timeline_id = TimelineId::generate();
1893 12 : let timeline = tenant
1894 12 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1895 12 : .await?;
1896 12 :
1897 12 : tracing::info!("Generating test data ...");
1898 12 :
1899 12 : let rng = &mut StdRng::seed_from_u64(0);
1900 12 : let entries = generate_entries(rng);
1901 12 : let entries_meta = get_entries_meta(&entries);
1902 12 :
1903 12 : tracing::info!("Done generating {} entries", entries.len());
1904 12 :
1905 12 : tracing::info!("Writing test data to delta layer ...");
1906 12 : let mut writer = DeltaLayerWriter::new(
1907 12 : harness.conf,
1908 12 : timeline_id,
1909 12 : harness.tenant_shard_id,
1910 12 : entries_meta.key_range.start,
1911 12 : entries_meta.lsn_range.clone(),
1912 12 : &timeline.gate,
1913 12 : timeline.cancel.clone(),
1914 12 : &ctx,
1915 12 : )
1916 12 : .await?;
1917 12 :
1918 6684 : for entry in entries {
1919 6672 : let (_, res) = writer
1920 6672 : .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
1921 6672 : .await;
1922 6672 : res?;
1923 12 : }
1924 12 :
1925 12 : let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
1926 12 : let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
1927 12 :
1928 12 : let inner = resident.get_as_delta(&ctx).await?;
1929 12 :
1930 12 : let file_size = inner.file.metadata().await?.len();
1931 12 : tracing::info!(
1932 12 : "Done writing test data to delta layer. Resulting file size is: {}",
1933 12 : file_size
1934 12 : );
1935 12 :
1936 1212 : for i in 0..constants::READS_COUNT {
1937 1200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1938 12 :
1939 1200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1940 1200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1941 1200 : inner.index_start_blk,
1942 1200 : inner.index_root_blk,
1943 1200 : block_reader,
1944 1200 : );
1945 1200 :
1946 1200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1947 1200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1948 1200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1949 12 :
1950 1200 : let vectored_reads = DeltaLayerInner::plan_reads(
1951 1200 : &keyspace,
1952 1200 : entries_meta.lsn_range.clone(),
1953 1200 : data_end_offset,
1954 1200 : index_reader,
1955 1200 : planner,
1956 1200 : &ctx,
1957 1200 : )
1958 1200 : .await?;
1959 12 :
1960 1200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1961 1200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1962 1200 : &vectored_reads,
1963 1200 : constants::MAX_VECTORED_READ_BYTES,
1964 1200 : );
1965 1200 : let mut buf = Some(IoBufferMut::with_capacity(buf_size));
1966 12 :
1967 119544 : for read in vectored_reads {
1968 118344 : let blobs_buf = vectored_blob_reader
1969 118344 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1970 118344 : .await?;
1971 118344 : let view = BufView::new_slice(&blobs_buf.buf);
1972 343824 : for meta in blobs_buf.blobs.iter() {
1973 343824 : let value = meta.read(&view).await?;
1974 343824 : assert_eq!(
1975 343824 : &value[..],
1976 343824 : &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
1977 343824 : );
1978 12 : }
1979 12 :
1980 118344 : buf = Some(blobs_buf.buf);
1981 12 : }
1982 12 : }
1983 12 :
1984 12 : Ok(())
1985 12 : }
1986 :
1987 : #[tokio::test]
1988 12 : async fn copy_delta_prefix_smoke() {
1989 12 : use bytes::Bytes;
1990 12 : use pageserver_api::record::NeonWalRecord;
1991 12 :
1992 12 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
1993 12 : .await
1994 12 : .unwrap();
1995 12 : let (tenant, ctx) = h.load().await;
1996 12 : let ctx = &ctx;
1997 12 : let timeline = tenant
1998 12 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1999 12 : .await
2000 12 : .unwrap();
2001 12 : let ctx = &ctx.with_scope_timeline(&timeline);
2002 12 :
2003 12 : let initdb_layer = timeline
2004 12 : .layers
2005 12 : .read()
2006 12 : .await
2007 12 : .likely_resident_layers()
2008 12 : .next()
2009 12 : .cloned()
2010 12 : .unwrap();
2011 12 :
2012 12 : {
2013 12 : let mut writer = timeline.writer().await;
2014 12 :
2015 12 : let data = [
2016 12 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
2017 12 : (
2018 12 : 0x30,
2019 12 : 12,
2020 12 : Value::WalRecord(NeonWalRecord::Postgres {
2021 12 : will_init: false,
2022 12 : rec: Bytes::from_static(b"1"),
2023 12 : }),
2024 12 : ),
2025 12 : (
2026 12 : 0x40,
2027 12 : 12,
2028 12 : Value::WalRecord(NeonWalRecord::Postgres {
2029 12 : will_init: true,
2030 12 : rec: Bytes::from_static(b"2"),
2031 12 : }),
2032 12 : ),
2033 12 : // build an oversized value so we cannot extend and existing read over
2034 12 : // this
2035 12 : (
2036 12 : 0x50,
2037 12 : 12,
2038 12 : Value::WalRecord(NeonWalRecord::Postgres {
2039 12 : will_init: true,
2040 12 : rec: {
2041 12 : let mut buf =
2042 12 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2043 12 : buf.iter_mut()
2044 12 : .enumerate()
2045 1609728 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2046 12 : Bytes::from(buf)
2047 12 : },
2048 12 : }),
2049 12 : ),
2050 12 : // because the oversized read cannot be extended further, we are sure to exercise the
2051 12 : // builder created on the last round with this:
2052 12 : (
2053 12 : 0x60,
2054 12 : 12,
2055 12 : Value::WalRecord(NeonWalRecord::Postgres {
2056 12 : will_init: true,
2057 12 : rec: Bytes::from_static(b"3"),
2058 12 : }),
2059 12 : ),
2060 12 : (
2061 12 : 0x60,
2062 12 : 9,
2063 12 : Value::Image(Bytes::from_static(b"something for a different key")),
2064 12 : ),
2065 12 : ];
2066 12 :
2067 12 : let mut last_lsn = None;
2068 12 :
2069 84 : for (lsn, key, value) in data {
2070 72 : let key = Key::from_i128(key);
2071 72 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2072 72 : last_lsn = Some(lsn);
2073 12 : }
2074 12 :
2075 12 : writer.finish_write(Lsn(last_lsn.unwrap()));
2076 12 : }
2077 12 : timeline.freeze_and_flush().await.unwrap();
2078 12 :
2079 12 : let new_layer = timeline
2080 12 : .layers
2081 12 : .read()
2082 12 : .await
2083 12 : .likely_resident_layers()
2084 21 : .find(|&x| x != &initdb_layer)
2085 12 : .cloned()
2086 12 : .unwrap();
2087 12 :
2088 12 : // create a copy for the timeline, so we don't overwrite the file
2089 12 : let branch = tenant
2090 12 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2091 12 : .await
2092 12 : .unwrap();
2093 12 :
2094 12 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2095 12 :
2096 12 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2097 12 : // a single key
2098 12 :
2099 72 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2100 60 : let truncate_at = Lsn(truncate_at);
2101 12 :
2102 60 : let mut writer = DeltaLayerWriter::new(
2103 60 : tenant.conf,
2104 60 : branch.timeline_id,
2105 60 : tenant.tenant_shard_id,
2106 60 : Key::MIN,
2107 60 : Lsn(0x11)..truncate_at,
2108 60 : &branch.gate,
2109 60 : branch.cancel.clone(),
2110 60 : ctx,
2111 60 : )
2112 60 : .await
2113 60 : .unwrap();
2114 12 :
2115 60 : let new_layer = new_layer.download_and_keep_resident(ctx).await.unwrap();
2116 60 :
2117 60 : new_layer
2118 60 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2119 60 : .await
2120 60 : .unwrap();
2121 12 :
2122 60 : let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
2123 60 : let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
2124 60 :
2125 60 : copied_layer.get_as_delta(ctx).await.unwrap();
2126 60 :
2127 60 : assert_keys_and_values_eq(
2128 60 : new_layer.get_as_delta(ctx).await.unwrap(),
2129 60 : copied_layer.get_as_delta(ctx).await.unwrap(),
2130 60 : truncate_at,
2131 60 : ctx,
2132 60 : )
2133 60 : .await;
2134 12 : }
2135 12 : }
2136 :
2137 60 : async fn assert_keys_and_values_eq(
2138 60 : source: &DeltaLayerInner,
2139 60 : truncated: &DeltaLayerInner,
2140 60 : truncated_at: Lsn,
2141 60 : ctx: &RequestContext,
2142 60 : ) {
2143 : use futures::future::ready;
2144 : use futures::stream::TryStreamExt;
2145 :
2146 60 : let start_key = [0u8; DELTA_KEY_SIZE];
2147 60 :
2148 60 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2149 60 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2150 60 : source.index_start_blk,
2151 60 : source.index_root_blk,
2152 60 : &source_reader,
2153 60 : );
2154 60 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2155 360 : let source_stream = source_stream.filter(|res| match res {
2156 360 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2157 0 : _ => ready(true),
2158 360 : });
2159 60 : let mut source_stream = std::pin::pin!(source_stream);
2160 60 :
2161 60 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2162 60 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2163 60 : truncated.index_start_blk,
2164 60 : truncated.index_root_blk,
2165 60 : &truncated_reader,
2166 60 : );
2167 60 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2168 60 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2169 60 :
2170 60 : let mut scratch_left = Vec::new();
2171 60 : let mut scratch_right = Vec::new();
2172 :
2173 : loop {
2174 252 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2175 252 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2176 252 :
2177 252 : if src.is_none() {
2178 60 : assert!(truncated.is_none());
2179 60 : break;
2180 192 : }
2181 192 :
2182 192 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2183 192 :
2184 192 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2185 192 : assert_eq!(src.0, truncated.0);
2186 192 : assert_eq!(src.1, truncated.1);
2187 :
2188 : // if this is needed for something else, just drop this assert.
2189 192 : assert!(
2190 192 : src.2.pos() >= truncated.2.pos(),
2191 0 : "value position should not go backwards {} vs. {}",
2192 0 : src.2.pos(),
2193 0 : truncated.2.pos()
2194 : );
2195 :
2196 192 : scratch_left.clear();
2197 192 : let src_cursor = source_reader.block_cursor();
2198 192 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2199 192 : scratch_right.clear();
2200 192 : let trunc_cursor = truncated_reader.block_cursor();
2201 192 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2202 192 :
2203 192 : tokio::try_join!(left, right).unwrap();
2204 192 :
2205 192 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2206 : }
2207 60 : }
2208 :
2209 109344 : pub(crate) fn sort_delta(
2210 109344 : (k1, l1, _): &(Key, Lsn, Value),
2211 109344 : (k2, l2, _): &(Key, Lsn, Value),
2212 109344 : ) -> std::cmp::Ordering {
2213 109344 : (k1, l1).cmp(&(k2, l2))
2214 109344 : }
2215 :
2216 : #[cfg(feature = "testing")]
2217 564 : pub(crate) fn sort_delta_value(
2218 564 : (k1, l1, v1): &(Key, Lsn, Value),
2219 564 : (k2, l2, v2): &(Key, Lsn, Value),
2220 564 : ) -> std::cmp::Ordering {
2221 564 : let order_1 = if v1.is_image() { 0 } else { 1 };
2222 564 : let order_2 = if v2.is_image() { 0 } else { 1 };
2223 564 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2224 564 : }
2225 :
2226 132 : pub(crate) async fn produce_delta_layer(
2227 132 : tenant: &TenantShard,
2228 132 : tline: &Arc<Timeline>,
2229 132 : mut deltas: Vec<(Key, Lsn, Value)>,
2230 132 : ctx: &RequestContext,
2231 132 : ) -> anyhow::Result<ResidentLayer> {
2232 132 : deltas.sort_by(sort_delta);
2233 132 : let (key_start, _, _) = deltas.first().unwrap();
2234 132 : let (key_max, _, _) = deltas.last().unwrap();
2235 49440 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2236 49440 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2237 132 : let lsn_end = Lsn(lsn_max.0 + 1);
2238 132 : let mut writer = DeltaLayerWriter::new(
2239 132 : tenant.conf,
2240 132 : tline.timeline_id,
2241 132 : tenant.tenant_shard_id,
2242 132 : *key_start,
2243 132 : (*lsn_min)..lsn_end,
2244 132 : &tline.gate,
2245 132 : tline.cancel.clone(),
2246 132 : ctx,
2247 132 : )
2248 132 : .await?;
2249 132 : let key_end = key_max.next();
2250 :
2251 49572 : for (key, lsn, value) in deltas {
2252 49440 : writer.put_value(key, lsn, value, ctx).await?;
2253 : }
2254 :
2255 132 : let (desc, path) = writer.finish(key_end, ctx).await?;
2256 132 : let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
2257 :
2258 132 : Ok::<_, anyhow::Error>(delta_layer)
2259 132 : }
2260 :
2261 168 : async fn assert_delta_iter_equal(
2262 168 : delta_iter: &mut DeltaLayerIterator<'_>,
2263 168 : expect: &[(Key, Lsn, Value)],
2264 168 : ) {
2265 168 : let mut expect_iter = expect.iter();
2266 : loop {
2267 168168 : let o1 = delta_iter.next().await.unwrap();
2268 168168 : let o2 = expect_iter.next();
2269 168168 : assert_eq!(o1.is_some(), o2.is_some());
2270 168168 : if o1.is_none() && o2.is_none() {
2271 168 : break;
2272 168000 : }
2273 168000 : let (k1, l1, v1) = o1.unwrap();
2274 168000 : let (k2, l2, v2) = o2.unwrap();
2275 168000 : assert_eq!(&k1, k2);
2276 168000 : assert_eq!(l1, *l2);
2277 168000 : assert_eq!(&v1, v2);
2278 : }
2279 168 : }
2280 :
2281 : #[tokio::test]
2282 12 : async fn delta_layer_iterator() {
2283 12 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2284 12 : let (tenant, ctx) = harness.load().await;
2285 12 :
2286 12 : let tline = tenant
2287 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2288 12 : .await
2289 12 : .unwrap();
2290 12 :
2291 12000 : fn get_key(id: u32) -> Key {
2292 12000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2293 12000 : key.field6 = id;
2294 12000 : key
2295 12000 : }
2296 12 : const N: usize = 1000;
2297 12 : let test_deltas = (0..N)
2298 12000 : .map(|idx| {
2299 12000 : (
2300 12000 : get_key(idx as u32 / 10),
2301 12000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2302 12000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2303 12000 : )
2304 12000 : })
2305 12 : .collect_vec();
2306 12 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2307 12 : .await
2308 12 : .unwrap();
2309 12 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2310 36 : for max_read_size in [1, 1024] {
2311 192 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2312 168 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2313 168 : // Test if the batch size is correctly determined
2314 168 : let mut iter = delta_layer.iter(&ctx);
2315 168 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2316 168 : let mut num_items = 0;
2317 672 : for _ in 0..3 {
2318 504 : iter.next_batch().await.unwrap();
2319 504 : num_items += iter.key_values_batch.len();
2320 504 : if max_read_size == 1 {
2321 12 : // every key should be a batch b/c the value is larger than max_read_size
2322 252 : assert_eq!(iter.key_values_batch.len(), 1);
2323 12 : } else {
2324 252 : assert!(iter.key_values_batch.len() <= batch_size);
2325 12 : }
2326 504 : if num_items >= N {
2327 12 : break;
2328 504 : }
2329 504 : iter.key_values_batch.clear();
2330 12 : }
2331 12 : // Test if the result is correct
2332 168 : let mut iter = delta_layer.iter(&ctx);
2333 168 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2334 168 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2335 12 : }
2336 12 : }
2337 12 : }
2338 : }
|