Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
41 : };
42 : use crate::tenant::{PageReconstructError, Timeline};
43 : use crate::virtual_file::{self, VirtualFile};
44 : use crate::{walrecord, TEMP_FILE_SUFFIX};
45 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::BytesMut;
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use futures::StreamExt;
50 : use itertools::Itertools;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::models::LayerAccessKind;
53 : use pageserver_api::shard::TenantShardId;
54 : use rand::{distributions::Alphanumeric, Rng};
55 : use serde::{Deserialize, Serialize};
56 : use std::fs::File;
57 : use std::io::SeekFrom;
58 : use std::ops::Range;
59 : use std::os::unix::fs::FileExt;
60 : use std::str::FromStr;
61 : use std::sync::Arc;
62 : use tokio::sync::OnceCell;
63 : use tracing::*;
64 :
65 : use utils::{
66 : bin_ser::BeSer,
67 : id::{TenantId, TimelineId},
68 : lsn::Lsn,
69 : };
70 :
71 : use super::{
72 : AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
73 : ValuesReconstructState,
74 : };
75 :
76 : ///
77 : /// Header stored in the beginning of the file
78 : ///
79 : /// After this comes the 'values' part, starting on block 1. After that,
80 : /// the 'index' starts at the block indicated by 'index_start_blk'
81 : ///
82 1000 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
83 : pub struct Summary {
84 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
85 : pub magic: u16,
86 : pub format_version: u16,
87 :
88 : pub tenant_id: TenantId,
89 : pub timeline_id: TimelineId,
90 : pub key_range: Range<Key>,
91 : pub lsn_range: Range<Lsn>,
92 :
93 : /// Block number where the 'index' part of the file begins.
94 : pub index_start_blk: u32,
95 : /// Block within the 'index', where the B-tree root page is stored
96 : pub index_root_blk: u32,
97 : }
98 :
99 : impl From<&DeltaLayer> for Summary {
100 0 : fn from(layer: &DeltaLayer) -> Self {
101 0 : Self::expected(
102 0 : layer.desc.tenant_shard_id.tenant_id,
103 0 : layer.desc.timeline_id,
104 0 : layer.desc.key_range.clone(),
105 0 : layer.desc.lsn_range.clone(),
106 0 : )
107 0 : }
108 : }
109 :
110 : impl Summary {
111 1000 : pub(super) fn expected(
112 1000 : tenant_id: TenantId,
113 1000 : timeline_id: TimelineId,
114 1000 : keys: Range<Key>,
115 1000 : lsns: Range<Lsn>,
116 1000 : ) -> Self {
117 1000 : Self {
118 1000 : magic: DELTA_FILE_MAGIC,
119 1000 : format_version: STORAGE_FORMAT_VERSION,
120 1000 :
121 1000 : tenant_id,
122 1000 : timeline_id,
123 1000 : key_range: keys,
124 1000 : lsn_range: lsns,
125 1000 :
126 1000 : index_start_blk: 0,
127 1000 : index_root_blk: 0,
128 1000 : }
129 1000 : }
130 : }
131 :
132 : // Flag indicating that this version initialize the page
133 : const WILL_INIT: u64 = 1;
134 :
135 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
136 : /// offset, and for WAL records it also contains `will_init` flag. The flag
137 : /// helps to determine the range of records that needs to be applied, without
138 : /// reading/deserializing records themselves.
139 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
140 : pub struct BlobRef(pub u64);
141 :
142 : impl BlobRef {
143 240724 : pub fn will_init(&self) -> bool {
144 240724 : (self.0 & WILL_INIT) != 0
145 240724 : }
146 :
147 4468228 : pub fn pos(&self) -> u64 {
148 4468228 : self.0 >> 1
149 4468228 : }
150 :
151 6452880 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
152 6452880 : let mut blob_ref = pos << 1;
153 6452880 : if will_init {
154 6451624 : blob_ref |= WILL_INIT;
155 6451624 : }
156 6452880 : BlobRef(blob_ref)
157 6452880 : }
158 : }
159 :
160 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
161 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
162 :
163 : /// This is the key of the B-tree index stored in the delta layer. It consists
164 : /// of the serialized representation of a Key and LSN.
165 : impl DeltaKey {
166 2064222 : fn from_slice(buf: &[u8]) -> Self {
167 2064222 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
168 2064222 : bytes.copy_from_slice(buf);
169 2064222 : DeltaKey(bytes)
170 2064222 : }
171 :
172 6699520 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
173 6699520 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
174 6699520 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
175 6699520 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
176 6699520 : DeltaKey(bytes)
177 6699520 : }
178 :
179 2064222 : fn key(&self) -> Key {
180 2064222 : Key::from_slice(&self.0)
181 2064222 : }
182 :
183 2064222 : fn lsn(&self) -> Lsn {
184 2064222 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
185 2064222 : }
186 :
187 339947 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
188 339947 : let mut lsn_buf = [0u8; 8];
189 339947 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
190 339947 : Lsn(u64::from_be_bytes(lsn_buf))
191 339947 : }
192 : }
193 :
194 : /// This is used only from `pagectl`. Within pageserver, all layers are
195 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
196 : pub struct DeltaLayer {
197 : path: Utf8PathBuf,
198 : pub desc: PersistentLayerDesc,
199 : access_stats: LayerAccessStats,
200 : inner: OnceCell<Arc<DeltaLayerInner>>,
201 : }
202 :
203 : impl std::fmt::Debug for DeltaLayer {
204 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 0 : use super::RangeDisplayDebug;
206 0 :
207 0 : f.debug_struct("DeltaLayer")
208 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
209 0 : .field("lsn_range", &self.desc.lsn_range)
210 0 : .field("file_size", &self.desc.file_size)
211 0 : .field("inner", &self.inner)
212 0 : .finish()
213 0 : }
214 : }
215 :
216 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
217 : /// file.
218 : pub struct DeltaLayerInner {
219 : // values copied from summary
220 : index_start_blk: u32,
221 : index_root_blk: u32,
222 :
223 : file: VirtualFile,
224 : file_id: FileId,
225 :
226 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
227 : }
228 :
229 : impl std::fmt::Debug for DeltaLayerInner {
230 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231 0 : f.debug_struct("DeltaLayerInner")
232 0 : .field("index_start_blk", &self.index_start_blk)
233 0 : .field("index_root_blk", &self.index_root_blk)
234 0 : .finish()
235 0 : }
236 : }
237 :
238 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
239 : impl std::fmt::Display for DeltaLayer {
240 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 0 : write!(f, "{}", self.layer_desc().short_id())
242 0 : }
243 : }
244 :
245 : impl AsLayerDesc for DeltaLayer {
246 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
247 0 : &self.desc
248 0 : }
249 : }
250 :
251 : impl DeltaLayer {
252 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
253 0 : self.desc.dump();
254 0 :
255 0 : if !verbose {
256 0 : return Ok(());
257 0 : }
258 :
259 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
260 :
261 0 : inner.dump(ctx).await
262 0 : }
263 :
264 1328 : fn temp_path_for(
265 1328 : conf: &PageServerConf,
266 1328 : tenant_shard_id: &TenantShardId,
267 1328 : timeline_id: &TimelineId,
268 1328 : key_start: Key,
269 1328 : lsn_range: &Range<Lsn>,
270 1328 : ) -> Utf8PathBuf {
271 1328 : let rand_string: String = rand::thread_rng()
272 1328 : .sample_iter(&Alphanumeric)
273 1328 : .take(8)
274 1328 : .map(char::from)
275 1328 : .collect();
276 1328 :
277 1328 : conf.timeline_path(tenant_shard_id, timeline_id)
278 1328 : .join(format!(
279 1328 : "{}-XXX__{:016X}-{:016X}.{}.{}",
280 1328 : key_start,
281 1328 : u64::from(lsn_range.start),
282 1328 : u64::from(lsn_range.end),
283 1328 : rand_string,
284 1328 : TEMP_FILE_SUFFIX,
285 1328 : ))
286 1328 : }
287 :
288 : ///
289 : /// Open the underlying file and read the metadata into memory, if it's
290 : /// not loaded already.
291 : ///
292 0 : async fn load(
293 0 : &self,
294 0 : access_kind: LayerAccessKind,
295 0 : ctx: &RequestContext,
296 0 : ) -> Result<&Arc<DeltaLayerInner>> {
297 0 : self.access_stats.record_access(access_kind, ctx);
298 0 : // Quick exit if already loaded
299 0 : self.inner
300 0 : .get_or_try_init(|| self.load_inner(ctx))
301 0 : .await
302 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
303 0 : }
304 :
305 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
306 0 : let path = self.path();
307 :
308 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx)
309 0 : .await
310 0 : .and_then(|res| res)?;
311 :
312 : // not production code
313 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
314 0 : let expected_layer_name = self.layer_desc().layer_name();
315 0 :
316 0 : if actual_layer_name != expected_layer_name {
317 0 : println!("warning: filename does not match what is expected from in-file summary");
318 0 : println!("actual: {:?}", actual_layer_name.to_string());
319 0 : println!("expected: {:?}", expected_layer_name.to_string());
320 0 : }
321 :
322 0 : Ok(Arc::new(loaded))
323 0 : }
324 :
325 : /// Create a DeltaLayer struct representing an existing file on disk.
326 : ///
327 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
328 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
329 0 : let mut summary_buf = vec![0; PAGE_SZ];
330 0 : file.read_exact_at(&mut summary_buf, 0)?;
331 0 : let summary = Summary::des_prefix(&summary_buf)?;
332 :
333 0 : let metadata = file
334 0 : .metadata()
335 0 : .context("get file metadata to determine size")?;
336 :
337 : // This function is never used for constructing layers in a running pageserver,
338 : // so it does not need an accurate TenantShardId.
339 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
340 0 :
341 0 : Ok(DeltaLayer {
342 0 : path: path.to_path_buf(),
343 0 : desc: PersistentLayerDesc::new_delta(
344 0 : tenant_shard_id,
345 0 : summary.timeline_id,
346 0 : summary.key_range,
347 0 : summary.lsn_range,
348 0 : metadata.len(),
349 0 : ),
350 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
351 0 : inner: OnceCell::new(),
352 0 : })
353 0 : }
354 :
355 : /// Path to the layer file in pageserver workdir.
356 0 : fn path(&self) -> Utf8PathBuf {
357 0 : self.path.clone()
358 0 : }
359 : }
360 :
361 : /// A builder object for constructing a new delta layer.
362 : ///
363 : /// Usage:
364 : ///
365 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
366 : ///
367 : /// 2. Write the contents by calling `put_value` for every page
368 : /// version to store in the layer.
369 : ///
370 : /// 3. Call `finish`.
371 : ///
372 : struct DeltaLayerWriterInner {
373 : conf: &'static PageServerConf,
374 : pub path: Utf8PathBuf,
375 : timeline_id: TimelineId,
376 : tenant_shard_id: TenantShardId,
377 :
378 : key_start: Key,
379 : lsn_range: Range<Lsn>,
380 :
381 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
382 :
383 : blob_writer: BlobWriter<true>,
384 : }
385 :
386 : impl DeltaLayerWriterInner {
387 : ///
388 : /// Start building a new delta layer.
389 : ///
390 1328 : async fn new(
391 1328 : conf: &'static PageServerConf,
392 1328 : timeline_id: TimelineId,
393 1328 : tenant_shard_id: TenantShardId,
394 1328 : key_start: Key,
395 1328 : lsn_range: Range<Lsn>,
396 1328 : ctx: &RequestContext,
397 1328 : ) -> anyhow::Result<Self> {
398 1328 : // Create the file initially with a temporary filename. We don't know
399 1328 : // the end key yet, so we cannot form the final filename yet. We will
400 1328 : // rename it when we're done.
401 1328 : //
402 1328 : // Note: This overwrites any existing file. There shouldn't be any.
403 1328 : // FIXME: throw an error instead?
404 1328 : let path =
405 1328 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
406 :
407 1328 : let mut file = VirtualFile::create(&path, ctx).await?;
408 : // make room for the header block
409 1328 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
410 1328 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
411 1328 :
412 1328 : // Initialize the b-tree index builder
413 1328 : let block_buf = BlockBuf::new();
414 1328 : let tree_builder = DiskBtreeBuilder::new(block_buf);
415 1328 :
416 1328 : Ok(Self {
417 1328 : conf,
418 1328 : path,
419 1328 : timeline_id,
420 1328 : tenant_shard_id,
421 1328 : key_start,
422 1328 : lsn_range,
423 1328 : tree: tree_builder,
424 1328 : blob_writer,
425 1328 : })
426 1328 : }
427 :
428 : ///
429 : /// Append a key-value pair to the file.
430 : ///
431 : /// The values must be appended in key, lsn order.
432 : ///
433 2066114 : async fn put_value(
434 2066114 : &mut self,
435 2066114 : key: Key,
436 2066114 : lsn: Lsn,
437 2066114 : val: Value,
438 2066114 : ctx: &RequestContext,
439 2066114 : ) -> anyhow::Result<()> {
440 2066114 : let (_, res) = self
441 2066114 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
442 1567 : .await;
443 2066114 : res
444 2066114 : }
445 :
446 6452776 : async fn put_value_bytes(
447 6452776 : &mut self,
448 6452776 : key: Key,
449 6452776 : lsn: Lsn,
450 6452776 : val: Vec<u8>,
451 6452776 : will_init: bool,
452 6452776 : ctx: &RequestContext,
453 6452776 : ) -> (Vec<u8>, anyhow::Result<()>) {
454 6452776 : assert!(self.lsn_range.start <= lsn);
455 : // We don't want to use compression in delta layer creation
456 6452776 : let compression = None;
457 6452776 : let (val, res) = self
458 6452776 : .blob_writer
459 6452776 : .write_blob_maybe_compressed(val, ctx, compression)
460 4515 : .await;
461 6452776 : let off = match res {
462 6452776 : Ok(off) => off,
463 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
464 : };
465 :
466 6452776 : let blob_ref = BlobRef::new(off, will_init);
467 6452776 :
468 6452776 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
469 6452776 : let res = self.tree.append(&delta_key.0, blob_ref.0);
470 6452776 : (val, res.map_err(|e| anyhow::anyhow!(e)))
471 6452776 : }
472 :
473 2023972 : fn size(&self) -> u64 {
474 2023972 : self.blob_writer.size() + self.tree.borrow_writer().size()
475 2023972 : }
476 :
477 : ///
478 : /// Finish writing the delta layer.
479 : ///
480 1328 : async fn finish(
481 1328 : self,
482 1328 : key_end: Key,
483 1328 : timeline: &Arc<Timeline>,
484 1328 : ctx: &RequestContext,
485 1328 : ) -> anyhow::Result<ResidentLayer> {
486 1328 : let temp_path = self.path.clone();
487 9512 : let result = self.finish0(key_end, timeline, ctx).await;
488 1328 : if result.is_err() {
489 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
490 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
491 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
492 0 : }
493 1328 : }
494 1328 : result
495 1328 : }
496 :
497 1328 : async fn finish0(
498 1328 : self,
499 1328 : key_end: Key,
500 1328 : timeline: &Arc<Timeline>,
501 1328 : ctx: &RequestContext,
502 1328 : ) -> anyhow::Result<ResidentLayer> {
503 1328 : let index_start_blk =
504 1328 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
505 :
506 1328 : let mut file = self.blob_writer.into_inner(ctx).await?;
507 :
508 : // Write out the index
509 1328 : let (index_root_blk, block_buf) = self.tree.finish()?;
510 1328 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
511 0 : .await?;
512 14861 : for buf in block_buf.blocks {
513 13533 : let (_buf, res) = file.write_all(buf, ctx).await;
514 13533 : res?;
515 : }
516 1328 : assert!(self.lsn_range.start < self.lsn_range.end);
517 : // Fill in the summary on blk 0
518 1328 : let summary = Summary {
519 1328 : magic: DELTA_FILE_MAGIC,
520 1328 : format_version: STORAGE_FORMAT_VERSION,
521 1328 : tenant_id: self.tenant_shard_id.tenant_id,
522 1328 : timeline_id: self.timeline_id,
523 1328 : key_range: self.key_start..key_end,
524 1328 : lsn_range: self.lsn_range.clone(),
525 1328 : index_start_blk,
526 1328 : index_root_blk,
527 1328 : };
528 1328 :
529 1328 : let mut buf = Vec::with_capacity(PAGE_SZ);
530 1328 : // TODO: could use smallvec here but it's a pain with Slice<T>
531 1328 : Summary::ser_into(&summary, &mut buf)?;
532 1328 : file.seek(SeekFrom::Start(0)).await?;
533 1328 : let (_buf, res) = file.write_all(buf, ctx).await;
534 1328 : res?;
535 :
536 1328 : let metadata = file
537 1328 : .metadata()
538 669 : .await
539 1328 : .context("get file metadata to determine size")?;
540 :
541 : // 5GB limit for objects without multipart upload (which we don't want to use)
542 : // Make it a little bit below to account for differing GB units
543 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
544 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
545 1328 : ensure!(
546 1328 : metadata.len() <= S3_UPLOAD_LIMIT,
547 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
548 0 : file.path,
549 0 : metadata.len()
550 : );
551 :
552 : // Note: Because we opened the file in write-only mode, we cannot
553 : // reuse the same VirtualFile for reading later. That's why we don't
554 : // set inner.file here. The first read will have to re-open it.
555 :
556 1328 : let desc = PersistentLayerDesc::new_delta(
557 1328 : self.tenant_shard_id,
558 1328 : self.timeline_id,
559 1328 : self.key_start..key_end,
560 1328 : self.lsn_range.clone(),
561 1328 : metadata.len(),
562 1328 : );
563 1328 :
564 1328 : // fsync the file
565 1328 : file.sync_all().await?;
566 :
567 1328 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
568 :
569 1328 : trace!("created delta layer {}", layer.local_path());
570 :
571 1328 : Ok(layer)
572 1328 : }
573 : }
574 :
575 : /// A builder object for constructing a new delta layer.
576 : ///
577 : /// Usage:
578 : ///
579 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
580 : ///
581 : /// 2. Write the contents by calling `put_value` for every page
582 : /// version to store in the layer.
583 : ///
584 : /// 3. Call `finish`.
585 : ///
586 : /// # Note
587 : ///
588 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
589 : /// possible for the writer to drop before `finish` is actually called. So this
590 : /// could lead to odd temporary files in the directory, exhausting file system.
591 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
592 : /// implementation that cleans up the temporary file in failure. It's not
593 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
594 : /// out some fields, making it impossible to implement `Drop`.
595 : ///
596 : #[must_use]
597 : pub struct DeltaLayerWriter {
598 : inner: Option<DeltaLayerWriterInner>,
599 : }
600 :
601 : impl DeltaLayerWriter {
602 : ///
603 : /// Start building a new delta layer.
604 : ///
605 1328 : pub async fn new(
606 1328 : conf: &'static PageServerConf,
607 1328 : timeline_id: TimelineId,
608 1328 : tenant_shard_id: TenantShardId,
609 1328 : key_start: Key,
610 1328 : lsn_range: Range<Lsn>,
611 1328 : ctx: &RequestContext,
612 1328 : ) -> anyhow::Result<Self> {
613 1328 : Ok(Self {
614 1328 : inner: Some(
615 1328 : DeltaLayerWriterInner::new(
616 1328 : conf,
617 1328 : timeline_id,
618 1328 : tenant_shard_id,
619 1328 : key_start,
620 1328 : lsn_range,
621 1328 : ctx,
622 1328 : )
623 676 : .await?,
624 : ),
625 : })
626 1328 : }
627 :
628 : ///
629 : /// Append a key-value pair to the file.
630 : ///
631 : /// The values must be appended in key, lsn order.
632 : ///
633 2066114 : pub async fn put_value(
634 2066114 : &mut self,
635 2066114 : key: Key,
636 2066114 : lsn: Lsn,
637 2066114 : val: Value,
638 2066114 : ctx: &RequestContext,
639 2066114 : ) -> anyhow::Result<()> {
640 2066114 : self.inner
641 2066114 : .as_mut()
642 2066114 : .unwrap()
643 2066114 : .put_value(key, lsn, val, ctx)
644 1567 : .await
645 2066114 : }
646 :
647 4386662 : pub async fn put_value_bytes(
648 4386662 : &mut self,
649 4386662 : key: Key,
650 4386662 : lsn: Lsn,
651 4386662 : val: Vec<u8>,
652 4386662 : will_init: bool,
653 4386662 : ctx: &RequestContext,
654 4386662 : ) -> (Vec<u8>, anyhow::Result<()>) {
655 4386662 : self.inner
656 4386662 : .as_mut()
657 4386662 : .unwrap()
658 4386662 : .put_value_bytes(key, lsn, val, will_init, ctx)
659 2948 : .await
660 4386662 : }
661 :
662 2023972 : pub fn size(&self) -> u64 {
663 2023972 : self.inner.as_ref().unwrap().size()
664 2023972 : }
665 :
666 : ///
667 : /// Finish writing the delta layer.
668 : ///
669 1328 : pub(crate) async fn finish(
670 1328 : mut self,
671 1328 : key_end: Key,
672 1328 : timeline: &Arc<Timeline>,
673 1328 : ctx: &RequestContext,
674 1328 : ) -> anyhow::Result<ResidentLayer> {
675 1328 : self.inner
676 1328 : .take()
677 1328 : .unwrap()
678 1328 : .finish(key_end, timeline, ctx)
679 9512 : .await
680 1328 : }
681 : }
682 :
683 : impl Drop for DeltaLayerWriter {
684 1328 : fn drop(&mut self) {
685 1328 : if let Some(inner) = self.inner.take() {
686 0 : // We want to remove the virtual file here, so it's fine to not
687 0 : // having completely flushed unwritten data.
688 0 : let vfile = inner.blob_writer.into_inner_no_flush();
689 0 : vfile.remove();
690 1328 : }
691 1328 : }
692 : }
693 :
694 0 : #[derive(thiserror::Error, Debug)]
695 : pub enum RewriteSummaryError {
696 : #[error("magic mismatch")]
697 : MagicMismatch,
698 : #[error(transparent)]
699 : Other(#[from] anyhow::Error),
700 : }
701 :
702 : impl From<std::io::Error> for RewriteSummaryError {
703 0 : fn from(e: std::io::Error) -> Self {
704 0 : Self::Other(anyhow::anyhow!(e))
705 0 : }
706 : }
707 :
708 : impl DeltaLayer {
709 0 : pub async fn rewrite_summary<F>(
710 0 : path: &Utf8Path,
711 0 : rewrite: F,
712 0 : ctx: &RequestContext,
713 0 : ) -> Result<(), RewriteSummaryError>
714 0 : where
715 0 : F: Fn(Summary) -> Summary,
716 0 : {
717 0 : let mut file = VirtualFile::open_with_options(
718 0 : path,
719 0 : virtual_file::OpenOptions::new().read(true).write(true),
720 0 : ctx,
721 0 : )
722 0 : .await
723 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
724 0 : let file_id = page_cache::next_file_id();
725 0 : let block_reader = FileBlockReader::new(&file, file_id);
726 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
727 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
728 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
729 0 : return Err(RewriteSummaryError::MagicMismatch);
730 0 : }
731 0 :
732 0 : let new_summary = rewrite(actual_summary);
733 0 :
734 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
735 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
736 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
737 0 : file.seek(SeekFrom::Start(0)).await?;
738 0 : let (_buf, res) = file.write_all(buf, ctx).await;
739 0 : res?;
740 0 : Ok(())
741 0 : }
742 : }
743 :
744 : impl DeltaLayerInner {
745 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
746 : /// - inner has the success or transient failure
747 : /// - outer has the permanent failure
748 1000 : pub(super) async fn load(
749 1000 : path: &Utf8Path,
750 1000 : summary: Option<Summary>,
751 1000 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
752 1000 : ctx: &RequestContext,
753 1000 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
754 1000 : let file = match VirtualFile::open(path, ctx).await {
755 1000 : Ok(file) => file,
756 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
757 : };
758 1000 : let file_id = page_cache::next_file_id();
759 1000 :
760 1000 : let block_reader = FileBlockReader::new(&file, file_id);
761 :
762 1000 : let summary_blk = match block_reader.read_blk(0, ctx).await {
763 1000 : Ok(blk) => blk,
764 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
765 : };
766 :
767 : // TODO: this should be an assertion instead; see ImageLayerInner::load
768 1000 : let actual_summary =
769 1000 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
770 :
771 1000 : if let Some(mut expected_summary) = summary {
772 : // production code path
773 1000 : expected_summary.index_start_blk = actual_summary.index_start_blk;
774 1000 : expected_summary.index_root_blk = actual_summary.index_root_blk;
775 1000 : // mask out the timeline_id, but still require the layers to be from the same tenant
776 1000 : expected_summary.timeline_id = actual_summary.timeline_id;
777 1000 :
778 1000 : if actual_summary != expected_summary {
779 0 : bail!(
780 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
781 0 : actual_summary,
782 0 : expected_summary
783 0 : );
784 1000 : }
785 0 : }
786 :
787 1000 : Ok(Ok(DeltaLayerInner {
788 1000 : file,
789 1000 : file_id,
790 1000 : index_start_blk: actual_summary.index_start_blk,
791 1000 : index_root_blk: actual_summary.index_root_blk,
792 1000 : max_vectored_read_bytes,
793 1000 : }))
794 1000 : }
795 :
796 204163 : pub(super) async fn get_value_reconstruct_data(
797 204163 : &self,
798 204163 : key: Key,
799 204163 : lsn_range: Range<Lsn>,
800 204163 : reconstruct_state: &mut ValueReconstructState,
801 204163 : ctx: &RequestContext,
802 204163 : ) -> anyhow::Result<ValueReconstructResult> {
803 204163 : let mut need_image = true;
804 204163 : // Scan the page versions backwards, starting from `lsn`.
805 204163 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
806 204163 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
807 204163 : self.index_start_blk,
808 204163 : self.index_root_blk,
809 204163 : &block_reader,
810 204163 : );
811 204163 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
812 204163 :
813 204163 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
814 204163 :
815 204163 : tree_reader
816 204163 : .visit(
817 204163 : &search_key.0,
818 204163 : VisitDirection::Backwards,
819 204163 : |key, value| {
820 198162 : let blob_ref = BlobRef(value);
821 198162 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
822 76287 : return false;
823 121875 : }
824 121875 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
825 121875 : if entry_lsn < lsn_range.start {
826 39 : return false;
827 121836 : }
828 121836 : offsets.push((entry_lsn, blob_ref.pos()));
829 121836 :
830 121836 : !blob_ref.will_init()
831 204163 : },
832 204163 : &RequestContextBuilder::extend(ctx)
833 204163 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
834 204163 : .build(),
835 204163 : )
836 20726 : .await?;
837 :
838 204163 : let ctx = &RequestContextBuilder::extend(ctx)
839 204163 : .page_content_kind(PageContentKind::DeltaLayerValue)
840 204163 : .build();
841 204163 :
842 204163 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
843 204163 : let cursor = block_reader.block_cursor();
844 204163 : let mut buf = Vec::new();
845 204207 : for (entry_lsn, pos) in offsets {
846 121836 : cursor
847 121836 : .read_blob_into_buf(pos, &mut buf, ctx)
848 7891 : .await
849 121836 : .with_context(|| {
850 0 : format!("Failed to read blob from virtual file {}", self.file.path)
851 121836 : })?;
852 121836 : let val = Value::des(&buf).with_context(|| {
853 0 : format!(
854 0 : "Failed to deserialize file blob from virtual file {}",
855 0 : self.file.path
856 0 : )
857 121836 : })?;
858 121836 : match val {
859 121792 : Value::Image(img) => {
860 121792 : reconstruct_state.img = Some((entry_lsn, img));
861 121792 : need_image = false;
862 121792 : break;
863 : }
864 44 : Value::WalRecord(rec) => {
865 44 : let will_init = rec.will_init();
866 44 : reconstruct_state.records.push((entry_lsn, rec));
867 44 : if will_init {
868 : // This WAL record initializes the page, so no need to go further back
869 0 : need_image = false;
870 0 : break;
871 44 : }
872 : }
873 : }
874 : }
875 :
876 : // If an older page image is needed to reconstruct the page, let the
877 : // caller know.
878 204163 : if need_image {
879 82371 : Ok(ValueReconstructResult::Continue)
880 : } else {
881 121792 : Ok(ValueReconstructResult::Complete)
882 : }
883 204163 : }
884 :
885 : // Look up the keys in the provided keyspace and update
886 : // the reconstruct state with whatever is found.
887 : //
888 : // If the key is cached, go no further than the cached Lsn.
889 : //
890 : // Currently, the index is visited for each range, but this
891 : // can be further optimised to visit the index only once.
892 154 : pub(super) async fn get_values_reconstruct_data(
893 154 : &self,
894 154 : keyspace: KeySpace,
895 154 : lsn_range: Range<Lsn>,
896 154 : reconstruct_state: &mut ValuesReconstructState,
897 154 : ctx: &RequestContext,
898 154 : ) -> Result<(), GetVectoredError> {
899 154 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
900 154 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
901 154 : self.index_start_blk,
902 154 : self.index_root_blk,
903 154 : block_reader,
904 154 : );
905 154 :
906 154 : let planner = VectoredReadPlanner::new(
907 154 : self.max_vectored_read_bytes
908 154 : .expect("Layer is loaded with max vectored bytes config")
909 154 : .0
910 154 : .into(),
911 154 : );
912 154 :
913 154 : let data_end_offset = self.index_start_offset();
914 :
915 154 : let reads = Self::plan_reads(
916 154 : &keyspace,
917 154 : lsn_range.clone(),
918 154 : data_end_offset,
919 154 : index_reader,
920 154 : planner,
921 154 : reconstruct_state,
922 154 : ctx,
923 154 : )
924 1426 : .await
925 154 : .map_err(GetVectoredError::Other)?;
926 :
927 154 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
928 8784 : .await;
929 :
930 154 : reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
931 154 :
932 154 : Ok(())
933 154 : }
934 :
935 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
936 8 : pub(super) async fn load_key_values(
937 8 : &self,
938 8 : ctx: &RequestContext,
939 8 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
940 8 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
941 8 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
942 8 : self.index_start_blk,
943 8 : self.index_root_blk,
944 8 : block_reader,
945 8 : );
946 8 : let mut result = Vec::new();
947 8 : let mut stream =
948 8 : Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
949 8 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
950 8 : let cursor = block_reader.block_cursor();
951 8 : let mut buf = Vec::new();
952 32 : while let Some(item) = stream.next().await {
953 24 : let (key, lsn, pos) = item?;
954 : // TODO: dedup code with get_reconstruct_value
955 : // TODO: ctx handling and sharding
956 24 : cursor
957 24 : .read_blob_into_buf(pos.pos(), &mut buf, ctx)
958 0 : .await
959 24 : .with_context(|| {
960 0 : format!("Failed to read blob from virtual file {}", self.file.path)
961 24 : })?;
962 24 : let val = Value::des(&buf).with_context(|| {
963 0 : format!(
964 0 : "Failed to deserialize file blob from virtual file {}",
965 0 : self.file.path
966 0 : )
967 24 : })?;
968 24 : result.push((key, lsn, val));
969 : }
970 8 : Ok(result)
971 8 : }
972 :
973 356 : async fn plan_reads<Reader>(
974 356 : keyspace: &KeySpace,
975 356 : lsn_range: Range<Lsn>,
976 356 : data_end_offset: u64,
977 356 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
978 356 : mut planner: VectoredReadPlanner,
979 356 : reconstruct_state: &mut ValuesReconstructState,
980 356 : ctx: &RequestContext,
981 356 : ) -> anyhow::Result<Vec<VectoredRead>>
982 356 : where
983 356 : Reader: BlockReader + Clone,
984 356 : {
985 356 : let ctx = RequestContextBuilder::extend(ctx)
986 356 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
987 356 : .build();
988 :
989 42547 : for range in keyspace.ranges.iter() {
990 42547 : let mut range_end_handled = false;
991 42547 :
992 42547 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
993 42547 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
994 42547 : let mut index_stream = std::pin::pin!(index_stream);
995 :
996 189981 : while let Some(index_entry) = index_stream.next().await {
997 189774 : let (raw_key, value) = index_entry?;
998 189774 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
999 189774 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1000 189774 : let blob_ref = BlobRef(value);
1001 189774 :
1002 189774 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
1003 189774 : assert!(key >= range.start);
1004 :
1005 189774 : let outside_lsn_range = !lsn_range.contains(&lsn);
1006 189774 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
1007 :
1008 189774 : let flag = {
1009 189774 : if outside_lsn_range || below_cached_lsn {
1010 70886 : BlobFlag::Ignore
1011 118888 : } else if blob_ref.will_init() {
1012 61240 : BlobFlag::ReplaceAll
1013 : } else {
1014 : // Usual path: add blob to the read
1015 57648 : BlobFlag::None
1016 : }
1017 : };
1018 :
1019 189774 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
1020 42340 : planner.handle_range_end(blob_ref.pos());
1021 42340 : range_end_handled = true;
1022 42340 : break;
1023 147434 : } else {
1024 147434 : planner.handle(key, lsn, blob_ref.pos(), flag);
1025 147434 : }
1026 : }
1027 :
1028 42547 : if !range_end_handled {
1029 207 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
1030 207 : planner.handle_range_end(data_end_offset);
1031 42340 : }
1032 : }
1033 :
1034 356 : Ok(planner.finish())
1035 356 : }
1036 :
1037 354 : fn get_min_read_buffer_size(
1038 354 : planned_reads: &[VectoredRead],
1039 354 : read_size_soft_max: usize,
1040 354 : ) -> usize {
1041 37031 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
1042 14 : return read_size_soft_max;
1043 : };
1044 :
1045 340 : let largest_read_size = largest_read.size();
1046 340 : if largest_read_size > read_size_soft_max {
1047 : // If the read is oversized, it should only contain one key.
1048 200 : let offenders = largest_read
1049 200 : .blobs_at
1050 200 : .as_slice()
1051 200 : .iter()
1052 200 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
1053 200 : .join(", ");
1054 200 : tracing::warn!(
1055 0 : "Oversized vectored read ({} > {}) for keys {}",
1056 : largest_read_size,
1057 : read_size_soft_max,
1058 : offenders
1059 : );
1060 140 : }
1061 :
1062 340 : largest_read_size
1063 354 : }
1064 :
1065 154 : async fn do_reads_and_update_state(
1066 154 : &self,
1067 154 : reads: Vec<VectoredRead>,
1068 154 : reconstruct_state: &mut ValuesReconstructState,
1069 154 : ctx: &RequestContext,
1070 154 : ) {
1071 154 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
1072 154 : let mut ignore_key_with_err = None;
1073 154 :
1074 154 : let max_vectored_read_bytes = self
1075 154 : .max_vectored_read_bytes
1076 154 : .expect("Layer is loaded with max vectored bytes config")
1077 154 : .0
1078 154 : .into();
1079 154 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1080 154 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1081 :
1082 : // Note that reads are processed in reverse order (from highest key+lsn).
1083 : // This is the order that `ReconstructState` requires such that it can
1084 : // track when a key is done.
1085 17307 : for read in reads.into_iter().rev() {
1086 17307 : let res = vectored_blob_reader
1087 17307 : .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
1088 8784 : .await;
1089 :
1090 17307 : let blobs_buf = match res {
1091 17307 : Ok(blobs_buf) => blobs_buf,
1092 0 : Err(err) => {
1093 0 : let kind = err.kind();
1094 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1095 0 : reconstruct_state.on_key_error(
1096 0 : blob_meta.key,
1097 0 : PageReconstructError::from(anyhow!(
1098 0 : "Failed to read blobs from virtual file {}: {}",
1099 0 : self.file.path,
1100 0 : kind
1101 0 : )),
1102 0 : );
1103 0 : }
1104 :
1105 : // We have "lost" the buffer since the lower level IO api
1106 : // doesn't return the buffer on error. Allocate a new one.
1107 0 : buf = Some(BytesMut::with_capacity(buf_size));
1108 0 :
1109 0 : continue;
1110 : }
1111 : };
1112 :
1113 28084 : for meta in blobs_buf.blobs.iter().rev() {
1114 28084 : if Some(meta.meta.key) == ignore_key_with_err {
1115 0 : continue;
1116 28084 : }
1117 28084 :
1118 28084 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1119 28084 : let value = match value {
1120 28084 : Ok(v) => v,
1121 0 : Err(e) => {
1122 0 : reconstruct_state.on_key_error(
1123 0 : meta.meta.key,
1124 0 : PageReconstructError::from(anyhow!(e).context(format!(
1125 0 : "Failed to deserialize blob from virtual file {}",
1126 0 : self.file.path,
1127 0 : ))),
1128 0 : );
1129 0 :
1130 0 : ignore_key_with_err = Some(meta.meta.key);
1131 0 : continue;
1132 : }
1133 : };
1134 :
1135 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1136 : // state, no further updates shall be made to it. The call below will
1137 : // panic if the invariant is violated.
1138 28084 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1139 : }
1140 :
1141 17307 : buf = Some(blobs_buf.buf);
1142 : }
1143 154 : }
1144 :
1145 406 : pub(super) async fn load_keys<'a>(
1146 406 : &'a self,
1147 406 : ctx: &RequestContext,
1148 406 : ) -> Result<Vec<DeltaEntry<'a>>> {
1149 406 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1150 406 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1151 406 : self.index_start_blk,
1152 406 : self.index_root_blk,
1153 406 : block_reader,
1154 406 : );
1155 406 :
1156 406 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1157 406 :
1158 406 : tree_reader
1159 406 : .visit(
1160 406 : &[0u8; DELTA_KEY_SIZE],
1161 406 : VisitDirection::Forwards,
1162 2064046 : |key, value| {
1163 2064046 : let delta_key = DeltaKey::from_slice(key);
1164 2064046 : let val_ref = ValueRef {
1165 2064046 : blob_ref: BlobRef(value),
1166 2064046 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
1167 2064046 : Adapter(self),
1168 2064046 : )),
1169 2064046 : };
1170 2064046 : let pos = BlobRef(value).pos();
1171 2064046 : if let Some(last) = all_keys.last_mut() {
1172 2063640 : // subtract offset of the current and last entries to get the size
1173 2063640 : // of the value associated with this (key, lsn) tuple
1174 2063640 : let first_pos = last.size;
1175 2063640 : last.size = pos - first_pos;
1176 2063640 : }
1177 2064046 : let entry = DeltaEntry {
1178 2064046 : key: delta_key.key(),
1179 2064046 : lsn: delta_key.lsn(),
1180 2064046 : size: pos,
1181 2064046 : val: val_ref,
1182 2064046 : };
1183 2064046 : all_keys.push(entry);
1184 2064046 : true
1185 2064046 : },
1186 406 : &RequestContextBuilder::extend(ctx)
1187 406 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1188 406 : .build(),
1189 406 : )
1190 2170 : .await?;
1191 406 : if let Some(last) = all_keys.last_mut() {
1192 406 : // Last key occupies all space till end of value storage,
1193 406 : // which corresponds to beginning of the index
1194 406 : last.size = self.index_start_offset() - last.size;
1195 406 : }
1196 406 : Ok(all_keys)
1197 406 : }
1198 :
1199 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1200 : ///
1201 : /// Return the amount of key value records pushed to the writer.
1202 10 : pub(super) async fn copy_prefix(
1203 10 : &self,
1204 10 : writer: &mut DeltaLayerWriter,
1205 10 : until: Lsn,
1206 10 : ctx: &RequestContext,
1207 10 : ) -> anyhow::Result<usize> {
1208 10 : use crate::tenant::vectored_blob_io::{
1209 10 : BlobMeta, VectoredReadBuilder, VectoredReadExtended,
1210 10 : };
1211 10 : use futures::stream::TryStreamExt;
1212 10 :
1213 10 : #[derive(Debug)]
1214 10 : enum Item {
1215 10 : Actual(Key, Lsn, BlobRef),
1216 10 : Sentinel,
1217 10 : }
1218 10 :
1219 10 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1220 70 : fn from(value: Item) -> Self {
1221 70 : match value {
1222 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1223 10 : Item::Sentinel => None,
1224 10 : }
1225 70 : }
1226 10 : }
1227 10 :
1228 10 : impl Item {
1229 70 : fn offset(&self) -> Option<BlobRef> {
1230 70 : match self {
1231 60 : Item::Actual(_, _, blob) => Some(*blob),
1232 10 : Item::Sentinel => None,
1233 10 : }
1234 70 : }
1235 10 :
1236 70 : fn is_last(&self) -> bool {
1237 70 : matches!(self, Item::Sentinel)
1238 70 : }
1239 10 : }
1240 10 :
1241 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1242 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1243 10 : self.index_start_blk,
1244 10 : self.index_root_blk,
1245 10 : block_reader,
1246 10 : );
1247 10 :
1248 10 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1249 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1250 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1251 10 : // repeat the whole read part
1252 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1253 10 : Item::Sentinel,
1254 10 : ))));
1255 10 : let mut stream = std::pin::pin!(stream);
1256 10 :
1257 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1258 10 :
1259 10 : let mut read_builder: Option<VectoredReadBuilder> = None;
1260 10 :
1261 10 : let max_read_size = self
1262 10 : .max_vectored_read_bytes
1263 10 : .map(|x| x.0.get())
1264 10 : .unwrap_or(8192);
1265 10 :
1266 10 : let mut buffer = Some(BytesMut::with_capacity(max_read_size));
1267 10 :
1268 10 : // FIXME: buffering of DeltaLayerWriter
1269 10 : let mut per_blob_copy = Vec::new();
1270 10 :
1271 10 : let mut records = 0;
1272 :
1273 80 : while let Some(item) = stream.try_next().await? {
1274 70 : tracing::debug!(?item, "popped");
1275 70 : let offset = item
1276 70 : .offset()
1277 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1278 :
1279 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1280 60 : let end_offset = offset;
1281 60 :
1282 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1283 : } else {
1284 10 : None
1285 : };
1286 :
1287 70 : let is_last = item.is_last();
1288 70 :
1289 70 : prev = Option::from(item);
1290 70 :
1291 70 : let actionable = actionable.filter(|x| x.0.lsn < until);
1292 :
1293 70 : let builder = if let Some((meta, offsets)) = actionable {
1294 : // extend or create a new builder
1295 32 : if read_builder
1296 32 : .as_mut()
1297 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1298 32 : .unwrap_or(VectoredReadExtended::No)
1299 32 : == VectoredReadExtended::Yes
1300 : {
1301 16 : None
1302 : } else {
1303 16 : read_builder.replace(VectoredReadBuilder::new(
1304 16 : offsets.start.pos(),
1305 16 : offsets.end.pos(),
1306 16 : meta,
1307 16 : Some(max_read_size),
1308 16 : ))
1309 : }
1310 : } else {
1311 : // nothing to do, except perhaps flush any existing for the last element
1312 38 : None
1313 : };
1314 :
1315 : // flush the possible older builder and also the new one if the item was the last one
1316 70 : let builders = builder.into_iter();
1317 70 : let builders = if is_last {
1318 10 : builders.chain(read_builder.take())
1319 : } else {
1320 60 : builders.chain(None)
1321 : };
1322 :
1323 86 : for builder in builders {
1324 16 : let read = builder.build();
1325 16 :
1326 16 : let reader = VectoredBlobReader::new(&self.file);
1327 16 :
1328 16 : let mut buf = buffer.take().unwrap();
1329 16 :
1330 16 : buf.clear();
1331 16 : buf.reserve(read.size());
1332 16 : let res = reader.read_blobs(&read, buf, ctx).await?;
1333 :
1334 48 : for blob in res.blobs {
1335 32 : let key = blob.meta.key;
1336 32 : let lsn = blob.meta.lsn;
1337 32 : let data = &res.buf[blob.start..blob.end];
1338 32 :
1339 32 : #[cfg(debug_assertions)]
1340 32 : Value::des(data)
1341 32 : .with_context(|| {
1342 0 : format!(
1343 0 : "blob failed to deserialize for {}@{}, {}..{}: {:?}",
1344 0 : blob.meta.key,
1345 0 : blob.meta.lsn,
1346 0 : blob.start,
1347 0 : blob.end,
1348 0 : utils::Hex(data)
1349 0 : )
1350 32 : })
1351 32 : .unwrap();
1352 32 :
1353 32 : // is it an image or will_init walrecord?
1354 32 : // FIXME: this could be handled by threading the BlobRef to the
1355 32 : // VectoredReadBuilder
1356 32 : let will_init = crate::repository::ValueBytes::will_init(data)
1357 32 : .inspect_err(|_e| {
1358 0 : #[cfg(feature = "testing")]
1359 0 : tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1360 32 : })
1361 32 : .unwrap_or(false);
1362 32 :
1363 32 : per_blob_copy.clear();
1364 32 : per_blob_copy.extend_from_slice(data);
1365 :
1366 32 : let (tmp, res) = writer
1367 32 : .put_value_bytes(
1368 32 : key,
1369 32 : lsn,
1370 32 : std::mem::take(&mut per_blob_copy),
1371 32 : will_init,
1372 32 : ctx,
1373 32 : )
1374 4 : .await;
1375 32 : per_blob_copy = tmp;
1376 32 :
1377 32 : res?;
1378 :
1379 32 : records += 1;
1380 : }
1381 :
1382 16 : buffer = Some(res.buf);
1383 : }
1384 : }
1385 :
1386 10 : assert!(
1387 10 : read_builder.is_none(),
1388 0 : "with the sentinel above loop should had handled all"
1389 : );
1390 :
1391 10 : Ok(records)
1392 10 : }
1393 :
1394 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1395 4 : println!(
1396 4 : "index_start_blk: {}, root {}",
1397 4 : self.index_start_blk, self.index_root_blk
1398 4 : );
1399 4 :
1400 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1401 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1402 4 : self.index_start_blk,
1403 4 : self.index_root_blk,
1404 4 : block_reader,
1405 4 : );
1406 4 :
1407 4 : tree_reader.dump().await?;
1408 :
1409 4 : let keys = self.load_keys(ctx).await?;
1410 :
1411 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1412 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1413 8 : let val = Value::des(&buf)?;
1414 8 : let desc = match val {
1415 8 : Value::Image(img) => {
1416 8 : format!(" img {} bytes", img.len())
1417 : }
1418 0 : Value::WalRecord(rec) => {
1419 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1420 0 : format!(
1421 0 : " rec {} bytes will_init: {} {}",
1422 0 : buf.len(),
1423 0 : rec.will_init(),
1424 0 : wal_desc
1425 0 : )
1426 : }
1427 : };
1428 8 : Ok(desc)
1429 8 : }
1430 :
1431 12 : for entry in keys {
1432 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1433 8 : let desc = match dump_blob(&val, ctx).await {
1434 8 : Ok(desc) => desc,
1435 0 : Err(err) => {
1436 0 : format!("ERROR: {err}")
1437 : }
1438 : };
1439 8 : println!(" key {key} at {lsn}: {desc}");
1440 8 :
1441 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1442 8 : // of many other record types too, but these are particularly interesting, as
1443 8 : // have a lot of special processing for them in walingest.rs.
1444 8 : use pageserver_api::key::CHECKPOINT_KEY;
1445 8 : use postgres_ffi::CheckPoint;
1446 8 : if key == CHECKPOINT_KEY {
1447 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1448 0 : let val = Value::des(&buf)?;
1449 0 : match val {
1450 0 : Value::Image(img) => {
1451 0 : let checkpoint = CheckPoint::decode(&img)?;
1452 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1453 : }
1454 0 : Value::WalRecord(_rec) => {
1455 0 : println!(" unexpected walrecord value for checkpoint key");
1456 0 : }
1457 : }
1458 8 : }
1459 : }
1460 :
1461 4 : Ok(())
1462 4 : }
1463 :
1464 38 : fn stream_index_forwards<'a, R>(
1465 38 : &'a self,
1466 38 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1467 38 : start: &'a [u8; DELTA_KEY_SIZE],
1468 38 : ctx: &'a RequestContext,
1469 38 : ) -> impl futures::stream::Stream<
1470 38 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1471 38 : > + 'a
1472 38 : where
1473 38 : R: BlockReader + 'a,
1474 38 : {
1475 38 : use futures::stream::TryStreamExt;
1476 38 : let stream = reader.into_stream(start, ctx);
1477 176 : stream.map_ok(|(key, value)| {
1478 176 : let key = DeltaKey::from_slice(&key);
1479 176 : let (key, lsn) = (key.key(), key.lsn());
1480 176 : let offset = BlobRef(value);
1481 176 :
1482 176 : (key, lsn, offset)
1483 176 : })
1484 38 : }
1485 :
1486 : /// The file offset to the first block of index.
1487 : ///
1488 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1489 658 : fn index_start_offset(&self) -> u64 {
1490 658 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1491 658 : let bref = BlobRef(offset);
1492 658 : tracing::debug!(
1493 : index_start_blk = self.index_start_blk,
1494 : offset,
1495 0 : pos = bref.pos(),
1496 0 : "index_start_offset"
1497 : );
1498 658 : offset
1499 658 : }
1500 :
1501 : #[cfg(test)]
1502 56 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1503 56 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1504 56 : let tree_reader =
1505 56 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1506 56 : DeltaLayerIterator {
1507 56 : delta_layer: self,
1508 56 : ctx,
1509 56 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1510 56 : key_values_batch: std::collections::VecDeque::new(),
1511 56 : is_end: false,
1512 56 : planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new(
1513 56 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1514 56 : 1024, // The default value. Unit tests might use a different value
1515 56 : ),
1516 56 : }
1517 56 : }
1518 : }
1519 :
1520 : /// A set of data associated with a delta layer key and its value
1521 : pub struct DeltaEntry<'a> {
1522 : pub key: Key,
1523 : pub lsn: Lsn,
1524 : /// Size of the stored value
1525 : pub size: u64,
1526 : /// Reference to the on-disk value
1527 : pub val: ValueRef<'a>,
1528 : }
1529 :
1530 : /// Reference to an on-disk value
1531 : pub struct ValueRef<'a> {
1532 : blob_ref: BlobRef,
1533 : reader: BlockCursor<'a>,
1534 : }
1535 :
1536 : impl<'a> ValueRef<'a> {
1537 : /// Loads the value from disk
1538 2064038 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1539 : // theoretically we *could* record an access time for each, but it does not really matter
1540 2064038 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1541 2064038 : let val = Value::des(&buf)?;
1542 2064038 : Ok(val)
1543 2064038 : }
1544 : }
1545 :
1546 : pub(crate) struct Adapter<T>(T);
1547 :
1548 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1549 2083002 : pub(crate) async fn read_blk(
1550 2083002 : &self,
1551 2083002 : blknum: u32,
1552 2083002 : ctx: &RequestContext,
1553 2083002 : ) -> Result<BlockLease, std::io::Error> {
1554 2083002 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1555 2083002 : block_reader.read_blk(blknum, ctx).await
1556 2083002 : }
1557 : }
1558 :
1559 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1560 4166004 : fn as_ref(&self) -> &DeltaLayerInner {
1561 4166004 : self
1562 4166004 : }
1563 : }
1564 :
1565 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1566 0 : fn key(&self) -> Key {
1567 0 : self.key
1568 0 : }
1569 0 : fn lsn(&self) -> Lsn {
1570 0 : self.lsn
1571 0 : }
1572 0 : fn size(&self) -> u64 {
1573 0 : self.size
1574 0 : }
1575 : }
1576 :
1577 : #[cfg(test)]
1578 : pub struct DeltaLayerIterator<'a> {
1579 : delta_layer: &'a DeltaLayerInner,
1580 : ctx: &'a RequestContext,
1581 : planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner,
1582 : index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
1583 : key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>,
1584 : is_end: bool,
1585 : }
1586 :
1587 : #[cfg(test)]
1588 : impl<'a> DeltaLayerIterator<'a> {
1589 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1590 18942 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1591 18942 : assert!(self.key_values_batch.is_empty());
1592 18942 : assert!(!self.is_end);
1593 :
1594 18942 : let plan = loop {
1595 28326 : if let Some(res) = self.index_iter.next().await {
1596 28298 : let (raw_key, value) = res?;
1597 28298 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1598 28298 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1599 28298 : let blob_ref = BlobRef(value);
1600 28298 : let offset = blob_ref.pos();
1601 28298 : if let Some(batch_plan) = self.planner.handle(key, lsn, offset, BlobFlag::None) {
1602 18914 : break batch_plan;
1603 9384 : }
1604 : } else {
1605 28 : self.is_end = true;
1606 28 : let data_end_offset = self.delta_layer.index_start_offset();
1607 28 : break self.planner.handle_range_end(data_end_offset);
1608 : }
1609 : };
1610 18942 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1611 18942 : let mut next_batch = std::collections::VecDeque::new();
1612 18942 : let buf_size = plan.size();
1613 18942 : let buf = BytesMut::with_capacity(buf_size);
1614 18942 : let blobs_buf = vectored_blob_reader
1615 18942 : .read_blobs(&plan, buf, self.ctx)
1616 9620 : .await?;
1617 18942 : let frozen_buf = blobs_buf.buf.freeze();
1618 28270 : for meta in blobs_buf.blobs.iter() {
1619 28270 : let value = Value::des(&frozen_buf[meta.start..meta.end])?;
1620 28270 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1621 : }
1622 18942 : self.key_values_batch = next_batch;
1623 18942 : Ok(())
1624 18942 : }
1625 :
1626 28028 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1627 28028 : if self.key_values_batch.is_empty() {
1628 18886 : if self.is_end {
1629 28 : return Ok(None);
1630 18858 : }
1631 18858 : self.next_batch().await?;
1632 9142 : }
1633 28000 : Ok(Some(
1634 28000 : self.key_values_batch
1635 28000 : .pop_front()
1636 28000 : .expect("should not be empty"),
1637 28000 : ))
1638 28028 : }
1639 : }
1640 :
1641 : #[cfg(test)]
1642 : mod test {
1643 : use std::collections::BTreeMap;
1644 :
1645 : use itertools::MinMaxResult;
1646 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1647 : use rand::RngCore;
1648 :
1649 : use super::*;
1650 : use crate::tenant::harness::TIMELINE_ID;
1651 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1652 : use crate::tenant::Tenant;
1653 : use crate::{
1654 : context::DownloadBehavior,
1655 : task_mgr::TaskKind,
1656 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1657 : DEFAULT_PG_VERSION,
1658 : };
1659 :
1660 : /// Construct an index for a fictional delta layer and and then
1661 : /// traverse in order to plan vectored reads for a query. Finally,
1662 : /// verify that the traversal fed the right index key and value
1663 : /// pairs into the planner.
1664 : #[tokio::test]
1665 2 : async fn test_delta_layer_index_traversal() {
1666 2 : let base_key = Key {
1667 2 : field1: 0,
1668 2 : field2: 1663,
1669 2 : field3: 12972,
1670 2 : field4: 16396,
1671 2 : field5: 0,
1672 2 : field6: 246080,
1673 2 : };
1674 2 :
1675 2 : // Populate the index with some entries
1676 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1677 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1678 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1679 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1680 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1681 2 : ]);
1682 2 :
1683 2 : let mut disk = TestDisk::default();
1684 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1685 2 :
1686 2 : let mut disk_offset = 0;
1687 10 : for (key, lsns) in &entries {
1688 42 : for lsn in lsns {
1689 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1690 34 : let blob_ref = BlobRef::new(disk_offset, false);
1691 34 : writer
1692 34 : .append(&index_key.0, blob_ref.0)
1693 34 : .expect("In memory disk append should never fail");
1694 34 :
1695 34 : disk_offset += 1;
1696 34 : }
1697 2 : }
1698 2 :
1699 2 : // Prepare all the arguments for the call into `plan_reads` below
1700 2 : let (root_offset, _writer) = writer
1701 2 : .finish()
1702 2 : .expect("In memory disk finish should never fail");
1703 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1704 2 : let planner = VectoredReadPlanner::new(100);
1705 2 : let mut reconstruct_state = ValuesReconstructState::new();
1706 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1707 2 :
1708 2 : let keyspace = KeySpace {
1709 2 : ranges: vec![
1710 2 : base_key..base_key.add(3),
1711 2 : base_key.add(3)..base_key.add(100),
1712 2 : ],
1713 2 : };
1714 2 : let lsn_range = Lsn(2)..Lsn(40);
1715 2 :
1716 2 : // Plan and validate
1717 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1718 2 : &keyspace,
1719 2 : lsn_range.clone(),
1720 2 : disk_offset,
1721 2 : reader,
1722 2 : planner,
1723 2 : &mut reconstruct_state,
1724 2 : &ctx,
1725 2 : )
1726 2 : .await
1727 2 : .expect("Read planning should not fail");
1728 2 :
1729 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1730 2 : }
1731 :
1732 2 : fn validate(
1733 2 : keyspace: KeySpace,
1734 2 : lsn_range: Range<Lsn>,
1735 2 : vectored_reads: Vec<VectoredRead>,
1736 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1737 2 : ) {
1738 2 : #[derive(Debug, PartialEq, Eq)]
1739 2 : struct BlobSpec {
1740 2 : key: Key,
1741 2 : lsn: Lsn,
1742 2 : at: u64,
1743 2 : }
1744 2 :
1745 2 : let mut planned_blobs = Vec::new();
1746 8 : for read in vectored_reads {
1747 28 : for (at, meta) in read.blobs_at.as_slice() {
1748 28 : planned_blobs.push(BlobSpec {
1749 28 : key: meta.key,
1750 28 : lsn: meta.lsn,
1751 28 : at: *at,
1752 28 : });
1753 28 : }
1754 : }
1755 :
1756 2 : let mut expected_blobs = Vec::new();
1757 2 : let mut disk_offset = 0;
1758 10 : for (key, lsns) in index_entries {
1759 42 : for lsn in lsns {
1760 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1761 34 : let lsn_included = lsn_range.contains(&lsn);
1762 34 :
1763 34 : if key_included && lsn_included {
1764 28 : expected_blobs.push(BlobSpec {
1765 28 : key,
1766 28 : lsn,
1767 28 : at: disk_offset,
1768 28 : });
1769 28 : }
1770 :
1771 34 : disk_offset += 1;
1772 : }
1773 : }
1774 :
1775 2 : assert_eq!(planned_blobs, expected_blobs);
1776 2 : }
1777 :
1778 : mod constants {
1779 : use utils::lsn::Lsn;
1780 :
1781 : /// Offset used by all lsns in this test
1782 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1783 : /// Number of unique keys including in the test data
1784 : pub(super) const KEY_COUNT: u8 = 60;
1785 : /// Max number of different lsns for each key
1786 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1787 : /// Possible value sizes for each key along with a probability weight
1788 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1789 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1790 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1791 : /// The minimum size of a key range in all the generated reads
1792 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1793 : /// The number of ranges included in each vectored read
1794 : pub(super) const RANGES_COUNT: u8 = 2;
1795 : /// The number of vectored reads performed
1796 : pub(super) const READS_COUNT: u8 = 100;
1797 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1798 : /// with values larger than the limit
1799 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1800 : }
1801 :
1802 : struct Entry {
1803 : key: Key,
1804 : lsn: Lsn,
1805 : value: Vec<u8>,
1806 : }
1807 :
1808 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1809 2 : let mut current_key = Key::MIN;
1810 2 :
1811 2 : let mut entries = Vec::new();
1812 122 : for _ in 0..constants::KEY_COUNT {
1813 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1814 120 : let mut lsns_iter =
1815 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1816 2260 : Some(Lsn(lsn.0 + 0x08))
1817 2260 : });
1818 120 : let mut lsns = Vec::new();
1819 2380 : while lsns.len() < count as usize {
1820 2260 : let take = rng.gen_bool(0.5);
1821 2260 : let lsn = lsns_iter.next().unwrap();
1822 2260 : if take {
1823 1112 : lsns.push(lsn);
1824 1148 : }
1825 : }
1826 :
1827 1232 : for lsn in lsns {
1828 1112 : let size = constants::VALUE_SIZES
1829 3336 : .choose_weighted(rng, |item| item.1)
1830 1112 : .unwrap()
1831 1112 : .0;
1832 1112 : let mut buf = vec![0; size];
1833 1112 : rng.fill_bytes(&mut buf);
1834 1112 :
1835 1112 : entries.push(Entry {
1836 1112 : key: current_key,
1837 1112 : lsn,
1838 1112 : value: buf,
1839 1112 : })
1840 : }
1841 :
1842 120 : let gap = constants::KEY_GAP_CHANGES
1843 240 : .choose_weighted(rng, |item| item.1)
1844 120 : .unwrap()
1845 120 : .0;
1846 120 : if gap {
1847 38 : current_key = current_key.add(2);
1848 82 : } else {
1849 82 : current_key = current_key.add(1);
1850 82 : }
1851 : }
1852 :
1853 2 : entries
1854 2 : }
1855 :
1856 : struct EntriesMeta {
1857 : key_range: Range<Key>,
1858 : lsn_range: Range<Lsn>,
1859 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1860 : }
1861 :
1862 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1863 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1864 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1865 0 : _ => panic!("More than one entry is always expected"),
1866 : };
1867 :
1868 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1869 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1870 0 : _ => panic!("More than one entry is always expected"),
1871 : };
1872 :
1873 2 : let mut index = BTreeMap::new();
1874 1112 : for entry in entries.iter() {
1875 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1876 1112 : }
1877 :
1878 2 : EntriesMeta {
1879 2 : key_range,
1880 2 : lsn_range,
1881 2 : index,
1882 2 : }
1883 2 : }
1884 :
1885 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1886 200 : let start = key_range.start.to_i128();
1887 200 : let end = key_range.end.to_i128();
1888 200 :
1889 200 : let mut keyspace = KeySpace::default();
1890 :
1891 600 : for _ in 0..constants::RANGES_COUNT {
1892 400 : let mut range: Option<Range<Key>> = Option::default();
1893 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1894 844 : let range_start = rng.gen_range(start..end);
1895 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1896 844 : if range_end_offset >= end {
1897 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1898 744 : } else {
1899 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1900 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1901 744 : }
1902 : }
1903 400 : keyspace.ranges.push(range.unwrap());
1904 : }
1905 :
1906 200 : keyspace
1907 200 : }
1908 :
1909 : #[tokio::test]
1910 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1911 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
1912 8 : let (tenant, ctx) = harness.load().await;
1913 2 :
1914 2 : let timeline_id = TimelineId::generate();
1915 2 : let timeline = tenant
1916 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1917 6 : .await?;
1918 2 :
1919 2 : tracing::info!("Generating test data ...");
1920 2 :
1921 2 : let rng = &mut StdRng::seed_from_u64(0);
1922 2 : let entries = generate_entries(rng);
1923 2 : let entries_meta = get_entries_meta(&entries);
1924 2 :
1925 2 : tracing::info!("Done generating {} entries", entries.len());
1926 2 :
1927 2 : tracing::info!("Writing test data to delta layer ...");
1928 2 : let mut writer = DeltaLayerWriter::new(
1929 2 : harness.conf,
1930 2 : timeline_id,
1931 2 : harness.tenant_shard_id,
1932 2 : entries_meta.key_range.start,
1933 2 : entries_meta.lsn_range.clone(),
1934 2 : &ctx,
1935 2 : )
1936 2 : .await?;
1937 2 :
1938 1114 : for entry in entries {
1939 1112 : let (_, res) = writer
1940 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx)
1941 215 : .await;
1942 1112 : res?;
1943 2 : }
1944 2 :
1945 2 : let resident = writer
1946 2 : .finish(entries_meta.key_range.end, &timeline, &ctx)
1947 5 : .await?;
1948 2 :
1949 2 : let inner = resident.get_as_delta(&ctx).await?;
1950 2 :
1951 2 : let file_size = inner.file.metadata().await?.len();
1952 2 : tracing::info!(
1953 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1954 2 : file_size
1955 2 : );
1956 2 :
1957 202 : for i in 0..constants::READS_COUNT {
1958 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1959 2 :
1960 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1961 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1962 200 : inner.index_start_blk,
1963 200 : inner.index_root_blk,
1964 200 : block_reader,
1965 200 : );
1966 200 :
1967 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1968 200 : let mut reconstruct_state = ValuesReconstructState::new();
1969 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1970 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1971 2 :
1972 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1973 200 : &keyspace,
1974 200 : entries_meta.lsn_range.clone(),
1975 200 : data_end_offset,
1976 200 : index_reader,
1977 200 : planner,
1978 200 : &mut reconstruct_state,
1979 200 : &ctx,
1980 200 : )
1981 4 : .await?;
1982 2 :
1983 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1984 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1985 200 : &vectored_reads,
1986 200 : constants::MAX_VECTORED_READ_BYTES,
1987 200 : );
1988 200 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1989 2 :
1990 19924 : for read in vectored_reads {
1991 19724 : let blobs_buf = vectored_blob_reader
1992 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1993 10016 : .await?;
1994 57304 : for meta in blobs_buf.blobs.iter() {
1995 57304 : let value = &blobs_buf.buf[meta.start..meta.end];
1996 57304 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
1997 2 : }
1998 2 :
1999 19724 : buf = Some(blobs_buf.buf);
2000 2 : }
2001 2 : }
2002 2 :
2003 2 : Ok(())
2004 2 : }
2005 :
2006 : #[tokio::test]
2007 2 : async fn copy_delta_prefix_smoke() {
2008 2 : use crate::walrecord::NeonWalRecord;
2009 2 : use bytes::Bytes;
2010 2 :
2011 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
2012 8 : let (tenant, ctx) = h.load().await;
2013 2 : let ctx = &ctx;
2014 2 : let timeline = tenant
2015 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
2016 6 : .await
2017 2 : .unwrap();
2018 2 :
2019 2 : let initdb_layer = timeline
2020 2 : .layers
2021 2 : .read()
2022 2 : .await
2023 2 : .likely_resident_layers()
2024 2 : .next()
2025 2 : .unwrap();
2026 2 :
2027 2 : {
2028 2 : let mut writer = timeline.writer().await;
2029 2 :
2030 2 : let data = [
2031 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
2032 2 : (
2033 2 : 0x30,
2034 2 : 12,
2035 2 : Value::WalRecord(NeonWalRecord::Postgres {
2036 2 : will_init: false,
2037 2 : rec: Bytes::from_static(b"1"),
2038 2 : }),
2039 2 : ),
2040 2 : (
2041 2 : 0x40,
2042 2 : 12,
2043 2 : Value::WalRecord(NeonWalRecord::Postgres {
2044 2 : will_init: true,
2045 2 : rec: Bytes::from_static(b"2"),
2046 2 : }),
2047 2 : ),
2048 2 : // build an oversized value so we cannot extend and existing read over
2049 2 : // this
2050 2 : (
2051 2 : 0x50,
2052 2 : 12,
2053 2 : Value::WalRecord(NeonWalRecord::Postgres {
2054 2 : will_init: true,
2055 2 : rec: {
2056 2 : let mut buf =
2057 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2058 2 : buf.iter_mut()
2059 2 : .enumerate()
2060 264192 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2061 2 : Bytes::from(buf)
2062 2 : },
2063 2 : }),
2064 2 : ),
2065 2 : // because the oversized read cannot be extended further, we are sure to exercise the
2066 2 : // builder created on the last round with this:
2067 2 : (
2068 2 : 0x60,
2069 2 : 12,
2070 2 : Value::WalRecord(NeonWalRecord::Postgres {
2071 2 : will_init: true,
2072 2 : rec: Bytes::from_static(b"3"),
2073 2 : }),
2074 2 : ),
2075 2 : (
2076 2 : 0x60,
2077 2 : 9,
2078 2 : Value::Image(Bytes::from_static(b"something for a different key")),
2079 2 : ),
2080 2 : ];
2081 2 :
2082 2 : let mut last_lsn = None;
2083 2 :
2084 14 : for (lsn, key, value) in data {
2085 12 : let key = Key::from_i128(key);
2086 12 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2087 12 : last_lsn = Some(lsn);
2088 2 : }
2089 2 :
2090 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
2091 2 : }
2092 2 : timeline.freeze_and_flush().await.unwrap();
2093 2 :
2094 2 : let new_layer = timeline
2095 2 : .layers
2096 2 : .read()
2097 2 : .await
2098 2 : .likely_resident_layers()
2099 4 : .find(|x| x != &initdb_layer)
2100 2 : .unwrap();
2101 2 :
2102 2 : // create a copy for the timeline, so we don't overwrite the file
2103 2 : let branch = tenant
2104 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2105 2 : .await
2106 2 : .unwrap();
2107 2 :
2108 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2109 2 :
2110 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2111 2 : // a single key
2112 2 :
2113 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2114 10 : let truncate_at = Lsn(truncate_at);
2115 2 :
2116 10 : let mut writer = DeltaLayerWriter::new(
2117 10 : tenant.conf,
2118 10 : branch.timeline_id,
2119 10 : tenant.tenant_shard_id,
2120 10 : Key::MIN,
2121 10 : Lsn(0x11)..truncate_at,
2122 10 : ctx,
2123 10 : )
2124 5 : .await
2125 10 : .unwrap();
2126 2 :
2127 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
2128 10 :
2129 10 : new_layer
2130 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2131 15 : .await
2132 10 : .unwrap();
2133 2 :
2134 24 : let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
2135 10 :
2136 11 : copied_layer.get_as_delta(ctx).await.unwrap();
2137 10 :
2138 10 : assert_keys_and_values_eq(
2139 10 : new_layer.get_as_delta(ctx).await.unwrap(),
2140 10 : copied_layer.get_as_delta(ctx).await.unwrap(),
2141 10 : truncate_at,
2142 10 : ctx,
2143 2 : )
2144 53 : .await;
2145 2 : }
2146 2 : }
2147 :
2148 10 : async fn assert_keys_and_values_eq(
2149 10 : source: &DeltaLayerInner,
2150 10 : truncated: &DeltaLayerInner,
2151 10 : truncated_at: Lsn,
2152 10 : ctx: &RequestContext,
2153 10 : ) {
2154 10 : use futures::future::ready;
2155 10 : use futures::stream::TryStreamExt;
2156 10 :
2157 10 : let start_key = [0u8; DELTA_KEY_SIZE];
2158 10 :
2159 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2160 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2161 10 : source.index_start_blk,
2162 10 : source.index_root_blk,
2163 10 : &source_reader,
2164 10 : );
2165 10 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2166 60 : let source_stream = source_stream.filter(|res| match res {
2167 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2168 0 : _ => ready(true),
2169 60 : });
2170 10 : let mut source_stream = std::pin::pin!(source_stream);
2171 10 :
2172 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2173 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2174 10 : truncated.index_start_blk,
2175 10 : truncated.index_root_blk,
2176 10 : &truncated_reader,
2177 10 : );
2178 10 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2179 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2180 10 :
2181 10 : let mut scratch_left = Vec::new();
2182 10 : let mut scratch_right = Vec::new();
2183 :
2184 : loop {
2185 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2186 42 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2187 42 :
2188 42 : if src.is_none() {
2189 10 : assert!(truncated.is_none());
2190 10 : break;
2191 32 : }
2192 32 :
2193 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2194 32 :
2195 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2196 32 : assert_eq!(src.0, truncated.0);
2197 32 : assert_eq!(src.1, truncated.1);
2198 :
2199 : // if this is needed for something else, just drop this assert.
2200 32 : assert!(
2201 32 : src.2.pos() >= truncated.2.pos(),
2202 0 : "value position should not go backwards {} vs. {}",
2203 0 : src.2.pos(),
2204 0 : truncated.2.pos()
2205 : );
2206 :
2207 32 : scratch_left.clear();
2208 32 : let src_cursor = source_reader.block_cursor();
2209 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2210 32 : scratch_right.clear();
2211 32 : let trunc_cursor = truncated_reader.block_cursor();
2212 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2213 :
2214 32 : tokio::try_join!(left, right).unwrap();
2215 32 :
2216 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2217 : }
2218 10 : }
2219 :
2220 2 : async fn produce_delta_layer(
2221 2 : tenant: &Tenant,
2222 2 : tline: &Arc<Timeline>,
2223 2 : mut deltas: Vec<(Key, Lsn, Value)>,
2224 2 : ctx: &RequestContext,
2225 2 : ) -> anyhow::Result<ResidentLayer> {
2226 1998 : deltas.sort_by(|(k1, l1, _), (k2, l2, _)| (k1, l1).cmp(&(k2, l2)));
2227 2 : let (key_start, _, _) = deltas.first().unwrap();
2228 2 : let (key_max, _, _) = deltas.first().unwrap();
2229 2000 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2230 2000 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2231 2 : let lsn_end = Lsn(lsn_max.0 + 1);
2232 2 : let mut writer = DeltaLayerWriter::new(
2233 2 : tenant.conf,
2234 2 : tline.timeline_id,
2235 2 : tenant.tenant_shard_id,
2236 2 : *key_start,
2237 2 : (*lsn_min)..lsn_end,
2238 2 : ctx,
2239 2 : )
2240 1 : .await?;
2241 2 : let key_end = key_max.next();
2242 :
2243 2002 : for (key, lsn, value) in deltas {
2244 2000 : writer.put_value(key, lsn, value, ctx).await?;
2245 : }
2246 7 : let delta_layer = writer.finish(key_end, tline, ctx).await?;
2247 :
2248 2 : Ok::<_, anyhow::Error>(delta_layer)
2249 2 : }
2250 :
2251 28 : async fn assert_delta_iter_equal(
2252 28 : delta_iter: &mut DeltaLayerIterator<'_>,
2253 28 : expect: &[(Key, Lsn, Value)],
2254 28 : ) {
2255 28 : let mut expect_iter = expect.iter();
2256 : loop {
2257 28028 : let o1 = delta_iter.next().await.unwrap();
2258 28028 : let o2 = expect_iter.next();
2259 28028 : assert_eq!(o1.is_some(), o2.is_some());
2260 28028 : if o1.is_none() && o2.is_none() {
2261 28 : break;
2262 28000 : }
2263 28000 : let (k1, l1, v1) = o1.unwrap();
2264 28000 : let (k2, l2, v2) = o2.unwrap();
2265 28000 : assert_eq!(&k1, k2);
2266 28000 : assert_eq!(l1, *l2);
2267 28000 : assert_eq!(&v1, v2);
2268 : }
2269 28 : }
2270 :
2271 : #[tokio::test]
2272 2 : async fn delta_layer_iterator() {
2273 2 : use crate::repository::Value;
2274 2 : use bytes::Bytes;
2275 2 :
2276 2 : let harness = TenantHarness::create("delta_layer_iterator").unwrap();
2277 8 : let (tenant, ctx) = harness.load().await;
2278 2 :
2279 2 : let tline = tenant
2280 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2281 6 : .await
2282 2 : .unwrap();
2283 2 :
2284 2000 : fn get_key(id: u32) -> Key {
2285 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2286 2000 : key.field6 = id;
2287 2000 : key
2288 2000 : }
2289 2 : const N: usize = 1000;
2290 2 : let test_deltas = (0..N)
2291 2000 : .map(|idx| {
2292 2000 : (
2293 2000 : get_key(idx as u32 / 10),
2294 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2295 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2296 2000 : )
2297 2000 : })
2298 2 : .collect_vec();
2299 2 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2300 8 : .await
2301 2 : .unwrap();
2302 2 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2303 6 : for max_read_size in [1, 1024] {
2304 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2305 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2306 28 : // Test if the batch size is correctly determined
2307 28 : let mut iter = delta_layer.iter(&ctx);
2308 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2309 28 : let mut num_items = 0;
2310 112 : for _ in 0..3 {
2311 84 : iter.next_batch().await.unwrap();
2312 84 : num_items += iter.key_values_batch.len();
2313 84 : if max_read_size == 1 {
2314 2 : // every key should be a batch b/c the value is larger than max_read_size
2315 42 : assert_eq!(iter.key_values_batch.len(), 1);
2316 2 : } else {
2317 42 : assert_eq!(iter.key_values_batch.len(), batch_size);
2318 2 : }
2319 84 : if num_items >= N {
2320 2 : break;
2321 84 : }
2322 84 : iter.key_values_batch.clear();
2323 2 : }
2324 2 : // Test if the result is correct
2325 28 : let mut iter = delta_layer.iter(&ctx);
2326 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2327 9577 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2328 2 : }
2329 2 : }
2330 2 : }
2331 : }
|