Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::tenant::blob_io::BlobWriter;
34 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
35 : use crate::tenant::disk_btree::{
36 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
37 : };
38 : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
39 : use crate::tenant::timeline::GetVectoredError;
40 : use crate::tenant::vectored_blob_io::{
41 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
42 : VectoredReadPlanner,
43 : };
44 : use crate::tenant::PageReconstructError;
45 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
46 : use crate::virtual_file::IoBufferMut;
47 : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
48 : use crate::TEMP_FILE_SUFFIX;
49 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
50 : use anyhow::{anyhow, bail, ensure, Context, Result};
51 : use camino::{Utf8Path, Utf8PathBuf};
52 : use futures::StreamExt;
53 : use itertools::Itertools;
54 : use pageserver_api::config::MaxVectoredReadBytes;
55 : use pageserver_api::key::DBDIR_KEY;
56 : use pageserver_api::key::{Key, KEY_SIZE};
57 : use pageserver_api::keyspace::KeySpace;
58 : use pageserver_api::models::ImageCompressionAlgorithm;
59 : use pageserver_api::shard::TenantShardId;
60 : use pageserver_api::value::Value;
61 : use rand::{distributions::Alphanumeric, Rng};
62 : use serde::{Deserialize, Serialize};
63 : use std::collections::VecDeque;
64 : use std::fs::File;
65 : use std::io::SeekFrom;
66 : use std::ops::Range;
67 : use std::os::unix::fs::FileExt;
68 : use std::str::FromStr;
69 : use std::sync::Arc;
70 : use tokio::sync::OnceCell;
71 : use tokio_epoll_uring::IoBuf;
72 : use tracing::*;
73 :
74 : use utils::{
75 : bin_ser::BeSer,
76 : id::{TenantId, TimelineId},
77 : lsn::Lsn,
78 : };
79 :
80 : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
81 :
82 : ///
83 : /// Header stored in the beginning of the file
84 : ///
85 : /// After this comes the 'values' part, starting on block 1. After that,
86 : /// the 'index' starts at the block indicated by 'index_start_blk'
87 : ///
88 1068 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
89 : pub struct Summary {
90 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
91 : pub magic: u16,
92 : pub format_version: u16,
93 :
94 : pub tenant_id: TenantId,
95 : pub timeline_id: TimelineId,
96 : pub key_range: Range<Key>,
97 : pub lsn_range: Range<Lsn>,
98 :
99 : /// Block number where the 'index' part of the file begins.
100 : pub index_start_blk: u32,
101 : /// Block within the 'index', where the B-tree root page is stored
102 : pub index_root_blk: u32,
103 : }
104 :
105 : impl From<&DeltaLayer> for Summary {
106 0 : fn from(layer: &DeltaLayer) -> Self {
107 0 : Self::expected(
108 0 : layer.desc.tenant_shard_id.tenant_id,
109 0 : layer.desc.timeline_id,
110 0 : layer.desc.key_range.clone(),
111 0 : layer.desc.lsn_range.clone(),
112 0 : )
113 0 : }
114 : }
115 :
116 : impl Summary {
117 1068 : pub(super) fn expected(
118 1068 : tenant_id: TenantId,
119 1068 : timeline_id: TimelineId,
120 1068 : keys: Range<Key>,
121 1068 : lsns: Range<Lsn>,
122 1068 : ) -> Self {
123 1068 : Self {
124 1068 : magic: DELTA_FILE_MAGIC,
125 1068 : format_version: STORAGE_FORMAT_VERSION,
126 1068 :
127 1068 : tenant_id,
128 1068 : timeline_id,
129 1068 : key_range: keys,
130 1068 : lsn_range: lsns,
131 1068 :
132 1068 : index_start_blk: 0,
133 1068 : index_root_blk: 0,
134 1068 : }
135 1068 : }
136 : }
137 :
138 : // Flag indicating that this version initialize the page
139 : const WILL_INIT: u64 = 1;
140 :
141 : /// Struct representing reference to BLOB in layers.
142 : ///
143 : /// Reference contains BLOB offset, and for WAL records it also contains
144 : /// `will_init` flag. The flag helps to determine the range of records
145 : /// that needs to be applied, without reading/deserializing records themselves.
146 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
147 : pub struct BlobRef(pub u64);
148 :
149 : impl BlobRef {
150 521338 : pub fn will_init(&self) -> bool {
151 521338 : (self.0 & WILL_INIT) != 0
152 521338 : }
153 :
154 4790261 : pub fn pos(&self) -> u64 {
155 4790261 : self.0 >> 1
156 4790261 : }
157 :
158 6471388 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
159 6471388 : let mut blob_ref = pos << 1;
160 6471388 : if will_init {
161 6469916 : blob_ref |= WILL_INIT;
162 6469916 : }
163 6471388 : BlobRef(blob_ref)
164 6471388 : }
165 : }
166 :
167 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
168 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
169 :
170 : /// This is the key of the B-tree index stored in the delta layer. It consists
171 : /// of the serialized representation of a Key and LSN.
172 : impl DeltaKey {
173 2064198 : fn from_slice(buf: &[u8]) -> Self {
174 2064198 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
175 2064198 : bytes.copy_from_slice(buf);
176 2064198 : DeltaKey(bytes)
177 2064198 : }
178 :
179 6689233 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
180 6689233 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
181 6689233 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
182 6689233 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
183 6689233 : DeltaKey(bytes)
184 6689233 : }
185 :
186 2064198 : fn key(&self) -> Key {
187 2064198 : Key::from_slice(&self.0)
188 2064198 : }
189 :
190 2064198 : fn lsn(&self) -> Lsn {
191 2064198 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
192 2064198 : }
193 :
194 2726003 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
195 2726003 : let mut lsn_buf = [0u8; 8];
196 2726003 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
197 2726003 : Lsn(u64::from_be_bytes(lsn_buf))
198 2726003 : }
199 : }
200 :
201 : /// This is used only from `pagectl`. Within pageserver, all layers are
202 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
203 : pub struct DeltaLayer {
204 : path: Utf8PathBuf,
205 : pub desc: PersistentLayerDesc,
206 : inner: OnceCell<Arc<DeltaLayerInner>>,
207 : }
208 :
209 : impl std::fmt::Debug for DeltaLayer {
210 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 : use super::RangeDisplayDebug;
212 :
213 0 : f.debug_struct("DeltaLayer")
214 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
215 0 : .field("lsn_range", &self.desc.lsn_range)
216 0 : .field("file_size", &self.desc.file_size)
217 0 : .field("inner", &self.inner)
218 0 : .finish()
219 0 : }
220 : }
221 :
222 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
223 : /// file.
224 : pub struct DeltaLayerInner {
225 : // values copied from summary
226 : index_start_blk: u32,
227 : index_root_blk: u32,
228 :
229 : file: VirtualFile,
230 : file_id: FileId,
231 :
232 : layer_key_range: Range<Key>,
233 : layer_lsn_range: Range<Lsn>,
234 :
235 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
236 : }
237 :
238 : impl DeltaLayerInner {
239 0 : pub(crate) fn layer_dbg_info(&self) -> String {
240 0 : format!(
241 0 : "delta {}..{} {}..{}",
242 0 : self.key_range().start,
243 0 : self.key_range().end,
244 0 : self.lsn_range().start,
245 0 : self.lsn_range().end
246 0 : )
247 0 : }
248 : }
249 :
250 : impl std::fmt::Debug for DeltaLayerInner {
251 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 0 : f.debug_struct("DeltaLayerInner")
253 0 : .field("index_start_blk", &self.index_start_blk)
254 0 : .field("index_root_blk", &self.index_root_blk)
255 0 : .finish()
256 0 : }
257 : }
258 :
259 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
260 : impl std::fmt::Display for DeltaLayer {
261 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 0 : write!(f, "{}", self.layer_desc().short_id())
263 0 : }
264 : }
265 :
266 : impl AsLayerDesc for DeltaLayer {
267 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
268 0 : &self.desc
269 0 : }
270 : }
271 :
272 : impl DeltaLayer {
273 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
274 0 : self.desc.dump();
275 0 :
276 0 : if !verbose {
277 0 : return Ok(());
278 0 : }
279 :
280 0 : let inner = self.load(ctx).await?;
281 :
282 0 : inner.dump(ctx).await
283 0 : }
284 :
285 1438 : fn temp_path_for(
286 1438 : conf: &PageServerConf,
287 1438 : tenant_shard_id: &TenantShardId,
288 1438 : timeline_id: &TimelineId,
289 1438 : key_start: Key,
290 1438 : lsn_range: &Range<Lsn>,
291 1438 : ) -> Utf8PathBuf {
292 1438 : let rand_string: String = rand::thread_rng()
293 1438 : .sample_iter(&Alphanumeric)
294 1438 : .take(8)
295 1438 : .map(char::from)
296 1438 : .collect();
297 1438 :
298 1438 : conf.timeline_path(tenant_shard_id, timeline_id)
299 1438 : .join(format!(
300 1438 : "{}-XXX__{:016X}-{:016X}.{}.{}",
301 1438 : key_start,
302 1438 : u64::from(lsn_range.start),
303 1438 : u64::from(lsn_range.end),
304 1438 : rand_string,
305 1438 : TEMP_FILE_SUFFIX,
306 1438 : ))
307 1438 : }
308 :
309 : ///
310 : /// Open the underlying file and read the metadata into memory, if it's
311 : /// not loaded already.
312 : ///
313 0 : async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
314 0 : // Quick exit if already loaded
315 0 : self.inner
316 0 : .get_or_try_init(|| self.load_inner(ctx))
317 0 : .await
318 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
319 0 : }
320 :
321 0 : async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
322 0 : let path = self.path();
323 :
324 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
325 :
326 : // not production code
327 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
328 0 : let expected_layer_name = self.layer_desc().layer_name();
329 0 :
330 0 : if actual_layer_name != expected_layer_name {
331 0 : println!("warning: filename does not match what is expected from in-file summary");
332 0 : println!("actual: {:?}", actual_layer_name.to_string());
333 0 : println!("expected: {:?}", expected_layer_name.to_string());
334 0 : }
335 :
336 0 : Ok(Arc::new(loaded))
337 0 : }
338 :
339 : /// Create a DeltaLayer struct representing an existing file on disk.
340 : ///
341 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
342 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
343 0 : let mut summary_buf = vec![0; PAGE_SZ];
344 0 : file.read_exact_at(&mut summary_buf, 0)?;
345 0 : let summary = Summary::des_prefix(&summary_buf)?;
346 :
347 0 : let metadata = file
348 0 : .metadata()
349 0 : .context("get file metadata to determine size")?;
350 :
351 : // This function is never used for constructing layers in a running pageserver,
352 : // so it does not need an accurate TenantShardId.
353 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
354 0 :
355 0 : Ok(DeltaLayer {
356 0 : path: path.to_path_buf(),
357 0 : desc: PersistentLayerDesc::new_delta(
358 0 : tenant_shard_id,
359 0 : summary.timeline_id,
360 0 : summary.key_range,
361 0 : summary.lsn_range,
362 0 : metadata.len(),
363 0 : ),
364 0 : inner: OnceCell::new(),
365 0 : })
366 0 : }
367 :
368 : /// Path to the layer file in pageserver workdir.
369 0 : fn path(&self) -> Utf8PathBuf {
370 0 : self.path.clone()
371 0 : }
372 : }
373 :
374 : /// A builder object for constructing a new delta layer.
375 : ///
376 : /// Usage:
377 : ///
378 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
379 : ///
380 : /// 2. Write the contents by calling `put_value` for every page
381 : /// version to store in the layer.
382 : ///
383 : /// 3. Call `finish`.
384 : ///
385 : struct DeltaLayerWriterInner {
386 : pub path: Utf8PathBuf,
387 : timeline_id: TimelineId,
388 : tenant_shard_id: TenantShardId,
389 :
390 : key_start: Key,
391 : lsn_range: Range<Lsn>,
392 :
393 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
394 :
395 : blob_writer: BlobWriter<true>,
396 :
397 : // Number of key-lsns in the layer.
398 : num_keys: usize,
399 : }
400 :
401 : impl DeltaLayerWriterInner {
402 : ///
403 : /// Start building a new delta layer.
404 : ///
405 1438 : async fn new(
406 1438 : conf: &'static PageServerConf,
407 1438 : timeline_id: TimelineId,
408 1438 : tenant_shard_id: TenantShardId,
409 1438 : key_start: Key,
410 1438 : lsn_range: Range<Lsn>,
411 1438 : ctx: &RequestContext,
412 1438 : ) -> anyhow::Result<Self> {
413 1438 : // Create the file initially with a temporary filename. We don't know
414 1438 : // the end key yet, so we cannot form the final filename yet. We will
415 1438 : // rename it when we're done.
416 1438 : //
417 1438 : // Note: This overwrites any existing file. There shouldn't be any.
418 1438 : // FIXME: throw an error instead?
419 1438 : let path =
420 1438 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
421 :
422 1438 : let mut file = VirtualFile::create(&path, ctx).await?;
423 : // make room for the header block
424 1438 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
425 1438 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
426 1438 :
427 1438 : // Initialize the b-tree index builder
428 1438 : let block_buf = BlockBuf::new();
429 1438 : let tree_builder = DiskBtreeBuilder::new(block_buf);
430 1438 :
431 1438 : Ok(Self {
432 1438 : path,
433 1438 : timeline_id,
434 1438 : tenant_shard_id,
435 1438 : key_start,
436 1438 : lsn_range,
437 1438 : tree: tree_builder,
438 1438 : blob_writer,
439 1438 : num_keys: 0,
440 1438 : })
441 1438 : }
442 :
443 : ///
444 : /// Append a key-value pair to the file.
445 : ///
446 : /// The values must be appended in key, lsn order.
447 : ///
448 2084624 : async fn put_value(
449 2084624 : &mut self,
450 2084624 : key: Key,
451 2084624 : lsn: Lsn,
452 2084624 : val: Value,
453 2084624 : ctx: &RequestContext,
454 2084624 : ) -> anyhow::Result<()> {
455 2084624 : let (_, res) = self
456 2084624 : .put_value_bytes(
457 2084624 : key,
458 2084624 : lsn,
459 2084624 : Value::ser(&val)?.slice_len(),
460 2084624 : val.will_init(),
461 2084624 : ctx,
462 : )
463 1982 : .await;
464 2084624 : res
465 2084624 : }
466 :
467 6471284 : async fn put_value_bytes<Buf>(
468 6471284 : &mut self,
469 6471284 : key: Key,
470 6471284 : lsn: Lsn,
471 6471284 : val: FullSlice<Buf>,
472 6471284 : will_init: bool,
473 6471284 : ctx: &RequestContext,
474 6471284 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
475 6471284 : where
476 6471284 : Buf: IoBuf + Send,
477 6471284 : {
478 6471284 : assert!(
479 6471284 : self.lsn_range.start <= lsn,
480 0 : "lsn_start={}, lsn={}",
481 : self.lsn_range.start,
482 : lsn
483 : );
484 : // We don't want to use compression in delta layer creation
485 6471284 : let compression = ImageCompressionAlgorithm::Disabled;
486 6471284 : let (val, res) = self
487 6471284 : .blob_writer
488 6471284 : .write_blob_maybe_compressed(val, ctx, compression)
489 4932 : .await;
490 6471284 : let off = match res {
491 6471284 : Ok((off, _)) => off,
492 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
493 : };
494 :
495 6471284 : let blob_ref = BlobRef::new(off, will_init);
496 6471284 :
497 6471284 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
498 6471284 : let res = self.tree.append(&delta_key.0, blob_ref.0);
499 6471284 :
500 6471284 : self.num_keys += 1;
501 6471284 :
502 6471284 : (val, res.map_err(|e| anyhow::anyhow!(e)))
503 6471284 : }
504 :
505 2023972 : fn size(&self) -> u64 {
506 2023972 : self.blob_writer.size() + self.tree.borrow_writer().size()
507 2023972 : }
508 :
509 : ///
510 : /// Finish writing the delta layer.
511 : ///
512 1414 : async fn finish(
513 1414 : self,
514 1414 : key_end: Key,
515 1414 : ctx: &RequestContext,
516 1414 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
517 1414 : let temp_path = self.path.clone();
518 9693 : let result = self.finish0(key_end, ctx).await;
519 1414 : if let Err(ref e) = result {
520 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
521 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
522 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
523 0 : }
524 1414 : }
525 1414 : result
526 1414 : }
527 :
528 1414 : async fn finish0(
529 1414 : self,
530 1414 : key_end: Key,
531 1414 : ctx: &RequestContext,
532 1414 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
533 1414 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
534 :
535 1414 : let mut file = self.blob_writer.into_inner(ctx).await?;
536 :
537 : // Write out the index
538 1414 : let (index_root_blk, block_buf) = self.tree.finish()?;
539 1414 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
540 0 : .await?;
541 15057 : for buf in block_buf.blocks {
542 13643 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
543 13643 : res?;
544 : }
545 1414 : assert!(self.lsn_range.start < self.lsn_range.end);
546 : // Fill in the summary on blk 0
547 1414 : let summary = Summary {
548 1414 : magic: DELTA_FILE_MAGIC,
549 1414 : format_version: STORAGE_FORMAT_VERSION,
550 1414 : tenant_id: self.tenant_shard_id.tenant_id,
551 1414 : timeline_id: self.timeline_id,
552 1414 : key_range: self.key_start..key_end,
553 1414 : lsn_range: self.lsn_range.clone(),
554 1414 : index_start_blk,
555 1414 : index_root_blk,
556 1414 : };
557 1414 :
558 1414 : let mut buf = Vec::with_capacity(PAGE_SZ);
559 1414 : // TODO: could use smallvec here but it's a pain with Slice<T>
560 1414 : Summary::ser_into(&summary, &mut buf)?;
561 1414 : file.seek(SeekFrom::Start(0)).await?;
562 1414 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
563 1414 : res?;
564 :
565 1414 : let metadata = file
566 1414 : .metadata()
567 710 : .await
568 1414 : .context("get file metadata to determine size")?;
569 :
570 : // 5GB limit for objects without multipart upload (which we don't want to use)
571 : // Make it a little bit below to account for differing GB units
572 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
573 1414 : ensure!(
574 1414 : metadata.len() <= S3_UPLOAD_LIMIT,
575 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
576 0 : file.path(),
577 0 : metadata.len()
578 : );
579 :
580 : // Note: Because we opened the file in write-only mode, we cannot
581 : // reuse the same VirtualFile for reading later. That's why we don't
582 : // set inner.file here. The first read will have to re-open it.
583 :
584 1414 : let desc = PersistentLayerDesc::new_delta(
585 1414 : self.tenant_shard_id,
586 1414 : self.timeline_id,
587 1414 : self.key_start..key_end,
588 1414 : self.lsn_range.clone(),
589 1414 : metadata.len(),
590 1414 : );
591 1414 :
592 1414 : // fsync the file
593 1414 : file.sync_all()
594 710 : .await
595 1414 : .maybe_fatal_err("delta_layer sync_all")?;
596 :
597 1414 : trace!("created delta layer {}", self.path);
598 :
599 1414 : Ok((desc, self.path))
600 1414 : }
601 : }
602 :
603 : /// A builder object for constructing a new delta layer.
604 : ///
605 : /// Usage:
606 : ///
607 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
608 : ///
609 : /// 2. Write the contents by calling `put_value` for every page
610 : /// version to store in the layer.
611 : ///
612 : /// 3. Call `finish`.
613 : ///
614 : /// # Note
615 : ///
616 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
617 : /// possible for the writer to drop before `finish` is actually called. So this
618 : /// could lead to odd temporary files in the directory, exhausting file system.
619 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
620 : /// implementation that cleans up the temporary file in failure. It's not
621 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
622 : /// out some fields, making it impossible to implement `Drop`.
623 : ///
624 : #[must_use]
625 : pub struct DeltaLayerWriter {
626 : inner: Option<DeltaLayerWriterInner>,
627 : }
628 :
629 : impl DeltaLayerWriter {
630 : ///
631 : /// Start building a new delta layer.
632 : ///
633 1438 : pub async fn new(
634 1438 : conf: &'static PageServerConf,
635 1438 : timeline_id: TimelineId,
636 1438 : tenant_shard_id: TenantShardId,
637 1438 : key_start: Key,
638 1438 : lsn_range: Range<Lsn>,
639 1438 : ctx: &RequestContext,
640 1438 : ) -> anyhow::Result<Self> {
641 1438 : Ok(Self {
642 1438 : inner: Some(
643 1438 : DeltaLayerWriterInner::new(
644 1438 : conf,
645 1438 : timeline_id,
646 1438 : tenant_shard_id,
647 1438 : key_start,
648 1438 : lsn_range,
649 1438 : ctx,
650 1438 : )
651 732 : .await?,
652 : ),
653 : })
654 1438 : }
655 :
656 0 : pub fn is_empty(&self) -> bool {
657 0 : self.inner.as_ref().unwrap().num_keys == 0
658 0 : }
659 :
660 : ///
661 : /// Append a key-value pair to the file.
662 : ///
663 : /// The values must be appended in key, lsn order.
664 : ///
665 2084624 : pub async fn put_value(
666 2084624 : &mut self,
667 2084624 : key: Key,
668 2084624 : lsn: Lsn,
669 2084624 : val: Value,
670 2084624 : ctx: &RequestContext,
671 2084624 : ) -> anyhow::Result<()> {
672 2084624 : self.inner
673 2084624 : .as_mut()
674 2084624 : .unwrap()
675 2084624 : .put_value(key, lsn, val, ctx)
676 1982 : .await
677 2084624 : }
678 :
679 4386660 : pub async fn put_value_bytes<Buf>(
680 4386660 : &mut self,
681 4386660 : key: Key,
682 4386660 : lsn: Lsn,
683 4386660 : val: FullSlice<Buf>,
684 4386660 : will_init: bool,
685 4386660 : ctx: &RequestContext,
686 4386660 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
687 4386660 : where
688 4386660 : Buf: IoBuf + Send,
689 4386660 : {
690 4386660 : self.inner
691 4386660 : .as_mut()
692 4386660 : .unwrap()
693 4386660 : .put_value_bytes(key, lsn, val, will_init, ctx)
694 2950 : .await
695 4386660 : }
696 :
697 2023972 : pub fn size(&self) -> u64 {
698 2023972 : self.inner.as_ref().unwrap().size()
699 2023972 : }
700 :
701 : ///
702 : /// Finish writing the delta layer.
703 : ///
704 1414 : pub(crate) async fn finish(
705 1414 : mut self,
706 1414 : key_end: Key,
707 1414 : ctx: &RequestContext,
708 1414 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
709 9693 : self.inner.take().unwrap().finish(key_end, ctx).await
710 1414 : }
711 :
712 12178 : pub(crate) fn num_keys(&self) -> usize {
713 12178 : self.inner.as_ref().unwrap().num_keys
714 12178 : }
715 :
716 15088 : pub(crate) fn estimated_size(&self) -> u64 {
717 15088 : let inner = self.inner.as_ref().unwrap();
718 15088 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
719 15088 : }
720 : }
721 :
722 : impl Drop for DeltaLayerWriter {
723 1438 : fn drop(&mut self) {
724 1438 : if let Some(inner) = self.inner.take() {
725 24 : // We want to remove the virtual file here, so it's fine to not
726 24 : // having completely flushed unwritten data.
727 24 : let vfile = inner.blob_writer.into_inner_no_flush();
728 24 : vfile.remove();
729 1414 : }
730 1438 : }
731 : }
732 :
733 0 : #[derive(thiserror::Error, Debug)]
734 : pub enum RewriteSummaryError {
735 : #[error("magic mismatch")]
736 : MagicMismatch,
737 : #[error(transparent)]
738 : Other(#[from] anyhow::Error),
739 : }
740 :
741 : impl From<std::io::Error> for RewriteSummaryError {
742 0 : fn from(e: std::io::Error) -> Self {
743 0 : Self::Other(anyhow::anyhow!(e))
744 0 : }
745 : }
746 :
747 : impl DeltaLayer {
748 0 : pub async fn rewrite_summary<F>(
749 0 : path: &Utf8Path,
750 0 : rewrite: F,
751 0 : ctx: &RequestContext,
752 0 : ) -> Result<(), RewriteSummaryError>
753 0 : where
754 0 : F: Fn(Summary) -> Summary,
755 0 : {
756 0 : let mut file = VirtualFile::open_with_options(
757 0 : path,
758 0 : virtual_file::OpenOptions::new().read(true).write(true),
759 0 : ctx,
760 0 : )
761 0 : .await
762 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
763 0 : let file_id = page_cache::next_file_id();
764 0 : let block_reader = FileBlockReader::new(&file, file_id);
765 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
766 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
767 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
768 0 : return Err(RewriteSummaryError::MagicMismatch);
769 0 : }
770 0 :
771 0 : let new_summary = rewrite(actual_summary);
772 0 :
773 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
774 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
775 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
776 0 : file.seek(SeekFrom::Start(0)).await?;
777 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
778 0 : res?;
779 0 : Ok(())
780 0 : }
781 : }
782 :
783 : impl DeltaLayerInner {
784 988 : pub(crate) fn key_range(&self) -> &Range<Key> {
785 988 : &self.layer_key_range
786 988 : }
787 :
788 988 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
789 988 : &self.layer_lsn_range
790 988 : }
791 :
792 1068 : pub(super) async fn load(
793 1068 : path: &Utf8Path,
794 1068 : summary: Option<Summary>,
795 1068 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
796 1068 : ctx: &RequestContext,
797 1068 : ) -> anyhow::Result<Self> {
798 1068 : let file = VirtualFile::open_v2(path, ctx)
799 534 : .await
800 1068 : .context("open layer file")?;
801 :
802 1068 : let file_id = page_cache::next_file_id();
803 1068 :
804 1068 : let block_reader = FileBlockReader::new(&file, file_id);
805 :
806 1068 : let summary_blk = block_reader
807 1068 : .read_blk(0, ctx)
808 537 : .await
809 1068 : .context("read first block")?;
810 :
811 : // TODO: this should be an assertion instead; see ImageLayerInner::load
812 1068 : let actual_summary =
813 1068 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
814 :
815 1068 : if let Some(mut expected_summary) = summary {
816 : // production code path
817 1068 : expected_summary.index_start_blk = actual_summary.index_start_blk;
818 1068 : expected_summary.index_root_blk = actual_summary.index_root_blk;
819 1068 : // mask out the timeline_id, but still require the layers to be from the same tenant
820 1068 : expected_summary.timeline_id = actual_summary.timeline_id;
821 1068 :
822 1068 : if actual_summary != expected_summary {
823 0 : bail!(
824 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
825 0 : actual_summary,
826 0 : expected_summary
827 0 : );
828 1068 : }
829 0 : }
830 :
831 1068 : Ok(DeltaLayerInner {
832 1068 : file,
833 1068 : file_id,
834 1068 : index_start_blk: actual_summary.index_start_blk,
835 1068 : index_root_blk: actual_summary.index_root_blk,
836 1068 : max_vectored_read_bytes,
837 1068 : layer_key_range: actual_summary.key_range,
838 1068 : layer_lsn_range: actual_summary.lsn_range,
839 1068 : })
840 1068 : }
841 :
842 : // Look up the keys in the provided keyspace and update
843 : // the reconstruct state with whatever is found.
844 : //
845 : // If the key is cached, go no further than the cached Lsn.
846 : //
847 : // Currently, the index is visited for each range, but this
848 : // can be further optimised to visit the index only once.
849 217511 : pub(super) async fn get_values_reconstruct_data(
850 217511 : &self,
851 217511 : keyspace: KeySpace,
852 217511 : lsn_range: Range<Lsn>,
853 217511 : reconstruct_state: &mut ValuesReconstructState,
854 217511 : ctx: &RequestContext,
855 217511 : ) -> Result<(), GetVectoredError> {
856 217511 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
857 217511 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
858 217511 : self.index_start_blk,
859 217511 : self.index_root_blk,
860 217511 : block_reader,
861 217511 : );
862 217511 :
863 217511 : let planner = VectoredReadPlanner::new(
864 217511 : self.max_vectored_read_bytes
865 217511 : .expect("Layer is loaded with max vectored bytes config")
866 217511 : .0
867 217511 : .into(),
868 217511 : );
869 217511 :
870 217511 : let data_end_offset = self.index_start_offset();
871 :
872 217511 : let reads = Self::plan_reads(
873 217511 : &keyspace,
874 217511 : lsn_range.clone(),
875 217511 : data_end_offset,
876 217511 : index_reader,
877 217511 : planner,
878 217511 : reconstruct_state,
879 217511 : ctx,
880 217511 : )
881 19370 : .await
882 217511 : .map_err(GetVectoredError::Other)?;
883 :
884 217511 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
885 63959 : .await;
886 :
887 217511 : reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
888 217511 :
889 217511 : Ok(())
890 217511 : }
891 :
892 217713 : async fn plan_reads<Reader>(
893 217713 : keyspace: &KeySpace,
894 217713 : lsn_range: Range<Lsn>,
895 217713 : data_end_offset: u64,
896 217713 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
897 217713 : mut planner: VectoredReadPlanner,
898 217713 : reconstruct_state: &mut ValuesReconstructState,
899 217713 : ctx: &RequestContext,
900 217713 : ) -> anyhow::Result<Vec<VectoredRead>>
901 217713 : where
902 217713 : Reader: BlockReader + Clone,
903 217713 : {
904 217713 : let ctx = RequestContextBuilder::extend(ctx)
905 217713 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
906 217713 : .build();
907 :
908 217915 : for range in keyspace.ranges.iter() {
909 217915 : let mut range_end_handled = false;
910 217915 :
911 217915 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
912 217915 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
913 217915 : let mut index_stream = std::pin::pin!(index_stream);
914 :
915 635643 : while let Some(index_entry) = index_stream.next().await {
916 626991 : let (raw_key, value) = index_entry?;
917 626991 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
918 626991 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
919 626991 : let blob_ref = BlobRef(value);
920 626991 :
921 626991 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
922 626991 : assert!(key >= range.start);
923 :
924 626991 : let outside_lsn_range = !lsn_range.contains(&lsn);
925 626991 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
926 :
927 626991 : let flag = {
928 626991 : if outside_lsn_range || below_cached_lsn {
929 105653 : BlobFlag::Ignore
930 521338 : } else if blob_ref.will_init() {
931 462996 : BlobFlag::ReplaceAll
932 : } else {
933 : // Usual path: add blob to the read
934 58342 : BlobFlag::None
935 : }
936 : };
937 :
938 626991 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
939 209263 : planner.handle_range_end(blob_ref.pos());
940 209263 : range_end_handled = true;
941 209263 : break;
942 417728 : } else {
943 417728 : planner.handle(key, lsn, blob_ref.pos(), flag);
944 417728 : }
945 : }
946 :
947 217915 : if !range_end_handled {
948 8652 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
949 8652 : planner.handle_range_end(data_end_offset);
950 209263 : }
951 : }
952 :
953 217713 : Ok(planner.finish())
954 217713 : }
955 :
956 217711 : fn get_min_read_buffer_size(
957 217711 : planned_reads: &[VectoredRead],
958 217711 : read_size_soft_max: usize,
959 217711 : ) -> usize {
960 217711 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
961 95356 : return read_size_soft_max;
962 : };
963 :
964 122355 : let largest_read_size = largest_read.size();
965 122355 : if largest_read_size > read_size_soft_max {
966 : // If the read is oversized, it should only contain one key.
967 200 : let offenders = largest_read
968 200 : .blobs_at
969 200 : .as_slice()
970 200 : .iter()
971 200 : .filter_map(|(_, blob_meta)| {
972 200 : if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
973 : // The size of values for these keys is unbounded and can
974 : // grow very large in pathological cases.
975 0 : None
976 : } else {
977 200 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
978 : }
979 200 : })
980 200 : .join(", ");
981 200 :
982 200 : if !offenders.is_empty() {
983 200 : tracing::warn!(
984 0 : "Oversized vectored read ({} > {}) for keys {}",
985 : largest_read_size,
986 : read_size_soft_max,
987 : offenders
988 : );
989 0 : }
990 122155 : }
991 :
992 122355 : largest_read_size
993 217711 : }
994 :
995 217511 : async fn do_reads_and_update_state(
996 217511 : &self,
997 217511 : reads: Vec<VectoredRead>,
998 217511 : reconstruct_state: &mut ValuesReconstructState,
999 217511 : ctx: &RequestContext,
1000 217511 : ) {
1001 217511 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
1002 217511 : let mut ignore_key_with_err = None;
1003 217511 :
1004 217511 : let max_vectored_read_bytes = self
1005 217511 : .max_vectored_read_bytes
1006 217511 : .expect("Layer is loaded with max vectored bytes config")
1007 217511 : .0
1008 217511 : .into();
1009 217511 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1010 217511 : let mut buf = Some(IoBufferMut::with_capacity(buf_size));
1011 :
1012 : // Note that reads are processed in reverse order (from highest key+lsn).
1013 : // This is the order that `ReconstructState` requires such that it can
1014 : // track when a key is done.
1015 217511 : for read in reads.into_iter().rev() {
1016 125842 : let res = vectored_blob_reader
1017 125842 : .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
1018 63959 : .await;
1019 :
1020 125842 : let blobs_buf = match res {
1021 125842 : Ok(blobs_buf) => blobs_buf,
1022 0 : Err(err) => {
1023 0 : let kind = err.kind();
1024 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1025 0 : reconstruct_state.on_key_error(
1026 0 : blob_meta.key,
1027 0 : PageReconstructError::Other(anyhow!(
1028 0 : "Failed to read blobs from virtual file {}: {}",
1029 0 : self.file.path(),
1030 0 : kind
1031 0 : )),
1032 0 : );
1033 0 : }
1034 :
1035 : // We have "lost" the buffer since the lower level IO api
1036 : // doesn't return the buffer on error. Allocate a new one.
1037 0 : buf = Some(IoBufferMut::with_capacity(buf_size));
1038 0 :
1039 0 : continue;
1040 : }
1041 : };
1042 125842 : let view = BufView::new_slice(&blobs_buf.buf);
1043 150259 : for meta in blobs_buf.blobs.iter().rev() {
1044 150259 : if Some(meta.meta.key) == ignore_key_with_err {
1045 0 : continue;
1046 150259 : }
1047 150259 : let blob_read = meta.read(&view).await;
1048 150259 : let blob_read = match blob_read {
1049 150259 : Ok(buf) => buf,
1050 0 : Err(e) => {
1051 0 : reconstruct_state.on_key_error(
1052 0 : meta.meta.key,
1053 0 : PageReconstructError::Other(anyhow!(e).context(format!(
1054 0 : "Failed to decompress blob from virtual file {}",
1055 0 : self.file.path(),
1056 0 : ))),
1057 0 : );
1058 0 :
1059 0 : ignore_key_with_err = Some(meta.meta.key);
1060 0 : continue;
1061 : }
1062 : };
1063 :
1064 150259 : let value = Value::des(&blob_read);
1065 :
1066 150259 : let value = match value {
1067 150259 : Ok(v) => v,
1068 0 : Err(e) => {
1069 0 : reconstruct_state.on_key_error(
1070 0 : meta.meta.key,
1071 0 : PageReconstructError::Other(anyhow!(e).context(format!(
1072 0 : "Failed to deserialize blob from virtual file {}",
1073 0 : self.file.path(),
1074 0 : ))),
1075 0 : );
1076 0 :
1077 0 : ignore_key_with_err = Some(meta.meta.key);
1078 0 : continue;
1079 : }
1080 : };
1081 :
1082 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1083 : // state, no further updates shall be made to it. The call below will
1084 : // panic if the invariant is violated.
1085 150259 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1086 : }
1087 :
1088 125842 : buf = Some(blobs_buf.buf);
1089 : }
1090 217511 : }
1091 :
1092 406 : pub(crate) async fn index_entries<'a>(
1093 406 : &'a self,
1094 406 : ctx: &RequestContext,
1095 406 : ) -> Result<Vec<DeltaEntry<'a>>> {
1096 406 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1097 406 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1098 406 : self.index_start_blk,
1099 406 : self.index_root_blk,
1100 406 : block_reader,
1101 406 : );
1102 406 :
1103 406 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1104 406 :
1105 406 : tree_reader
1106 406 : .visit(
1107 406 : &[0u8; DELTA_KEY_SIZE],
1108 406 : VisitDirection::Forwards,
1109 2064046 : |key, value| {
1110 2064046 : let delta_key = DeltaKey::from_slice(key);
1111 2064046 : let val_ref = ValueRef {
1112 2064046 : blob_ref: BlobRef(value),
1113 2064046 : layer: self,
1114 2064046 : };
1115 2064046 : let pos = BlobRef(value).pos();
1116 2064046 : if let Some(last) = all_keys.last_mut() {
1117 2063640 : // subtract offset of the current and last entries to get the size
1118 2063640 : // of the value associated with this (key, lsn) tuple
1119 2063640 : let first_pos = last.size;
1120 2063640 : last.size = pos - first_pos;
1121 2063640 : }
1122 2064046 : let entry = DeltaEntry {
1123 2064046 : key: delta_key.key(),
1124 2064046 : lsn: delta_key.lsn(),
1125 2064046 : size: pos,
1126 2064046 : val: val_ref,
1127 2064046 : };
1128 2064046 : all_keys.push(entry);
1129 2064046 : true
1130 2064046 : },
1131 406 : &RequestContextBuilder::extend(ctx)
1132 406 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1133 406 : .build(),
1134 406 : )
1135 2124 : .await?;
1136 406 : if let Some(last) = all_keys.last_mut() {
1137 406 : // Last key occupies all space till end of value storage,
1138 406 : // which corresponds to beginning of the index
1139 406 : last.size = self.index_start_offset() - last.size;
1140 406 : }
1141 406 : Ok(all_keys)
1142 406 : }
1143 :
1144 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1145 : ///
1146 : /// Return the amount of key value records pushed to the writer.
1147 10 : pub(super) async fn copy_prefix(
1148 10 : &self,
1149 10 : writer: &mut DeltaLayerWriter,
1150 10 : until: Lsn,
1151 10 : ctx: &RequestContext,
1152 10 : ) -> anyhow::Result<usize> {
1153 : use crate::tenant::vectored_blob_io::{
1154 : BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
1155 : };
1156 : use futures::stream::TryStreamExt;
1157 :
1158 : #[derive(Debug)]
1159 : enum Item {
1160 : Actual(Key, Lsn, BlobRef),
1161 : Sentinel,
1162 : }
1163 :
1164 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1165 70 : fn from(value: Item) -> Self {
1166 70 : match value {
1167 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1168 10 : Item::Sentinel => None,
1169 : }
1170 70 : }
1171 : }
1172 :
1173 : impl Item {
1174 70 : fn offset(&self) -> Option<BlobRef> {
1175 70 : match self {
1176 60 : Item::Actual(_, _, blob) => Some(*blob),
1177 10 : Item::Sentinel => None,
1178 : }
1179 70 : }
1180 :
1181 70 : fn is_last(&self) -> bool {
1182 70 : matches!(self, Item::Sentinel)
1183 70 : }
1184 : }
1185 :
1186 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1187 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1188 10 : self.index_start_blk,
1189 10 : self.index_root_blk,
1190 10 : block_reader,
1191 10 : );
1192 10 :
1193 10 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1194 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1195 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1196 10 : // repeat the whole read part
1197 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1198 10 : Item::Sentinel,
1199 10 : ))));
1200 10 : let mut stream = std::pin::pin!(stream);
1201 10 :
1202 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1203 10 :
1204 10 : let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
1205 10 :
1206 10 : let max_read_size = self
1207 10 : .max_vectored_read_bytes
1208 10 : .map(|x| x.0.get())
1209 10 : .unwrap_or(8192);
1210 10 :
1211 10 : let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
1212 10 :
1213 10 : // FIXME: buffering of DeltaLayerWriter
1214 10 : let mut per_blob_copy = Vec::new();
1215 10 :
1216 10 : let mut records = 0;
1217 :
1218 80 : while let Some(item) = stream.try_next().await? {
1219 70 : tracing::debug!(?item, "popped");
1220 70 : let offset = item
1221 70 : .offset()
1222 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1223 :
1224 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1225 60 : let end_offset = offset;
1226 60 :
1227 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1228 : } else {
1229 10 : None
1230 : };
1231 :
1232 70 : let is_last = item.is_last();
1233 70 :
1234 70 : prev = Option::from(item);
1235 70 :
1236 70 : let actionable = actionable.filter(|x| x.0.lsn < until);
1237 :
1238 70 : let builder = if let Some((meta, offsets)) = actionable {
1239 : // extend or create a new builder
1240 32 : if read_builder
1241 32 : .as_mut()
1242 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1243 32 : .unwrap_or(VectoredReadExtended::No)
1244 32 : == VectoredReadExtended::Yes
1245 : {
1246 16 : None
1247 : } else {
1248 16 : read_builder.replace(ChunkedVectoredReadBuilder::new(
1249 16 : offsets.start.pos(),
1250 16 : offsets.end.pos(),
1251 16 : meta,
1252 16 : max_read_size,
1253 16 : ))
1254 : }
1255 : } else {
1256 : // nothing to do, except perhaps flush any existing for the last element
1257 38 : None
1258 : };
1259 :
1260 : // flush the possible older builder and also the new one if the item was the last one
1261 70 : let builders = builder.into_iter();
1262 70 : let builders = if is_last {
1263 10 : builders.chain(read_builder.take())
1264 : } else {
1265 60 : builders.chain(None)
1266 : };
1267 :
1268 86 : for builder in builders {
1269 16 : let read = builder.build();
1270 16 :
1271 16 : let reader = VectoredBlobReader::new(&self.file);
1272 16 :
1273 16 : let mut buf = buffer.take().unwrap();
1274 16 :
1275 16 : buf.clear();
1276 16 : buf.reserve(read.size());
1277 16 : let res = reader.read_blobs(&read, buf, ctx).await?;
1278 :
1279 16 : let view = BufView::new_slice(&res.buf);
1280 :
1281 48 : for blob in res.blobs {
1282 32 : let key = blob.meta.key;
1283 32 : let lsn = blob.meta.lsn;
1284 :
1285 32 : let data = blob.read(&view).await?;
1286 :
1287 : #[cfg(debug_assertions)]
1288 32 : Value::des(&data)
1289 32 : .with_context(|| {
1290 0 : format!(
1291 0 : "blob failed to deserialize for {}: {:?}",
1292 0 : blob,
1293 0 : utils::Hex(&data)
1294 0 : )
1295 32 : })
1296 32 : .unwrap();
1297 32 :
1298 32 : // is it an image or will_init walrecord?
1299 32 : // FIXME: this could be handled by threading the BlobRef to the
1300 32 : // VectoredReadBuilder
1301 32 : let will_init = pageserver_api::value::ValueBytes::will_init(&data)
1302 32 : .inspect_err(|_e| {
1303 0 : #[cfg(feature = "testing")]
1304 0 : tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1305 32 : })
1306 32 : .unwrap_or(false);
1307 32 :
1308 32 : per_blob_copy.clear();
1309 32 : per_blob_copy.extend_from_slice(&data);
1310 :
1311 32 : let (tmp, res) = writer
1312 32 : .put_value_bytes(
1313 32 : key,
1314 32 : lsn,
1315 32 : std::mem::take(&mut per_blob_copy).slice_len(),
1316 32 : will_init,
1317 32 : ctx,
1318 32 : )
1319 4 : .await;
1320 32 : per_blob_copy = tmp.into_raw_slice().into_inner();
1321 32 :
1322 32 : res?;
1323 :
1324 32 : records += 1;
1325 : }
1326 :
1327 16 : buffer = Some(res.buf);
1328 : }
1329 : }
1330 :
1331 10 : assert!(
1332 10 : read_builder.is_none(),
1333 0 : "with the sentinel above loop should had handled all"
1334 : );
1335 :
1336 10 : Ok(records)
1337 10 : }
1338 :
1339 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1340 4 : println!(
1341 4 : "index_start_blk: {}, root {}",
1342 4 : self.index_start_blk, self.index_root_blk
1343 4 : );
1344 4 :
1345 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1346 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1347 4 : self.index_start_blk,
1348 4 : self.index_root_blk,
1349 4 : block_reader,
1350 4 : );
1351 4 :
1352 4 : tree_reader.dump().await?;
1353 :
1354 4 : let keys = self.index_entries(ctx).await?;
1355 :
1356 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1357 8 : let buf = val.load_raw(ctx).await?;
1358 8 : let val = Value::des(&buf)?;
1359 8 : let desc = match val {
1360 8 : Value::Image(img) => {
1361 8 : format!(" img {} bytes", img.len())
1362 : }
1363 0 : Value::WalRecord(rec) => {
1364 0 : let wal_desc = pageserver_api::record::describe_wal_record(&rec)?;
1365 0 : format!(
1366 0 : " rec {} bytes will_init: {} {}",
1367 0 : buf.len(),
1368 0 : rec.will_init(),
1369 0 : wal_desc
1370 0 : )
1371 : }
1372 : };
1373 8 : Ok(desc)
1374 8 : }
1375 :
1376 12 : for entry in keys {
1377 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1378 8 : let desc = match dump_blob(&val, ctx).await {
1379 8 : Ok(desc) => desc,
1380 0 : Err(err) => {
1381 0 : format!("ERROR: {err}")
1382 : }
1383 : };
1384 8 : println!(" key {key} at {lsn}: {desc}");
1385 :
1386 : // Print more details about CHECKPOINT records. Would be nice to print details
1387 : // of many other record types too, but these are particularly interesting, as
1388 : // have a lot of special processing for them in walingest.rs.
1389 4 : use pageserver_api::key::CHECKPOINT_KEY;
1390 4 : use postgres_ffi::CheckPoint;
1391 8 : if key == CHECKPOINT_KEY {
1392 0 : let val = val.load(ctx).await?;
1393 0 : match val {
1394 0 : Value::Image(img) => {
1395 0 : let checkpoint = CheckPoint::decode(&img)?;
1396 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1397 : }
1398 0 : Value::WalRecord(_rec) => {
1399 0 : println!(" unexpected walrecord value for checkpoint key");
1400 0 : }
1401 : }
1402 8 : }
1403 : }
1404 :
1405 4 : Ok(())
1406 4 : }
1407 :
1408 30 : fn stream_index_forwards<'a, R>(
1409 30 : &'a self,
1410 30 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1411 30 : start: &'a [u8; DELTA_KEY_SIZE],
1412 30 : ctx: &'a RequestContext,
1413 30 : ) -> impl futures::stream::Stream<
1414 30 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1415 30 : > + 'a
1416 30 : where
1417 30 : R: BlockReader + 'a,
1418 30 : {
1419 : use futures::stream::TryStreamExt;
1420 30 : let stream = reader.into_stream(start, ctx);
1421 152 : stream.map_ok(|(key, value)| {
1422 152 : let key = DeltaKey::from_slice(&key);
1423 152 : let (key, lsn) = (key.key(), key.lsn());
1424 152 : let offset = BlobRef(value);
1425 152 :
1426 152 : (key, lsn, offset)
1427 152 : })
1428 30 : }
1429 :
1430 : /// The file offset to the first block of index.
1431 : ///
1432 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1433 218509 : fn index_start_offset(&self) -> u64 {
1434 218509 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1435 218509 : let bref = BlobRef(offset);
1436 218509 : tracing::debug!(
1437 : index_start_blk = self.index_start_blk,
1438 : offset,
1439 0 : pos = bref.pos(),
1440 0 : "index_start_offset"
1441 : );
1442 218509 : offset
1443 218509 : }
1444 :
1445 550 : pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1446 550 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1447 550 : let tree_reader =
1448 550 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1449 550 : DeltaLayerIterator {
1450 550 : delta_layer: self,
1451 550 : ctx,
1452 550 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1453 550 : key_values_batch: std::collections::VecDeque::new(),
1454 550 : is_end: false,
1455 550 : planner: StreamingVectoredReadPlanner::new(
1456 550 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1457 550 : 1024, // The default value. Unit tests might use a different value
1458 550 : ),
1459 550 : }
1460 550 : }
1461 :
1462 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
1463 : //
1464 : // We're reusing the index traversal logical in plan_reads; would be nice to
1465 : // factor that out.
1466 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
1467 0 : self.index_entries(ctx)
1468 0 : .await
1469 0 : .map(|entries| entries.into_iter().map(|entry| entry.key).collect())
1470 0 : }
1471 : }
1472 :
1473 : /// A set of data associated with a delta layer key and its value
1474 : pub struct DeltaEntry<'a> {
1475 : pub key: Key,
1476 : pub lsn: Lsn,
1477 : /// Size of the stored value
1478 : pub size: u64,
1479 : /// Reference to the on-disk value
1480 : pub val: ValueRef<'a>,
1481 : }
1482 :
1483 : /// Reference to an on-disk value
1484 : pub struct ValueRef<'a> {
1485 : blob_ref: BlobRef,
1486 : layer: &'a DeltaLayerInner,
1487 : }
1488 :
1489 : impl<'a> ValueRef<'a> {
1490 : /// Loads the value from disk
1491 0 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1492 0 : let buf = self.load_raw(ctx).await?;
1493 0 : let val = Value::des(&buf)?;
1494 0 : Ok(val)
1495 0 : }
1496 :
1497 8 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1498 8 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1499 8 : self.layer,
1500 8 : )));
1501 8 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1502 8 : Ok(buf)
1503 8 : }
1504 : }
1505 :
1506 : pub(crate) struct Adapter<T>(T);
1507 :
1508 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1509 8 : pub(crate) async fn read_blk(
1510 8 : &self,
1511 8 : blknum: u32,
1512 8 : ctx: &RequestContext,
1513 8 : ) -> Result<BlockLease, std::io::Error> {
1514 8 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1515 8 : block_reader.read_blk(blknum, ctx).await
1516 8 : }
1517 : }
1518 :
1519 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1520 16 : fn as_ref(&self) -> &DeltaLayerInner {
1521 16 : self
1522 16 : }
1523 : }
1524 :
1525 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1526 0 : fn key(&self) -> Key {
1527 0 : self.key
1528 0 : }
1529 0 : fn lsn(&self) -> Lsn {
1530 0 : self.lsn
1531 0 : }
1532 0 : fn size(&self) -> u64 {
1533 0 : self.size
1534 0 : }
1535 : }
1536 :
1537 : pub struct DeltaLayerIterator<'a> {
1538 : delta_layer: &'a DeltaLayerInner,
1539 : ctx: &'a RequestContext,
1540 : planner: StreamingVectoredReadPlanner,
1541 : index_iter: DiskBtreeIterator<'a>,
1542 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1543 : is_end: bool,
1544 : }
1545 :
1546 : impl<'a> DeltaLayerIterator<'a> {
1547 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1548 0 : self.delta_layer.layer_dbg_info()
1549 0 : }
1550 :
1551 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1552 21380 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1553 21380 : assert!(self.key_values_batch.is_empty());
1554 21380 : assert!(!self.is_end);
1555 :
1556 21380 : let plan = loop {
1557 2099534 : if let Some(res) = self.index_iter.next().await {
1558 2099012 : let (raw_key, value) = res?;
1559 2099012 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1560 2099012 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1561 2099012 : let blob_ref = BlobRef(value);
1562 2099012 : let offset = blob_ref.pos();
1563 2099012 : if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
1564 20858 : break batch_plan;
1565 2078154 : }
1566 : } else {
1567 522 : self.is_end = true;
1568 522 : let data_end_offset = self.delta_layer.index_start_offset();
1569 522 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1570 522 : break item;
1571 : } else {
1572 0 : return Ok(()); // TODO: test empty iterator
1573 : }
1574 : }
1575 : };
1576 21380 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1577 21380 : let mut next_batch = std::collections::VecDeque::new();
1578 21380 : let buf_size = plan.size();
1579 21380 : let buf = IoBufferMut::with_capacity(buf_size);
1580 21380 : let blobs_buf = vectored_blob_reader
1581 21380 : .read_blobs(&plan, buf, self.ctx)
1582 10916 : .await?;
1583 21380 : let view = BufView::new_slice(&blobs_buf.buf);
1584 2098984 : for meta in blobs_buf.blobs.iter() {
1585 2098984 : let blob_read = meta.read(&view).await?;
1586 2098984 : let value = Value::des(&blob_read)?;
1587 :
1588 2098984 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1589 : }
1590 21380 : self.key_values_batch = next_batch;
1591 21380 : Ok(())
1592 21380 : }
1593 :
1594 2099722 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1595 2099722 : if self.key_values_batch.is_empty() {
1596 22308 : if self.is_end {
1597 1012 : return Ok(None);
1598 21296 : }
1599 21296 : self.next_batch().await?;
1600 2077414 : }
1601 2098710 : Ok(Some(
1602 2098710 : self.key_values_batch
1603 2098710 : .pop_front()
1604 2098710 : .expect("should not be empty"),
1605 2098710 : ))
1606 2099722 : }
1607 : }
1608 :
1609 : #[cfg(test)]
1610 : pub(crate) mod test {
1611 : use std::collections::BTreeMap;
1612 :
1613 : use itertools::MinMaxResult;
1614 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1615 : use rand::RngCore;
1616 :
1617 : use super::*;
1618 : use crate::tenant::harness::TIMELINE_ID;
1619 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1620 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1621 : use crate::tenant::{Tenant, Timeline};
1622 : use crate::{
1623 : context::DownloadBehavior,
1624 : task_mgr::TaskKind,
1625 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1626 : DEFAULT_PG_VERSION,
1627 : };
1628 : use bytes::Bytes;
1629 : use pageserver_api::value::Value;
1630 :
1631 : /// Construct an index for a fictional delta layer and and then
1632 : /// traverse in order to plan vectored reads for a query. Finally,
1633 : /// verify that the traversal fed the right index key and value
1634 : /// pairs into the planner.
1635 : #[tokio::test]
1636 2 : async fn test_delta_layer_index_traversal() {
1637 2 : let base_key = Key {
1638 2 : field1: 0,
1639 2 : field2: 1663,
1640 2 : field3: 12972,
1641 2 : field4: 16396,
1642 2 : field5: 0,
1643 2 : field6: 246080,
1644 2 : };
1645 2 :
1646 2 : // Populate the index with some entries
1647 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1648 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1649 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1650 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1651 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1652 2 : ]);
1653 2 :
1654 2 : let mut disk = TestDisk::default();
1655 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1656 2 :
1657 2 : let mut disk_offset = 0;
1658 10 : for (key, lsns) in &entries {
1659 42 : for lsn in lsns {
1660 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1661 34 : let blob_ref = BlobRef::new(disk_offset, false);
1662 34 : writer
1663 34 : .append(&index_key.0, blob_ref.0)
1664 34 : .expect("In memory disk append should never fail");
1665 34 :
1666 34 : disk_offset += 1;
1667 34 : }
1668 2 : }
1669 2 :
1670 2 : // Prepare all the arguments for the call into `plan_reads` below
1671 2 : let (root_offset, _writer) = writer
1672 2 : .finish()
1673 2 : .expect("In memory disk finish should never fail");
1674 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1675 2 : let planner = VectoredReadPlanner::new(100);
1676 2 : let mut reconstruct_state = ValuesReconstructState::new();
1677 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1678 2 :
1679 2 : let keyspace = KeySpace {
1680 2 : ranges: vec![
1681 2 : base_key..base_key.add(3),
1682 2 : base_key.add(3)..base_key.add(100),
1683 2 : ],
1684 2 : };
1685 2 : let lsn_range = Lsn(2)..Lsn(40);
1686 2 :
1687 2 : // Plan and validate
1688 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1689 2 : &keyspace,
1690 2 : lsn_range.clone(),
1691 2 : disk_offset,
1692 2 : reader,
1693 2 : planner,
1694 2 : &mut reconstruct_state,
1695 2 : &ctx,
1696 2 : )
1697 2 : .await
1698 2 : .expect("Read planning should not fail");
1699 2 :
1700 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1701 2 : }
1702 :
1703 2 : fn validate(
1704 2 : keyspace: KeySpace,
1705 2 : lsn_range: Range<Lsn>,
1706 2 : vectored_reads: Vec<VectoredRead>,
1707 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1708 2 : ) {
1709 : #[derive(Debug, PartialEq, Eq)]
1710 : struct BlobSpec {
1711 : key: Key,
1712 : lsn: Lsn,
1713 : at: u64,
1714 : }
1715 :
1716 2 : let mut planned_blobs = Vec::new();
1717 30 : for read in vectored_reads {
1718 28 : for (at, meta) in read.blobs_at.as_slice() {
1719 28 : planned_blobs.push(BlobSpec {
1720 28 : key: meta.key,
1721 28 : lsn: meta.lsn,
1722 28 : at: *at,
1723 28 : });
1724 28 : }
1725 : }
1726 :
1727 2 : let mut expected_blobs = Vec::new();
1728 2 : let mut disk_offset = 0;
1729 10 : for (key, lsns) in index_entries {
1730 42 : for lsn in lsns {
1731 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1732 34 : let lsn_included = lsn_range.contains(&lsn);
1733 34 :
1734 34 : if key_included && lsn_included {
1735 28 : expected_blobs.push(BlobSpec {
1736 28 : key,
1737 28 : lsn,
1738 28 : at: disk_offset,
1739 28 : });
1740 28 : }
1741 :
1742 34 : disk_offset += 1;
1743 : }
1744 : }
1745 :
1746 2 : assert_eq!(planned_blobs, expected_blobs);
1747 2 : }
1748 :
1749 : mod constants {
1750 : use utils::lsn::Lsn;
1751 :
1752 : /// Offset used by all lsns in this test
1753 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1754 : /// Number of unique keys including in the test data
1755 : pub(super) const KEY_COUNT: u8 = 60;
1756 : /// Max number of different lsns for each key
1757 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1758 : /// Possible value sizes for each key along with a probability weight
1759 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1760 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1761 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1762 : /// The minimum size of a key range in all the generated reads
1763 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1764 : /// The number of ranges included in each vectored read
1765 : pub(super) const RANGES_COUNT: u8 = 2;
1766 : /// The number of vectored reads performed
1767 : pub(super) const READS_COUNT: u8 = 100;
1768 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1769 : /// with values larger than the limit
1770 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1771 : }
1772 :
1773 : struct Entry {
1774 : key: Key,
1775 : lsn: Lsn,
1776 : value: Vec<u8>,
1777 : }
1778 :
1779 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1780 2 : let mut current_key = Key::MIN;
1781 2 :
1782 2 : let mut entries = Vec::new();
1783 122 : for _ in 0..constants::KEY_COUNT {
1784 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1785 120 : let mut lsns_iter =
1786 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1787 2260 : Some(Lsn(lsn.0 + 0x08))
1788 2260 : });
1789 120 : let mut lsns = Vec::new();
1790 2380 : while lsns.len() < count as usize {
1791 2260 : let take = rng.gen_bool(0.5);
1792 2260 : let lsn = lsns_iter.next().unwrap();
1793 2260 : if take {
1794 1112 : lsns.push(lsn);
1795 1148 : }
1796 : }
1797 :
1798 1232 : for lsn in lsns {
1799 1112 : let size = constants::VALUE_SIZES
1800 3336 : .choose_weighted(rng, |item| item.1)
1801 1112 : .unwrap()
1802 1112 : .0;
1803 1112 : let mut buf = vec![0; size];
1804 1112 : rng.fill_bytes(&mut buf);
1805 1112 :
1806 1112 : entries.push(Entry {
1807 1112 : key: current_key,
1808 1112 : lsn,
1809 1112 : value: buf,
1810 1112 : })
1811 : }
1812 :
1813 120 : let gap = constants::KEY_GAP_CHANGES
1814 240 : .choose_weighted(rng, |item| item.1)
1815 120 : .unwrap()
1816 120 : .0;
1817 120 : if gap {
1818 38 : current_key = current_key.add(2);
1819 82 : } else {
1820 82 : current_key = current_key.add(1);
1821 82 : }
1822 : }
1823 :
1824 2 : entries
1825 2 : }
1826 :
1827 : struct EntriesMeta {
1828 : key_range: Range<Key>,
1829 : lsn_range: Range<Lsn>,
1830 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1831 : }
1832 :
1833 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1834 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1835 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1836 0 : _ => panic!("More than one entry is always expected"),
1837 : };
1838 :
1839 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1840 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1841 0 : _ => panic!("More than one entry is always expected"),
1842 : };
1843 :
1844 2 : let mut index = BTreeMap::new();
1845 1112 : for entry in entries.iter() {
1846 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1847 1112 : }
1848 :
1849 2 : EntriesMeta {
1850 2 : key_range,
1851 2 : lsn_range,
1852 2 : index,
1853 2 : }
1854 2 : }
1855 :
1856 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1857 200 : let start = key_range.start.to_i128();
1858 200 : let end = key_range.end.to_i128();
1859 200 :
1860 200 : let mut keyspace = KeySpace::default();
1861 :
1862 600 : for _ in 0..constants::RANGES_COUNT {
1863 400 : let mut range: Option<Range<Key>> = Option::default();
1864 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1865 844 : let range_start = rng.gen_range(start..end);
1866 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1867 844 : if range_end_offset >= end {
1868 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1869 744 : } else {
1870 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1871 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1872 744 : }
1873 : }
1874 400 : keyspace.ranges.push(range.unwrap());
1875 : }
1876 :
1877 200 : keyspace
1878 200 : }
1879 :
1880 : #[tokio::test]
1881 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1882 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1883 20 : let (tenant, ctx) = harness.load().await;
1884 2 :
1885 2 : let timeline_id = TimelineId::generate();
1886 2 : let timeline = tenant
1887 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1888 6 : .await?;
1889 2 :
1890 2 : tracing::info!("Generating test data ...");
1891 2 :
1892 2 : let rng = &mut StdRng::seed_from_u64(0);
1893 2 : let entries = generate_entries(rng);
1894 2 : let entries_meta = get_entries_meta(&entries);
1895 2 :
1896 2 : tracing::info!("Done generating {} entries", entries.len());
1897 2 :
1898 2 : tracing::info!("Writing test data to delta layer ...");
1899 2 : let mut writer = DeltaLayerWriter::new(
1900 2 : harness.conf,
1901 2 : timeline_id,
1902 2 : harness.tenant_shard_id,
1903 2 : entries_meta.key_range.start,
1904 2 : entries_meta.lsn_range.clone(),
1905 2 : &ctx,
1906 2 : )
1907 2 : .await?;
1908 2 :
1909 1114 : for entry in entries {
1910 1112 : let (_, res) = writer
1911 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
1912 215 : .await;
1913 1112 : res?;
1914 2 : }
1915 2 :
1916 5 : let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
1917 2 : let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
1918 2 :
1919 2 : let inner = resident.get_as_delta(&ctx).await?;
1920 2 :
1921 2 : let file_size = inner.file.metadata().await?.len();
1922 2 : tracing::info!(
1923 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1924 2 : file_size
1925 2 : );
1926 2 :
1927 202 : for i in 0..constants::READS_COUNT {
1928 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1929 2 :
1930 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1931 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1932 200 : inner.index_start_blk,
1933 200 : inner.index_root_blk,
1934 200 : block_reader,
1935 200 : );
1936 200 :
1937 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1938 200 : let mut reconstruct_state = ValuesReconstructState::new();
1939 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1940 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1941 2 :
1942 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1943 200 : &keyspace,
1944 200 : entries_meta.lsn_range.clone(),
1945 200 : data_end_offset,
1946 200 : index_reader,
1947 200 : planner,
1948 200 : &mut reconstruct_state,
1949 200 : &ctx,
1950 200 : )
1951 4 : .await?;
1952 2 :
1953 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1954 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1955 200 : &vectored_reads,
1956 200 : constants::MAX_VECTORED_READ_BYTES,
1957 200 : );
1958 200 : let mut buf = Some(IoBufferMut::with_capacity(buf_size));
1959 2 :
1960 19924 : for read in vectored_reads {
1961 19724 : let blobs_buf = vectored_blob_reader
1962 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1963 10016 : .await?;
1964 19724 : let view = BufView::new_slice(&blobs_buf.buf);
1965 57304 : for meta in blobs_buf.blobs.iter() {
1966 57304 : let value = meta.read(&view).await?;
1967 57304 : assert_eq!(
1968 57304 : &value[..],
1969 57304 : &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
1970 57304 : );
1971 2 : }
1972 2 :
1973 19724 : buf = Some(blobs_buf.buf);
1974 2 : }
1975 2 : }
1976 2 :
1977 2 : Ok(())
1978 2 : }
1979 :
1980 : #[tokio::test]
1981 2 : async fn copy_delta_prefix_smoke() {
1982 2 : use bytes::Bytes;
1983 2 : use pageserver_api::record::NeonWalRecord;
1984 2 :
1985 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
1986 2 : .await
1987 2 : .unwrap();
1988 20 : let (tenant, ctx) = h.load().await;
1989 2 : let ctx = &ctx;
1990 2 : let timeline = tenant
1991 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1992 6 : .await
1993 2 : .unwrap();
1994 2 :
1995 2 : let initdb_layer = timeline
1996 2 : .layers
1997 2 : .read()
1998 2 : .await
1999 2 : .likely_resident_layers()
2000 2 : .next()
2001 2 : .cloned()
2002 2 : .unwrap();
2003 2 :
2004 2 : {
2005 2 : let mut writer = timeline.writer().await;
2006 2 :
2007 2 : let data = [
2008 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
2009 2 : (
2010 2 : 0x30,
2011 2 : 12,
2012 2 : Value::WalRecord(NeonWalRecord::Postgres {
2013 2 : will_init: false,
2014 2 : rec: Bytes::from_static(b"1"),
2015 2 : }),
2016 2 : ),
2017 2 : (
2018 2 : 0x40,
2019 2 : 12,
2020 2 : Value::WalRecord(NeonWalRecord::Postgres {
2021 2 : will_init: true,
2022 2 : rec: Bytes::from_static(b"2"),
2023 2 : }),
2024 2 : ),
2025 2 : // build an oversized value so we cannot extend and existing read over
2026 2 : // this
2027 2 : (
2028 2 : 0x50,
2029 2 : 12,
2030 2 : Value::WalRecord(NeonWalRecord::Postgres {
2031 2 : will_init: true,
2032 2 : rec: {
2033 2 : let mut buf =
2034 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2035 2 : buf.iter_mut()
2036 2 : .enumerate()
2037 268288 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2038 2 : Bytes::from(buf)
2039 2 : },
2040 2 : }),
2041 2 : ),
2042 2 : // because the oversized read cannot be extended further, we are sure to exercise the
2043 2 : // builder created on the last round with this:
2044 2 : (
2045 2 : 0x60,
2046 2 : 12,
2047 2 : Value::WalRecord(NeonWalRecord::Postgres {
2048 2 : will_init: true,
2049 2 : rec: Bytes::from_static(b"3"),
2050 2 : }),
2051 2 : ),
2052 2 : (
2053 2 : 0x60,
2054 2 : 9,
2055 2 : Value::Image(Bytes::from_static(b"something for a different key")),
2056 2 : ),
2057 2 : ];
2058 2 :
2059 2 : let mut last_lsn = None;
2060 2 :
2061 14 : for (lsn, key, value) in data {
2062 12 : let key = Key::from_i128(key);
2063 12 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2064 12 : last_lsn = Some(lsn);
2065 2 : }
2066 2 :
2067 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
2068 2 : }
2069 2 : timeline.freeze_and_flush().await.unwrap();
2070 2 :
2071 2 : let new_layer = timeline
2072 2 : .layers
2073 2 : .read()
2074 2 : .await
2075 2 : .likely_resident_layers()
2076 3 : .find(|&x| x != &initdb_layer)
2077 2 : .cloned()
2078 2 : .unwrap();
2079 2 :
2080 2 : // create a copy for the timeline, so we don't overwrite the file
2081 2 : let branch = tenant
2082 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2083 2 : .await
2084 2 : .unwrap();
2085 2 :
2086 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2087 2 :
2088 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2089 2 : // a single key
2090 2 :
2091 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2092 10 : let truncate_at = Lsn(truncate_at);
2093 2 :
2094 10 : let mut writer = DeltaLayerWriter::new(
2095 10 : tenant.conf,
2096 10 : branch.timeline_id,
2097 10 : tenant.tenant_shard_id,
2098 10 : Key::MIN,
2099 10 : Lsn(0x11)..truncate_at,
2100 10 : ctx,
2101 10 : )
2102 5 : .await
2103 10 : .unwrap();
2104 2 :
2105 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
2106 10 :
2107 10 : new_layer
2108 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2109 15 : .await
2110 10 : .unwrap();
2111 2 :
2112 24 : let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
2113 10 : let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
2114 10 :
2115 10 : copied_layer.get_as_delta(ctx).await.unwrap();
2116 10 :
2117 10 : assert_keys_and_values_eq(
2118 10 : new_layer.get_as_delta(ctx).await.unwrap(),
2119 10 : copied_layer.get_as_delta(ctx).await.unwrap(),
2120 10 : truncate_at,
2121 10 : ctx,
2122 2 : )
2123 51 : .await;
2124 2 : }
2125 2 : }
2126 :
2127 10 : async fn assert_keys_and_values_eq(
2128 10 : source: &DeltaLayerInner,
2129 10 : truncated: &DeltaLayerInner,
2130 10 : truncated_at: Lsn,
2131 10 : ctx: &RequestContext,
2132 10 : ) {
2133 : use futures::future::ready;
2134 : use futures::stream::TryStreamExt;
2135 :
2136 10 : let start_key = [0u8; DELTA_KEY_SIZE];
2137 10 :
2138 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2139 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2140 10 : source.index_start_blk,
2141 10 : source.index_root_blk,
2142 10 : &source_reader,
2143 10 : );
2144 10 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2145 60 : let source_stream = source_stream.filter(|res| match res {
2146 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2147 0 : _ => ready(true),
2148 60 : });
2149 10 : let mut source_stream = std::pin::pin!(source_stream);
2150 10 :
2151 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2152 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2153 10 : truncated.index_start_blk,
2154 10 : truncated.index_root_blk,
2155 10 : &truncated_reader,
2156 10 : );
2157 10 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2158 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2159 10 :
2160 10 : let mut scratch_left = Vec::new();
2161 10 : let mut scratch_right = Vec::new();
2162 :
2163 : loop {
2164 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2165 42 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2166 42 :
2167 42 : if src.is_none() {
2168 10 : assert!(truncated.is_none());
2169 10 : break;
2170 32 : }
2171 32 :
2172 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2173 32 :
2174 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2175 32 : assert_eq!(src.0, truncated.0);
2176 32 : assert_eq!(src.1, truncated.1);
2177 :
2178 : // if this is needed for something else, just drop this assert.
2179 32 : assert!(
2180 32 : src.2.pos() >= truncated.2.pos(),
2181 0 : "value position should not go backwards {} vs. {}",
2182 0 : src.2.pos(),
2183 0 : truncated.2.pos()
2184 : );
2185 :
2186 32 : scratch_left.clear();
2187 32 : let src_cursor = source_reader.block_cursor();
2188 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2189 32 : scratch_right.clear();
2190 32 : let trunc_cursor = truncated_reader.block_cursor();
2191 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2192 32 :
2193 32 : tokio::try_join!(left, right).unwrap();
2194 32 :
2195 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2196 : }
2197 10 : }
2198 :
2199 18224 : pub(crate) fn sort_delta(
2200 18224 : (k1, l1, _): &(Key, Lsn, Value),
2201 18224 : (k2, l2, _): &(Key, Lsn, Value),
2202 18224 : ) -> std::cmp::Ordering {
2203 18224 : (k1, l1).cmp(&(k2, l2))
2204 18224 : }
2205 :
2206 : #[cfg(feature = "testing")]
2207 94 : pub(crate) fn sort_delta_value(
2208 94 : (k1, l1, v1): &(Key, Lsn, Value),
2209 94 : (k2, l2, v2): &(Key, Lsn, Value),
2210 94 : ) -> std::cmp::Ordering {
2211 94 : let order_1 = if v1.is_image() { 0 } else { 1 };
2212 94 : let order_2 = if v2.is_image() { 0 } else { 1 };
2213 94 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2214 94 : }
2215 :
2216 22 : pub(crate) async fn produce_delta_layer(
2217 22 : tenant: &Tenant,
2218 22 : tline: &Arc<Timeline>,
2219 22 : mut deltas: Vec<(Key, Lsn, Value)>,
2220 22 : ctx: &RequestContext,
2221 22 : ) -> anyhow::Result<ResidentLayer> {
2222 22 : deltas.sort_by(sort_delta);
2223 22 : let (key_start, _, _) = deltas.first().unwrap();
2224 22 : let (key_max, _, _) = deltas.last().unwrap();
2225 8240 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2226 8240 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2227 22 : let lsn_end = Lsn(lsn_max.0 + 1);
2228 22 : let mut writer = DeltaLayerWriter::new(
2229 22 : tenant.conf,
2230 22 : tline.timeline_id,
2231 22 : tenant.tenant_shard_id,
2232 22 : *key_start,
2233 22 : (*lsn_min)..lsn_end,
2234 22 : ctx,
2235 22 : )
2236 11 : .await?;
2237 22 : let key_end = key_max.next();
2238 :
2239 8262 : for (key, lsn, value) in deltas {
2240 8240 : writer.put_value(key, lsn, value, ctx).await?;
2241 : }
2242 :
2243 63 : let (desc, path) = writer.finish(key_end, ctx).await?;
2244 22 : let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
2245 :
2246 22 : Ok::<_, anyhow::Error>(delta_layer)
2247 22 : }
2248 :
2249 28 : async fn assert_delta_iter_equal(
2250 28 : delta_iter: &mut DeltaLayerIterator<'_>,
2251 28 : expect: &[(Key, Lsn, Value)],
2252 28 : ) {
2253 28 : let mut expect_iter = expect.iter();
2254 : loop {
2255 28028 : let o1 = delta_iter.next().await.unwrap();
2256 28028 : let o2 = expect_iter.next();
2257 28028 : assert_eq!(o1.is_some(), o2.is_some());
2258 28028 : if o1.is_none() && o2.is_none() {
2259 28 : break;
2260 28000 : }
2261 28000 : let (k1, l1, v1) = o1.unwrap();
2262 28000 : let (k2, l2, v2) = o2.unwrap();
2263 28000 : assert_eq!(&k1, k2);
2264 28000 : assert_eq!(l1, *l2);
2265 28000 : assert_eq!(&v1, v2);
2266 : }
2267 28 : }
2268 :
2269 : #[tokio::test]
2270 2 : async fn delta_layer_iterator() {
2271 2 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2272 20 : let (tenant, ctx) = harness.load().await;
2273 2 :
2274 2 : let tline = tenant
2275 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2276 6 : .await
2277 2 : .unwrap();
2278 2 :
2279 2000 : fn get_key(id: u32) -> Key {
2280 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2281 2000 : key.field6 = id;
2282 2000 : key
2283 2000 : }
2284 2 : const N: usize = 1000;
2285 2 : let test_deltas = (0..N)
2286 2000 : .map(|idx| {
2287 2000 : (
2288 2000 : get_key(idx as u32 / 10),
2289 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2290 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2291 2000 : )
2292 2000 : })
2293 2 : .collect_vec();
2294 2 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2295 8 : .await
2296 2 : .unwrap();
2297 2 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2298 6 : for max_read_size in [1, 1024] {
2299 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2300 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2301 28 : // Test if the batch size is correctly determined
2302 28 : let mut iter = delta_layer.iter(&ctx);
2303 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2304 28 : let mut num_items = 0;
2305 112 : for _ in 0..3 {
2306 84 : iter.next_batch().await.unwrap();
2307 84 : num_items += iter.key_values_batch.len();
2308 84 : if max_read_size == 1 {
2309 2 : // every key should be a batch b/c the value is larger than max_read_size
2310 42 : assert_eq!(iter.key_values_batch.len(), 1);
2311 2 : } else {
2312 42 : assert!(iter.key_values_batch.len() <= batch_size);
2313 2 : }
2314 84 : if num_items >= N {
2315 2 : break;
2316 84 : }
2317 84 : iter.key_values_batch.clear();
2318 2 : }
2319 2 : // Test if the result is correct
2320 28 : let mut iter = delta_layer.iter(&ctx);
2321 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2322 9649 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2323 2 : }
2324 2 : }
2325 2 : }
2326 : }
|