Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{
37 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
38 : };
39 : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
40 : use crate::tenant::timeline::GetVectoredError;
41 : use crate::tenant::vectored_blob_io::{
42 : BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
43 : VectoredReadCoalesceMode, VectoredReadPlanner,
44 : };
45 : use crate::tenant::PageReconstructError;
46 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
47 : use crate::virtual_file::{self, VirtualFile};
48 : use crate::{walrecord, TEMP_FILE_SUFFIX};
49 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
50 : use anyhow::{anyhow, bail, ensure, Context, Result};
51 : use bytes::BytesMut;
52 : use camino::{Utf8Path, Utf8PathBuf};
53 : use futures::StreamExt;
54 : use itertools::Itertools;
55 : use pageserver_api::keyspace::KeySpace;
56 : use pageserver_api::models::ImageCompressionAlgorithm;
57 : use pageserver_api::shard::TenantShardId;
58 : use rand::{distributions::Alphanumeric, Rng};
59 : use serde::{Deserialize, Serialize};
60 : use std::collections::VecDeque;
61 : use std::fs::File;
62 : use std::io::SeekFrom;
63 : use std::ops::Range;
64 : use std::os::unix::fs::FileExt;
65 : use std::str::FromStr;
66 : use std::sync::Arc;
67 : use tokio::sync::OnceCell;
68 : use tokio_epoll_uring::IoBuf;
69 : use tracing::*;
70 :
71 : use utils::{
72 : bin_ser::BeSer,
73 : id::{TenantId, TimelineId},
74 : lsn::Lsn,
75 : };
76 :
77 : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
78 :
79 : ///
80 : /// Header stored in the beginning of the file
81 : ///
82 : /// After this comes the 'values' part, starting on block 1. After that,
83 : /// the 'index' starts at the block indicated by 'index_start_blk'
84 : ///
85 3126 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
86 : pub struct Summary {
87 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
88 : pub magic: u16,
89 : pub format_version: u16,
90 :
91 : pub tenant_id: TenantId,
92 : pub timeline_id: TimelineId,
93 : pub key_range: Range<Key>,
94 : pub lsn_range: Range<Lsn>,
95 :
96 : /// Block number where the 'index' part of the file begins.
97 : pub index_start_blk: u32,
98 : /// Block within the 'index', where the B-tree root page is stored
99 : pub index_root_blk: u32,
100 : }
101 :
102 : impl From<&DeltaLayer> for Summary {
103 0 : fn from(layer: &DeltaLayer) -> Self {
104 0 : Self::expected(
105 0 : layer.desc.tenant_shard_id.tenant_id,
106 0 : layer.desc.timeline_id,
107 0 : layer.desc.key_range.clone(),
108 0 : layer.desc.lsn_range.clone(),
109 0 : )
110 0 : }
111 : }
112 :
113 : impl Summary {
114 3126 : pub(super) fn expected(
115 3126 : tenant_id: TenantId,
116 3126 : timeline_id: TimelineId,
117 3126 : keys: Range<Key>,
118 3126 : lsns: Range<Lsn>,
119 3126 : ) -> Self {
120 3126 : Self {
121 3126 : magic: DELTA_FILE_MAGIC,
122 3126 : format_version: STORAGE_FORMAT_VERSION,
123 3126 :
124 3126 : tenant_id,
125 3126 : timeline_id,
126 3126 : key_range: keys,
127 3126 : lsn_range: lsns,
128 3126 :
129 3126 : index_start_blk: 0,
130 3126 : index_root_blk: 0,
131 3126 : }
132 3126 : }
133 : }
134 :
135 : // Flag indicating that this version initialize the page
136 : const WILL_INIT: u64 = 1;
137 :
138 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
139 : /// offset, and for WAL records it also contains `will_init` flag. The flag
140 : /// helps to determine the range of records that needs to be applied, without
141 : /// reading/deserializing records themselves.
142 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
143 : pub struct BlobRef(pub u64);
144 :
145 : impl BlobRef {
146 1533241 : pub fn will_init(&self) -> bool {
147 1533241 : (self.0 & WILL_INIT) != 0
148 1533241 : }
149 :
150 20432781 : pub fn pos(&self) -> u64 {
151 20432781 : self.0 >> 1
152 20432781 : }
153 :
154 19413306 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
155 19413306 : let mut blob_ref = pos << 1;
156 19413306 : if will_init {
157 19408998 : blob_ref |= WILL_INIT;
158 19408998 : }
159 19413306 : BlobRef(blob_ref)
160 19413306 : }
161 : }
162 :
163 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
164 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
165 :
166 : /// This is the key of the B-tree index stored in the delta layer. It consists
167 : /// of the serialized representation of a Key and LSN.
168 : impl DeltaKey {
169 6192594 : fn from_slice(buf: &[u8]) -> Self {
170 6192594 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
171 6192594 : bytes.copy_from_slice(buf);
172 6192594 : DeltaKey(bytes)
173 6192594 : }
174 :
175 20155091 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
176 20155091 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
177 20155091 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
178 20155091 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
179 20155091 : DeltaKey(bytes)
180 20155091 : }
181 :
182 6192594 : fn key(&self) -> Key {
183 6192594 : Key::from_slice(&self.0)
184 6192594 : }
185 :
186 6192594 : fn lsn(&self) -> Lsn {
187 6192594 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
188 6192594 : }
189 :
190 8047893 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
191 8047893 : let mut lsn_buf = [0u8; 8];
192 8047893 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
193 8047893 : Lsn(u64::from_be_bytes(lsn_buf))
194 8047893 : }
195 : }
196 :
197 : /// This is used only from `pagectl`. Within pageserver, all layers are
198 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
199 : pub struct DeltaLayer {
200 : path: Utf8PathBuf,
201 : pub desc: PersistentLayerDesc,
202 : inner: OnceCell<Arc<DeltaLayerInner>>,
203 : }
204 :
205 : impl std::fmt::Debug for DeltaLayer {
206 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 0 : use super::RangeDisplayDebug;
208 0 :
209 0 : f.debug_struct("DeltaLayer")
210 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
211 0 : .field("lsn_range", &self.desc.lsn_range)
212 0 : .field("file_size", &self.desc.file_size)
213 0 : .field("inner", &self.inner)
214 0 : .finish()
215 0 : }
216 : }
217 :
218 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
219 : /// file.
220 : pub struct DeltaLayerInner {
221 : // values copied from summary
222 : index_start_blk: u32,
223 : index_root_blk: u32,
224 :
225 : file: VirtualFile,
226 : file_id: FileId,
227 :
228 : #[allow(dead_code)]
229 : layer_key_range: Range<Key>,
230 : #[allow(dead_code)]
231 : layer_lsn_range: Range<Lsn>,
232 :
233 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
234 : }
235 :
236 : impl DeltaLayerInner {
237 0 : pub(crate) fn layer_dbg_info(&self) -> String {
238 0 : format!(
239 0 : "delta {}..{} {}..{}",
240 0 : self.key_range().start,
241 0 : self.key_range().end,
242 0 : self.lsn_range().start,
243 0 : self.lsn_range().end
244 0 : )
245 0 : }
246 : }
247 :
248 : impl std::fmt::Debug for DeltaLayerInner {
249 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 0 : f.debug_struct("DeltaLayerInner")
251 0 : .field("index_start_blk", &self.index_start_blk)
252 0 : .field("index_root_blk", &self.index_root_blk)
253 0 : .finish()
254 0 : }
255 : }
256 :
257 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
258 : impl std::fmt::Display for DeltaLayer {
259 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 0 : write!(f, "{}", self.layer_desc().short_id())
261 0 : }
262 : }
263 :
264 : impl AsLayerDesc for DeltaLayer {
265 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
266 0 : &self.desc
267 0 : }
268 : }
269 :
270 : impl DeltaLayer {
271 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
272 0 : self.desc.dump();
273 0 :
274 0 : if !verbose {
275 0 : return Ok(());
276 0 : }
277 :
278 0 : let inner = self.load(ctx).await?;
279 :
280 0 : inner.dump(ctx).await
281 0 : }
282 :
283 4248 : fn temp_path_for(
284 4248 : conf: &PageServerConf,
285 4248 : tenant_shard_id: &TenantShardId,
286 4248 : timeline_id: &TimelineId,
287 4248 : key_start: Key,
288 4248 : lsn_range: &Range<Lsn>,
289 4248 : ) -> Utf8PathBuf {
290 4248 : let rand_string: String = rand::thread_rng()
291 4248 : .sample_iter(&Alphanumeric)
292 4248 : .take(8)
293 4248 : .map(char::from)
294 4248 : .collect();
295 4248 :
296 4248 : conf.timeline_path(tenant_shard_id, timeline_id)
297 4248 : .join(format!(
298 4248 : "{}-XXX__{:016X}-{:016X}.{}.{}",
299 4248 : key_start,
300 4248 : u64::from(lsn_range.start),
301 4248 : u64::from(lsn_range.end),
302 4248 : rand_string,
303 4248 : TEMP_FILE_SUFFIX,
304 4248 : ))
305 4248 : }
306 :
307 : ///
308 : /// Open the underlying file and read the metadata into memory, if it's
309 : /// not loaded already.
310 : ///
311 0 : async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
312 0 : // Quick exit if already loaded
313 0 : self.inner
314 0 : .get_or_try_init(|| self.load_inner(ctx))
315 0 : .await
316 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
317 0 : }
318 :
319 0 : async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
320 0 : let path = self.path();
321 :
322 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
323 :
324 : // not production code
325 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
326 0 : let expected_layer_name = self.layer_desc().layer_name();
327 0 :
328 0 : if actual_layer_name != expected_layer_name {
329 0 : println!("warning: filename does not match what is expected from in-file summary");
330 0 : println!("actual: {:?}", actual_layer_name.to_string());
331 0 : println!("expected: {:?}", expected_layer_name.to_string());
332 0 : }
333 :
334 0 : Ok(Arc::new(loaded))
335 0 : }
336 :
337 : /// Create a DeltaLayer struct representing an existing file on disk.
338 : ///
339 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
340 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
341 0 : let mut summary_buf = vec![0; PAGE_SZ];
342 0 : file.read_exact_at(&mut summary_buf, 0)?;
343 0 : let summary = Summary::des_prefix(&summary_buf)?;
344 :
345 0 : let metadata = file
346 0 : .metadata()
347 0 : .context("get file metadata to determine size")?;
348 :
349 : // This function is never used for constructing layers in a running pageserver,
350 : // so it does not need an accurate TenantShardId.
351 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
352 0 :
353 0 : Ok(DeltaLayer {
354 0 : path: path.to_path_buf(),
355 0 : desc: PersistentLayerDesc::new_delta(
356 0 : tenant_shard_id,
357 0 : summary.timeline_id,
358 0 : summary.key_range,
359 0 : summary.lsn_range,
360 0 : metadata.len(),
361 0 : ),
362 0 : inner: OnceCell::new(),
363 0 : })
364 0 : }
365 :
366 : /// Path to the layer file in pageserver workdir.
367 0 : fn path(&self) -> Utf8PathBuf {
368 0 : self.path.clone()
369 0 : }
370 : }
371 :
372 : /// A builder object for constructing a new delta layer.
373 : ///
374 : /// Usage:
375 : ///
376 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
377 : ///
378 : /// 2. Write the contents by calling `put_value` for every page
379 : /// version to store in the layer.
380 : ///
381 : /// 3. Call `finish`.
382 : ///
383 : struct DeltaLayerWriterInner {
384 : pub path: Utf8PathBuf,
385 : timeline_id: TimelineId,
386 : tenant_shard_id: TenantShardId,
387 :
388 : key_start: Key,
389 : lsn_range: Range<Lsn>,
390 :
391 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
392 :
393 : blob_writer: BlobWriter<true>,
394 :
395 : // Number of key-lsns in the layer.
396 : num_keys: usize,
397 : }
398 :
399 : impl DeltaLayerWriterInner {
400 : ///
401 : /// Start building a new delta layer.
402 : ///
403 4248 : async fn new(
404 4248 : conf: &'static PageServerConf,
405 4248 : timeline_id: TimelineId,
406 4248 : tenant_shard_id: TenantShardId,
407 4248 : key_start: Key,
408 4248 : lsn_range: Range<Lsn>,
409 4248 : ctx: &RequestContext,
410 4248 : ) -> anyhow::Result<Self> {
411 4248 : // Create the file initially with a temporary filename. We don't know
412 4248 : // the end key yet, so we cannot form the final filename yet. We will
413 4248 : // rename it when we're done.
414 4248 : //
415 4248 : // Note: This overwrites any existing file. There shouldn't be any.
416 4248 : // FIXME: throw an error instead?
417 4248 : let path =
418 4248 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
419 :
420 4248 : let mut file = VirtualFile::create(&path, ctx).await?;
421 : // make room for the header block
422 4248 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
423 4248 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
424 4248 :
425 4248 : // Initialize the b-tree index builder
426 4248 : let block_buf = BlockBuf::new();
427 4248 : let tree_builder = DiskBtreeBuilder::new(block_buf);
428 4248 :
429 4248 : Ok(Self {
430 4248 : path,
431 4248 : timeline_id,
432 4248 : tenant_shard_id,
433 4248 : key_start,
434 4248 : lsn_range,
435 4248 : tree: tree_builder,
436 4248 : blob_writer,
437 4248 : num_keys: 0,
438 4248 : })
439 4248 : }
440 :
441 : ///
442 : /// Append a key-value pair to the file.
443 : ///
444 : /// The values must be appended in key, lsn order.
445 : ///
446 6253014 : async fn put_value(
447 6253014 : &mut self,
448 6253014 : key: Key,
449 6253014 : lsn: Lsn,
450 6253014 : val: Value,
451 6253014 : ctx: &RequestContext,
452 6253014 : ) -> anyhow::Result<()> {
453 6253014 : let (_, res) = self
454 6253014 : .put_value_bytes(
455 6253014 : key,
456 6253014 : lsn,
457 6253014 : Value::ser(&val)?.slice_len(),
458 6253014 : val.will_init(),
459 6253014 : ctx,
460 : )
461 5924 : .await;
462 6253014 : res
463 6253014 : }
464 :
465 19412994 : async fn put_value_bytes<Buf>(
466 19412994 : &mut self,
467 19412994 : key: Key,
468 19412994 : lsn: Lsn,
469 19412994 : val: FullSlice<Buf>,
470 19412994 : will_init: bool,
471 19412994 : ctx: &RequestContext,
472 19412994 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
473 19412994 : where
474 19412994 : Buf: IoBuf + Send,
475 19412994 : {
476 19412994 : assert!(
477 19412994 : self.lsn_range.start <= lsn,
478 0 : "lsn_start={}, lsn={}",
479 : self.lsn_range.start,
480 : lsn
481 : );
482 : // We don't want to use compression in delta layer creation
483 19412994 : let compression = ImageCompressionAlgorithm::Disabled;
484 19412994 : let (val, res) = self
485 19412994 : .blob_writer
486 19412994 : .write_blob_maybe_compressed(val, ctx, compression)
487 14767 : .await;
488 19412994 : let off = match res {
489 19412994 : Ok((off, _)) => off,
490 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
491 : };
492 :
493 19412994 : let blob_ref = BlobRef::new(off, will_init);
494 19412994 :
495 19412994 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
496 19412994 : let res = self.tree.append(&delta_key.0, blob_ref.0);
497 19412994 :
498 19412994 : self.num_keys += 1;
499 19412994 :
500 19412994 : (val, res.map_err(|e| anyhow::anyhow!(e)))
501 19412994 : }
502 :
503 6071916 : fn size(&self) -> u64 {
504 6071916 : self.blob_writer.size() + self.tree.borrow_writer().size()
505 6071916 : }
506 :
507 : ///
508 : /// Finish writing the delta layer.
509 : ///
510 4200 : async fn finish(
511 4200 : self,
512 4200 : key_end: Key,
513 4200 : ctx: &RequestContext,
514 4200 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
515 4200 : let temp_path = self.path.clone();
516 29006 : let result = self.finish0(key_end, ctx).await;
517 4200 : if result.is_err() {
518 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
519 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
520 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
521 0 : }
522 4200 : }
523 4200 : result
524 4200 : }
525 :
526 4200 : async fn finish0(
527 4200 : self,
528 4200 : key_end: Key,
529 4200 : ctx: &RequestContext,
530 4200 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
531 4200 : let index_start_blk =
532 4200 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
533 :
534 4200 : let mut file = self.blob_writer.into_inner(ctx).await?;
535 :
536 : // Write out the index
537 4200 : let (index_root_blk, block_buf) = self.tree.finish()?;
538 4200 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
539 0 : .await?;
540 45051 : for buf in block_buf.blocks {
541 40851 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
542 40851 : res?;
543 : }
544 4200 : assert!(self.lsn_range.start < self.lsn_range.end);
545 : // Fill in the summary on blk 0
546 4200 : let summary = Summary {
547 4200 : magic: DELTA_FILE_MAGIC,
548 4200 : format_version: STORAGE_FORMAT_VERSION,
549 4200 : tenant_id: self.tenant_shard_id.tenant_id,
550 4200 : timeline_id: self.timeline_id,
551 4200 : key_range: self.key_start..key_end,
552 4200 : lsn_range: self.lsn_range.clone(),
553 4200 : index_start_blk,
554 4200 : index_root_blk,
555 4200 : };
556 4200 :
557 4200 : let mut buf = Vec::with_capacity(PAGE_SZ);
558 4200 : // TODO: could use smallvec here but it's a pain with Slice<T>
559 4200 : Summary::ser_into(&summary, &mut buf)?;
560 4200 : file.seek(SeekFrom::Start(0)).await?;
561 4200 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
562 4200 : res?;
563 :
564 4200 : let metadata = file
565 4200 : .metadata()
566 2100 : .await
567 4200 : .context("get file metadata to determine size")?;
568 :
569 : // 5GB limit for objects without multipart upload (which we don't want to use)
570 : // Make it a little bit below to account for differing GB units
571 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
572 4200 : ensure!(
573 4200 : metadata.len() <= S3_UPLOAD_LIMIT,
574 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
575 0 : file.path,
576 0 : metadata.len()
577 : );
578 :
579 : // Note: Because we opened the file in write-only mode, we cannot
580 : // reuse the same VirtualFile for reading later. That's why we don't
581 : // set inner.file here. The first read will have to re-open it.
582 :
583 4200 : let desc = PersistentLayerDesc::new_delta(
584 4200 : self.tenant_shard_id,
585 4200 : self.timeline_id,
586 4200 : self.key_start..key_end,
587 4200 : self.lsn_range.clone(),
588 4200 : metadata.len(),
589 4200 : );
590 4200 :
591 4200 : // fsync the file
592 4200 : file.sync_all().await?;
593 :
594 4200 : trace!("created delta layer {}", self.path);
595 :
596 4200 : Ok((desc, self.path))
597 4200 : }
598 : }
599 :
600 : /// A builder object for constructing a new delta layer.
601 : ///
602 : /// Usage:
603 : ///
604 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
605 : ///
606 : /// 2. Write the contents by calling `put_value` for every page
607 : /// version to store in the layer.
608 : ///
609 : /// 3. Call `finish`.
610 : ///
611 : /// # Note
612 : ///
613 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
614 : /// possible for the writer to drop before `finish` is actually called. So this
615 : /// could lead to odd temporary files in the directory, exhausting file system.
616 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
617 : /// implementation that cleans up the temporary file in failure. It's not
618 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
619 : /// out some fields, making it impossible to implement `Drop`.
620 : ///
621 : #[must_use]
622 : pub struct DeltaLayerWriter {
623 : inner: Option<DeltaLayerWriterInner>,
624 : }
625 :
626 : impl DeltaLayerWriter {
627 : ///
628 : /// Start building a new delta layer.
629 : ///
630 4248 : pub async fn new(
631 4248 : conf: &'static PageServerConf,
632 4248 : timeline_id: TimelineId,
633 4248 : tenant_shard_id: TenantShardId,
634 4248 : key_start: Key,
635 4248 : lsn_range: Range<Lsn>,
636 4248 : ctx: &RequestContext,
637 4248 : ) -> anyhow::Result<Self> {
638 4248 : Ok(Self {
639 4248 : inner: Some(
640 4248 : DeltaLayerWriterInner::new(
641 4248 : conf,
642 4248 : timeline_id,
643 4248 : tenant_shard_id,
644 4248 : key_start,
645 4248 : lsn_range,
646 4248 : ctx,
647 4248 : )
648 2159 : .await?,
649 : ),
650 : })
651 4248 : }
652 :
653 : ///
654 : /// Append a key-value pair to the file.
655 : ///
656 : /// The values must be appended in key, lsn order.
657 : ///
658 6253014 : pub async fn put_value(
659 6253014 : &mut self,
660 6253014 : key: Key,
661 6253014 : lsn: Lsn,
662 6253014 : val: Value,
663 6253014 : ctx: &RequestContext,
664 6253014 : ) -> anyhow::Result<()> {
665 6253014 : self.inner
666 6253014 : .as_mut()
667 6253014 : .unwrap()
668 6253014 : .put_value(key, lsn, val, ctx)
669 5924 : .await
670 6253014 : }
671 :
672 13159980 : pub async fn put_value_bytes<Buf>(
673 13159980 : &mut self,
674 13159980 : key: Key,
675 13159980 : lsn: Lsn,
676 13159980 : val: FullSlice<Buf>,
677 13159980 : will_init: bool,
678 13159980 : ctx: &RequestContext,
679 13159980 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
680 13159980 : where
681 13159980 : Buf: IoBuf + Send,
682 13159980 : {
683 13159980 : self.inner
684 13159980 : .as_mut()
685 13159980 : .unwrap()
686 13159980 : .put_value_bytes(key, lsn, val, will_init, ctx)
687 8843 : .await
688 13159980 : }
689 :
690 6071916 : pub fn size(&self) -> u64 {
691 6071916 : self.inner.as_ref().unwrap().size()
692 6071916 : }
693 :
694 : ///
695 : /// Finish writing the delta layer.
696 : ///
697 4200 : pub(crate) async fn finish(
698 4200 : mut self,
699 4200 : key_end: Key,
700 4200 : ctx: &RequestContext,
701 4200 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
702 29006 : self.inner.take().unwrap().finish(key_end, ctx).await
703 4200 : }
704 :
705 36498 : pub(crate) fn num_keys(&self) -> usize {
706 36498 : self.inner.as_ref().unwrap().num_keys
707 36498 : }
708 :
709 45252 : pub(crate) fn estimated_size(&self) -> u64 {
710 45252 : let inner = self.inner.as_ref().unwrap();
711 45252 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
712 45252 : }
713 : }
714 :
715 : impl Drop for DeltaLayerWriter {
716 4248 : fn drop(&mut self) {
717 4248 : if let Some(inner) = self.inner.take() {
718 48 : // We want to remove the virtual file here, so it's fine to not
719 48 : // having completely flushed unwritten data.
720 48 : let vfile = inner.blob_writer.into_inner_no_flush();
721 48 : vfile.remove();
722 4200 : }
723 4248 : }
724 : }
725 :
726 0 : #[derive(thiserror::Error, Debug)]
727 : pub enum RewriteSummaryError {
728 : #[error("magic mismatch")]
729 : MagicMismatch,
730 : #[error(transparent)]
731 : Other(#[from] anyhow::Error),
732 : }
733 :
734 : impl From<std::io::Error> for RewriteSummaryError {
735 0 : fn from(e: std::io::Error) -> Self {
736 0 : Self::Other(anyhow::anyhow!(e))
737 0 : }
738 : }
739 :
740 : impl DeltaLayer {
741 0 : pub async fn rewrite_summary<F>(
742 0 : path: &Utf8Path,
743 0 : rewrite: F,
744 0 : ctx: &RequestContext,
745 0 : ) -> Result<(), RewriteSummaryError>
746 0 : where
747 0 : F: Fn(Summary) -> Summary,
748 0 : {
749 0 : let mut file = VirtualFile::open_with_options(
750 0 : path,
751 0 : virtual_file::OpenOptions::new().read(true).write(true),
752 0 : ctx,
753 0 : )
754 0 : .await
755 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
756 0 : let file_id = page_cache::next_file_id();
757 0 : let block_reader = FileBlockReader::new(&file, file_id);
758 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
759 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
760 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
761 0 : return Err(RewriteSummaryError::MagicMismatch);
762 0 : }
763 0 :
764 0 : let new_summary = rewrite(actual_summary);
765 0 :
766 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
767 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
768 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
769 0 : file.seek(SeekFrom::Start(0)).await?;
770 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
771 0 : res?;
772 0 : Ok(())
773 0 : }
774 : }
775 :
776 : impl DeltaLayerInner {
777 1422 : pub(crate) fn key_range(&self) -> &Range<Key> {
778 1422 : &self.layer_key_range
779 1422 : }
780 :
781 1422 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
782 1422 : &self.layer_lsn_range
783 1422 : }
784 :
785 3126 : pub(super) async fn load(
786 3126 : path: &Utf8Path,
787 3126 : summary: Option<Summary>,
788 3126 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
789 3126 : ctx: &RequestContext,
790 3126 : ) -> anyhow::Result<Self> {
791 3126 : let file = VirtualFile::open(path, ctx)
792 1563 : .await
793 3126 : .context("open layer file")?;
794 :
795 3126 : let file_id = page_cache::next_file_id();
796 3126 :
797 3126 : let block_reader = FileBlockReader::new(&file, file_id);
798 :
799 3126 : let summary_blk = block_reader
800 3126 : .read_blk(0, ctx)
801 1579 : .await
802 3126 : .context("read first block")?;
803 :
804 : // TODO: this should be an assertion instead; see ImageLayerInner::load
805 3126 : let actual_summary =
806 3126 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
807 :
808 3126 : if let Some(mut expected_summary) = summary {
809 : // production code path
810 3126 : expected_summary.index_start_blk = actual_summary.index_start_blk;
811 3126 : expected_summary.index_root_blk = actual_summary.index_root_blk;
812 3126 : // mask out the timeline_id, but still require the layers to be from the same tenant
813 3126 : expected_summary.timeline_id = actual_summary.timeline_id;
814 3126 :
815 3126 : if actual_summary != expected_summary {
816 0 : bail!(
817 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
818 0 : actual_summary,
819 0 : expected_summary
820 0 : );
821 3126 : }
822 0 : }
823 :
824 3126 : Ok(DeltaLayerInner {
825 3126 : file,
826 3126 : file_id,
827 3126 : index_start_blk: actual_summary.index_start_blk,
828 3126 : index_root_blk: actual_summary.index_root_blk,
829 3126 : max_vectored_read_bytes,
830 3126 : layer_key_range: actual_summary.key_range,
831 3126 : layer_lsn_range: actual_summary.lsn_range,
832 3126 : })
833 3126 : }
834 :
835 : // Look up the keys in the provided keyspace and update
836 : // the reconstruct state with whatever is found.
837 : //
838 : // If the key is cached, go no further than the cached Lsn.
839 : //
840 : // Currently, the index is visited for each range, but this
841 : // can be further optimised to visit the index only once.
842 614615 : pub(super) async fn get_values_reconstruct_data(
843 614615 : &self,
844 614615 : keyspace: KeySpace,
845 614615 : lsn_range: Range<Lsn>,
846 614615 : reconstruct_state: &mut ValuesReconstructState,
847 614615 : ctx: &RequestContext,
848 614615 : ) -> Result<(), GetVectoredError> {
849 614615 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
850 614615 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
851 614615 : self.index_start_blk,
852 614615 : self.index_root_blk,
853 614615 : block_reader,
854 614615 : );
855 614615 :
856 614615 : let planner = VectoredReadPlanner::new(
857 614615 : self.max_vectored_read_bytes
858 614615 : .expect("Layer is loaded with max vectored bytes config")
859 614615 : .0
860 614615 : .into(),
861 614615 : );
862 614615 :
863 614615 : let data_end_offset = self.index_start_offset();
864 :
865 614615 : let reads = Self::plan_reads(
866 614615 : &keyspace,
867 614615 : lsn_range.clone(),
868 614615 : data_end_offset,
869 614615 : index_reader,
870 614615 : planner,
871 614615 : reconstruct_state,
872 614615 : ctx,
873 614615 : )
874 61968 : .await
875 614615 : .map_err(GetVectoredError::Other)?;
876 :
877 614615 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
878 205224 : .await;
879 :
880 614615 : reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
881 614615 :
882 614615 : Ok(())
883 614615 : }
884 :
885 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
886 0 : pub(super) async fn load_key_values(
887 0 : &self,
888 0 : ctx: &RequestContext,
889 0 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
890 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
891 0 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
892 0 : self.index_start_blk,
893 0 : self.index_root_blk,
894 0 : block_reader,
895 0 : );
896 0 : let mut result = Vec::new();
897 0 : let mut stream =
898 0 : Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
899 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
900 0 : let cursor = block_reader.block_cursor();
901 0 : let mut buf = Vec::new();
902 0 : while let Some(item) = stream.next().await {
903 0 : let (key, lsn, pos) = item?;
904 : // TODO: dedup code with get_reconstruct_value
905 : // TODO: ctx handling and sharding
906 0 : cursor
907 0 : .read_blob_into_buf(pos.pos(), &mut buf, ctx)
908 0 : .await
909 0 : .with_context(|| {
910 0 : format!("Failed to read blob from virtual file {}", self.file.path)
911 0 : })?;
912 0 : let val = Value::des(&buf).with_context(|| {
913 0 : format!(
914 0 : "Failed to deserialize file blob from virtual file {}",
915 0 : self.file.path
916 0 : )
917 0 : })?;
918 0 : result.push((key, lsn, val));
919 : }
920 0 : Ok(result)
921 0 : }
922 :
923 615221 : async fn plan_reads<Reader>(
924 615221 : keyspace: &KeySpace,
925 615221 : lsn_range: Range<Lsn>,
926 615221 : data_end_offset: u64,
927 615221 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
928 615221 : mut planner: VectoredReadPlanner,
929 615221 : reconstruct_state: &mut ValuesReconstructState,
930 615221 : ctx: &RequestContext,
931 615221 : ) -> anyhow::Result<Vec<VectoredRead>>
932 615221 : where
933 615221 : Reader: BlockReader + Clone,
934 615221 : {
935 615221 : let ctx = RequestContextBuilder::extend(ctx)
936 615221 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
937 615221 : .build();
938 :
939 741995 : for range in keyspace.ranges.iter() {
940 741995 : let mut range_end_handled = false;
941 741995 :
942 741995 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
943 741995 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
944 741995 : let mut index_stream = std::pin::pin!(index_stream);
945 :
946 1779079 : while let Some(index_entry) = index_stream.next().await {
947 1752215 : let (raw_key, value) = index_entry?;
948 1752215 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
949 1752215 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
950 1752215 : let blob_ref = BlobRef(value);
951 1752215 :
952 1752215 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
953 1752215 : assert!(key >= range.start);
954 :
955 1752215 : let outside_lsn_range = !lsn_range.contains(&lsn);
956 1752215 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
957 :
958 1752215 : let flag = {
959 1752215 : if outside_lsn_range || below_cached_lsn {
960 218974 : BlobFlag::Ignore
961 1533241 : } else if blob_ref.will_init() {
962 1358287 : BlobFlag::ReplaceAll
963 : } else {
964 : // Usual path: add blob to the read
965 174954 : BlobFlag::None
966 : }
967 : };
968 :
969 1752215 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
970 715131 : planner.handle_range_end(blob_ref.pos());
971 715131 : range_end_handled = true;
972 715131 : break;
973 1037084 : } else {
974 1037084 : planner.handle(key, lsn, blob_ref.pos(), flag);
975 1037084 : }
976 : }
977 :
978 741995 : if !range_end_handled {
979 26864 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
980 26864 : planner.handle_range_end(data_end_offset);
981 715131 : }
982 : }
983 :
984 615221 : Ok(planner.finish())
985 615221 : }
986 :
987 615215 : fn get_min_read_buffer_size(
988 615215 : planned_reads: &[VectoredRead],
989 615215 : read_size_soft_max: usize,
990 615215 : ) -> usize {
991 615215 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
992 248696 : return read_size_soft_max;
993 : };
994 :
995 366519 : let largest_read_size = largest_read.size();
996 366519 : if largest_read_size > read_size_soft_max {
997 : // If the read is oversized, it should only contain one key.
998 600 : let offenders = largest_read
999 600 : .blobs_at
1000 600 : .as_slice()
1001 600 : .iter()
1002 600 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
1003 600 : .join(", ");
1004 600 : tracing::warn!(
1005 0 : "Oversized vectored read ({} > {}) for keys {}",
1006 : largest_read_size,
1007 : read_size_soft_max,
1008 : offenders
1009 : );
1010 365919 : }
1011 :
1012 366519 : largest_read_size
1013 615215 : }
1014 :
1015 614615 : async fn do_reads_and_update_state(
1016 614615 : &self,
1017 614615 : reads: Vec<VectoredRead>,
1018 614615 : reconstruct_state: &mut ValuesReconstructState,
1019 614615 : ctx: &RequestContext,
1020 614615 : ) {
1021 614615 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
1022 614615 : let mut ignore_key_with_err = None;
1023 614615 :
1024 614615 : let max_vectored_read_bytes = self
1025 614615 : .max_vectored_read_bytes
1026 614615 : .expect("Layer is loaded with max vectored bytes config")
1027 614615 : .0
1028 614615 : .into();
1029 614615 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1030 614615 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1031 :
1032 : // Note that reads are processed in reverse order (from highest key+lsn).
1033 : // This is the order that `ReconstructState` requires such that it can
1034 : // track when a key is done.
1035 614615 : for read in reads.into_iter().rev() {
1036 403662 : let res = vectored_blob_reader
1037 403662 : .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
1038 205224 : .await;
1039 :
1040 403662 : let blobs_buf = match res {
1041 403662 : Ok(blobs_buf) => blobs_buf,
1042 0 : Err(err) => {
1043 0 : let kind = err.kind();
1044 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1045 0 : reconstruct_state.on_key_error(
1046 0 : blob_meta.key,
1047 0 : PageReconstructError::Other(anyhow!(
1048 0 : "Failed to read blobs from virtual file {}: {}",
1049 0 : self.file.path,
1050 0 : kind
1051 0 : )),
1052 0 : );
1053 0 : }
1054 :
1055 : // We have "lost" the buffer since the lower level IO api
1056 : // doesn't return the buffer on error. Allocate a new one.
1057 0 : buf = Some(BytesMut::with_capacity(buf_size));
1058 0 :
1059 0 : continue;
1060 : }
1061 : };
1062 :
1063 449933 : for meta in blobs_buf.blobs.iter().rev() {
1064 449933 : if Some(meta.meta.key) == ignore_key_with_err {
1065 0 : continue;
1066 449933 : }
1067 449933 :
1068 449933 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1069 449933 : let value = match value {
1070 449933 : Ok(v) => v,
1071 0 : Err(e) => {
1072 0 : reconstruct_state.on_key_error(
1073 0 : meta.meta.key,
1074 0 : PageReconstructError::Other(anyhow!(e).context(format!(
1075 0 : "Failed to deserialize blob from virtual file {}",
1076 0 : self.file.path,
1077 0 : ))),
1078 0 : );
1079 0 :
1080 0 : ignore_key_with_err = Some(meta.meta.key);
1081 0 : continue;
1082 : }
1083 : };
1084 :
1085 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1086 : // state, no further updates shall be made to it. The call below will
1087 : // panic if the invariant is violated.
1088 449933 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1089 : }
1090 :
1091 403662 : buf = Some(blobs_buf.buf);
1092 : }
1093 614615 : }
1094 :
1095 1218 : pub(super) async fn load_keys<'a>(
1096 1218 : &'a self,
1097 1218 : ctx: &RequestContext,
1098 1218 : ) -> Result<Vec<DeltaEntry<'a>>> {
1099 1218 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1100 1218 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1101 1218 : self.index_start_blk,
1102 1218 : self.index_root_blk,
1103 1218 : block_reader,
1104 1218 : );
1105 1218 :
1106 1218 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1107 1218 :
1108 1218 : tree_reader
1109 1218 : .visit(
1110 1218 : &[0u8; DELTA_KEY_SIZE],
1111 1218 : VisitDirection::Forwards,
1112 6192138 : |key, value| {
1113 6192138 : let delta_key = DeltaKey::from_slice(key);
1114 6192138 : let val_ref = ValueRef {
1115 6192138 : blob_ref: BlobRef(value),
1116 6192138 : layer: self,
1117 6192138 : };
1118 6192138 : let pos = BlobRef(value).pos();
1119 6192138 : if let Some(last) = all_keys.last_mut() {
1120 6190920 : // subtract offset of the current and last entries to get the size
1121 6190920 : // of the value associated with this (key, lsn) tuple
1122 6190920 : let first_pos = last.size;
1123 6190920 : last.size = pos - first_pos;
1124 6190920 : }
1125 6192138 : let entry = DeltaEntry {
1126 6192138 : key: delta_key.key(),
1127 6192138 : lsn: delta_key.lsn(),
1128 6192138 : size: pos,
1129 6192138 : val: val_ref,
1130 6192138 : };
1131 6192138 : all_keys.push(entry);
1132 6192138 : true
1133 6192138 : },
1134 1218 : &RequestContextBuilder::extend(ctx)
1135 1218 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1136 1218 : .build(),
1137 1218 : )
1138 6357 : .await?;
1139 1218 : if let Some(last) = all_keys.last_mut() {
1140 1218 : // Last key occupies all space till end of value storage,
1141 1218 : // which corresponds to beginning of the index
1142 1218 : last.size = self.index_start_offset() - last.size;
1143 1218 : }
1144 1218 : Ok(all_keys)
1145 1218 : }
1146 :
1147 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1148 : ///
1149 : /// Return the amount of key value records pushed to the writer.
1150 30 : pub(super) async fn copy_prefix(
1151 30 : &self,
1152 30 : writer: &mut DeltaLayerWriter,
1153 30 : until: Lsn,
1154 30 : ctx: &RequestContext,
1155 30 : ) -> anyhow::Result<usize> {
1156 30 : use crate::tenant::vectored_blob_io::{
1157 30 : BlobMeta, VectoredReadBuilder, VectoredReadExtended,
1158 30 : };
1159 30 : use futures::stream::TryStreamExt;
1160 30 :
1161 30 : #[derive(Debug)]
1162 30 : enum Item {
1163 30 : Actual(Key, Lsn, BlobRef),
1164 30 : Sentinel,
1165 30 : }
1166 30 :
1167 30 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1168 210 : fn from(value: Item) -> Self {
1169 210 : match value {
1170 180 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1171 30 : Item::Sentinel => None,
1172 30 : }
1173 210 : }
1174 30 : }
1175 30 :
1176 30 : impl Item {
1177 210 : fn offset(&self) -> Option<BlobRef> {
1178 210 : match self {
1179 180 : Item::Actual(_, _, blob) => Some(*blob),
1180 30 : Item::Sentinel => None,
1181 30 : }
1182 210 : }
1183 30 :
1184 210 : fn is_last(&self) -> bool {
1185 210 : matches!(self, Item::Sentinel)
1186 210 : }
1187 30 : }
1188 30 :
1189 30 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1190 30 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1191 30 : self.index_start_blk,
1192 30 : self.index_root_blk,
1193 30 : block_reader,
1194 30 : );
1195 30 :
1196 30 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1197 180 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1198 30 : // put in a sentinel value for getting the end offset for last item, and not having to
1199 30 : // repeat the whole read part
1200 30 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1201 30 : Item::Sentinel,
1202 30 : ))));
1203 30 : let mut stream = std::pin::pin!(stream);
1204 30 :
1205 30 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1206 30 :
1207 30 : let mut read_builder: Option<VectoredReadBuilder> = None;
1208 30 : let read_mode = VectoredReadCoalesceMode::get();
1209 30 :
1210 30 : let max_read_size = self
1211 30 : .max_vectored_read_bytes
1212 30 : .map(|x| x.0.get())
1213 30 : .unwrap_or(8192);
1214 30 :
1215 30 : let mut buffer = Some(BytesMut::with_capacity(max_read_size));
1216 30 :
1217 30 : // FIXME: buffering of DeltaLayerWriter
1218 30 : let mut per_blob_copy = Vec::new();
1219 30 :
1220 30 : let mut records = 0;
1221 :
1222 240 : while let Some(item) = stream.try_next().await? {
1223 210 : tracing::debug!(?item, "popped");
1224 210 : let offset = item
1225 210 : .offset()
1226 210 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1227 :
1228 210 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1229 180 : let end_offset = offset;
1230 180 :
1231 180 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1232 : } else {
1233 30 : None
1234 : };
1235 :
1236 210 : let is_last = item.is_last();
1237 210 :
1238 210 : prev = Option::from(item);
1239 210 :
1240 210 : let actionable = actionable.filter(|x| x.0.lsn < until);
1241 :
1242 210 : let builder = if let Some((meta, offsets)) = actionable {
1243 : // extend or create a new builder
1244 96 : if read_builder
1245 96 : .as_mut()
1246 96 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1247 96 : .unwrap_or(VectoredReadExtended::No)
1248 96 : == VectoredReadExtended::Yes
1249 : {
1250 48 : None
1251 : } else {
1252 48 : read_builder.replace(VectoredReadBuilder::new(
1253 48 : offsets.start.pos(),
1254 48 : offsets.end.pos(),
1255 48 : meta,
1256 48 : max_read_size,
1257 48 : read_mode,
1258 48 : ))
1259 : }
1260 : } else {
1261 : // nothing to do, except perhaps flush any existing for the last element
1262 114 : None
1263 : };
1264 :
1265 : // flush the possible older builder and also the new one if the item was the last one
1266 210 : let builders = builder.into_iter();
1267 210 : let builders = if is_last {
1268 30 : builders.chain(read_builder.take())
1269 : } else {
1270 180 : builders.chain(None)
1271 : };
1272 :
1273 258 : for builder in builders {
1274 48 : let read = builder.build();
1275 48 :
1276 48 : let reader = VectoredBlobReader::new(&self.file);
1277 48 :
1278 48 : let mut buf = buffer.take().unwrap();
1279 48 :
1280 48 : buf.clear();
1281 48 : buf.reserve(read.size());
1282 48 : let res = reader.read_blobs(&read, buf, ctx).await?;
1283 :
1284 144 : for blob in res.blobs {
1285 96 : let key = blob.meta.key;
1286 96 : let lsn = blob.meta.lsn;
1287 96 : let data = &res.buf[blob.start..blob.end];
1288 96 :
1289 96 : #[cfg(debug_assertions)]
1290 96 : Value::des(data)
1291 96 : .with_context(|| {
1292 0 : format!(
1293 0 : "blob failed to deserialize for {}@{}, {}..{}: {:?}",
1294 0 : blob.meta.key,
1295 0 : blob.meta.lsn,
1296 0 : blob.start,
1297 0 : blob.end,
1298 0 : utils::Hex(data)
1299 0 : )
1300 96 : })
1301 96 : .unwrap();
1302 96 :
1303 96 : // is it an image or will_init walrecord?
1304 96 : // FIXME: this could be handled by threading the BlobRef to the
1305 96 : // VectoredReadBuilder
1306 96 : let will_init = crate::repository::ValueBytes::will_init(data)
1307 96 : .inspect_err(|_e| {
1308 0 : #[cfg(feature = "testing")]
1309 0 : tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1310 96 : })
1311 96 : .unwrap_or(false);
1312 96 :
1313 96 : per_blob_copy.clear();
1314 96 : per_blob_copy.extend_from_slice(data);
1315 :
1316 96 : let (tmp, res) = writer
1317 96 : .put_value_bytes(
1318 96 : key,
1319 96 : lsn,
1320 96 : std::mem::take(&mut per_blob_copy).slice_len(),
1321 96 : will_init,
1322 96 : ctx,
1323 96 : )
1324 12 : .await;
1325 96 : per_blob_copy = tmp.into_raw_slice().into_inner();
1326 96 :
1327 96 : res?;
1328 :
1329 96 : records += 1;
1330 : }
1331 :
1332 48 : buffer = Some(res.buf);
1333 : }
1334 : }
1335 :
1336 30 : assert!(
1337 30 : read_builder.is_none(),
1338 0 : "with the sentinel above loop should had handled all"
1339 : );
1340 :
1341 30 : Ok(records)
1342 30 : }
1343 :
1344 12 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1345 12 : println!(
1346 12 : "index_start_blk: {}, root {}",
1347 12 : self.index_start_blk, self.index_root_blk
1348 12 : );
1349 12 :
1350 12 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1351 12 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1352 12 : self.index_start_blk,
1353 12 : self.index_root_blk,
1354 12 : block_reader,
1355 12 : );
1356 12 :
1357 12 : tree_reader.dump().await?;
1358 :
1359 12 : let keys = self.load_keys(ctx).await?;
1360 :
1361 24 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1362 24 : let buf = val.load_raw(ctx).await?;
1363 24 : let val = Value::des(&buf)?;
1364 24 : let desc = match val {
1365 24 : Value::Image(img) => {
1366 24 : format!(" img {} bytes", img.len())
1367 : }
1368 0 : Value::WalRecord(rec) => {
1369 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1370 0 : format!(
1371 0 : " rec {} bytes will_init: {} {}",
1372 0 : buf.len(),
1373 0 : rec.will_init(),
1374 0 : wal_desc
1375 0 : )
1376 : }
1377 : };
1378 24 : Ok(desc)
1379 24 : }
1380 :
1381 36 : for entry in keys {
1382 24 : let DeltaEntry { key, lsn, val, .. } = entry;
1383 24 : let desc = match dump_blob(&val, ctx).await {
1384 24 : Ok(desc) => desc,
1385 0 : Err(err) => {
1386 0 : format!("ERROR: {err}")
1387 : }
1388 : };
1389 24 : println!(" key {key} at {lsn}: {desc}");
1390 24 :
1391 24 : // Print more details about CHECKPOINT records. Would be nice to print details
1392 24 : // of many other record types too, but these are particularly interesting, as
1393 24 : // have a lot of special processing for them in walingest.rs.
1394 24 : use pageserver_api::key::CHECKPOINT_KEY;
1395 24 : use postgres_ffi::CheckPoint;
1396 24 : if key == CHECKPOINT_KEY {
1397 0 : let val = val.load(ctx).await?;
1398 0 : match val {
1399 0 : Value::Image(img) => {
1400 0 : let checkpoint = CheckPoint::decode(&img)?;
1401 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1402 : }
1403 0 : Value::WalRecord(_rec) => {
1404 0 : println!(" unexpected walrecord value for checkpoint key");
1405 0 : }
1406 : }
1407 24 : }
1408 : }
1409 :
1410 12 : Ok(())
1411 12 : }
1412 :
1413 90 : fn stream_index_forwards<'a, R>(
1414 90 : &'a self,
1415 90 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1416 90 : start: &'a [u8; DELTA_KEY_SIZE],
1417 90 : ctx: &'a RequestContext,
1418 90 : ) -> impl futures::stream::Stream<
1419 90 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1420 90 : > + 'a
1421 90 : where
1422 90 : R: BlockReader + 'a,
1423 90 : {
1424 90 : use futures::stream::TryStreamExt;
1425 90 : let stream = reader.into_stream(start, ctx);
1426 456 : stream.map_ok(|(key, value)| {
1427 456 : let key = DeltaKey::from_slice(&key);
1428 456 : let (key, lsn) = (key.key(), key.lsn());
1429 456 : let offset = BlobRef(value);
1430 456 :
1431 456 : (key, lsn, offset)
1432 456 : })
1433 90 : }
1434 :
1435 : /// The file offset to the first block of index.
1436 : ///
1437 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1438 617549 : fn index_start_offset(&self) -> u64 {
1439 617549 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1440 617549 : let bref = BlobRef(offset);
1441 617549 : tracing::debug!(
1442 : index_start_blk = self.index_start_blk,
1443 : offset,
1444 0 : pos = bref.pos(),
1445 0 : "index_start_offset"
1446 : );
1447 617549 : offset
1448 617549 : }
1449 :
1450 1590 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1451 1590 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1452 1590 : let tree_reader =
1453 1590 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1454 1590 : DeltaLayerIterator {
1455 1590 : delta_layer: self,
1456 1590 : ctx,
1457 1590 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1458 1590 : key_values_batch: std::collections::VecDeque::new(),
1459 1590 : is_end: false,
1460 1590 : planner: StreamingVectoredReadPlanner::new(
1461 1590 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1462 1590 : 1024, // The default value. Unit tests might use a different value
1463 1590 : ),
1464 1590 : }
1465 1590 : }
1466 : }
1467 :
1468 : /// A set of data associated with a delta layer key and its value
1469 : pub struct DeltaEntry<'a> {
1470 : pub key: Key,
1471 : pub lsn: Lsn,
1472 : /// Size of the stored value
1473 : pub size: u64,
1474 : /// Reference to the on-disk value
1475 : pub val: ValueRef<'a>,
1476 : }
1477 :
1478 : /// Reference to an on-disk value
1479 : pub struct ValueRef<'a> {
1480 : blob_ref: BlobRef,
1481 : layer: &'a DeltaLayerInner,
1482 : }
1483 :
1484 : impl<'a> ValueRef<'a> {
1485 : /// Loads the value from disk
1486 6192114 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1487 6192114 : let buf = self.load_raw(ctx).await?;
1488 6192114 : let val = Value::des(&buf)?;
1489 6192114 : Ok(val)
1490 6192114 : }
1491 :
1492 6192138 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1493 6192138 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1494 6192138 : self.layer,
1495 6192138 : )));
1496 6192138 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1497 6192138 : Ok(buf)
1498 6192138 : }
1499 : }
1500 :
1501 : pub(crate) struct Adapter<T>(T);
1502 :
1503 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1504 6249006 : pub(crate) async fn read_blk(
1505 6249006 : &self,
1506 6249006 : blknum: u32,
1507 6249006 : ctx: &RequestContext,
1508 6249006 : ) -> Result<BlockLease, std::io::Error> {
1509 6249006 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1510 6249006 : block_reader.read_blk(blknum, ctx).await
1511 6249006 : }
1512 : }
1513 :
1514 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1515 12498012 : fn as_ref(&self) -> &DeltaLayerInner {
1516 12498012 : self
1517 12498012 : }
1518 : }
1519 :
1520 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1521 0 : fn key(&self) -> Key {
1522 0 : self.key
1523 0 : }
1524 0 : fn lsn(&self) -> Lsn {
1525 0 : self.lsn
1526 0 : }
1527 0 : fn size(&self) -> u64 {
1528 0 : self.size
1529 0 : }
1530 : }
1531 :
1532 : pub struct DeltaLayerIterator<'a> {
1533 : delta_layer: &'a DeltaLayerInner,
1534 : ctx: &'a RequestContext,
1535 : planner: StreamingVectoredReadPlanner,
1536 : index_iter: DiskBtreeIterator<'a>,
1537 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1538 : is_end: bool,
1539 : }
1540 :
1541 : impl<'a> DeltaLayerIterator<'a> {
1542 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1543 0 : self.delta_layer.layer_dbg_info()
1544 0 : }
1545 :
1546 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1547 63800 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1548 63800 : assert!(self.key_values_batch.is_empty());
1549 63800 : assert!(!self.is_end);
1550 :
1551 63800 : let plan = loop {
1552 6297184 : if let Some(res) = self.index_iter.next().await {
1553 6295678 : let (raw_key, value) = res?;
1554 6295678 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1555 6295678 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1556 6295678 : let blob_ref = BlobRef(value);
1557 6295678 : let offset = blob_ref.pos();
1558 6295678 : if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
1559 62294 : break batch_plan;
1560 6233384 : }
1561 : } else {
1562 1506 : self.is_end = true;
1563 1506 : let data_end_offset = self.delta_layer.index_start_offset();
1564 1506 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1565 1506 : break item;
1566 : } else {
1567 0 : return Ok(()); // TODO: test empty iterator
1568 : }
1569 : }
1570 : };
1571 63800 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1572 63800 : let mut next_batch = std::collections::VecDeque::new();
1573 63800 : let buf_size = plan.size();
1574 63800 : let buf = BytesMut::with_capacity(buf_size);
1575 63800 : let blobs_buf = vectored_blob_reader
1576 63800 : .read_blobs(&plan, buf, self.ctx)
1577 32572 : .await?;
1578 63800 : let frozen_buf = blobs_buf.buf.freeze();
1579 6295594 : for meta in blobs_buf.blobs.iter() {
1580 6295594 : let value = Value::des(&frozen_buf[meta.start..meta.end])?;
1581 6295594 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1582 : }
1583 63800 : self.key_values_batch = next_batch;
1584 63800 : Ok(())
1585 63800 : }
1586 :
1587 6297714 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1588 6297714 : if self.key_values_batch.is_empty() {
1589 66476 : if self.is_end {
1590 2928 : return Ok(None);
1591 63548 : }
1592 63548 : self.next_batch().await?;
1593 6231238 : }
1594 6294786 : Ok(Some(
1595 6294786 : self.key_values_batch
1596 6294786 : .pop_front()
1597 6294786 : .expect("should not be empty"),
1598 6294786 : ))
1599 6297714 : }
1600 : }
1601 :
1602 : #[cfg(test)]
1603 : pub(crate) mod test {
1604 : use std::collections::BTreeMap;
1605 :
1606 : use itertools::MinMaxResult;
1607 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1608 : use rand::RngCore;
1609 :
1610 : use super::*;
1611 : use crate::repository::Value;
1612 : use crate::tenant::harness::TIMELINE_ID;
1613 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1614 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1615 : use crate::tenant::{Tenant, Timeline};
1616 : use crate::{
1617 : context::DownloadBehavior,
1618 : task_mgr::TaskKind,
1619 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1620 : DEFAULT_PG_VERSION,
1621 : };
1622 : use bytes::Bytes;
1623 :
1624 : /// Construct an index for a fictional delta layer and and then
1625 : /// traverse in order to plan vectored reads for a query. Finally,
1626 : /// verify that the traversal fed the right index key and value
1627 : /// pairs into the planner.
1628 : #[tokio::test]
1629 6 : async fn test_delta_layer_index_traversal() {
1630 6 : let base_key = Key {
1631 6 : field1: 0,
1632 6 : field2: 1663,
1633 6 : field3: 12972,
1634 6 : field4: 16396,
1635 6 : field5: 0,
1636 6 : field6: 246080,
1637 6 : };
1638 6 :
1639 6 : // Populate the index with some entries
1640 6 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1641 6 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1642 6 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1643 6 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1644 6 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1645 6 : ]);
1646 6 :
1647 6 : let mut disk = TestDisk::default();
1648 6 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1649 6 :
1650 6 : let mut disk_offset = 0;
1651 30 : for (key, lsns) in &entries {
1652 126 : for lsn in lsns {
1653 102 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1654 102 : let blob_ref = BlobRef::new(disk_offset, false);
1655 102 : writer
1656 102 : .append(&index_key.0, blob_ref.0)
1657 102 : .expect("In memory disk append should never fail");
1658 102 :
1659 102 : disk_offset += 1;
1660 102 : }
1661 6 : }
1662 6 :
1663 6 : // Prepare all the arguments for the call into `plan_reads` below
1664 6 : let (root_offset, _writer) = writer
1665 6 : .finish()
1666 6 : .expect("In memory disk finish should never fail");
1667 6 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1668 6 : let planner = VectoredReadPlanner::new(100);
1669 6 : let mut reconstruct_state = ValuesReconstructState::new();
1670 6 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1671 6 :
1672 6 : let keyspace = KeySpace {
1673 6 : ranges: vec![
1674 6 : base_key..base_key.add(3),
1675 6 : base_key.add(3)..base_key.add(100),
1676 6 : ],
1677 6 : };
1678 6 : let lsn_range = Lsn(2)..Lsn(40);
1679 6 :
1680 6 : // Plan and validate
1681 6 : let vectored_reads = DeltaLayerInner::plan_reads(
1682 6 : &keyspace,
1683 6 : lsn_range.clone(),
1684 6 : disk_offset,
1685 6 : reader,
1686 6 : planner,
1687 6 : &mut reconstruct_state,
1688 6 : &ctx,
1689 6 : )
1690 6 : .await
1691 6 : .expect("Read planning should not fail");
1692 6 :
1693 6 : validate(keyspace, lsn_range, vectored_reads, entries);
1694 6 : }
1695 :
1696 6 : fn validate(
1697 6 : keyspace: KeySpace,
1698 6 : lsn_range: Range<Lsn>,
1699 6 : vectored_reads: Vec<VectoredRead>,
1700 6 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1701 6 : ) {
1702 6 : #[derive(Debug, PartialEq, Eq)]
1703 6 : struct BlobSpec {
1704 6 : key: Key,
1705 6 : lsn: Lsn,
1706 6 : at: u64,
1707 6 : }
1708 6 :
1709 6 : let mut planned_blobs = Vec::new();
1710 46 : for read in vectored_reads {
1711 84 : for (at, meta) in read.blobs_at.as_slice() {
1712 84 : planned_blobs.push(BlobSpec {
1713 84 : key: meta.key,
1714 84 : lsn: meta.lsn,
1715 84 : at: *at,
1716 84 : });
1717 84 : }
1718 : }
1719 :
1720 6 : let mut expected_blobs = Vec::new();
1721 6 : let mut disk_offset = 0;
1722 30 : for (key, lsns) in index_entries {
1723 126 : for lsn in lsns {
1724 126 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1725 102 : let lsn_included = lsn_range.contains(&lsn);
1726 102 :
1727 102 : if key_included && lsn_included {
1728 84 : expected_blobs.push(BlobSpec {
1729 84 : key,
1730 84 : lsn,
1731 84 : at: disk_offset,
1732 84 : });
1733 84 : }
1734 :
1735 102 : disk_offset += 1;
1736 : }
1737 : }
1738 :
1739 6 : assert_eq!(planned_blobs, expected_blobs);
1740 6 : }
1741 :
1742 : mod constants {
1743 : use utils::lsn::Lsn;
1744 :
1745 : /// Offset used by all lsns in this test
1746 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1747 : /// Number of unique keys including in the test data
1748 : pub(super) const KEY_COUNT: u8 = 60;
1749 : /// Max number of different lsns for each key
1750 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1751 : /// Possible value sizes for each key along with a probability weight
1752 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1753 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1754 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1755 : /// The minimum size of a key range in all the generated reads
1756 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1757 : /// The number of ranges included in each vectored read
1758 : pub(super) const RANGES_COUNT: u8 = 2;
1759 : /// The number of vectored reads performed
1760 : pub(super) const READS_COUNT: u8 = 100;
1761 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1762 : /// with values larger than the limit
1763 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1764 : }
1765 :
1766 : struct Entry {
1767 : key: Key,
1768 : lsn: Lsn,
1769 : value: Vec<u8>,
1770 : }
1771 :
1772 6 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1773 6 : let mut current_key = Key::MIN;
1774 6 :
1775 6 : let mut entries = Vec::new();
1776 366 : for _ in 0..constants::KEY_COUNT {
1777 360 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1778 360 : let mut lsns_iter =
1779 6780 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1780 6780 : Some(Lsn(lsn.0 + 0x08))
1781 6780 : });
1782 360 : let mut lsns = Vec::new();
1783 7140 : while lsns.len() < count as usize {
1784 6780 : let take = rng.gen_bool(0.5);
1785 6780 : let lsn = lsns_iter.next().unwrap();
1786 6780 : if take {
1787 3336 : lsns.push(lsn);
1788 3444 : }
1789 : }
1790 :
1791 3696 : for lsn in lsns {
1792 3336 : let size = constants::VALUE_SIZES
1793 10008 : .choose_weighted(rng, |item| item.1)
1794 3336 : .unwrap()
1795 3336 : .0;
1796 3336 : let mut buf = vec![0; size];
1797 3336 : rng.fill_bytes(&mut buf);
1798 3336 :
1799 3336 : entries.push(Entry {
1800 3336 : key: current_key,
1801 3336 : lsn,
1802 3336 : value: buf,
1803 3336 : })
1804 : }
1805 :
1806 360 : let gap = constants::KEY_GAP_CHANGES
1807 720 : .choose_weighted(rng, |item| item.1)
1808 360 : .unwrap()
1809 360 : .0;
1810 360 : if gap {
1811 114 : current_key = current_key.add(2);
1812 246 : } else {
1813 246 : current_key = current_key.add(1);
1814 246 : }
1815 : }
1816 :
1817 6 : entries
1818 6 : }
1819 :
1820 : struct EntriesMeta {
1821 : key_range: Range<Key>,
1822 : lsn_range: Range<Lsn>,
1823 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1824 : }
1825 :
1826 6 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1827 3336 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1828 6 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1829 0 : _ => panic!("More than one entry is always expected"),
1830 : };
1831 :
1832 3336 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1833 6 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1834 0 : _ => panic!("More than one entry is always expected"),
1835 : };
1836 :
1837 6 : let mut index = BTreeMap::new();
1838 3336 : for entry in entries.iter() {
1839 3336 : index.insert((entry.key, entry.lsn), entry.value.clone());
1840 3336 : }
1841 :
1842 6 : EntriesMeta {
1843 6 : key_range,
1844 6 : lsn_range,
1845 6 : index,
1846 6 : }
1847 6 : }
1848 :
1849 600 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1850 600 : let start = key_range.start.to_i128();
1851 600 : let end = key_range.end.to_i128();
1852 600 :
1853 600 : let mut keyspace = KeySpace::default();
1854 :
1855 1800 : for _ in 0..constants::RANGES_COUNT {
1856 1200 : let mut range: Option<Range<Key>> = Option::default();
1857 3732 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1858 2532 : let range_start = rng.gen_range(start..end);
1859 2532 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1860 2532 : if range_end_offset >= end {
1861 300 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1862 2232 : } else {
1863 2232 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1864 2232 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1865 2232 : }
1866 : }
1867 1200 : keyspace.ranges.push(range.unwrap());
1868 : }
1869 :
1870 600 : keyspace
1871 600 : }
1872 :
1873 : #[tokio::test]
1874 6 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1875 6 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1876 24 : let (tenant, ctx) = harness.load().await;
1877 6 :
1878 6 : let timeline_id = TimelineId::generate();
1879 6 : let timeline = tenant
1880 6 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1881 12 : .await?;
1882 6 :
1883 6 : tracing::info!("Generating test data ...");
1884 6 :
1885 6 : let rng = &mut StdRng::seed_from_u64(0);
1886 6 : let entries = generate_entries(rng);
1887 6 : let entries_meta = get_entries_meta(&entries);
1888 6 :
1889 6 : tracing::info!("Done generating {} entries", entries.len());
1890 6 :
1891 6 : tracing::info!("Writing test data to delta layer ...");
1892 6 : let mut writer = DeltaLayerWriter::new(
1893 6 : harness.conf,
1894 6 : timeline_id,
1895 6 : harness.tenant_shard_id,
1896 6 : entries_meta.key_range.start,
1897 6 : entries_meta.lsn_range.clone(),
1898 6 : &ctx,
1899 6 : )
1900 6 : .await?;
1901 6 :
1902 3342 : for entry in entries {
1903 3336 : let (_, res) = writer
1904 3336 : .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
1905 645 : .await;
1906 3336 : res?;
1907 6 : }
1908 6 :
1909 15 : let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
1910 6 : let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
1911 6 :
1912 6 : let inner = resident.get_as_delta(&ctx).await?;
1913 6 :
1914 6 : let file_size = inner.file.metadata().await?.len();
1915 6 : tracing::info!(
1916 6 : "Done writing test data to delta layer. Resulting file size is: {}",
1917 6 : file_size
1918 6 : );
1919 6 :
1920 606 : for i in 0..constants::READS_COUNT {
1921 600 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1922 6 :
1923 600 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1924 600 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1925 600 : inner.index_start_blk,
1926 600 : inner.index_root_blk,
1927 600 : block_reader,
1928 600 : );
1929 600 :
1930 600 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1931 600 : let mut reconstruct_state = ValuesReconstructState::new();
1932 600 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1933 600 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1934 6 :
1935 600 : let vectored_reads = DeltaLayerInner::plan_reads(
1936 600 : &keyspace,
1937 600 : entries_meta.lsn_range.clone(),
1938 600 : data_end_offset,
1939 600 : index_reader,
1940 600 : planner,
1941 600 : &mut reconstruct_state,
1942 600 : &ctx,
1943 600 : )
1944 12 : .await?;
1945 6 :
1946 600 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1947 600 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1948 600 : &vectored_reads,
1949 600 : constants::MAX_VECTORED_READ_BYTES,
1950 600 : );
1951 600 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1952 6 :
1953 59772 : for read in vectored_reads {
1954 59172 : let blobs_buf = vectored_blob_reader
1955 59172 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1956 30048 : .await?;
1957 171912 : for meta in blobs_buf.blobs.iter() {
1958 171912 : let value = &blobs_buf.buf[meta.start..meta.end];
1959 171912 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
1960 6 : }
1961 6 :
1962 59172 : buf = Some(blobs_buf.buf);
1963 6 : }
1964 6 : }
1965 6 :
1966 6 : Ok(())
1967 6 : }
1968 :
1969 : #[tokio::test]
1970 6 : async fn copy_delta_prefix_smoke() {
1971 6 : use crate::walrecord::NeonWalRecord;
1972 6 : use bytes::Bytes;
1973 6 :
1974 6 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
1975 6 : .await
1976 6 : .unwrap();
1977 24 : let (tenant, ctx) = h.load().await;
1978 6 : let ctx = &ctx;
1979 6 : let timeline = tenant
1980 6 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1981 12 : .await
1982 6 : .unwrap();
1983 6 :
1984 6 : let initdb_layer = timeline
1985 6 : .layers
1986 6 : .read()
1987 6 : .await
1988 6 : .likely_resident_layers()
1989 6 : .next()
1990 6 : .cloned()
1991 6 : .unwrap();
1992 6 :
1993 6 : {
1994 6 : let mut writer = timeline.writer().await;
1995 6 :
1996 6 : let data = [
1997 6 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
1998 6 : (
1999 6 : 0x30,
2000 6 : 12,
2001 6 : Value::WalRecord(NeonWalRecord::Postgres {
2002 6 : will_init: false,
2003 6 : rec: Bytes::from_static(b"1"),
2004 6 : }),
2005 6 : ),
2006 6 : (
2007 6 : 0x40,
2008 6 : 12,
2009 6 : Value::WalRecord(NeonWalRecord::Postgres {
2010 6 : will_init: true,
2011 6 : rec: Bytes::from_static(b"2"),
2012 6 : }),
2013 6 : ),
2014 6 : // build an oversized value so we cannot extend and existing read over
2015 6 : // this
2016 6 : (
2017 6 : 0x50,
2018 6 : 12,
2019 6 : Value::WalRecord(NeonWalRecord::Postgres {
2020 6 : will_init: true,
2021 6 : rec: {
2022 6 : let mut buf =
2023 6 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2024 6 : buf.iter_mut()
2025 6 : .enumerate()
2026 792576 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2027 6 : Bytes::from(buf)
2028 6 : },
2029 6 : }),
2030 6 : ),
2031 6 : // because the oversized read cannot be extended further, we are sure to exercise the
2032 6 : // builder created on the last round with this:
2033 6 : (
2034 6 : 0x60,
2035 6 : 12,
2036 6 : Value::WalRecord(NeonWalRecord::Postgres {
2037 6 : will_init: true,
2038 6 : rec: Bytes::from_static(b"3"),
2039 6 : }),
2040 6 : ),
2041 6 : (
2042 6 : 0x60,
2043 6 : 9,
2044 6 : Value::Image(Bytes::from_static(b"something for a different key")),
2045 6 : ),
2046 6 : ];
2047 6 :
2048 6 : let mut last_lsn = None;
2049 6 :
2050 42 : for (lsn, key, value) in data {
2051 36 : let key = Key::from_i128(key);
2052 36 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2053 36 : last_lsn = Some(lsn);
2054 6 : }
2055 6 :
2056 6 : writer.finish_write(Lsn(last_lsn.unwrap()));
2057 6 : }
2058 6 : timeline.freeze_and_flush().await.unwrap();
2059 6 :
2060 6 : let new_layer = timeline
2061 6 : .layers
2062 6 : .read()
2063 6 : .await
2064 6 : .likely_resident_layers()
2065 9 : .find(|&x| x != &initdb_layer)
2066 6 : .cloned()
2067 6 : .unwrap();
2068 6 :
2069 6 : // create a copy for the timeline, so we don't overwrite the file
2070 6 : let branch = tenant
2071 6 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2072 6 : .await
2073 6 : .unwrap();
2074 6 :
2075 6 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2076 6 :
2077 6 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2078 6 : // a single key
2079 6 :
2080 36 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2081 30 : let truncate_at = Lsn(truncate_at);
2082 6 :
2083 30 : let mut writer = DeltaLayerWriter::new(
2084 30 : tenant.conf,
2085 30 : branch.timeline_id,
2086 30 : tenant.tenant_shard_id,
2087 30 : Key::MIN,
2088 30 : Lsn(0x11)..truncate_at,
2089 30 : ctx,
2090 30 : )
2091 15 : .await
2092 30 : .unwrap();
2093 6 :
2094 30 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
2095 30 :
2096 30 : new_layer
2097 30 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2098 45 : .await
2099 30 : .unwrap();
2100 6 :
2101 72 : let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
2102 30 : let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
2103 30 :
2104 30 : copied_layer.get_as_delta(ctx).await.unwrap();
2105 30 :
2106 30 : assert_keys_and_values_eq(
2107 30 : new_layer.get_as_delta(ctx).await.unwrap(),
2108 30 : copied_layer.get_as_delta(ctx).await.unwrap(),
2109 30 : truncate_at,
2110 30 : ctx,
2111 6 : )
2112 158 : .await;
2113 6 : }
2114 6 : }
2115 :
2116 30 : async fn assert_keys_and_values_eq(
2117 30 : source: &DeltaLayerInner,
2118 30 : truncated: &DeltaLayerInner,
2119 30 : truncated_at: Lsn,
2120 30 : ctx: &RequestContext,
2121 30 : ) {
2122 30 : use futures::future::ready;
2123 30 : use futures::stream::TryStreamExt;
2124 30 :
2125 30 : let start_key = [0u8; DELTA_KEY_SIZE];
2126 30 :
2127 30 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2128 30 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2129 30 : source.index_start_blk,
2130 30 : source.index_root_blk,
2131 30 : &source_reader,
2132 30 : );
2133 30 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2134 180 : let source_stream = source_stream.filter(|res| match res {
2135 180 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2136 0 : _ => ready(true),
2137 180 : });
2138 30 : let mut source_stream = std::pin::pin!(source_stream);
2139 30 :
2140 30 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2141 30 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2142 30 : truncated.index_start_blk,
2143 30 : truncated.index_root_blk,
2144 30 : &truncated_reader,
2145 30 : );
2146 30 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2147 30 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2148 30 :
2149 30 : let mut scratch_left = Vec::new();
2150 30 : let mut scratch_right = Vec::new();
2151 :
2152 : loop {
2153 126 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2154 126 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2155 126 :
2156 126 : if src.is_none() {
2157 30 : assert!(truncated.is_none());
2158 30 : break;
2159 96 : }
2160 96 :
2161 96 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2162 96 :
2163 96 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2164 96 : assert_eq!(src.0, truncated.0);
2165 96 : assert_eq!(src.1, truncated.1);
2166 :
2167 : // if this is needed for something else, just drop this assert.
2168 96 : assert!(
2169 96 : src.2.pos() >= truncated.2.pos(),
2170 0 : "value position should not go backwards {} vs. {}",
2171 0 : src.2.pos(),
2172 0 : truncated.2.pos()
2173 : );
2174 :
2175 96 : scratch_left.clear();
2176 96 : let src_cursor = source_reader.block_cursor();
2177 96 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2178 96 : scratch_right.clear();
2179 96 : let trunc_cursor = truncated_reader.block_cursor();
2180 96 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2181 :
2182 96 : tokio::try_join!(left, right).unwrap();
2183 96 :
2184 96 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2185 : }
2186 30 : }
2187 :
2188 54078 : pub(crate) fn sort_delta(
2189 54078 : (k1, l1, _): &(Key, Lsn, Value),
2190 54078 : (k2, l2, _): &(Key, Lsn, Value),
2191 54078 : ) -> std::cmp::Ordering {
2192 54078 : (k1, l1).cmp(&(k2, l2))
2193 54078 : }
2194 :
2195 282 : pub(crate) fn sort_delta_value(
2196 282 : (k1, l1, v1): &(Key, Lsn, Value),
2197 282 : (k2, l2, v2): &(Key, Lsn, Value),
2198 282 : ) -> std::cmp::Ordering {
2199 282 : let order_1 = if v1.is_image() { 0 } else { 1 };
2200 282 : let order_2 = if v2.is_image() { 0 } else { 1 };
2201 282 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2202 282 : }
2203 :
2204 60 : pub(crate) async fn produce_delta_layer(
2205 60 : tenant: &Tenant,
2206 60 : tline: &Arc<Timeline>,
2207 60 : mut deltas: Vec<(Key, Lsn, Value)>,
2208 60 : ctx: &RequestContext,
2209 60 : ) -> anyhow::Result<ResidentLayer> {
2210 60 : deltas.sort_by(sort_delta);
2211 60 : let (key_start, _, _) = deltas.first().unwrap();
2212 60 : let (key_max, _, _) = deltas.last().unwrap();
2213 24120 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2214 24120 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2215 60 : let lsn_end = Lsn(lsn_max.0 + 1);
2216 60 : let mut writer = DeltaLayerWriter::new(
2217 60 : tenant.conf,
2218 60 : tline.timeline_id,
2219 60 : tenant.tenant_shard_id,
2220 60 : *key_start,
2221 60 : (*lsn_min)..lsn_end,
2222 60 : ctx,
2223 60 : )
2224 30 : .await?;
2225 60 : let key_end = key_max.next();
2226 :
2227 24180 : for (key, lsn, value) in deltas {
2228 24120 : writer.put_value(key, lsn, value, ctx).await?;
2229 : }
2230 :
2231 174 : let (desc, path) = writer.finish(key_end, ctx).await?;
2232 60 : let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
2233 :
2234 60 : Ok::<_, anyhow::Error>(delta_layer)
2235 60 : }
2236 :
2237 84 : async fn assert_delta_iter_equal(
2238 84 : delta_iter: &mut DeltaLayerIterator<'_>,
2239 84 : expect: &[(Key, Lsn, Value)],
2240 84 : ) {
2241 84 : let mut expect_iter = expect.iter();
2242 : loop {
2243 84084 : let o1 = delta_iter.next().await.unwrap();
2244 84084 : let o2 = expect_iter.next();
2245 84084 : assert_eq!(o1.is_some(), o2.is_some());
2246 84084 : if o1.is_none() && o2.is_none() {
2247 84 : break;
2248 84000 : }
2249 84000 : let (k1, l1, v1) = o1.unwrap();
2250 84000 : let (k2, l2, v2) = o2.unwrap();
2251 84000 : assert_eq!(&k1, k2);
2252 84000 : assert_eq!(l1, *l2);
2253 84000 : assert_eq!(&v1, v2);
2254 : }
2255 84 : }
2256 :
2257 : #[tokio::test]
2258 6 : async fn delta_layer_iterator() {
2259 6 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2260 24 : let (tenant, ctx) = harness.load().await;
2261 6 :
2262 6 : let tline = tenant
2263 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2264 12 : .await
2265 6 : .unwrap();
2266 6 :
2267 6000 : fn get_key(id: u32) -> Key {
2268 6000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2269 6000 : key.field6 = id;
2270 6000 : key
2271 6000 : }
2272 6 : const N: usize = 1000;
2273 6 : let test_deltas = (0..N)
2274 6000 : .map(|idx| {
2275 6000 : (
2276 6000 : get_key(idx as u32 / 10),
2277 6000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2278 6000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2279 6000 : )
2280 6000 : })
2281 6 : .collect_vec();
2282 6 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2283 24 : .await
2284 6 : .unwrap();
2285 6 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2286 18 : for max_read_size in [1, 1024] {
2287 96 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2288 84 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2289 84 : // Test if the batch size is correctly determined
2290 84 : let mut iter = delta_layer.iter(&ctx);
2291 84 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2292 84 : let mut num_items = 0;
2293 336 : for _ in 0..3 {
2294 252 : iter.next_batch().await.unwrap();
2295 252 : num_items += iter.key_values_batch.len();
2296 252 : if max_read_size == 1 {
2297 6 : // every key should be a batch b/c the value is larger than max_read_size
2298 126 : assert_eq!(iter.key_values_batch.len(), 1);
2299 6 : } else {
2300 126 : assert!(iter.key_values_batch.len() <= batch_size);
2301 6 : }
2302 252 : if num_items >= N {
2303 6 : break;
2304 252 : }
2305 252 : iter.key_values_batch.clear();
2306 6 : }
2307 6 : // Test if the result is correct
2308 84 : let mut iter = delta_layer.iter(&ctx);
2309 84 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2310 28803 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2311 6 : }
2312 6 : }
2313 6 : }
2314 : }
|