LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: 792183ae0ef4f1f8b22e9ac7e8748740ab73f873.info Lines: 84.6 % 1541 1303
Test Date: 2024-06-26 01:04:33 Functions: 67.8 % 143 97

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "values", and
      24              : //! "index". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache::{self, FileId, PAGE_SZ};
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::BlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37              : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
      38              : use crate::tenant::timeline::GetVectoredError;
      39              : use crate::tenant::vectored_blob_io::{
      40              :     BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
      41              : };
      42              : use crate::tenant::{PageReconstructError, Timeline};
      43              : use crate::virtual_file::{self, VirtualFile};
      44              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      45              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      46              : use anyhow::{anyhow, bail, ensure, Context, Result};
      47              : use bytes::BytesMut;
      48              : use camino::{Utf8Path, Utf8PathBuf};
      49              : use futures::StreamExt;
      50              : use itertools::Itertools;
      51              : use pageserver_api::keyspace::KeySpace;
      52              : use pageserver_api::models::LayerAccessKind;
      53              : use pageserver_api::shard::TenantShardId;
      54              : use rand::{distributions::Alphanumeric, Rng};
      55              : use serde::{Deserialize, Serialize};
      56              : use std::fs::File;
      57              : use std::io::SeekFrom;
      58              : use std::ops::Range;
      59              : use std::os::unix::fs::FileExt;
      60              : use std::str::FromStr;
      61              : use std::sync::Arc;
      62              : use tokio::sync::OnceCell;
      63              : use tracing::*;
      64              : 
      65              : use utils::{
      66              :     bin_ser::BeSer,
      67              :     id::{TenantId, TimelineId},
      68              :     lsn::Lsn,
      69              : };
      70              : 
      71              : use super::{
      72              :     AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
      73              :     ValuesReconstructState,
      74              : };
      75              : 
      76              : ///
      77              : /// Header stored in the beginning of the file
      78              : ///
      79              : /// After this comes the 'values' part, starting on block 1. After that,
      80              : /// the 'index' starts at the block indicated by 'index_start_blk'
      81              : ///
      82          998 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      83              : pub struct Summary {
      84              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      85              :     pub magic: u16,
      86              :     pub format_version: u16,
      87              : 
      88              :     pub tenant_id: TenantId,
      89              :     pub timeline_id: TimelineId,
      90              :     pub key_range: Range<Key>,
      91              :     pub lsn_range: Range<Lsn>,
      92              : 
      93              :     /// Block number where the 'index' part of the file begins.
      94              :     pub index_start_blk: u32,
      95              :     /// Block within the 'index', where the B-tree root page is stored
      96              :     pub index_root_blk: u32,
      97              : }
      98              : 
      99              : impl From<&DeltaLayer> for Summary {
     100            0 :     fn from(layer: &DeltaLayer) -> Self {
     101            0 :         Self::expected(
     102            0 :             layer.desc.tenant_shard_id.tenant_id,
     103            0 :             layer.desc.timeline_id,
     104            0 :             layer.desc.key_range.clone(),
     105            0 :             layer.desc.lsn_range.clone(),
     106            0 :         )
     107            0 :     }
     108              : }
     109              : 
     110              : impl Summary {
     111          998 :     pub(super) fn expected(
     112          998 :         tenant_id: TenantId,
     113          998 :         timeline_id: TimelineId,
     114          998 :         keys: Range<Key>,
     115          998 :         lsns: Range<Lsn>,
     116          998 :     ) -> Self {
     117          998 :         Self {
     118          998 :             magic: DELTA_FILE_MAGIC,
     119          998 :             format_version: STORAGE_FORMAT_VERSION,
     120          998 : 
     121          998 :             tenant_id,
     122          998 :             timeline_id,
     123          998 :             key_range: keys,
     124          998 :             lsn_range: lsns,
     125          998 : 
     126          998 :             index_start_blk: 0,
     127          998 :             index_root_blk: 0,
     128          998 :         }
     129          998 :     }
     130              : }
     131              : 
     132              : // Flag indicating that this version initialize the page
     133              : const WILL_INIT: u64 = 1;
     134              : 
     135              : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     136              : /// offset, and for WAL records it also contains `will_init` flag. The flag
     137              : /// helps to determine the range of records that needs to be applied, without
     138              : /// reading/deserializing records themselves.
     139            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     140              : pub struct BlobRef(pub u64);
     141              : 
     142              : impl BlobRef {
     143       240472 :     pub fn will_init(&self) -> bool {
     144       240472 :         (self.0 & WILL_INIT) != 0
     145       240472 :     }
     146              : 
     147      4439720 :     pub fn pos(&self) -> u64 {
     148      4439720 :         self.0 >> 1
     149      4439720 :     }
     150              : 
     151      6450880 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     152      6450880 :         let mut blob_ref = pos << 1;
     153      6450880 :         if will_init {
     154      6449624 :             blob_ref |= WILL_INIT;
     155      6449624 :         }
     156      6450880 :         BlobRef(blob_ref)
     157      6450880 :     }
     158              : }
     159              : 
     160              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     161              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     162              : 
     163              : /// This is the key of the B-tree index stored in the delta layer. It consists
     164              : /// of the serialized representation of a Key and LSN.
     165              : impl DeltaKey {
     166      2064222 :     fn from_slice(buf: &[u8]) -> Self {
     167      2064222 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     168      2064222 :         bytes.copy_from_slice(buf);
     169      2064222 :         DeltaKey(bytes)
     170      2064222 :     }
     171              : 
     172      6698226 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     173      6698226 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     174      6698226 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     175      6698226 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     176      6698226 :         DeltaKey(bytes)
     177      6698226 :     }
     178              : 
     179      2064222 :     fn key(&self) -> Key {
     180      2064222 :         Key::from_slice(&self.0)
     181      2064222 :     }
     182              : 
     183      2064222 :     fn lsn(&self) -> Lsn {
     184      2064222 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     185      2064222 :     }
     186              : 
     187       311431 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     188       311431 :         let mut lsn_buf = [0u8; 8];
     189       311431 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     190       311431 :         Lsn(u64::from_be_bytes(lsn_buf))
     191       311431 :     }
     192              : }
     193              : 
     194              : /// This is used only from `pagectl`. Within pageserver, all layers are
     195              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     196              : pub struct DeltaLayer {
     197              :     path: Utf8PathBuf,
     198              :     pub desc: PersistentLayerDesc,
     199              :     access_stats: LayerAccessStats,
     200              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     201              : }
     202              : 
     203              : impl std::fmt::Debug for DeltaLayer {
     204            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     205            0 :         use super::RangeDisplayDebug;
     206            0 : 
     207            0 :         f.debug_struct("DeltaLayer")
     208            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     209            0 :             .field("lsn_range", &self.desc.lsn_range)
     210            0 :             .field("file_size", &self.desc.file_size)
     211            0 :             .field("inner", &self.inner)
     212            0 :             .finish()
     213            0 :     }
     214              : }
     215              : 
     216              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     217              : /// file.
     218              : pub struct DeltaLayerInner {
     219              :     // values copied from summary
     220              :     index_start_blk: u32,
     221              :     index_root_blk: u32,
     222              : 
     223              :     file: VirtualFile,
     224              :     file_id: FileId,
     225              : 
     226              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     227              : }
     228              : 
     229              : impl std::fmt::Debug for DeltaLayerInner {
     230            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     231            0 :         f.debug_struct("DeltaLayerInner")
     232            0 :             .field("index_start_blk", &self.index_start_blk)
     233            0 :             .field("index_root_blk", &self.index_root_blk)
     234            0 :             .finish()
     235            0 :     }
     236              : }
     237              : 
     238              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     239              : impl std::fmt::Display for DeltaLayer {
     240            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     241            0 :         write!(f, "{}", self.layer_desc().short_id())
     242            0 :     }
     243              : }
     244              : 
     245              : impl AsLayerDesc for DeltaLayer {
     246            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     247            0 :         &self.desc
     248            0 :     }
     249              : }
     250              : 
     251              : impl DeltaLayer {
     252            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     253            0 :         self.desc.dump();
     254            0 : 
     255            0 :         if !verbose {
     256            0 :             return Ok(());
     257            0 :         }
     258              : 
     259            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     260              : 
     261            0 :         inner.dump(ctx).await
     262            0 :     }
     263              : 
     264         1326 :     fn temp_path_for(
     265         1326 :         conf: &PageServerConf,
     266         1326 :         tenant_shard_id: &TenantShardId,
     267         1326 :         timeline_id: &TimelineId,
     268         1326 :         key_start: Key,
     269         1326 :         lsn_range: &Range<Lsn>,
     270         1326 :     ) -> Utf8PathBuf {
     271         1326 :         let rand_string: String = rand::thread_rng()
     272         1326 :             .sample_iter(&Alphanumeric)
     273         1326 :             .take(8)
     274         1326 :             .map(char::from)
     275         1326 :             .collect();
     276         1326 : 
     277         1326 :         conf.timeline_path(tenant_shard_id, timeline_id)
     278         1326 :             .join(format!(
     279         1326 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     280         1326 :                 key_start,
     281         1326 :                 u64::from(lsn_range.start),
     282         1326 :                 u64::from(lsn_range.end),
     283         1326 :                 rand_string,
     284         1326 :                 TEMP_FILE_SUFFIX,
     285         1326 :             ))
     286         1326 :     }
     287              : 
     288              :     ///
     289              :     /// Open the underlying file and read the metadata into memory, if it's
     290              :     /// not loaded already.
     291              :     ///
     292            0 :     async fn load(
     293            0 :         &self,
     294            0 :         access_kind: LayerAccessKind,
     295            0 :         ctx: &RequestContext,
     296            0 :     ) -> Result<&Arc<DeltaLayerInner>> {
     297            0 :         self.access_stats.record_access(access_kind, ctx);
     298            0 :         // Quick exit if already loaded
     299            0 :         self.inner
     300            0 :             .get_or_try_init(|| self.load_inner(ctx))
     301            0 :             .await
     302            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     303            0 :     }
     304              : 
     305            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     306            0 :         let path = self.path();
     307              : 
     308            0 :         let loaded = DeltaLayerInner::load(&path, None, None, ctx)
     309            0 :             .await
     310            0 :             .and_then(|res| res)?;
     311              : 
     312              :         // not production code
     313            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     314            0 :         let expected_layer_name = self.layer_desc().layer_name();
     315            0 : 
     316            0 :         if actual_layer_name != expected_layer_name {
     317            0 :             println!("warning: filename does not match what is expected from in-file summary");
     318            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     319            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     320            0 :         }
     321              : 
     322            0 :         Ok(Arc::new(loaded))
     323            0 :     }
     324              : 
     325              :     /// Create a DeltaLayer struct representing an existing file on disk.
     326              :     ///
     327              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     328            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     329            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     330            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     331            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     332              : 
     333            0 :         let metadata = file
     334            0 :             .metadata()
     335            0 :             .context("get file metadata to determine size")?;
     336              : 
     337              :         // This function is never used for constructing layers in a running pageserver,
     338              :         // so it does not need an accurate TenantShardId.
     339            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     340            0 : 
     341            0 :         Ok(DeltaLayer {
     342            0 :             path: path.to_path_buf(),
     343            0 :             desc: PersistentLayerDesc::new_delta(
     344            0 :                 tenant_shard_id,
     345            0 :                 summary.timeline_id,
     346            0 :                 summary.key_range,
     347            0 :                 summary.lsn_range,
     348            0 :                 metadata.len(),
     349            0 :             ),
     350            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     351            0 :             inner: OnceCell::new(),
     352            0 :         })
     353            0 :     }
     354              : 
     355              :     /// Path to the layer file in pageserver workdir.
     356            0 :     fn path(&self) -> Utf8PathBuf {
     357            0 :         self.path.clone()
     358            0 :     }
     359              : }
     360              : 
     361              : /// A builder object for constructing a new delta layer.
     362              : ///
     363              : /// Usage:
     364              : ///
     365              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     366              : ///
     367              : /// 2. Write the contents by calling `put_value` for every page
     368              : ///    version to store in the layer.
     369              : ///
     370              : /// 3. Call `finish`.
     371              : ///
     372              : struct DeltaLayerWriterInner {
     373              :     conf: &'static PageServerConf,
     374              :     pub path: Utf8PathBuf,
     375              :     timeline_id: TimelineId,
     376              :     tenant_shard_id: TenantShardId,
     377              : 
     378              :     key_start: Key,
     379              :     lsn_range: Range<Lsn>,
     380              : 
     381              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     382              : 
     383              :     blob_writer: BlobWriter<true>,
     384              : }
     385              : 
     386              : impl DeltaLayerWriterInner {
     387              :     ///
     388              :     /// Start building a new delta layer.
     389              :     ///
     390         1326 :     async fn new(
     391         1326 :         conf: &'static PageServerConf,
     392         1326 :         timeline_id: TimelineId,
     393         1326 :         tenant_shard_id: TenantShardId,
     394         1326 :         key_start: Key,
     395         1326 :         lsn_range: Range<Lsn>,
     396         1326 :         ctx: &RequestContext,
     397         1326 :     ) -> anyhow::Result<Self> {
     398         1326 :         // Create the file initially with a temporary filename. We don't know
     399         1326 :         // the end key yet, so we cannot form the final filename yet. We will
     400         1326 :         // rename it when we're done.
     401         1326 :         //
     402         1326 :         // Note: This overwrites any existing file. There shouldn't be any.
     403         1326 :         // FIXME: throw an error instead?
     404         1326 :         let path =
     405         1326 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     406              : 
     407         1326 :         let mut file = VirtualFile::create(&path, ctx).await?;
     408              :         // make room for the header block
     409         1326 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     410         1326 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     411         1326 : 
     412         1326 :         // Initialize the b-tree index builder
     413         1326 :         let block_buf = BlockBuf::new();
     414         1326 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     415         1326 : 
     416         1326 :         Ok(Self {
     417         1326 :             conf,
     418         1326 :             path,
     419         1326 :             timeline_id,
     420         1326 :             tenant_shard_id,
     421         1326 :             key_start,
     422         1326 :             lsn_range,
     423         1326 :             tree: tree_builder,
     424         1326 :             blob_writer,
     425         1326 :         })
     426         1326 :     }
     427              : 
     428              :     ///
     429              :     /// Append a key-value pair to the file.
     430              :     ///
     431              :     /// The values must be appended in key, lsn order.
     432              :     ///
     433      2064114 :     async fn put_value(
     434      2064114 :         &mut self,
     435      2064114 :         key: Key,
     436      2064114 :         lsn: Lsn,
     437      2064114 :         val: Value,
     438      2064114 :         ctx: &RequestContext,
     439      2064114 :     ) -> anyhow::Result<()> {
     440      2064114 :         let (_, res) = self
     441      2064114 :             .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
     442         1569 :             .await;
     443      2064114 :         res
     444      2064114 :     }
     445              : 
     446      6450776 :     async fn put_value_bytes(
     447      6450776 :         &mut self,
     448      6450776 :         key: Key,
     449      6450776 :         lsn: Lsn,
     450      6450776 :         val: Vec<u8>,
     451      6450776 :         will_init: bool,
     452      6450776 :         ctx: &RequestContext,
     453      6450776 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     454      6450776 :         assert!(self.lsn_range.start <= lsn);
     455      6450776 :         let (val, res) = self.blob_writer.write_blob(val, ctx).await;
     456      6450776 :         let off = match res {
     457      6450776 :             Ok(off) => off,
     458            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     459              :         };
     460              : 
     461      6450776 :         let blob_ref = BlobRef::new(off, will_init);
     462      6450776 : 
     463      6450776 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     464      6450776 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     465      6450776 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     466      6450776 :     }
     467              : 
     468      2023972 :     fn size(&self) -> u64 {
     469      2023972 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     470      2023972 :     }
     471              : 
     472              :     ///
     473              :     /// Finish writing the delta layer.
     474              :     ///
     475         1326 :     async fn finish(
     476         1326 :         self,
     477         1326 :         key_end: Key,
     478         1326 :         timeline: &Arc<Timeline>,
     479         1326 :         ctx: &RequestContext,
     480         1326 :     ) -> anyhow::Result<ResidentLayer> {
     481         1326 :         let temp_path = self.path.clone();
     482         9512 :         let result = self.finish0(key_end, timeline, ctx).await;
     483         1326 :         if result.is_err() {
     484            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
     485            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     486            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     487            0 :             }
     488         1326 :         }
     489         1326 :         result
     490         1326 :     }
     491              : 
     492         1326 :     async fn finish0(
     493         1326 :         self,
     494         1326 :         key_end: Key,
     495         1326 :         timeline: &Arc<Timeline>,
     496         1326 :         ctx: &RequestContext,
     497         1326 :     ) -> anyhow::Result<ResidentLayer> {
     498         1326 :         let index_start_blk =
     499         1326 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     500              : 
     501         1326 :         let mut file = self.blob_writer.into_inner(ctx).await?;
     502              : 
     503              :         // Write out the index
     504         1326 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     505         1326 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     506            0 :             .await?;
     507        14858 :         for buf in block_buf.blocks {
     508        13532 :             let (_buf, res) = file.write_all(buf, ctx).await;
     509        13532 :             res?;
     510              :         }
     511         1326 :         assert!(self.lsn_range.start < self.lsn_range.end);
     512              :         // Fill in the summary on blk 0
     513         1326 :         let summary = Summary {
     514         1326 :             magic: DELTA_FILE_MAGIC,
     515         1326 :             format_version: STORAGE_FORMAT_VERSION,
     516         1326 :             tenant_id: self.tenant_shard_id.tenant_id,
     517         1326 :             timeline_id: self.timeline_id,
     518         1326 :             key_range: self.key_start..key_end,
     519         1326 :             lsn_range: self.lsn_range.clone(),
     520         1326 :             index_start_blk,
     521         1326 :             index_root_blk,
     522         1326 :         };
     523         1326 : 
     524         1326 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     525         1326 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     526         1326 :         Summary::ser_into(&summary, &mut buf)?;
     527         1326 :         file.seek(SeekFrom::Start(0)).await?;
     528         1326 :         let (_buf, res) = file.write_all(buf, ctx).await;
     529         1326 :         res?;
     530              : 
     531         1326 :         let metadata = file
     532         1326 :             .metadata()
     533          668 :             .await
     534         1326 :             .context("get file metadata to determine size")?;
     535              : 
     536              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     537              :         // Make it a little bit below to account for differing GB units
     538              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     539              :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     540         1326 :         ensure!(
     541         1326 :             metadata.len() <= S3_UPLOAD_LIMIT,
     542            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     543            0 :             file.path,
     544            0 :             metadata.len()
     545              :         );
     546              : 
     547              :         // Note: Because we opened the file in write-only mode, we cannot
     548              :         // reuse the same VirtualFile for reading later. That's why we don't
     549              :         // set inner.file here. The first read will have to re-open it.
     550              : 
     551         1326 :         let desc = PersistentLayerDesc::new_delta(
     552         1326 :             self.tenant_shard_id,
     553         1326 :             self.timeline_id,
     554         1326 :             self.key_start..key_end,
     555         1326 :             self.lsn_range.clone(),
     556         1326 :             metadata.len(),
     557         1326 :         );
     558         1326 : 
     559         1326 :         // fsync the file
     560         1326 :         file.sync_all().await?;
     561              : 
     562         1326 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     563              : 
     564         1326 :         trace!("created delta layer {}", layer.local_path());
     565              : 
     566         1326 :         Ok(layer)
     567         1326 :     }
     568              : }
     569              : 
     570              : /// A builder object for constructing a new delta layer.
     571              : ///
     572              : /// Usage:
     573              : ///
     574              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     575              : ///
     576              : /// 2. Write the contents by calling `put_value` for every page
     577              : ///    version to store in the layer.
     578              : ///
     579              : /// 3. Call `finish`.
     580              : ///
     581              : /// # Note
     582              : ///
     583              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     584              : /// possible for the writer to drop before `finish` is actually called. So this
     585              : /// could lead to odd temporary files in the directory, exhausting file system.
     586              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     587              : /// implementation that cleans up the temporary file in failure. It's not
     588              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     589              : /// out some fields, making it impossible to implement `Drop`.
     590              : ///
     591              : #[must_use]
     592              : pub struct DeltaLayerWriter {
     593              :     inner: Option<DeltaLayerWriterInner>,
     594              : }
     595              : 
     596              : impl DeltaLayerWriter {
     597              :     ///
     598              :     /// Start building a new delta layer.
     599              :     ///
     600         1326 :     pub async fn new(
     601         1326 :         conf: &'static PageServerConf,
     602         1326 :         timeline_id: TimelineId,
     603         1326 :         tenant_shard_id: TenantShardId,
     604         1326 :         key_start: Key,
     605         1326 :         lsn_range: Range<Lsn>,
     606         1326 :         ctx: &RequestContext,
     607         1326 :     ) -> anyhow::Result<Self> {
     608         1326 :         Ok(Self {
     609         1326 :             inner: Some(
     610         1326 :                 DeltaLayerWriterInner::new(
     611         1326 :                     conf,
     612         1326 :                     timeline_id,
     613         1326 :                     tenant_shard_id,
     614         1326 :                     key_start,
     615         1326 :                     lsn_range,
     616         1326 :                     ctx,
     617         1326 :                 )
     618          681 :                 .await?,
     619              :             ),
     620              :         })
     621         1326 :     }
     622              : 
     623              :     ///
     624              :     /// Append a key-value pair to the file.
     625              :     ///
     626              :     /// The values must be appended in key, lsn order.
     627              :     ///
     628      2064114 :     pub async fn put_value(
     629      2064114 :         &mut self,
     630      2064114 :         key: Key,
     631      2064114 :         lsn: Lsn,
     632      2064114 :         val: Value,
     633      2064114 :         ctx: &RequestContext,
     634      2064114 :     ) -> anyhow::Result<()> {
     635      2064114 :         self.inner
     636      2064114 :             .as_mut()
     637      2064114 :             .unwrap()
     638      2064114 :             .put_value(key, lsn, val, ctx)
     639         1569 :             .await
     640      2064114 :     }
     641              : 
     642      4386662 :     pub async fn put_value_bytes(
     643      4386662 :         &mut self,
     644      4386662 :         key: Key,
     645      4386662 :         lsn: Lsn,
     646      4386662 :         val: Vec<u8>,
     647      4386662 :         will_init: bool,
     648      4386662 :         ctx: &RequestContext,
     649      4386662 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     650      4386662 :         self.inner
     651      4386662 :             .as_mut()
     652      4386662 :             .unwrap()
     653      4386662 :             .put_value_bytes(key, lsn, val, will_init, ctx)
     654         2948 :             .await
     655      4386662 :     }
     656              : 
     657      2023972 :     pub fn size(&self) -> u64 {
     658      2023972 :         self.inner.as_ref().unwrap().size()
     659      2023972 :     }
     660              : 
     661              :     ///
     662              :     /// Finish writing the delta layer.
     663              :     ///
     664         1326 :     pub(crate) async fn finish(
     665         1326 :         mut self,
     666         1326 :         key_end: Key,
     667         1326 :         timeline: &Arc<Timeline>,
     668         1326 :         ctx: &RequestContext,
     669         1326 :     ) -> anyhow::Result<ResidentLayer> {
     670         1326 :         self.inner
     671         1326 :             .take()
     672         1326 :             .unwrap()
     673         1326 :             .finish(key_end, timeline, ctx)
     674         9512 :             .await
     675         1326 :     }
     676              : }
     677              : 
     678              : impl Drop for DeltaLayerWriter {
     679         1326 :     fn drop(&mut self) {
     680         1326 :         if let Some(inner) = self.inner.take() {
     681            0 :             // We want to remove the virtual file here, so it's fine to not
     682            0 :             // having completely flushed unwritten data.
     683            0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     684            0 :             vfile.remove();
     685         1326 :         }
     686         1326 :     }
     687              : }
     688              : 
     689            0 : #[derive(thiserror::Error, Debug)]
     690              : pub enum RewriteSummaryError {
     691              :     #[error("magic mismatch")]
     692              :     MagicMismatch,
     693              :     #[error(transparent)]
     694              :     Other(#[from] anyhow::Error),
     695              : }
     696              : 
     697              : impl From<std::io::Error> for RewriteSummaryError {
     698            0 :     fn from(e: std::io::Error) -> Self {
     699            0 :         Self::Other(anyhow::anyhow!(e))
     700            0 :     }
     701              : }
     702              : 
     703              : impl DeltaLayer {
     704            0 :     pub async fn rewrite_summary<F>(
     705            0 :         path: &Utf8Path,
     706            0 :         rewrite: F,
     707            0 :         ctx: &RequestContext,
     708            0 :     ) -> Result<(), RewriteSummaryError>
     709            0 :     where
     710            0 :         F: Fn(Summary) -> Summary,
     711            0 :     {
     712            0 :         let mut file = VirtualFile::open_with_options(
     713            0 :             path,
     714            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     715            0 :             ctx,
     716            0 :         )
     717            0 :         .await
     718            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     719            0 :         let file_id = page_cache::next_file_id();
     720            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     721            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     722            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     723            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     724            0 :             return Err(RewriteSummaryError::MagicMismatch);
     725            0 :         }
     726            0 : 
     727            0 :         let new_summary = rewrite(actual_summary);
     728            0 : 
     729            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     730            0 :         // TODO: could use smallvec here, but it's a pain with Slice<T>
     731            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     732            0 :         file.seek(SeekFrom::Start(0)).await?;
     733            0 :         let (_buf, res) = file.write_all(buf, ctx).await;
     734            0 :         res?;
     735            0 :         Ok(())
     736            0 :     }
     737              : }
     738              : 
     739              : impl DeltaLayerInner {
     740              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     741              :     /// - inner has the success or transient failure
     742              :     /// - outer has the permanent failure
     743          998 :     pub(super) async fn load(
     744          998 :         path: &Utf8Path,
     745          998 :         summary: Option<Summary>,
     746          998 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     747          998 :         ctx: &RequestContext,
     748          998 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     749          998 :         let file = match VirtualFile::open(path, ctx).await {
     750          998 :             Ok(file) => file,
     751            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     752              :         };
     753          998 :         let file_id = page_cache::next_file_id();
     754          998 : 
     755          998 :         let block_reader = FileBlockReader::new(&file, file_id);
     756              : 
     757          998 :         let summary_blk = match block_reader.read_blk(0, ctx).await {
     758          998 :             Ok(blk) => blk,
     759            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     760              :         };
     761              : 
     762              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     763          998 :         let actual_summary =
     764          998 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     765              : 
     766          998 :         if let Some(mut expected_summary) = summary {
     767              :             // production code path
     768          998 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     769          998 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     770          998 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     771          998 :             expected_summary.timeline_id = actual_summary.timeline_id;
     772          998 : 
     773          998 :             if actual_summary != expected_summary {
     774            0 :                 bail!(
     775            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     776            0 :                     actual_summary,
     777            0 :                     expected_summary
     778            0 :                 );
     779          998 :             }
     780            0 :         }
     781              : 
     782          998 :         Ok(Ok(DeltaLayerInner {
     783          998 :             file,
     784          998 :             file_id,
     785          998 :             index_start_blk: actual_summary.index_start_blk,
     786          998 :             index_root_blk: actual_summary.index_root_blk,
     787          998 :             max_vectored_read_bytes,
     788          998 :         }))
     789          998 :     }
     790              : 
     791       204764 :     pub(super) async fn get_value_reconstruct_data(
     792       204764 :         &self,
     793       204764 :         key: Key,
     794       204764 :         lsn_range: Range<Lsn>,
     795       204764 :         reconstruct_state: &mut ValueReconstructState,
     796       204764 :         ctx: &RequestContext,
     797       204764 :     ) -> anyhow::Result<ValueReconstructResult> {
     798       204764 :         let mut need_image = true;
     799       204764 :         // Scan the page versions backwards, starting from `lsn`.
     800       204764 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     801       204764 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     802       204764 :             self.index_start_blk,
     803       204764 :             self.index_root_blk,
     804       204764 :             &block_reader,
     805       204764 :         );
     806       204764 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     807       204764 : 
     808       204764 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     809       204764 : 
     810       204764 :         tree_reader
     811       204764 :             .visit(
     812       204764 :                 &search_key.0,
     813       204764 :                 VisitDirection::Backwards,
     814       204764 :                 |key, value| {
     815       198724 :                     let blob_ref = BlobRef(value);
     816       198724 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     817        76693 :                         return false;
     818       122031 :                     }
     819       122031 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     820       122031 :                     if entry_lsn < lsn_range.start {
     821           31 :                         return false;
     822       122000 :                     }
     823       122000 :                     offsets.push((entry_lsn, blob_ref.pos()));
     824       122000 : 
     825       122000 :                     !blob_ref.will_init()
     826       204764 :                 },
     827       204764 :                 &RequestContextBuilder::extend(ctx)
     828       204764 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     829       204764 :                     .build(),
     830       204764 :             )
     831        21105 :             .await?;
     832              : 
     833       204764 :         let ctx = &RequestContextBuilder::extend(ctx)
     834       204764 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     835       204764 :             .build();
     836       204764 : 
     837       204764 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     838       204764 :         let cursor = block_reader.block_cursor();
     839       204764 :         let mut buf = Vec::new();
     840       204808 :         for (entry_lsn, pos) in offsets {
     841       122000 :             cursor
     842       122000 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     843         7953 :                 .await
     844       122000 :                 .with_context(|| {
     845            0 :                     format!("Failed to read blob from virtual file {}", self.file.path)
     846       122000 :                 })?;
     847       122000 :             let val = Value::des(&buf).with_context(|| {
     848            0 :                 format!(
     849            0 :                     "Failed to deserialize file blob from virtual file {}",
     850            0 :                     self.file.path
     851            0 :                 )
     852       122000 :             })?;
     853       122000 :             match val {
     854       121956 :                 Value::Image(img) => {
     855       121956 :                     reconstruct_state.img = Some((entry_lsn, img));
     856       121956 :                     need_image = false;
     857       121956 :                     break;
     858              :                 }
     859           44 :                 Value::WalRecord(rec) => {
     860           44 :                     let will_init = rec.will_init();
     861           44 :                     reconstruct_state.records.push((entry_lsn, rec));
     862           44 :                     if will_init {
     863              :                         // This WAL record initializes the page, so no need to go further back
     864            0 :                         need_image = false;
     865            0 :                         break;
     866           44 :                     }
     867              :                 }
     868              :             }
     869              :         }
     870              : 
     871              :         // If an older page image is needed to reconstruct the page, let the
     872              :         // caller know.
     873       204764 :         if need_image {
     874        82808 :             Ok(ValueReconstructResult::Continue)
     875              :         } else {
     876       121956 :             Ok(ValueReconstructResult::Complete)
     877              :         }
     878       204764 :     }
     879              : 
     880              :     // Look up the keys in the provided keyspace and update
     881              :     // the reconstruct state with whatever is found.
     882              :     //
     883              :     // If the key is cached, go no further than the cached Lsn.
     884              :     //
     885              :     // Currently, the index is visited for each range, but this
     886              :     // can be further optimised to visit the index only once.
     887          154 :     pub(super) async fn get_values_reconstruct_data(
     888          154 :         &self,
     889          154 :         keyspace: KeySpace,
     890          154 :         lsn_range: Range<Lsn>,
     891          154 :         reconstruct_state: &mut ValuesReconstructState,
     892          154 :         ctx: &RequestContext,
     893          154 :     ) -> Result<(), GetVectoredError> {
     894          154 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     895          154 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     896          154 :             self.index_start_blk,
     897          154 :             self.index_root_blk,
     898          154 :             block_reader,
     899          154 :         );
     900          154 : 
     901          154 :         let planner = VectoredReadPlanner::new(
     902          154 :             self.max_vectored_read_bytes
     903          154 :                 .expect("Layer is loaded with max vectored bytes config")
     904          154 :                 .0
     905          154 :                 .into(),
     906          154 :         );
     907          154 : 
     908          154 :         let data_end_offset = self.index_start_offset();
     909              : 
     910          154 :         let reads = Self::plan_reads(
     911          154 :             &keyspace,
     912          154 :             lsn_range.clone(),
     913          154 :             data_end_offset,
     914          154 :             index_reader,
     915          154 :             planner,
     916          154 :             reconstruct_state,
     917          154 :             ctx,
     918          154 :         )
     919         1428 :         .await
     920          154 :         .map_err(GetVectoredError::Other)?;
     921              : 
     922          154 :         self.do_reads_and_update_state(reads, reconstruct_state, ctx)
     923         8735 :             .await;
     924              : 
     925          154 :         reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
     926          154 : 
     927          154 :         Ok(())
     928          154 :     }
     929              : 
     930              :     /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
     931            8 :     pub(super) async fn load_key_values(
     932            8 :         &self,
     933            8 :         ctx: &RequestContext,
     934            8 :     ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
     935            8 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     936            8 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     937            8 :             self.index_start_blk,
     938            8 :             self.index_root_blk,
     939            8 :             block_reader,
     940            8 :         );
     941            8 :         let mut result = Vec::new();
     942            8 :         let mut stream =
     943            8 :             Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
     944            8 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     945            8 :         let cursor = block_reader.block_cursor();
     946            8 :         let mut buf = Vec::new();
     947           32 :         while let Some(item) = stream.next().await {
     948           24 :             let (key, lsn, pos) = item?;
     949              :             // TODO: dedup code with get_reconstruct_value
     950              :             // TODO: ctx handling and sharding
     951           24 :             cursor
     952           24 :                 .read_blob_into_buf(pos.pos(), &mut buf, ctx)
     953            0 :                 .await
     954           24 :                 .with_context(|| {
     955            0 :                     format!("Failed to read blob from virtual file {}", self.file.path)
     956           24 :                 })?;
     957           24 :             let val = Value::des(&buf).with_context(|| {
     958            0 :                 format!(
     959            0 :                     "Failed to deserialize file blob from virtual file {}",
     960            0 :                     self.file.path
     961            0 :                 )
     962           24 :             })?;
     963           24 :             result.push((key, lsn, val));
     964              :         }
     965            8 :         Ok(result)
     966            8 :     }
     967              : 
     968          356 :     async fn plan_reads<Reader>(
     969          356 :         keyspace: &KeySpace,
     970          356 :         lsn_range: Range<Lsn>,
     971          356 :         data_end_offset: u64,
     972          356 :         index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
     973          356 :         mut planner: VectoredReadPlanner,
     974          356 :         reconstruct_state: &mut ValuesReconstructState,
     975          356 :         ctx: &RequestContext,
     976          356 :     ) -> anyhow::Result<Vec<VectoredRead>>
     977          356 :     where
     978          356 :         Reader: BlockReader + Clone,
     979          356 :     {
     980          356 :         let ctx = RequestContextBuilder::extend(ctx)
     981          356 :             .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     982          356 :             .build();
     983              : 
     984        42652 :         for range in keyspace.ranges.iter() {
     985        42652 :             let mut range_end_handled = false;
     986        42652 : 
     987        42652 :             let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
     988        42652 :             let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
     989        42652 :             let mut index_stream = std::pin::pin!(index_stream);
     990              : 
     991       189599 :             while let Some(index_entry) = index_stream.next().await {
     992       189400 :                 let (raw_key, value) = index_entry?;
     993       189400 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     994       189400 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
     995       189400 :                 let blob_ref = BlobRef(value);
     996       189400 : 
     997       189400 :                 // Lsns are not monotonically increasing across keys, so we don't assert on them.
     998       189400 :                 assert!(key >= range.start);
     999              : 
    1000       189400 :                 let outside_lsn_range = !lsn_range.contains(&lsn);
    1001       189400 :                 let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
    1002              : 
    1003       189400 :                 let flag = {
    1004       189400 :                     if outside_lsn_range || below_cached_lsn {
    1005        70928 :                         BlobFlag::Ignore
    1006       118472 :                     } else if blob_ref.will_init() {
    1007        60824 :                         BlobFlag::ReplaceAll
    1008              :                     } else {
    1009              :                         // Usual path: add blob to the read
    1010        57648 :                         BlobFlag::None
    1011              :                     }
    1012              :                 };
    1013              : 
    1014       189400 :                 if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
    1015        42453 :                     planner.handle_range_end(blob_ref.pos());
    1016        42453 :                     range_end_handled = true;
    1017        42453 :                     break;
    1018       146947 :                 } else {
    1019       146947 :                     planner.handle(key, lsn, blob_ref.pos(), flag);
    1020       146947 :                 }
    1021              :             }
    1022              : 
    1023        42652 :             if !range_end_handled {
    1024          199 :                 tracing::debug!("Handling range end fallback at {}", data_end_offset);
    1025          199 :                 planner.handle_range_end(data_end_offset);
    1026        42453 :             }
    1027              :         }
    1028              : 
    1029          356 :         Ok(planner.finish())
    1030          356 :     }
    1031              : 
    1032          354 :     fn get_min_read_buffer_size(
    1033          354 :         planned_reads: &[VectoredRead],
    1034          354 :         read_size_soft_max: usize,
    1035          354 :     ) -> usize {
    1036        36917 :         let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
    1037           14 :             return read_size_soft_max;
    1038              :         };
    1039              : 
    1040          340 :         let largest_read_size = largest_read.size();
    1041          340 :         if largest_read_size > read_size_soft_max {
    1042              :             // If the read is oversized, it should only contain one key.
    1043          200 :             let offenders = largest_read
    1044          200 :                 .blobs_at
    1045          200 :                 .as_slice()
    1046          200 :                 .iter()
    1047          200 :                 .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
    1048          200 :                 .join(", ");
    1049          200 :             tracing::warn!(
    1050            0 :                 "Oversized vectored read ({} > {}) for keys {}",
    1051              :                 largest_read_size,
    1052              :                 read_size_soft_max,
    1053              :                 offenders
    1054              :             );
    1055          140 :         }
    1056              : 
    1057          340 :         largest_read_size
    1058          354 :     }
    1059              : 
    1060          154 :     async fn do_reads_and_update_state(
    1061          154 :         &self,
    1062          154 :         reads: Vec<VectoredRead>,
    1063          154 :         reconstruct_state: &mut ValuesReconstructState,
    1064          154 :         ctx: &RequestContext,
    1065          154 :     ) {
    1066          154 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
    1067          154 :         let mut ignore_key_with_err = None;
    1068          154 : 
    1069          154 :         let max_vectored_read_bytes = self
    1070          154 :             .max_vectored_read_bytes
    1071          154 :             .expect("Layer is loaded with max vectored bytes config")
    1072          154 :             .0
    1073          154 :             .into();
    1074          154 :         let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
    1075          154 :         let mut buf = Some(BytesMut::with_capacity(buf_size));
    1076              : 
    1077              :         // Note that reads are processed in reverse order (from highest key+lsn).
    1078              :         // This is the order that `ReconstructState` requires such that it can
    1079              :         // track when a key is done.
    1080        17193 :         for read in reads.into_iter().rev() {
    1081        17193 :             let res = vectored_blob_reader
    1082        17193 :                 .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
    1083         8735 :                 .await;
    1084              : 
    1085        17193 :             let blobs_buf = match res {
    1086        17193 :                 Ok(blobs_buf) => blobs_buf,
    1087            0 :                 Err(err) => {
    1088            0 :                     let kind = err.kind();
    1089            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
    1090            0 :                         reconstruct_state.on_key_error(
    1091            0 :                             blob_meta.key,
    1092            0 :                             PageReconstructError::from(anyhow!(
    1093            0 :                                 "Failed to read blobs from virtual file {}: {}",
    1094            0 :                                 self.file.path,
    1095            0 :                                 kind
    1096            0 :                             )),
    1097            0 :                         );
    1098            0 :                     }
    1099              : 
    1100              :                     // We have "lost" the buffer since the lower level IO api
    1101              :                     // doesn't return the buffer on error. Allocate a new one.
    1102            0 :                     buf = Some(BytesMut::with_capacity(buf_size));
    1103            0 : 
    1104            0 :                     continue;
    1105              :                 }
    1106              :             };
    1107              : 
    1108        28180 :             for meta in blobs_buf.blobs.iter().rev() {
    1109        28180 :                 if Some(meta.meta.key) == ignore_key_with_err {
    1110            0 :                     continue;
    1111        28180 :                 }
    1112        28180 : 
    1113        28180 :                 let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
    1114        28180 :                 let value = match value {
    1115        28180 :                     Ok(v) => v,
    1116            0 :                     Err(e) => {
    1117            0 :                         reconstruct_state.on_key_error(
    1118            0 :                             meta.meta.key,
    1119            0 :                             PageReconstructError::from(anyhow!(e).context(format!(
    1120            0 :                                 "Failed to deserialize blob from virtual file {}",
    1121            0 :                                 self.file.path,
    1122            0 :                             ))),
    1123            0 :                         );
    1124            0 : 
    1125            0 :                         ignore_key_with_err = Some(meta.meta.key);
    1126            0 :                         continue;
    1127              :                     }
    1128              :                 };
    1129              : 
    1130              :                 // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
    1131              :                 // state, no further updates shall be made to it. The call below will
    1132              :                 // panic if the invariant is violated.
    1133        28180 :                 reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
    1134              :             }
    1135              : 
    1136        17193 :             buf = Some(blobs_buf.buf);
    1137              :         }
    1138          154 :     }
    1139              : 
    1140          406 :     pub(super) async fn load_keys<'a>(
    1141          406 :         &'a self,
    1142          406 :         ctx: &RequestContext,
    1143          406 :     ) -> Result<Vec<DeltaEntry<'a>>> {
    1144          406 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1145          406 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1146          406 :             self.index_start_blk,
    1147          406 :             self.index_root_blk,
    1148          406 :             block_reader,
    1149          406 :         );
    1150          406 : 
    1151          406 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
    1152          406 : 
    1153          406 :         tree_reader
    1154          406 :             .visit(
    1155          406 :                 &[0u8; DELTA_KEY_SIZE],
    1156          406 :                 VisitDirection::Forwards,
    1157      2064046 :                 |key, value| {
    1158      2064046 :                     let delta_key = DeltaKey::from_slice(key);
    1159      2064046 :                     let val_ref = ValueRef {
    1160      2064046 :                         blob_ref: BlobRef(value),
    1161      2064046 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
    1162      2064046 :                             Adapter(self),
    1163      2064046 :                         )),
    1164      2064046 :                     };
    1165      2064046 :                     let pos = BlobRef(value).pos();
    1166      2064046 :                     if let Some(last) = all_keys.last_mut() {
    1167      2063640 :                         // subtract offset of the current and last entries to get the size
    1168      2063640 :                         // of the value associated with this (key, lsn) tuple
    1169      2063640 :                         let first_pos = last.size;
    1170      2063640 :                         last.size = pos - first_pos;
    1171      2063640 :                     }
    1172      2064046 :                     let entry = DeltaEntry {
    1173      2064046 :                         key: delta_key.key(),
    1174      2064046 :                         lsn: delta_key.lsn(),
    1175      2064046 :                         size: pos,
    1176      2064046 :                         val: val_ref,
    1177      2064046 :                     };
    1178      2064046 :                     all_keys.push(entry);
    1179      2064046 :                     true
    1180      2064046 :                 },
    1181          406 :                 &RequestContextBuilder::extend(ctx)
    1182          406 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1183          406 :                     .build(),
    1184          406 :             )
    1185         2170 :             .await?;
    1186          406 :         if let Some(last) = all_keys.last_mut() {
    1187          406 :             // Last key occupies all space till end of value storage,
    1188          406 :             // which corresponds to beginning of the index
    1189          406 :             last.size = self.index_start_offset() - last.size;
    1190          406 :         }
    1191          406 :         Ok(all_keys)
    1192          406 :     }
    1193              : 
    1194              :     /// Using the given writer, write out a version which has the earlier Lsns than `until`.
    1195              :     ///
    1196              :     /// Return the amount of key value records pushed to the writer.
    1197           10 :     pub(super) async fn copy_prefix(
    1198           10 :         &self,
    1199           10 :         writer: &mut DeltaLayerWriter,
    1200           10 :         until: Lsn,
    1201           10 :         ctx: &RequestContext,
    1202           10 :     ) -> anyhow::Result<usize> {
    1203           10 :         use crate::tenant::vectored_blob_io::{
    1204           10 :             BlobMeta, VectoredReadBuilder, VectoredReadExtended,
    1205           10 :         };
    1206           10 :         use futures::stream::TryStreamExt;
    1207           10 : 
    1208           10 :         #[derive(Debug)]
    1209           10 :         enum Item {
    1210           10 :             Actual(Key, Lsn, BlobRef),
    1211           10 :             Sentinel,
    1212           10 :         }
    1213           10 : 
    1214           10 :         impl From<Item> for Option<(Key, Lsn, BlobRef)> {
    1215           70 :             fn from(value: Item) -> Self {
    1216           70 :                 match value {
    1217           60 :                     Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
    1218           10 :                     Item::Sentinel => None,
    1219           10 :                 }
    1220           70 :             }
    1221           10 :         }
    1222           10 : 
    1223           10 :         impl Item {
    1224           70 :             fn offset(&self) -> Option<BlobRef> {
    1225           70 :                 match self {
    1226           60 :                     Item::Actual(_, _, blob) => Some(*blob),
    1227           10 :                     Item::Sentinel => None,
    1228           10 :                 }
    1229           70 :             }
    1230           10 : 
    1231           70 :             fn is_last(&self) -> bool {
    1232           70 :                 matches!(self, Item::Sentinel)
    1233           70 :             }
    1234           10 :         }
    1235           10 : 
    1236           10 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1237           10 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1238           10 :             self.index_start_blk,
    1239           10 :             self.index_root_blk,
    1240           10 :             block_reader,
    1241           10 :         );
    1242           10 : 
    1243           10 :         let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
    1244           60 :         let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
    1245           10 :         // put in a sentinel value for getting the end offset for last item, and not having to
    1246           10 :         // repeat the whole read part
    1247           10 :         let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
    1248           10 :             Item::Sentinel,
    1249           10 :         ))));
    1250           10 :         let mut stream = std::pin::pin!(stream);
    1251           10 : 
    1252           10 :         let mut prev: Option<(Key, Lsn, BlobRef)> = None;
    1253           10 : 
    1254           10 :         let mut read_builder: Option<VectoredReadBuilder> = None;
    1255           10 : 
    1256           10 :         let max_read_size = self
    1257           10 :             .max_vectored_read_bytes
    1258           10 :             .map(|x| x.0.get())
    1259           10 :             .unwrap_or(8192);
    1260           10 : 
    1261           10 :         let mut buffer = Some(BytesMut::with_capacity(max_read_size));
    1262           10 : 
    1263           10 :         // FIXME: buffering of DeltaLayerWriter
    1264           10 :         let mut per_blob_copy = Vec::new();
    1265           10 : 
    1266           10 :         let mut records = 0;
    1267              : 
    1268           80 :         while let Some(item) = stream.try_next().await? {
    1269           70 :             tracing::debug!(?item, "popped");
    1270           70 :             let offset = item
    1271           70 :                 .offset()
    1272           70 :                 .unwrap_or(BlobRef::new(self.index_start_offset(), false));
    1273              : 
    1274           70 :             let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
    1275           60 :                 let end_offset = offset;
    1276           60 : 
    1277           60 :                 Some((BlobMeta { key, lsn }, start_offset..end_offset))
    1278              :             } else {
    1279           10 :                 None
    1280              :             };
    1281              : 
    1282           70 :             let is_last = item.is_last();
    1283           70 : 
    1284           70 :             prev = Option::from(item);
    1285           70 : 
    1286           70 :             let actionable = actionable.filter(|x| x.0.lsn < until);
    1287              : 
    1288           70 :             let builder = if let Some((meta, offsets)) = actionable {
    1289              :                 // extend or create a new builder
    1290           32 :                 if read_builder
    1291           32 :                     .as_mut()
    1292           32 :                     .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
    1293           32 :                     .unwrap_or(VectoredReadExtended::No)
    1294           32 :                     == VectoredReadExtended::Yes
    1295              :                 {
    1296           16 :                     None
    1297              :                 } else {
    1298           16 :                     read_builder.replace(VectoredReadBuilder::new(
    1299           16 :                         offsets.start.pos(),
    1300           16 :                         offsets.end.pos(),
    1301           16 :                         meta,
    1302           16 :                         Some(max_read_size),
    1303           16 :                     ))
    1304              :                 }
    1305              :             } else {
    1306              :                 // nothing to do, except perhaps flush any existing for the last element
    1307           38 :                 None
    1308              :             };
    1309              : 
    1310              :             // flush the possible older builder and also the new one if the item was the last one
    1311           70 :             let builders = builder.into_iter();
    1312           70 :             let builders = if is_last {
    1313           10 :                 builders.chain(read_builder.take())
    1314              :             } else {
    1315           60 :                 builders.chain(None)
    1316              :             };
    1317              : 
    1318           86 :             for builder in builders {
    1319           16 :                 let read = builder.build();
    1320           16 : 
    1321           16 :                 let reader = VectoredBlobReader::new(&self.file);
    1322           16 : 
    1323           16 :                 let mut buf = buffer.take().unwrap();
    1324           16 : 
    1325           16 :                 buf.clear();
    1326           16 :                 buf.reserve(read.size());
    1327           16 :                 let res = reader.read_blobs(&read, buf, ctx).await?;
    1328              : 
    1329           48 :                 for blob in res.blobs {
    1330           32 :                     let key = blob.meta.key;
    1331           32 :                     let lsn = blob.meta.lsn;
    1332           32 :                     let data = &res.buf[blob.start..blob.end];
    1333           32 : 
    1334           32 :                     #[cfg(debug_assertions)]
    1335           32 :                     Value::des(data)
    1336           32 :                         .with_context(|| {
    1337            0 :                             format!(
    1338            0 :                                 "blob failed to deserialize for {}@{}, {}..{}: {:?}",
    1339            0 :                                 blob.meta.key,
    1340            0 :                                 blob.meta.lsn,
    1341            0 :                                 blob.start,
    1342            0 :                                 blob.end,
    1343            0 :                                 utils::Hex(data)
    1344            0 :                             )
    1345           32 :                         })
    1346           32 :                         .unwrap();
    1347           32 : 
    1348           32 :                     // is it an image or will_init walrecord?
    1349           32 :                     // FIXME: this could be handled by threading the BlobRef to the
    1350           32 :                     // VectoredReadBuilder
    1351           32 :                     let will_init = crate::repository::ValueBytes::will_init(data)
    1352           32 :                         .inspect_err(|_e| {
    1353            0 :                             #[cfg(feature = "testing")]
    1354            0 :                             tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
    1355           32 :                         })
    1356           32 :                         .unwrap_or(false);
    1357           32 : 
    1358           32 :                     per_blob_copy.clear();
    1359           32 :                     per_blob_copy.extend_from_slice(data);
    1360              : 
    1361           32 :                     let (tmp, res) = writer
    1362           32 :                         .put_value_bytes(
    1363           32 :                             key,
    1364           32 :                             lsn,
    1365           32 :                             std::mem::take(&mut per_blob_copy),
    1366           32 :                             will_init,
    1367           32 :                             ctx,
    1368           32 :                         )
    1369            4 :                         .await;
    1370           32 :                     per_blob_copy = tmp;
    1371           32 : 
    1372           32 :                     res?;
    1373              : 
    1374           32 :                     records += 1;
    1375              :                 }
    1376              : 
    1377           16 :                 buffer = Some(res.buf);
    1378              :             }
    1379              :         }
    1380              : 
    1381           10 :         assert!(
    1382           10 :             read_builder.is_none(),
    1383            0 :             "with the sentinel above loop should had handled all"
    1384              :         );
    1385              : 
    1386           10 :         Ok(records)
    1387           10 :     }
    1388              : 
    1389            4 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1390            4 :         println!(
    1391            4 :             "index_start_blk: {}, root {}",
    1392            4 :             self.index_start_blk, self.index_root_blk
    1393            4 :         );
    1394            4 : 
    1395            4 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1396            4 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1397            4 :             self.index_start_blk,
    1398            4 :             self.index_root_blk,
    1399            4 :             block_reader,
    1400            4 :         );
    1401            4 : 
    1402            4 :         tree_reader.dump().await?;
    1403              : 
    1404            4 :         let keys = self.load_keys(ctx).await?;
    1405              : 
    1406            8 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1407            8 :             let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1408            8 :             let val = Value::des(&buf)?;
    1409            8 :             let desc = match val {
    1410            8 :                 Value::Image(img) => {
    1411            8 :                     format!(" img {} bytes", img.len())
    1412              :                 }
    1413            0 :                 Value::WalRecord(rec) => {
    1414            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
    1415            0 :                     format!(
    1416            0 :                         " rec {} bytes will_init: {} {}",
    1417            0 :                         buf.len(),
    1418            0 :                         rec.will_init(),
    1419            0 :                         wal_desc
    1420            0 :                     )
    1421              :                 }
    1422              :             };
    1423            8 :             Ok(desc)
    1424            8 :         }
    1425              : 
    1426           12 :         for entry in keys {
    1427            8 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1428            8 :             let desc = match dump_blob(&val, ctx).await {
    1429            8 :                 Ok(desc) => desc,
    1430            0 :                 Err(err) => {
    1431            0 :                     format!("ERROR: {err}")
    1432              :                 }
    1433              :             };
    1434            8 :             println!("  key {key} at {lsn}: {desc}");
    1435            8 : 
    1436            8 :             // Print more details about CHECKPOINT records. Would be nice to print details
    1437            8 :             // of many other record types too, but these are particularly interesting, as
    1438            8 :             // have a lot of special processing for them in walingest.rs.
    1439            8 :             use pageserver_api::key::CHECKPOINT_KEY;
    1440            8 :             use postgres_ffi::CheckPoint;
    1441            8 :             if key == CHECKPOINT_KEY {
    1442            0 :                 let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1443            0 :                 let val = Value::des(&buf)?;
    1444            0 :                 match val {
    1445            0 :                     Value::Image(img) => {
    1446            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1447            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
    1448              :                     }
    1449            0 :                     Value::WalRecord(_rec) => {
    1450            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1451            0 :                     }
    1452              :                 }
    1453            8 :             }
    1454              :         }
    1455              : 
    1456            4 :         Ok(())
    1457            4 :     }
    1458              : 
    1459           38 :     fn stream_index_forwards<'a, R>(
    1460           38 :         &'a self,
    1461           38 :         reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
    1462           38 :         start: &'a [u8; DELTA_KEY_SIZE],
    1463           38 :         ctx: &'a RequestContext,
    1464           38 :     ) -> impl futures::stream::Stream<
    1465           38 :         Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
    1466           38 :     > + 'a
    1467           38 :     where
    1468           38 :         R: BlockReader + 'a,
    1469           38 :     {
    1470           38 :         use futures::stream::TryStreamExt;
    1471           38 :         let stream = reader.into_stream(start, ctx);
    1472          176 :         stream.map_ok(|(key, value)| {
    1473          176 :             let key = DeltaKey::from_slice(&key);
    1474          176 :             let (key, lsn) = (key.key(), key.lsn());
    1475          176 :             let offset = BlobRef(value);
    1476          176 : 
    1477          176 :             (key, lsn, offset)
    1478          176 :         })
    1479           38 :     }
    1480              : 
    1481              :     /// The file offset to the first block of index.
    1482              :     ///
    1483              :     /// The file structure is summary, values, and index. We often need this for the size of last blob.
    1484          630 :     fn index_start_offset(&self) -> u64 {
    1485          630 :         let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
    1486          630 :         let bref = BlobRef(offset);
    1487          630 :         tracing::debug!(
    1488              :             index_start_blk = self.index_start_blk,
    1489              :             offset,
    1490            0 :             pos = bref.pos(),
    1491            0 :             "index_start_offset"
    1492              :         );
    1493          630 :         offset
    1494          630 :     }
    1495              : }
    1496              : 
    1497              : /// A set of data associated with a delta layer key and its value
    1498              : pub struct DeltaEntry<'a> {
    1499              :     pub key: Key,
    1500              :     pub lsn: Lsn,
    1501              :     /// Size of the stored value
    1502              :     pub size: u64,
    1503              :     /// Reference to the on-disk value
    1504              :     pub val: ValueRef<'a>,
    1505              : }
    1506              : 
    1507              : /// Reference to an on-disk value
    1508              : pub struct ValueRef<'a> {
    1509              :     blob_ref: BlobRef,
    1510              :     reader: BlockCursor<'a>,
    1511              : }
    1512              : 
    1513              : impl<'a> ValueRef<'a> {
    1514              :     /// Loads the value from disk
    1515      2064038 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1516              :         // theoretically we *could* record an access time for each, but it does not really matter
    1517      2064038 :         let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1518      2064038 :         let val = Value::des(&buf)?;
    1519      2064038 :         Ok(val)
    1520      2064038 :     }
    1521              : }
    1522              : 
    1523              : pub(crate) struct Adapter<T>(T);
    1524              : 
    1525              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1526      2083002 :     pub(crate) async fn read_blk(
    1527      2083002 :         &self,
    1528      2083002 :         blknum: u32,
    1529      2083002 :         ctx: &RequestContext,
    1530      2083002 :     ) -> Result<BlockLease, std::io::Error> {
    1531      2083002 :         let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
    1532      2083002 :         block_reader.read_blk(blknum, ctx).await
    1533      2083002 :     }
    1534              : }
    1535              : 
    1536              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1537      4166004 :     fn as_ref(&self) -> &DeltaLayerInner {
    1538      4166004 :         self
    1539      4166004 :     }
    1540              : }
    1541              : 
    1542              : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
    1543            0 :     fn key(&self) -> Key {
    1544            0 :         self.key
    1545            0 :     }
    1546            0 :     fn lsn(&self) -> Lsn {
    1547            0 :         self.lsn
    1548            0 :     }
    1549            0 :     fn size(&self) -> u64 {
    1550            0 :         self.size
    1551            0 :     }
    1552              : }
    1553              : 
    1554              : #[cfg(test)]
    1555              : mod test {
    1556              :     use std::collections::BTreeMap;
    1557              : 
    1558              :     use itertools::MinMaxResult;
    1559              :     use rand::prelude::{SeedableRng, SliceRandom, StdRng};
    1560              :     use rand::RngCore;
    1561              : 
    1562              :     use super::*;
    1563              :     use crate::{
    1564              :         context::DownloadBehavior,
    1565              :         task_mgr::TaskKind,
    1566              :         tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
    1567              :         DEFAULT_PG_VERSION,
    1568              :     };
    1569              : 
    1570              :     /// Construct an index for a fictional delta layer and and then
    1571              :     /// traverse in order to plan vectored reads for a query. Finally,
    1572              :     /// verify that the traversal fed the right index key and value
    1573              :     /// pairs into the planner.
    1574              :     #[tokio::test]
    1575            2 :     async fn test_delta_layer_index_traversal() {
    1576            2 :         let base_key = Key {
    1577            2 :             field1: 0,
    1578            2 :             field2: 1663,
    1579            2 :             field3: 12972,
    1580            2 :             field4: 16396,
    1581            2 :             field5: 0,
    1582            2 :             field6: 246080,
    1583            2 :         };
    1584            2 : 
    1585            2 :         // Populate the index with some entries
    1586            2 :         let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
    1587            2 :             (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
    1588            2 :             (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1589            2 :             (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1590            2 :             (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
    1591            2 :         ]);
    1592            2 : 
    1593            2 :         let mut disk = TestDisk::default();
    1594            2 :         let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
    1595            2 : 
    1596            2 :         let mut disk_offset = 0;
    1597           10 :         for (key, lsns) in &entries {
    1598           42 :             for lsn in lsns {
    1599           34 :                 let index_key = DeltaKey::from_key_lsn(key, *lsn);
    1600           34 :                 let blob_ref = BlobRef::new(disk_offset, false);
    1601           34 :                 writer
    1602           34 :                     .append(&index_key.0, blob_ref.0)
    1603           34 :                     .expect("In memory disk append should never fail");
    1604           34 : 
    1605           34 :                 disk_offset += 1;
    1606           34 :             }
    1607            2 :         }
    1608            2 : 
    1609            2 :         // Prepare all the arguments for the call into `plan_reads` below
    1610            2 :         let (root_offset, _writer) = writer
    1611            2 :             .finish()
    1612            2 :             .expect("In memory disk finish should never fail");
    1613            2 :         let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
    1614            2 :         let planner = VectoredReadPlanner::new(100);
    1615            2 :         let mut reconstruct_state = ValuesReconstructState::new();
    1616            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1617            2 : 
    1618            2 :         let keyspace = KeySpace {
    1619            2 :             ranges: vec![
    1620            2 :                 base_key..base_key.add(3),
    1621            2 :                 base_key.add(3)..base_key.add(100),
    1622            2 :             ],
    1623            2 :         };
    1624            2 :         let lsn_range = Lsn(2)..Lsn(40);
    1625            2 : 
    1626            2 :         // Plan and validate
    1627            2 :         let vectored_reads = DeltaLayerInner::plan_reads(
    1628            2 :             &keyspace,
    1629            2 :             lsn_range.clone(),
    1630            2 :             disk_offset,
    1631            2 :             reader,
    1632            2 :             planner,
    1633            2 :             &mut reconstruct_state,
    1634            2 :             &ctx,
    1635            2 :         )
    1636            2 :         .await
    1637            2 :         .expect("Read planning should not fail");
    1638            2 : 
    1639            2 :         validate(keyspace, lsn_range, vectored_reads, entries);
    1640            2 :     }
    1641              : 
    1642            2 :     fn validate(
    1643            2 :         keyspace: KeySpace,
    1644            2 :         lsn_range: Range<Lsn>,
    1645            2 :         vectored_reads: Vec<VectoredRead>,
    1646            2 :         index_entries: BTreeMap<Key, Vec<Lsn>>,
    1647            2 :     ) {
    1648            2 :         #[derive(Debug, PartialEq, Eq)]
    1649            2 :         struct BlobSpec {
    1650            2 :             key: Key,
    1651            2 :             lsn: Lsn,
    1652            2 :             at: u64,
    1653            2 :         }
    1654            2 : 
    1655            2 :         let mut planned_blobs = Vec::new();
    1656            8 :         for read in vectored_reads {
    1657           28 :             for (at, meta) in read.blobs_at.as_slice() {
    1658           28 :                 planned_blobs.push(BlobSpec {
    1659           28 :                     key: meta.key,
    1660           28 :                     lsn: meta.lsn,
    1661           28 :                     at: *at,
    1662           28 :                 });
    1663           28 :             }
    1664              :         }
    1665              : 
    1666            2 :         let mut expected_blobs = Vec::new();
    1667            2 :         let mut disk_offset = 0;
    1668           10 :         for (key, lsns) in index_entries {
    1669           42 :             for lsn in lsns {
    1670           42 :                 let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
    1671           34 :                 let lsn_included = lsn_range.contains(&lsn);
    1672           34 : 
    1673           34 :                 if key_included && lsn_included {
    1674           28 :                     expected_blobs.push(BlobSpec {
    1675           28 :                         key,
    1676           28 :                         lsn,
    1677           28 :                         at: disk_offset,
    1678           28 :                     });
    1679           28 :                 }
    1680              : 
    1681           34 :                 disk_offset += 1;
    1682              :             }
    1683              :         }
    1684              : 
    1685            2 :         assert_eq!(planned_blobs, expected_blobs);
    1686            2 :     }
    1687              : 
    1688              :     mod constants {
    1689              :         use utils::lsn::Lsn;
    1690              : 
    1691              :         /// Offset used by all lsns in this test
    1692              :         pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
    1693              :         /// Number of unique keys including in the test data
    1694              :         pub(super) const KEY_COUNT: u8 = 60;
    1695              :         /// Max number of different lsns for each key
    1696              :         pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
    1697              :         /// Possible value sizes for each key along with a probability weight
    1698              :         pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
    1699              :         /// Probability that there will be a gap between the current key and the next one (33.3%)
    1700              :         pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
    1701              :         /// The minimum size of a key range in all the generated reads
    1702              :         pub(super) const MIN_RANGE_SIZE: i128 = 10;
    1703              :         /// The number of ranges included in each vectored read
    1704              :         pub(super) const RANGES_COUNT: u8 = 2;
    1705              :         /// The number of vectored reads performed
    1706              :         pub(super) const READS_COUNT: u8 = 100;
    1707              :         /// Soft max size of a vectored read. Will be violated if we have to read keys
    1708              :         /// with values larger than the limit
    1709              :         pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
    1710              :     }
    1711              : 
    1712              :     struct Entry {
    1713              :         key: Key,
    1714              :         lsn: Lsn,
    1715              :         value: Vec<u8>,
    1716              :     }
    1717              : 
    1718            2 :     fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
    1719            2 :         let mut current_key = Key::MIN;
    1720            2 : 
    1721            2 :         let mut entries = Vec::new();
    1722          122 :         for _ in 0..constants::KEY_COUNT {
    1723          120 :             let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
    1724          120 :             let mut lsns_iter =
    1725         2260 :                 std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
    1726         2260 :                     Some(Lsn(lsn.0 + 0x08))
    1727         2260 :                 });
    1728          120 :             let mut lsns = Vec::new();
    1729         2380 :             while lsns.len() < count as usize {
    1730         2260 :                 let take = rng.gen_bool(0.5);
    1731         2260 :                 let lsn = lsns_iter.next().unwrap();
    1732         2260 :                 if take {
    1733         1112 :                     lsns.push(lsn);
    1734         1148 :                 }
    1735              :             }
    1736              : 
    1737         1232 :             for lsn in lsns {
    1738         1112 :                 let size = constants::VALUE_SIZES
    1739         3336 :                     .choose_weighted(rng, |item| item.1)
    1740         1112 :                     .unwrap()
    1741         1112 :                     .0;
    1742         1112 :                 let mut buf = vec![0; size];
    1743         1112 :                 rng.fill_bytes(&mut buf);
    1744         1112 : 
    1745         1112 :                 entries.push(Entry {
    1746         1112 :                     key: current_key,
    1747         1112 :                     lsn,
    1748         1112 :                     value: buf,
    1749         1112 :                 })
    1750              :             }
    1751              : 
    1752          120 :             let gap = constants::KEY_GAP_CHANGES
    1753          240 :                 .choose_weighted(rng, |item| item.1)
    1754          120 :                 .unwrap()
    1755          120 :                 .0;
    1756          120 :             if gap {
    1757           38 :                 current_key = current_key.add(2);
    1758           82 :             } else {
    1759           82 :                 current_key = current_key.add(1);
    1760           82 :             }
    1761              :         }
    1762              : 
    1763            2 :         entries
    1764            2 :     }
    1765              : 
    1766              :     struct EntriesMeta {
    1767              :         key_range: Range<Key>,
    1768              :         lsn_range: Range<Lsn>,
    1769              :         index: BTreeMap<(Key, Lsn), Vec<u8>>,
    1770              :     }
    1771              : 
    1772            2 :     fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
    1773         1112 :         let key_range = match entries.iter().minmax_by_key(|e| e.key) {
    1774            2 :             MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
    1775            0 :             _ => panic!("More than one entry is always expected"),
    1776              :         };
    1777              : 
    1778         1112 :         let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
    1779            2 :             MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
    1780            0 :             _ => panic!("More than one entry is always expected"),
    1781              :         };
    1782              : 
    1783            2 :         let mut index = BTreeMap::new();
    1784         1112 :         for entry in entries.iter() {
    1785         1112 :             index.insert((entry.key, entry.lsn), entry.value.clone());
    1786         1112 :         }
    1787              : 
    1788            2 :         EntriesMeta {
    1789            2 :             key_range,
    1790            2 :             lsn_range,
    1791            2 :             index,
    1792            2 :         }
    1793            2 :     }
    1794              : 
    1795          200 :     fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
    1796          200 :         let start = key_range.start.to_i128();
    1797          200 :         let end = key_range.end.to_i128();
    1798          200 : 
    1799          200 :         let mut keyspace = KeySpace::default();
    1800              : 
    1801          600 :         for _ in 0..constants::RANGES_COUNT {
    1802          400 :             let mut range: Option<Range<Key>> = Option::default();
    1803         1244 :             while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
    1804          844 :                 let range_start = rng.gen_range(start..end);
    1805          844 :                 let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
    1806          844 :                 if range_end_offset >= end {
    1807          100 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(end));
    1808          744 :                 } else {
    1809          744 :                     let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
    1810          744 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
    1811          744 :                 }
    1812              :             }
    1813          400 :             keyspace.ranges.push(range.unwrap());
    1814              :         }
    1815              : 
    1816          200 :         keyspace
    1817          200 :     }
    1818              : 
    1819              :     #[tokio::test]
    1820            2 :     async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
    1821            2 :         let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
    1822            8 :         let (tenant, ctx) = harness.load().await;
    1823            2 : 
    1824            2 :         let timeline_id = TimelineId::generate();
    1825            2 :         let timeline = tenant
    1826            2 :             .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
    1827            6 :             .await?;
    1828            2 : 
    1829            2 :         tracing::info!("Generating test data ...");
    1830            2 : 
    1831            2 :         let rng = &mut StdRng::seed_from_u64(0);
    1832            2 :         let entries = generate_entries(rng);
    1833            2 :         let entries_meta = get_entries_meta(&entries);
    1834            2 : 
    1835            2 :         tracing::info!("Done generating {} entries", entries.len());
    1836            2 : 
    1837            2 :         tracing::info!("Writing test data to delta layer ...");
    1838            2 :         let mut writer = DeltaLayerWriter::new(
    1839            2 :             harness.conf,
    1840            2 :             timeline_id,
    1841            2 :             harness.tenant_shard_id,
    1842            2 :             entries_meta.key_range.start,
    1843            2 :             entries_meta.lsn_range.clone(),
    1844            2 :             &ctx,
    1845            2 :         )
    1846            2 :         .await?;
    1847            2 : 
    1848         1114 :         for entry in entries {
    1849         1112 :             let (_, res) = writer
    1850         1112 :                 .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx)
    1851          215 :                 .await;
    1852         1112 :             res?;
    1853            2 :         }
    1854            2 : 
    1855            2 :         let resident = writer
    1856            2 :             .finish(entries_meta.key_range.end, &timeline, &ctx)
    1857            5 :             .await?;
    1858            2 : 
    1859            2 :         let inner = resident.get_as_delta(&ctx).await?;
    1860            2 : 
    1861            2 :         let file_size = inner.file.metadata().await?.len();
    1862            2 :         tracing::info!(
    1863            2 :             "Done writing test data to delta layer. Resulting file size is: {}",
    1864            2 :             file_size
    1865            2 :         );
    1866            2 : 
    1867          202 :         for i in 0..constants::READS_COUNT {
    1868          200 :             tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
    1869            2 : 
    1870          200 :             let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
    1871          200 :             let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1872          200 :                 inner.index_start_blk,
    1873          200 :                 inner.index_root_blk,
    1874          200 :                 block_reader,
    1875          200 :             );
    1876          200 : 
    1877          200 :             let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
    1878          200 :             let mut reconstruct_state = ValuesReconstructState::new();
    1879          200 :             let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
    1880          200 :             let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
    1881            2 : 
    1882          200 :             let vectored_reads = DeltaLayerInner::plan_reads(
    1883          200 :                 &keyspace,
    1884          200 :                 entries_meta.lsn_range.clone(),
    1885          200 :                 data_end_offset,
    1886          200 :                 index_reader,
    1887          200 :                 planner,
    1888          200 :                 &mut reconstruct_state,
    1889          200 :                 &ctx,
    1890          200 :             )
    1891            4 :             .await?;
    1892            2 : 
    1893          200 :             let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
    1894          200 :             let buf_size = DeltaLayerInner::get_min_read_buffer_size(
    1895          200 :                 &vectored_reads,
    1896          200 :                 constants::MAX_VECTORED_READ_BYTES,
    1897          200 :             );
    1898          200 :             let mut buf = Some(BytesMut::with_capacity(buf_size));
    1899            2 : 
    1900        19924 :             for read in vectored_reads {
    1901        19724 :                 let blobs_buf = vectored_blob_reader
    1902        19724 :                     .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
    1903        10016 :                     .await?;
    1904        57304 :                 for meta in blobs_buf.blobs.iter() {
    1905        57304 :                     let value = &blobs_buf.buf[meta.start..meta.end];
    1906        57304 :                     assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
    1907            2 :                 }
    1908            2 : 
    1909        19724 :                 buf = Some(blobs_buf.buf);
    1910            2 :             }
    1911            2 :         }
    1912            2 : 
    1913            2 :         Ok(())
    1914            2 :     }
    1915              : 
    1916              :     #[tokio::test]
    1917            2 :     async fn copy_delta_prefix_smoke() {
    1918            2 :         use crate::walrecord::NeonWalRecord;
    1919            2 :         use bytes::Bytes;
    1920            2 : 
    1921            2 :         let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
    1922            8 :         let (tenant, ctx) = h.load().await;
    1923            2 :         let ctx = &ctx;
    1924            2 :         let timeline = tenant
    1925            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
    1926            6 :             .await
    1927            2 :             .unwrap();
    1928            2 : 
    1929            2 :         let initdb_layer = timeline
    1930            2 :             .layers
    1931            2 :             .read()
    1932            2 :             .await
    1933            2 :             .likely_resident_layers()
    1934            2 :             .next()
    1935            2 :             .unwrap();
    1936            2 : 
    1937            2 :         {
    1938            2 :             let mut writer = timeline.writer().await;
    1939            2 : 
    1940            2 :             let data = [
    1941            2 :                 (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
    1942            2 :                 (
    1943            2 :                     0x30,
    1944            2 :                     12,
    1945            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1946            2 :                         will_init: false,
    1947            2 :                         rec: Bytes::from_static(b"1"),
    1948            2 :                     }),
    1949            2 :                 ),
    1950            2 :                 (
    1951            2 :                     0x40,
    1952            2 :                     12,
    1953            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1954            2 :                         will_init: true,
    1955            2 :                         rec: Bytes::from_static(b"2"),
    1956            2 :                     }),
    1957            2 :                 ),
    1958            2 :                 // build an oversized value so we cannot extend and existing read over
    1959            2 :                 // this
    1960            2 :                 (
    1961            2 :                     0x50,
    1962            2 :                     12,
    1963            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1964            2 :                         will_init: true,
    1965            2 :                         rec: {
    1966            2 :                             let mut buf =
    1967            2 :                                 vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
    1968            2 :                             buf.iter_mut()
    1969            2 :                                 .enumerate()
    1970       264192 :                                 .for_each(|(i, slot)| *slot = (i % 256) as u8);
    1971            2 :                             Bytes::from(buf)
    1972            2 :                         },
    1973            2 :                     }),
    1974            2 :                 ),
    1975            2 :                 // because the oversized read cannot be extended further, we are sure to exercise the
    1976            2 :                 // builder created on the last round with this:
    1977            2 :                 (
    1978            2 :                     0x60,
    1979            2 :                     12,
    1980            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1981            2 :                         will_init: true,
    1982            2 :                         rec: Bytes::from_static(b"3"),
    1983            2 :                     }),
    1984            2 :                 ),
    1985            2 :                 (
    1986            2 :                     0x60,
    1987            2 :                     9,
    1988            2 :                     Value::Image(Bytes::from_static(b"something for a different key")),
    1989            2 :                 ),
    1990            2 :             ];
    1991            2 : 
    1992            2 :             let mut last_lsn = None;
    1993            2 : 
    1994           14 :             for (lsn, key, value) in data {
    1995           12 :                 let key = Key::from_i128(key);
    1996           12 :                 writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
    1997           12 :                 last_lsn = Some(lsn);
    1998            2 :             }
    1999            2 : 
    2000            2 :             writer.finish_write(Lsn(last_lsn.unwrap()));
    2001            2 :         }
    2002            2 :         timeline.freeze_and_flush().await.unwrap();
    2003            2 : 
    2004            2 :         let new_layer = timeline
    2005            2 :             .layers
    2006            2 :             .read()
    2007            2 :             .await
    2008            2 :             .likely_resident_layers()
    2009            4 :             .find(|x| x != &initdb_layer)
    2010            2 :             .unwrap();
    2011            2 : 
    2012            2 :         // create a copy for the timeline, so we don't overwrite the file
    2013            2 :         let branch = tenant
    2014            2 :             .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
    2015            2 :             .await
    2016            2 :             .unwrap();
    2017            2 : 
    2018            2 :         assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
    2019            2 : 
    2020            2 :         // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
    2021            2 :         // a single key
    2022            2 : 
    2023           12 :         for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
    2024           10 :             let truncate_at = Lsn(truncate_at);
    2025            2 : 
    2026           10 :             let mut writer = DeltaLayerWriter::new(
    2027           10 :                 tenant.conf,
    2028           10 :                 branch.timeline_id,
    2029           10 :                 tenant.tenant_shard_id,
    2030           10 :                 Key::MIN,
    2031           10 :                 Lsn(0x11)..truncate_at,
    2032           10 :                 ctx,
    2033           10 :             )
    2034            5 :             .await
    2035           10 :             .unwrap();
    2036            2 : 
    2037           10 :             let new_layer = new_layer.download_and_keep_resident().await.unwrap();
    2038           10 : 
    2039           10 :             new_layer
    2040           10 :                 .copy_delta_prefix(&mut writer, truncate_at, ctx)
    2041           15 :                 .await
    2042           10 :                 .unwrap();
    2043            2 : 
    2044           24 :             let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
    2045           10 : 
    2046           11 :             copied_layer.get_as_delta(ctx).await.unwrap();
    2047           10 : 
    2048           10 :             assert_keys_and_values_eq(
    2049           10 :                 new_layer.get_as_delta(ctx).await.unwrap(),
    2050           10 :                 copied_layer.get_as_delta(ctx).await.unwrap(),
    2051           10 :                 truncate_at,
    2052           10 :                 ctx,
    2053            2 :             )
    2054           55 :             .await;
    2055            2 :         }
    2056            2 :     }
    2057              : 
    2058           10 :     async fn assert_keys_and_values_eq(
    2059           10 :         source: &DeltaLayerInner,
    2060           10 :         truncated: &DeltaLayerInner,
    2061           10 :         truncated_at: Lsn,
    2062           10 :         ctx: &RequestContext,
    2063           10 :     ) {
    2064           10 :         use futures::future::ready;
    2065           10 :         use futures::stream::TryStreamExt;
    2066           10 : 
    2067           10 :         let start_key = [0u8; DELTA_KEY_SIZE];
    2068           10 : 
    2069           10 :         let source_reader = FileBlockReader::new(&source.file, source.file_id);
    2070           10 :         let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2071           10 :             source.index_start_blk,
    2072           10 :             source.index_root_blk,
    2073           10 :             &source_reader,
    2074           10 :         );
    2075           10 :         let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
    2076           60 :         let source_stream = source_stream.filter(|res| match res {
    2077           60 :             Ok((_, lsn, _)) => ready(lsn < &truncated_at),
    2078            0 :             _ => ready(true),
    2079           60 :         });
    2080           10 :         let mut source_stream = std::pin::pin!(source_stream);
    2081           10 : 
    2082           10 :         let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
    2083           10 :         let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2084           10 :             truncated.index_start_blk,
    2085           10 :             truncated.index_root_blk,
    2086           10 :             &truncated_reader,
    2087           10 :         );
    2088           10 :         let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
    2089           10 :         let mut truncated_stream = std::pin::pin!(truncated_stream);
    2090           10 : 
    2091           10 :         let mut scratch_left = Vec::new();
    2092           10 :         let mut scratch_right = Vec::new();
    2093              : 
    2094              :         loop {
    2095           42 :             let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
    2096           42 :             let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
    2097           42 : 
    2098           42 :             if src.is_none() {
    2099           10 :                 assert!(truncated.is_none());
    2100           10 :                 break;
    2101           32 :             }
    2102           32 : 
    2103           32 :             let (src, truncated) = (src.unwrap(), truncated.unwrap());
    2104           32 : 
    2105           32 :             // because we've filtered the source with Lsn, we should always have the same keys from both.
    2106           32 :             assert_eq!(src.0, truncated.0);
    2107           32 :             assert_eq!(src.1, truncated.1);
    2108              : 
    2109              :             // if this is needed for something else, just drop this assert.
    2110           32 :             assert!(
    2111           32 :                 src.2.pos() >= truncated.2.pos(),
    2112            0 :                 "value position should not go backwards {} vs. {}",
    2113            0 :                 src.2.pos(),
    2114            0 :                 truncated.2.pos()
    2115              :             );
    2116              : 
    2117           32 :             scratch_left.clear();
    2118           32 :             let src_cursor = source_reader.block_cursor();
    2119           32 :             let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
    2120           32 :             scratch_right.clear();
    2121           32 :             let trunc_cursor = truncated_reader.block_cursor();
    2122           32 :             let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
    2123              : 
    2124           32 :             tokio::try_join!(left, right).unwrap();
    2125           32 : 
    2126           32 :             assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
    2127              :         }
    2128           10 :     }
    2129              : }
        

Generated by: LCOV version 2.1-beta