LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 84.7 % 1734 1469
Test Date: 2024-07-21 16:16:09 Functions: 70.7 % 164 116

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "values", and
      24              : //! "index". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache::{self, FileId, PAGE_SZ};
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::BlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{
      37              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      38              : };
      39              : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
      40              : use crate::tenant::timeline::GetVectoredError;
      41              : use crate::tenant::vectored_blob_io::{
      42              :     BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      43              :     VectoredReadPlanner,
      44              : };
      45              : use crate::tenant::{PageReconstructError, Timeline};
      46              : use crate::virtual_file::{self, VirtualFile};
      47              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      48              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      49              : use anyhow::{anyhow, bail, ensure, Context, Result};
      50              : use bytes::BytesMut;
      51              : use camino::{Utf8Path, Utf8PathBuf};
      52              : use futures::StreamExt;
      53              : use itertools::Itertools;
      54              : use pageserver_api::keyspace::KeySpace;
      55              : use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
      56              : use pageserver_api::shard::TenantShardId;
      57              : use rand::{distributions::Alphanumeric, Rng};
      58              : use serde::{Deserialize, Serialize};
      59              : use std::collections::VecDeque;
      60              : use std::fs::File;
      61              : use std::io::SeekFrom;
      62              : use std::ops::Range;
      63              : use std::os::unix::fs::FileExt;
      64              : use std::str::FromStr;
      65              : use std::sync::Arc;
      66              : use tokio::sync::OnceCell;
      67              : use tracing::*;
      68              : 
      69              : use utils::{
      70              :     bin_ser::BeSer,
      71              :     id::{TenantId, TimelineId},
      72              :     lsn::Lsn,
      73              : };
      74              : 
      75              : use super::{
      76              :     AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
      77              :     ValuesReconstructState,
      78              : };
      79              : 
      80              : ///
      81              : /// Header stored in the beginning of the file
      82              : ///
      83              : /// After this comes the 'values' part, starting on block 1. After that,
      84              : /// the 'index' starts at the block indicated by 'index_start_blk'
      85              : ///
      86         1018 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      87              : pub struct Summary {
      88              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      89              :     pub magic: u16,
      90              :     pub format_version: u16,
      91              : 
      92              :     pub tenant_id: TenantId,
      93              :     pub timeline_id: TimelineId,
      94              :     pub key_range: Range<Key>,
      95              :     pub lsn_range: Range<Lsn>,
      96              : 
      97              :     /// Block number where the 'index' part of the file begins.
      98              :     pub index_start_blk: u32,
      99              :     /// Block within the 'index', where the B-tree root page is stored
     100              :     pub index_root_blk: u32,
     101              : }
     102              : 
     103              : impl From<&DeltaLayer> for Summary {
     104            0 :     fn from(layer: &DeltaLayer) -> Self {
     105            0 :         Self::expected(
     106            0 :             layer.desc.tenant_shard_id.tenant_id,
     107            0 :             layer.desc.timeline_id,
     108            0 :             layer.desc.key_range.clone(),
     109            0 :             layer.desc.lsn_range.clone(),
     110            0 :         )
     111            0 :     }
     112              : }
     113              : 
     114              : impl Summary {
     115         1018 :     pub(super) fn expected(
     116         1018 :         tenant_id: TenantId,
     117         1018 :         timeline_id: TimelineId,
     118         1018 :         keys: Range<Key>,
     119         1018 :         lsns: Range<Lsn>,
     120         1018 :     ) -> Self {
     121         1018 :         Self {
     122         1018 :             magic: DELTA_FILE_MAGIC,
     123         1018 :             format_version: STORAGE_FORMAT_VERSION,
     124         1018 : 
     125         1018 :             tenant_id,
     126         1018 :             timeline_id,
     127         1018 :             key_range: keys,
     128         1018 :             lsn_range: lsns,
     129         1018 : 
     130         1018 :             index_start_blk: 0,
     131         1018 :             index_root_blk: 0,
     132         1018 :         }
     133         1018 :     }
     134              : }
     135              : 
     136              : // Flag indicating that this version initialize the page
     137              : const WILL_INIT: u64 = 1;
     138              : 
     139              : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     140              : /// offset, and for WAL records it also contains `will_init` flag. The flag
     141              : /// helps to determine the range of records that needs to be applied, without
     142              : /// reading/deserializing records themselves.
     143            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     144              : pub struct BlobRef(pub u64);
     145              : 
     146              : impl BlobRef {
     147       240547 :     pub fn will_init(&self) -> bool {
     148       240547 :         (self.0 & WILL_INIT) != 0
     149       240547 :     }
     150              : 
     151      4474241 :     pub fn pos(&self) -> u64 {
     152      4474241 :         self.0 >> 1
     153      4474241 :     }
     154              : 
     155      6458920 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     156      6458920 :         let mut blob_ref = pos << 1;
     157      6458920 :         if will_init {
     158      6457656 :             blob_ref |= WILL_INIT;
     159      6457656 :         }
     160      6458920 :         BlobRef(blob_ref)
     161      6458920 :     }
     162              : }
     163              : 
     164              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     165              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     166              : 
     167              : /// This is the key of the B-tree index stored in the delta layer. It consists
     168              : /// of the serialized representation of a Key and LSN.
     169              : impl DeltaKey {
     170      2064198 :     fn from_slice(buf: &[u8]) -> Self {
     171      2064198 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     172      2064198 :         bytes.copy_from_slice(buf);
     173      2064198 :         DeltaKey(bytes)
     174      2064198 :     }
     175              : 
     176      6705952 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     177      6705952 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     178      6705952 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     179      6705952 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     180      6705952 :         DeltaKey(bytes)
     181      6705952 :     }
     182              : 
     183      2064198 :     fn key(&self) -> Key {
     184      2064198 :         Key::from_slice(&self.0)
     185      2064198 :     }
     186              : 
     187      2064198 :     fn lsn(&self) -> Lsn {
     188      2064198 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     189      2064198 :     }
     190              : 
     191       345977 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     192       345977 :         let mut lsn_buf = [0u8; 8];
     193       345977 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     194       345977 :         Lsn(u64::from_be_bytes(lsn_buf))
     195       345977 :     }
     196              : }
     197              : 
     198              : /// This is used only from `pagectl`. Within pageserver, all layers are
     199              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     200              : pub struct DeltaLayer {
     201              :     path: Utf8PathBuf,
     202              :     pub desc: PersistentLayerDesc,
     203              :     access_stats: LayerAccessStats,
     204              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     205              : }
     206              : 
     207              : impl std::fmt::Debug for DeltaLayer {
     208            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     209            0 :         use super::RangeDisplayDebug;
     210            0 : 
     211            0 :         f.debug_struct("DeltaLayer")
     212            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     213            0 :             .field("lsn_range", &self.desc.lsn_range)
     214            0 :             .field("file_size", &self.desc.file_size)
     215            0 :             .field("inner", &self.inner)
     216            0 :             .finish()
     217            0 :     }
     218              : }
     219              : 
     220              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     221              : /// file.
     222              : pub struct DeltaLayerInner {
     223              :     // values copied from summary
     224              :     index_start_blk: u32,
     225              :     index_root_blk: u32,
     226              : 
     227              :     file: VirtualFile,
     228              :     file_id: FileId,
     229              : 
     230              :     #[allow(dead_code)]
     231              :     layer_key_range: Range<Key>,
     232              :     #[allow(dead_code)]
     233              :     layer_lsn_range: Range<Lsn>,
     234              : 
     235              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     236              : }
     237              : 
     238              : impl std::fmt::Debug for DeltaLayerInner {
     239            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     240            0 :         f.debug_struct("DeltaLayerInner")
     241            0 :             .field("index_start_blk", &self.index_start_blk)
     242            0 :             .field("index_root_blk", &self.index_root_blk)
     243            0 :             .finish()
     244            0 :     }
     245              : }
     246              : 
     247              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     248              : impl std::fmt::Display for DeltaLayer {
     249            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     250            0 :         write!(f, "{}", self.layer_desc().short_id())
     251            0 :     }
     252              : }
     253              : 
     254              : impl AsLayerDesc for DeltaLayer {
     255            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     256            0 :         &self.desc
     257            0 :     }
     258              : }
     259              : 
     260              : impl DeltaLayer {
     261            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     262            0 :         self.desc.dump();
     263            0 : 
     264            0 :         if !verbose {
     265            0 :             return Ok(());
     266            0 :         }
     267              : 
     268            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     269              : 
     270            0 :         inner.dump(ctx).await
     271            0 :     }
     272              : 
     273         1346 :     fn temp_path_for(
     274         1346 :         conf: &PageServerConf,
     275         1346 :         tenant_shard_id: &TenantShardId,
     276         1346 :         timeline_id: &TimelineId,
     277         1346 :         key_start: Key,
     278         1346 :         lsn_range: &Range<Lsn>,
     279         1346 :     ) -> Utf8PathBuf {
     280         1346 :         let rand_string: String = rand::thread_rng()
     281         1346 :             .sample_iter(&Alphanumeric)
     282         1346 :             .take(8)
     283         1346 :             .map(char::from)
     284         1346 :             .collect();
     285         1346 : 
     286         1346 :         conf.timeline_path(tenant_shard_id, timeline_id)
     287         1346 :             .join(format!(
     288         1346 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     289         1346 :                 key_start,
     290         1346 :                 u64::from(lsn_range.start),
     291         1346 :                 u64::from(lsn_range.end),
     292         1346 :                 rand_string,
     293         1346 :                 TEMP_FILE_SUFFIX,
     294         1346 :             ))
     295         1346 :     }
     296              : 
     297              :     ///
     298              :     /// Open the underlying file and read the metadata into memory, if it's
     299              :     /// not loaded already.
     300              :     ///
     301            0 :     async fn load(
     302            0 :         &self,
     303            0 :         access_kind: LayerAccessKind,
     304            0 :         ctx: &RequestContext,
     305            0 :     ) -> Result<&Arc<DeltaLayerInner>> {
     306            0 :         self.access_stats.record_access(access_kind, ctx);
     307            0 :         // Quick exit if already loaded
     308            0 :         self.inner
     309            0 :             .get_or_try_init(|| self.load_inner(ctx))
     310            0 :             .await
     311            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     312            0 :     }
     313              : 
     314            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     315            0 :         let path = self.path();
     316              : 
     317            0 :         let loaded = DeltaLayerInner::load(&path, None, None, ctx)
     318            0 :             .await
     319            0 :             .and_then(|res| res)?;
     320              : 
     321              :         // not production code
     322            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     323            0 :         let expected_layer_name = self.layer_desc().layer_name();
     324            0 : 
     325            0 :         if actual_layer_name != expected_layer_name {
     326            0 :             println!("warning: filename does not match what is expected from in-file summary");
     327            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     328            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     329            0 :         }
     330              : 
     331            0 :         Ok(Arc::new(loaded))
     332            0 :     }
     333              : 
     334              :     /// Create a DeltaLayer struct representing an existing file on disk.
     335              :     ///
     336              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     337            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     338            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     339            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     340            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     341              : 
     342            0 :         let metadata = file
     343            0 :             .metadata()
     344            0 :             .context("get file metadata to determine size")?;
     345              : 
     346              :         // This function is never used for constructing layers in a running pageserver,
     347              :         // so it does not need an accurate TenantShardId.
     348            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     349            0 : 
     350            0 :         Ok(DeltaLayer {
     351            0 :             path: path.to_path_buf(),
     352            0 :             desc: PersistentLayerDesc::new_delta(
     353            0 :                 tenant_shard_id,
     354            0 :                 summary.timeline_id,
     355            0 :                 summary.key_range,
     356            0 :                 summary.lsn_range,
     357            0 :                 metadata.len(),
     358            0 :             ),
     359            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     360            0 :             inner: OnceCell::new(),
     361            0 :         })
     362            0 :     }
     363              : 
     364              :     /// Path to the layer file in pageserver workdir.
     365            0 :     fn path(&self) -> Utf8PathBuf {
     366            0 :         self.path.clone()
     367            0 :     }
     368              : }
     369              : 
     370              : /// A builder object for constructing a new delta layer.
     371              : ///
     372              : /// Usage:
     373              : ///
     374              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     375              : ///
     376              : /// 2. Write the contents by calling `put_value` for every page
     377              : ///    version to store in the layer.
     378              : ///
     379              : /// 3. Call `finish`.
     380              : ///
     381              : struct DeltaLayerWriterInner {
     382              :     conf: &'static PageServerConf,
     383              :     pub path: Utf8PathBuf,
     384              :     timeline_id: TimelineId,
     385              :     tenant_shard_id: TenantShardId,
     386              : 
     387              :     key_start: Key,
     388              :     lsn_range: Range<Lsn>,
     389              : 
     390              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     391              : 
     392              :     blob_writer: BlobWriter<true>,
     393              : }
     394              : 
     395              : impl DeltaLayerWriterInner {
     396              :     ///
     397              :     /// Start building a new delta layer.
     398              :     ///
     399         1346 :     async fn new(
     400         1346 :         conf: &'static PageServerConf,
     401         1346 :         timeline_id: TimelineId,
     402         1346 :         tenant_shard_id: TenantShardId,
     403         1346 :         key_start: Key,
     404         1346 :         lsn_range: Range<Lsn>,
     405         1346 :         ctx: &RequestContext,
     406         1346 :     ) -> anyhow::Result<Self> {
     407         1346 :         // Create the file initially with a temporary filename. We don't know
     408         1346 :         // the end key yet, so we cannot form the final filename yet. We will
     409         1346 :         // rename it when we're done.
     410         1346 :         //
     411         1346 :         // Note: This overwrites any existing file. There shouldn't be any.
     412         1346 :         // FIXME: throw an error instead?
     413         1346 :         let path =
     414         1346 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     415              : 
     416         1346 :         let mut file = VirtualFile::create(&path, ctx).await?;
     417              :         // make room for the header block
     418         1346 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     419         1346 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     420         1346 : 
     421         1346 :         // Initialize the b-tree index builder
     422         1346 :         let block_buf = BlockBuf::new();
     423         1346 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     424         1346 : 
     425         1346 :         Ok(Self {
     426         1346 :             conf,
     427         1346 :             path,
     428         1346 :             timeline_id,
     429         1346 :             tenant_shard_id,
     430         1346 :             key_start,
     431         1346 :             lsn_range,
     432         1346 :             tree: tree_builder,
     433         1346 :             blob_writer,
     434         1346 :         })
     435         1346 :     }
     436              : 
     437              :     ///
     438              :     /// Append a key-value pair to the file.
     439              :     ///
     440              :     /// The values must be appended in key, lsn order.
     441              :     ///
     442      2072154 :     async fn put_value(
     443      2072154 :         &mut self,
     444      2072154 :         key: Key,
     445      2072154 :         lsn: Lsn,
     446      2072154 :         val: Value,
     447      2072154 :         ctx: &RequestContext,
     448      2072154 :     ) -> anyhow::Result<()> {
     449      2072154 :         let (_, res) = self
     450      2072154 :             .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
     451         1566 :             .await;
     452      2072154 :         res
     453      2072154 :     }
     454              : 
     455      6458816 :     async fn put_value_bytes(
     456      6458816 :         &mut self,
     457      6458816 :         key: Key,
     458      6458816 :         lsn: Lsn,
     459      6458816 :         val: Vec<u8>,
     460      6458816 :         will_init: bool,
     461      6458816 :         ctx: &RequestContext,
     462      6458816 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     463      6458816 :         assert!(self.lsn_range.start <= lsn);
     464              :         // We don't want to use compression in delta layer creation
     465      6458816 :         let compression = ImageCompressionAlgorithm::Disabled;
     466      6458816 :         let (val, res) = self
     467      6458816 :             .blob_writer
     468      6458816 :             .write_blob_maybe_compressed(val, ctx, compression)
     469         4517 :             .await;
     470      6458816 :         let off = match res {
     471      6458816 :             Ok(off) => off,
     472            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     473              :         };
     474              : 
     475      6458816 :         let blob_ref = BlobRef::new(off, will_init);
     476      6458816 : 
     477      6458816 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     478      6458816 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     479      6458816 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     480      6458816 :     }
     481              : 
     482      2023972 :     fn size(&self) -> u64 {
     483      2023972 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     484      2023972 :     }
     485              : 
     486              :     ///
     487              :     /// Finish writing the delta layer.
     488              :     ///
     489         1346 :     async fn finish(
     490         1346 :         self,
     491         1346 :         key_end: Key,
     492         1346 :         timeline: &Arc<Timeline>,
     493         1346 :         ctx: &RequestContext,
     494         1346 :     ) -> anyhow::Result<ResidentLayer> {
     495         1346 :         let temp_path = self.path.clone();
     496         9573 :         let result = self.finish0(key_end, timeline, ctx).await;
     497         1346 :         if result.is_err() {
     498            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
     499            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     500            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     501            0 :             }
     502         1346 :         }
     503         1346 :         result
     504         1346 :     }
     505              : 
     506         1346 :     async fn finish0(
     507         1346 :         self,
     508         1346 :         key_end: Key,
     509         1346 :         timeline: &Arc<Timeline>,
     510         1346 :         ctx: &RequestContext,
     511         1346 :     ) -> anyhow::Result<ResidentLayer> {
     512         1346 :         let index_start_blk =
     513         1346 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     514              : 
     515         1346 :         let mut file = self.blob_writer.into_inner(ctx).await?;
     516              : 
     517              :         // Write out the index
     518         1346 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     519         1346 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     520            0 :             .await?;
     521        14904 :         for buf in block_buf.blocks {
     522        13558 :             let (_buf, res) = file.write_all(buf, ctx).await;
     523        13558 :             res?;
     524              :         }
     525         1346 :         assert!(self.lsn_range.start < self.lsn_range.end);
     526              :         // Fill in the summary on blk 0
     527         1346 :         let summary = Summary {
     528         1346 :             magic: DELTA_FILE_MAGIC,
     529         1346 :             format_version: STORAGE_FORMAT_VERSION,
     530         1346 :             tenant_id: self.tenant_shard_id.tenant_id,
     531         1346 :             timeline_id: self.timeline_id,
     532         1346 :             key_range: self.key_start..key_end,
     533         1346 :             lsn_range: self.lsn_range.clone(),
     534         1346 :             index_start_blk,
     535         1346 :             index_root_blk,
     536         1346 :         };
     537         1346 : 
     538         1346 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     539         1346 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     540         1346 :         Summary::ser_into(&summary, &mut buf)?;
     541         1346 :         file.seek(SeekFrom::Start(0)).await?;
     542         1346 :         let (_buf, res) = file.write_all(buf, ctx).await;
     543         1346 :         res?;
     544              : 
     545         1346 :         let metadata = file
     546         1346 :             .metadata()
     547          677 :             .await
     548         1346 :             .context("get file metadata to determine size")?;
     549              : 
     550              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     551              :         // Make it a little bit below to account for differing GB units
     552              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     553              :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     554         1346 :         ensure!(
     555         1346 :             metadata.len() <= S3_UPLOAD_LIMIT,
     556            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     557            0 :             file.path,
     558            0 :             metadata.len()
     559              :         );
     560              : 
     561              :         // Note: Because we opened the file in write-only mode, we cannot
     562              :         // reuse the same VirtualFile for reading later. That's why we don't
     563              :         // set inner.file here. The first read will have to re-open it.
     564              : 
     565         1346 :         let desc = PersistentLayerDesc::new_delta(
     566         1346 :             self.tenant_shard_id,
     567         1346 :             self.timeline_id,
     568         1346 :             self.key_start..key_end,
     569         1346 :             self.lsn_range.clone(),
     570         1346 :             metadata.len(),
     571         1346 :         );
     572         1346 : 
     573         1346 :         // fsync the file
     574         1346 :         file.sync_all().await?;
     575              : 
     576         1346 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     577              : 
     578         1346 :         trace!("created delta layer {}", layer.local_path());
     579              : 
     580         1346 :         Ok(layer)
     581         1346 :     }
     582              : }
     583              : 
     584              : /// A builder object for constructing a new delta layer.
     585              : ///
     586              : /// Usage:
     587              : ///
     588              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     589              : ///
     590              : /// 2. Write the contents by calling `put_value` for every page
     591              : ///    version to store in the layer.
     592              : ///
     593              : /// 3. Call `finish`.
     594              : ///
     595              : /// # Note
     596              : ///
     597              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     598              : /// possible for the writer to drop before `finish` is actually called. So this
     599              : /// could lead to odd temporary files in the directory, exhausting file system.
     600              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     601              : /// implementation that cleans up the temporary file in failure. It's not
     602              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     603              : /// out some fields, making it impossible to implement `Drop`.
     604              : ///
     605              : #[must_use]
     606              : pub struct DeltaLayerWriter {
     607              :     inner: Option<DeltaLayerWriterInner>,
     608              : }
     609              : 
     610              : impl DeltaLayerWriter {
     611              :     ///
     612              :     /// Start building a new delta layer.
     613              :     ///
     614         1346 :     pub async fn new(
     615         1346 :         conf: &'static PageServerConf,
     616         1346 :         timeline_id: TimelineId,
     617         1346 :         tenant_shard_id: TenantShardId,
     618         1346 :         key_start: Key,
     619         1346 :         lsn_range: Range<Lsn>,
     620         1346 :         ctx: &RequestContext,
     621         1346 :     ) -> anyhow::Result<Self> {
     622         1346 :         Ok(Self {
     623         1346 :             inner: Some(
     624         1346 :                 DeltaLayerWriterInner::new(
     625         1346 :                     conf,
     626         1346 :                     timeline_id,
     627         1346 :                     tenant_shard_id,
     628         1346 :                     key_start,
     629         1346 :                     lsn_range,
     630         1346 :                     ctx,
     631         1346 :                 )
     632          686 :                 .await?,
     633              :             ),
     634              :         })
     635         1346 :     }
     636              : 
     637              :     ///
     638              :     /// Append a key-value pair to the file.
     639              :     ///
     640              :     /// The values must be appended in key, lsn order.
     641              :     ///
     642      2072154 :     pub async fn put_value(
     643      2072154 :         &mut self,
     644      2072154 :         key: Key,
     645      2072154 :         lsn: Lsn,
     646      2072154 :         val: Value,
     647      2072154 :         ctx: &RequestContext,
     648      2072154 :     ) -> anyhow::Result<()> {
     649      2072154 :         self.inner
     650      2072154 :             .as_mut()
     651      2072154 :             .unwrap()
     652      2072154 :             .put_value(key, lsn, val, ctx)
     653         1566 :             .await
     654      2072154 :     }
     655              : 
     656      4386662 :     pub async fn put_value_bytes(
     657      4386662 :         &mut self,
     658      4386662 :         key: Key,
     659      4386662 :         lsn: Lsn,
     660      4386662 :         val: Vec<u8>,
     661      4386662 :         will_init: bool,
     662      4386662 :         ctx: &RequestContext,
     663      4386662 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     664      4386662 :         self.inner
     665      4386662 :             .as_mut()
     666      4386662 :             .unwrap()
     667      4386662 :             .put_value_bytes(key, lsn, val, will_init, ctx)
     668         2951 :             .await
     669      4386662 :     }
     670              : 
     671      2023972 :     pub fn size(&self) -> u64 {
     672      2023972 :         self.inner.as_ref().unwrap().size()
     673      2023972 :     }
     674              : 
     675              :     ///
     676              :     /// Finish writing the delta layer.
     677              :     ///
     678         1346 :     pub(crate) async fn finish(
     679         1346 :         mut self,
     680         1346 :         key_end: Key,
     681         1346 :         timeline: &Arc<Timeline>,
     682         1346 :         ctx: &RequestContext,
     683         1346 :     ) -> anyhow::Result<ResidentLayer> {
     684         1346 :         self.inner
     685         1346 :             .take()
     686         1346 :             .unwrap()
     687         1346 :             .finish(key_end, timeline, ctx)
     688         9573 :             .await
     689         1346 :     }
     690              : }
     691              : 
     692              : impl Drop for DeltaLayerWriter {
     693         1346 :     fn drop(&mut self) {
     694         1346 :         if let Some(inner) = self.inner.take() {
     695            0 :             // We want to remove the virtual file here, so it's fine to not
     696            0 :             // having completely flushed unwritten data.
     697            0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     698            0 :             vfile.remove();
     699         1346 :         }
     700         1346 :     }
     701              : }
     702              : 
     703            0 : #[derive(thiserror::Error, Debug)]
     704              : pub enum RewriteSummaryError {
     705              :     #[error("magic mismatch")]
     706              :     MagicMismatch,
     707              :     #[error(transparent)]
     708              :     Other(#[from] anyhow::Error),
     709              : }
     710              : 
     711              : impl From<std::io::Error> for RewriteSummaryError {
     712            0 :     fn from(e: std::io::Error) -> Self {
     713            0 :         Self::Other(anyhow::anyhow!(e))
     714            0 :     }
     715              : }
     716              : 
     717              : impl DeltaLayer {
     718            0 :     pub async fn rewrite_summary<F>(
     719            0 :         path: &Utf8Path,
     720            0 :         rewrite: F,
     721            0 :         ctx: &RequestContext,
     722            0 :     ) -> Result<(), RewriteSummaryError>
     723            0 :     where
     724            0 :         F: Fn(Summary) -> Summary,
     725            0 :     {
     726            0 :         let mut file = VirtualFile::open_with_options(
     727            0 :             path,
     728            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     729            0 :             ctx,
     730            0 :         )
     731            0 :         .await
     732            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     733            0 :         let file_id = page_cache::next_file_id();
     734            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     735            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     736            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     737            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     738            0 :             return Err(RewriteSummaryError::MagicMismatch);
     739            0 :         }
     740            0 : 
     741            0 :         let new_summary = rewrite(actual_summary);
     742            0 : 
     743            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     744            0 :         // TODO: could use smallvec here, but it's a pain with Slice<T>
     745            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     746            0 :         file.seek(SeekFrom::Start(0)).await?;
     747            0 :         let (_buf, res) = file.write_all(buf, ctx).await;
     748            0 :         res?;
     749            0 :         Ok(())
     750            0 :     }
     751              : }
     752              : 
     753              : impl DeltaLayerInner {
     754           34 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     755           34 :         &self.layer_key_range
     756           34 :     }
     757              : 
     758           34 :     pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
     759           34 :         &self.layer_lsn_range
     760           34 :     }
     761              : 
     762              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     763              :     /// - inner has the success or transient failure
     764              :     /// - outer has the permanent failure
     765         1018 :     pub(super) async fn load(
     766         1018 :         path: &Utf8Path,
     767         1018 :         summary: Option<Summary>,
     768         1018 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     769         1018 :         ctx: &RequestContext,
     770         1018 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     771         1018 :         let file = match VirtualFile::open(path, ctx).await {
     772         1018 :             Ok(file) => file,
     773            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     774              :         };
     775         1018 :         let file_id = page_cache::next_file_id();
     776         1018 : 
     777         1018 :         let block_reader = FileBlockReader::new(&file, file_id);
     778              : 
     779         1018 :         let summary_blk = match block_reader.read_blk(0, ctx).await {
     780         1018 :             Ok(blk) => blk,
     781            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     782              :         };
     783              : 
     784              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     785         1018 :         let actual_summary =
     786         1018 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     787              : 
     788         1018 :         if let Some(mut expected_summary) = summary {
     789              :             // production code path
     790         1018 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     791         1018 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     792         1018 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     793         1018 :             expected_summary.timeline_id = actual_summary.timeline_id;
     794         1018 : 
     795         1018 :             if actual_summary != expected_summary {
     796            0 :                 bail!(
     797            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     798            0 :                     actual_summary,
     799            0 :                     expected_summary
     800            0 :                 );
     801         1018 :             }
     802            0 :         }
     803              : 
     804         1018 :         Ok(Ok(DeltaLayerInner {
     805         1018 :             file,
     806         1018 :             file_id,
     807         1018 :             index_start_blk: actual_summary.index_start_blk,
     808         1018 :             index_root_blk: actual_summary.index_root_blk,
     809         1018 :             max_vectored_read_bytes,
     810         1018 :             layer_key_range: actual_summary.key_range,
     811         1018 :             layer_lsn_range: actual_summary.lsn_range,
     812         1018 :         }))
     813         1018 :     }
     814              : 
     815       204429 :     pub(super) async fn get_value_reconstruct_data(
     816       204429 :         &self,
     817       204429 :         key: Key,
     818       204429 :         lsn_range: Range<Lsn>,
     819       204429 :         reconstruct_state: &mut ValueReconstructState,
     820       204429 :         ctx: &RequestContext,
     821       204429 :     ) -> anyhow::Result<ValueReconstructResult> {
     822       204429 :         let mut need_image = true;
     823       204429 :         // Scan the page versions backwards, starting from `lsn`.
     824       204429 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     825       204429 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     826       204429 :             self.index_start_blk,
     827       204429 :             self.index_root_blk,
     828       204429 :             &block_reader,
     829       204429 :         );
     830       204429 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     831       204429 : 
     832       204429 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     833       204429 : 
     834       204429 :         tree_reader
     835       204429 :             .visit(
     836       204429 :                 &search_key.0,
     837       204429 :                 VisitDirection::Backwards,
     838       204429 :                 |key, value| {
     839       198369 :                     let blob_ref = BlobRef(value);
     840       198369 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     841        76385 :                         return false;
     842       121984 :                     }
     843       121984 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     844       121984 :                     if entry_lsn < lsn_range.start {
     845           32 :                         return false;
     846       121952 :                     }
     847       121952 :                     offsets.push((entry_lsn, blob_ref.pos()));
     848       121952 : 
     849       121952 :                     !blob_ref.will_init()
     850       204429 :                 },
     851       204429 :                 &RequestContextBuilder::extend(ctx)
     852       204429 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     853       204429 :                     .build(),
     854       204429 :             )
     855        20923 :             .await?;
     856              : 
     857       204429 :         let ctx = &RequestContextBuilder::extend(ctx)
     858       204429 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     859       204429 :             .build();
     860       204429 : 
     861       204429 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     862       204429 :         let cursor = block_reader.block_cursor();
     863       204429 :         let mut buf = Vec::new();
     864       204473 :         for (entry_lsn, pos) in offsets {
     865       121952 :             cursor
     866       121952 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     867         7958 :                 .await
     868       121952 :                 .with_context(|| {
     869            0 :                     format!("Failed to read blob from virtual file {}", self.file.path)
     870       121952 :                 })?;
     871       121952 :             let val = Value::des(&buf).with_context(|| {
     872            0 :                 format!(
     873            0 :                     "Failed to deserialize file blob from virtual file {}",
     874            0 :                     self.file.path
     875            0 :                 )
     876       121952 :             })?;
     877       121952 :             match val {
     878       121908 :                 Value::Image(img) => {
     879       121908 :                     reconstruct_state.img = Some((entry_lsn, img));
     880       121908 :                     need_image = false;
     881       121908 :                     break;
     882              :                 }
     883           44 :                 Value::WalRecord(rec) => {
     884           44 :                     let will_init = rec.will_init();
     885           44 :                     reconstruct_state.records.push((entry_lsn, rec));
     886           44 :                     if will_init {
     887              :                         // This WAL record initializes the page, so no need to go further back
     888            0 :                         need_image = false;
     889            0 :                         break;
     890           44 :                     }
     891              :                 }
     892              :             }
     893              :         }
     894              : 
     895              :         // If an older page image is needed to reconstruct the page, let the
     896              :         // caller know.
     897       204429 :         if need_image {
     898        82521 :             Ok(ValueReconstructResult::Continue)
     899              :         } else {
     900       121908 :             Ok(ValueReconstructResult::Complete)
     901              :         }
     902       204429 :     }
     903              : 
     904              :     // Look up the keys in the provided keyspace and update
     905              :     // the reconstruct state with whatever is found.
     906              :     //
     907              :     // If the key is cached, go no further than the cached Lsn.
     908              :     //
     909              :     // Currently, the index is visited for each range, but this
     910              :     // can be further optimised to visit the index only once.
     911          152 :     pub(super) async fn get_values_reconstruct_data(
     912          152 :         &self,
     913          152 :         keyspace: KeySpace,
     914          152 :         lsn_range: Range<Lsn>,
     915          152 :         reconstruct_state: &mut ValuesReconstructState,
     916          152 :         ctx: &RequestContext,
     917          152 :     ) -> Result<(), GetVectoredError> {
     918          152 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     919          152 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     920          152 :             self.index_start_blk,
     921          152 :             self.index_root_blk,
     922          152 :             block_reader,
     923          152 :         );
     924          152 : 
     925          152 :         let planner = VectoredReadPlanner::new(
     926          152 :             self.max_vectored_read_bytes
     927          152 :                 .expect("Layer is loaded with max vectored bytes config")
     928          152 :                 .0
     929          152 :                 .into(),
     930          152 :         );
     931          152 : 
     932          152 :         let data_end_offset = self.index_start_offset();
     933              : 
     934          152 :         let reads = Self::plan_reads(
     935          152 :             &keyspace,
     936          152 :             lsn_range.clone(),
     937          152 :             data_end_offset,
     938          152 :             index_reader,
     939          152 :             planner,
     940          152 :             reconstruct_state,
     941          152 :             ctx,
     942          152 :         )
     943         1425 :         .await
     944          152 :         .map_err(GetVectoredError::Other)?;
     945              : 
     946          152 :         self.do_reads_and_update_state(reads, reconstruct_state, ctx)
     947         8676 :             .await;
     948              : 
     949          152 :         reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
     950          152 : 
     951          152 :         Ok(())
     952          152 :     }
     953              : 
     954              :     /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
     955            0 :     pub(super) async fn load_key_values(
     956            0 :         &self,
     957            0 :         ctx: &RequestContext,
     958            0 :     ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
     959            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     960            0 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     961            0 :             self.index_start_blk,
     962            0 :             self.index_root_blk,
     963            0 :             block_reader,
     964            0 :         );
     965            0 :         let mut result = Vec::new();
     966            0 :         let mut stream =
     967            0 :             Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
     968            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     969            0 :         let cursor = block_reader.block_cursor();
     970            0 :         let mut buf = Vec::new();
     971            0 :         while let Some(item) = stream.next().await {
     972            0 :             let (key, lsn, pos) = item?;
     973              :             // TODO: dedup code with get_reconstruct_value
     974              :             // TODO: ctx handling and sharding
     975            0 :             cursor
     976            0 :                 .read_blob_into_buf(pos.pos(), &mut buf, ctx)
     977            0 :                 .await
     978            0 :                 .with_context(|| {
     979            0 :                     format!("Failed to read blob from virtual file {}", self.file.path)
     980            0 :                 })?;
     981            0 :             let val = Value::des(&buf).with_context(|| {
     982            0 :                 format!(
     983            0 :                     "Failed to deserialize file blob from virtual file {}",
     984            0 :                     self.file.path
     985            0 :                 )
     986            0 :             })?;
     987            0 :             result.push((key, lsn, val));
     988              :         }
     989            0 :         Ok(result)
     990            0 :     }
     991              : 
     992          354 :     async fn plan_reads<Reader>(
     993          354 :         keyspace: &KeySpace,
     994          354 :         lsn_range: Range<Lsn>,
     995          354 :         data_end_offset: u64,
     996          354 :         index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
     997          354 :         mut planner: VectoredReadPlanner,
     998          354 :         reconstruct_state: &mut ValuesReconstructState,
     999          354 :         ctx: &RequestContext,
    1000          354 :     ) -> anyhow::Result<Vec<VectoredRead>>
    1001          354 :     where
    1002          354 :         Reader: BlockReader + Clone,
    1003          354 :     {
    1004          354 :         let ctx = RequestContextBuilder::extend(ctx)
    1005          354 :             .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1006          354 :             .build();
    1007              : 
    1008        42673 :         for range in keyspace.ranges.iter() {
    1009        42673 :             let mut range_end_handled = false;
    1010        42673 : 
    1011        42673 :             let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
    1012        42673 :             let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
    1013        42673 :             let mut index_stream = std::pin::pin!(index_stream);
    1014              : 
    1015       189810 :             while let Some(index_entry) = index_stream.next().await {
    1016       189599 :                 let (raw_key, value) = index_entry?;
    1017       189599 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
    1018       189599 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
    1019       189599 :                 let blob_ref = BlobRef(value);
    1020       189599 : 
    1021       189599 :                 // Lsns are not monotonically increasing across keys, so we don't assert on them.
    1022       189599 :                 assert!(key >= range.start);
    1023              : 
    1024       189599 :                 let outside_lsn_range = !lsn_range.contains(&lsn);
    1025       189599 :                 let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
    1026              : 
    1027       189599 :                 let flag = {
    1028       189599 :                     if outside_lsn_range || below_cached_lsn {
    1029        71004 :                         BlobFlag::Ignore
    1030       118595 :                     } else if blob_ref.will_init() {
    1031        60947 :                         BlobFlag::ReplaceAll
    1032              :                     } else {
    1033              :                         // Usual path: add blob to the read
    1034        57648 :                         BlobFlag::None
    1035              :                     }
    1036              :                 };
    1037              : 
    1038       189599 :                 if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
    1039        42462 :                     planner.handle_range_end(blob_ref.pos());
    1040        42462 :                     range_end_handled = true;
    1041        42462 :                     break;
    1042       147137 :                 } else {
    1043       147137 :                     planner.handle(key, lsn, blob_ref.pos(), flag);
    1044       147137 :                 }
    1045              :             }
    1046              : 
    1047        42673 :             if !range_end_handled {
    1048          211 :                 tracing::debug!("Handling range end fallback at {}", data_end_offset);
    1049          211 :                 planner.handle_range_end(data_end_offset);
    1050        42462 :             }
    1051              :         }
    1052              : 
    1053          354 :         Ok(planner.finish())
    1054          354 :     }
    1055              : 
    1056          352 :     fn get_min_read_buffer_size(
    1057          352 :         planned_reads: &[VectoredRead],
    1058          352 :         read_size_soft_max: usize,
    1059          352 :     ) -> usize {
    1060        36924 :         let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
    1061           12 :             return read_size_soft_max;
    1062              :         };
    1063              : 
    1064          340 :         let largest_read_size = largest_read.size();
    1065          340 :         if largest_read_size > read_size_soft_max {
    1066              :             // If the read is oversized, it should only contain one key.
    1067          200 :             let offenders = largest_read
    1068          200 :                 .blobs_at
    1069          200 :                 .as_slice()
    1070          200 :                 .iter()
    1071          200 :                 .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
    1072          200 :                 .join(", ");
    1073          200 :             tracing::warn!(
    1074            0 :                 "Oversized vectored read ({} > {}) for keys {}",
    1075              :                 largest_read_size,
    1076              :                 read_size_soft_max,
    1077              :                 offenders
    1078              :             );
    1079          140 :         }
    1080              : 
    1081          340 :         largest_read_size
    1082          352 :     }
    1083              : 
    1084          152 :     async fn do_reads_and_update_state(
    1085          152 :         &self,
    1086          152 :         reads: Vec<VectoredRead>,
    1087          152 :         reconstruct_state: &mut ValuesReconstructState,
    1088          152 :         ctx: &RequestContext,
    1089          152 :     ) {
    1090          152 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
    1091          152 :         let mut ignore_key_with_err = None;
    1092          152 : 
    1093          152 :         let max_vectored_read_bytes = self
    1094          152 :             .max_vectored_read_bytes
    1095          152 :             .expect("Layer is loaded with max vectored bytes config")
    1096          152 :             .0
    1097          152 :             .into();
    1098          152 :         let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
    1099          152 :         let mut buf = Some(BytesMut::with_capacity(buf_size));
    1100              : 
    1101              :         // Note that reads are processed in reverse order (from highest key+lsn).
    1102              :         // This is the order that `ReconstructState` requires such that it can
    1103              :         // track when a key is done.
    1104        17200 :         for read in reads.into_iter().rev() {
    1105        17200 :             let res = vectored_blob_reader
    1106        17200 :                 .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
    1107         8676 :                 .await;
    1108              : 
    1109        17200 :             let blobs_buf = match res {
    1110        17200 :                 Ok(blobs_buf) => blobs_buf,
    1111            0 :                 Err(err) => {
    1112            0 :                     let kind = err.kind();
    1113            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
    1114            0 :                         reconstruct_state.on_key_error(
    1115            0 :                             blob_meta.key,
    1116            0 :                             PageReconstructError::from(anyhow!(
    1117            0 :                                 "Failed to read blobs from virtual file {}: {}",
    1118            0 :                                 self.file.path,
    1119            0 :                                 kind
    1120            0 :                             )),
    1121            0 :                         );
    1122            0 :                     }
    1123              : 
    1124              :                     // We have "lost" the buffer since the lower level IO api
    1125              :                     // doesn't return the buffer on error. Allocate a new one.
    1126            0 :                     buf = Some(BytesMut::with_capacity(buf_size));
    1127            0 : 
    1128            0 :                     continue;
    1129              :                 }
    1130              :             };
    1131              : 
    1132        28120 :             for meta in blobs_buf.blobs.iter().rev() {
    1133        28120 :                 if Some(meta.meta.key) == ignore_key_with_err {
    1134            0 :                     continue;
    1135        28120 :                 }
    1136        28120 : 
    1137        28120 :                 let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
    1138        28120 :                 let value = match value {
    1139        28120 :                     Ok(v) => v,
    1140            0 :                     Err(e) => {
    1141            0 :                         reconstruct_state.on_key_error(
    1142            0 :                             meta.meta.key,
    1143            0 :                             PageReconstructError::from(anyhow!(e).context(format!(
    1144            0 :                                 "Failed to deserialize blob from virtual file {}",
    1145            0 :                                 self.file.path,
    1146            0 :                             ))),
    1147            0 :                         );
    1148            0 : 
    1149            0 :                         ignore_key_with_err = Some(meta.meta.key);
    1150            0 :                         continue;
    1151              :                     }
    1152              :                 };
    1153              : 
    1154              :                 // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
    1155              :                 // state, no further updates shall be made to it. The call below will
    1156              :                 // panic if the invariant is violated.
    1157        28120 :                 reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
    1158              :             }
    1159              : 
    1160        17200 :             buf = Some(blobs_buf.buf);
    1161              :         }
    1162          152 :     }
    1163              : 
    1164          406 :     pub(super) async fn load_keys<'a>(
    1165          406 :         &'a self,
    1166          406 :         ctx: &RequestContext,
    1167          406 :     ) -> Result<Vec<DeltaEntry<'a>>> {
    1168          406 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1169          406 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1170          406 :             self.index_start_blk,
    1171          406 :             self.index_root_blk,
    1172          406 :             block_reader,
    1173          406 :         );
    1174          406 : 
    1175          406 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
    1176          406 : 
    1177          406 :         tree_reader
    1178          406 :             .visit(
    1179          406 :                 &[0u8; DELTA_KEY_SIZE],
    1180          406 :                 VisitDirection::Forwards,
    1181      2064046 :                 |key, value| {
    1182      2064046 :                     let delta_key = DeltaKey::from_slice(key);
    1183      2064046 :                     let val_ref = ValueRef {
    1184      2064046 :                         blob_ref: BlobRef(value),
    1185      2064046 :                         layer: self,
    1186      2064046 :                     };
    1187      2064046 :                     let pos = BlobRef(value).pos();
    1188      2064046 :                     if let Some(last) = all_keys.last_mut() {
    1189      2063640 :                         // subtract offset of the current and last entries to get the size
    1190      2063640 :                         // of the value associated with this (key, lsn) tuple
    1191      2063640 :                         let first_pos = last.size;
    1192      2063640 :                         last.size = pos - first_pos;
    1193      2063640 :                     }
    1194      2064046 :                     let entry = DeltaEntry {
    1195      2064046 :                         key: delta_key.key(),
    1196      2064046 :                         lsn: delta_key.lsn(),
    1197      2064046 :                         size: pos,
    1198      2064046 :                         val: val_ref,
    1199      2064046 :                     };
    1200      2064046 :                     all_keys.push(entry);
    1201      2064046 :                     true
    1202      2064046 :                 },
    1203          406 :                 &RequestContextBuilder::extend(ctx)
    1204          406 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1205          406 :                     .build(),
    1206          406 :             )
    1207         2170 :             .await?;
    1208          406 :         if let Some(last) = all_keys.last_mut() {
    1209          406 :             // Last key occupies all space till end of value storage,
    1210          406 :             // which corresponds to beginning of the index
    1211          406 :             last.size = self.index_start_offset() - last.size;
    1212          406 :         }
    1213          406 :         Ok(all_keys)
    1214          406 :     }
    1215              : 
    1216              :     /// Using the given writer, write out a version which has the earlier Lsns than `until`.
    1217              :     ///
    1218              :     /// Return the amount of key value records pushed to the writer.
    1219           10 :     pub(super) async fn copy_prefix(
    1220           10 :         &self,
    1221           10 :         writer: &mut DeltaLayerWriter,
    1222           10 :         until: Lsn,
    1223           10 :         ctx: &RequestContext,
    1224           10 :     ) -> anyhow::Result<usize> {
    1225           10 :         use crate::tenant::vectored_blob_io::{
    1226           10 :             BlobMeta, VectoredReadBuilder, VectoredReadExtended,
    1227           10 :         };
    1228           10 :         use futures::stream::TryStreamExt;
    1229           10 : 
    1230           10 :         #[derive(Debug)]
    1231           10 :         enum Item {
    1232           10 :             Actual(Key, Lsn, BlobRef),
    1233           10 :             Sentinel,
    1234           10 :         }
    1235           10 : 
    1236           10 :         impl From<Item> for Option<(Key, Lsn, BlobRef)> {
    1237           70 :             fn from(value: Item) -> Self {
    1238           70 :                 match value {
    1239           60 :                     Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
    1240           10 :                     Item::Sentinel => None,
    1241           10 :                 }
    1242           70 :             }
    1243           10 :         }
    1244           10 : 
    1245           10 :         impl Item {
    1246           70 :             fn offset(&self) -> Option<BlobRef> {
    1247           70 :                 match self {
    1248           60 :                     Item::Actual(_, _, blob) => Some(*blob),
    1249           10 :                     Item::Sentinel => None,
    1250           10 :                 }
    1251           70 :             }
    1252           10 : 
    1253           70 :             fn is_last(&self) -> bool {
    1254           70 :                 matches!(self, Item::Sentinel)
    1255           70 :             }
    1256           10 :         }
    1257           10 : 
    1258           10 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1259           10 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1260           10 :             self.index_start_blk,
    1261           10 :             self.index_root_blk,
    1262           10 :             block_reader,
    1263           10 :         );
    1264           10 : 
    1265           10 :         let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
    1266           60 :         let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
    1267           10 :         // put in a sentinel value for getting the end offset for last item, and not having to
    1268           10 :         // repeat the whole read part
    1269           10 :         let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
    1270           10 :             Item::Sentinel,
    1271           10 :         ))));
    1272           10 :         let mut stream = std::pin::pin!(stream);
    1273           10 : 
    1274           10 :         let mut prev: Option<(Key, Lsn, BlobRef)> = None;
    1275           10 : 
    1276           10 :         let mut read_builder: Option<VectoredReadBuilder> = None;
    1277           10 : 
    1278           10 :         let max_read_size = self
    1279           10 :             .max_vectored_read_bytes
    1280           10 :             .map(|x| x.0.get())
    1281           10 :             .unwrap_or(8192);
    1282           10 : 
    1283           10 :         let mut buffer = Some(BytesMut::with_capacity(max_read_size));
    1284           10 : 
    1285           10 :         // FIXME: buffering of DeltaLayerWriter
    1286           10 :         let mut per_blob_copy = Vec::new();
    1287           10 : 
    1288           10 :         let mut records = 0;
    1289              : 
    1290           80 :         while let Some(item) = stream.try_next().await? {
    1291           70 :             tracing::debug!(?item, "popped");
    1292           70 :             let offset = item
    1293           70 :                 .offset()
    1294           70 :                 .unwrap_or(BlobRef::new(self.index_start_offset(), false));
    1295              : 
    1296           70 :             let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
    1297           60 :                 let end_offset = offset;
    1298           60 : 
    1299           60 :                 Some((BlobMeta { key, lsn }, start_offset..end_offset))
    1300              :             } else {
    1301           10 :                 None
    1302              :             };
    1303              : 
    1304           70 :             let is_last = item.is_last();
    1305           70 : 
    1306           70 :             prev = Option::from(item);
    1307           70 : 
    1308           70 :             let actionable = actionable.filter(|x| x.0.lsn < until);
    1309              : 
    1310           70 :             let builder = if let Some((meta, offsets)) = actionable {
    1311              :                 // extend or create a new builder
    1312           32 :                 if read_builder
    1313           32 :                     .as_mut()
    1314           32 :                     .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
    1315           32 :                     .unwrap_or(VectoredReadExtended::No)
    1316           32 :                     == VectoredReadExtended::Yes
    1317              :                 {
    1318           16 :                     None
    1319              :                 } else {
    1320           16 :                     read_builder.replace(VectoredReadBuilder::new(
    1321           16 :                         offsets.start.pos(),
    1322           16 :                         offsets.end.pos(),
    1323           16 :                         meta,
    1324           16 :                         max_read_size,
    1325           16 :                     ))
    1326              :                 }
    1327              :             } else {
    1328              :                 // nothing to do, except perhaps flush any existing for the last element
    1329           38 :                 None
    1330              :             };
    1331              : 
    1332              :             // flush the possible older builder and also the new one if the item was the last one
    1333           70 :             let builders = builder.into_iter();
    1334           70 :             let builders = if is_last {
    1335           10 :                 builders.chain(read_builder.take())
    1336              :             } else {
    1337           60 :                 builders.chain(None)
    1338              :             };
    1339              : 
    1340           86 :             for builder in builders {
    1341           16 :                 let read = builder.build();
    1342           16 : 
    1343           16 :                 let reader = VectoredBlobReader::new(&self.file);
    1344           16 : 
    1345           16 :                 let mut buf = buffer.take().unwrap();
    1346           16 : 
    1347           16 :                 buf.clear();
    1348           16 :                 buf.reserve(read.size());
    1349           16 :                 let res = reader.read_blobs(&read, buf, ctx).await?;
    1350              : 
    1351           48 :                 for blob in res.blobs {
    1352           32 :                     let key = blob.meta.key;
    1353           32 :                     let lsn = blob.meta.lsn;
    1354           32 :                     let data = &res.buf[blob.start..blob.end];
    1355           32 : 
    1356           32 :                     #[cfg(debug_assertions)]
    1357           32 :                     Value::des(data)
    1358           32 :                         .with_context(|| {
    1359            0 :                             format!(
    1360            0 :                                 "blob failed to deserialize for {}@{}, {}..{}: {:?}",
    1361            0 :                                 blob.meta.key,
    1362            0 :                                 blob.meta.lsn,
    1363            0 :                                 blob.start,
    1364            0 :                                 blob.end,
    1365            0 :                                 utils::Hex(data)
    1366            0 :                             )
    1367           32 :                         })
    1368           32 :                         .unwrap();
    1369           32 : 
    1370           32 :                     // is it an image or will_init walrecord?
    1371           32 :                     // FIXME: this could be handled by threading the BlobRef to the
    1372           32 :                     // VectoredReadBuilder
    1373           32 :                     let will_init = crate::repository::ValueBytes::will_init(data)
    1374           32 :                         .inspect_err(|_e| {
    1375            0 :                             #[cfg(feature = "testing")]
    1376            0 :                             tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
    1377           32 :                         })
    1378           32 :                         .unwrap_or(false);
    1379           32 : 
    1380           32 :                     per_blob_copy.clear();
    1381           32 :                     per_blob_copy.extend_from_slice(data);
    1382              : 
    1383           32 :                     let (tmp, res) = writer
    1384           32 :                         .put_value_bytes(
    1385           32 :                             key,
    1386           32 :                             lsn,
    1387           32 :                             std::mem::take(&mut per_blob_copy),
    1388           32 :                             will_init,
    1389           32 :                             ctx,
    1390           32 :                         )
    1391            4 :                         .await;
    1392           32 :                     per_blob_copy = tmp;
    1393           32 : 
    1394           32 :                     res?;
    1395              : 
    1396           32 :                     records += 1;
    1397              :                 }
    1398              : 
    1399           16 :                 buffer = Some(res.buf);
    1400              :             }
    1401              :         }
    1402              : 
    1403           10 :         assert!(
    1404           10 :             read_builder.is_none(),
    1405            0 :             "with the sentinel above loop should had handled all"
    1406              :         );
    1407              : 
    1408           10 :         Ok(records)
    1409           10 :     }
    1410              : 
    1411            4 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1412            4 :         println!(
    1413            4 :             "index_start_blk: {}, root {}",
    1414            4 :             self.index_start_blk, self.index_root_blk
    1415            4 :         );
    1416            4 : 
    1417            4 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1418            4 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1419            4 :             self.index_start_blk,
    1420            4 :             self.index_root_blk,
    1421            4 :             block_reader,
    1422            4 :         );
    1423            4 : 
    1424            4 :         tree_reader.dump().await?;
    1425              : 
    1426            4 :         let keys = self.load_keys(ctx).await?;
    1427              : 
    1428            8 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1429            8 :             let buf = val.load_raw(ctx).await?;
    1430            8 :             let val = Value::des(&buf)?;
    1431            8 :             let desc = match val {
    1432            8 :                 Value::Image(img) => {
    1433            8 :                     format!(" img {} bytes", img.len())
    1434              :                 }
    1435            0 :                 Value::WalRecord(rec) => {
    1436            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
    1437            0 :                     format!(
    1438            0 :                         " rec {} bytes will_init: {} {}",
    1439            0 :                         buf.len(),
    1440            0 :                         rec.will_init(),
    1441            0 :                         wal_desc
    1442            0 :                     )
    1443              :                 }
    1444              :             };
    1445            8 :             Ok(desc)
    1446            8 :         }
    1447              : 
    1448           12 :         for entry in keys {
    1449            8 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1450            8 :             let desc = match dump_blob(&val, ctx).await {
    1451            8 :                 Ok(desc) => desc,
    1452            0 :                 Err(err) => {
    1453            0 :                     format!("ERROR: {err}")
    1454              :                 }
    1455              :             };
    1456            8 :             println!("  key {key} at {lsn}: {desc}");
    1457            8 : 
    1458            8 :             // Print more details about CHECKPOINT records. Would be nice to print details
    1459            8 :             // of many other record types too, but these are particularly interesting, as
    1460            8 :             // have a lot of special processing for them in walingest.rs.
    1461            8 :             use pageserver_api::key::CHECKPOINT_KEY;
    1462            8 :             use postgres_ffi::CheckPoint;
    1463            8 :             if key == CHECKPOINT_KEY {
    1464            0 :                 let val = val.load(ctx).await?;
    1465            0 :                 match val {
    1466            0 :                     Value::Image(img) => {
    1467            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1468            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
    1469              :                     }
    1470            0 :                     Value::WalRecord(_rec) => {
    1471            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1472            0 :                     }
    1473              :                 }
    1474            8 :             }
    1475              :         }
    1476              : 
    1477            4 :         Ok(())
    1478            4 :     }
    1479              : 
    1480           30 :     fn stream_index_forwards<'a, R>(
    1481           30 :         &'a self,
    1482           30 :         reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
    1483           30 :         start: &'a [u8; DELTA_KEY_SIZE],
    1484           30 :         ctx: &'a RequestContext,
    1485           30 :     ) -> impl futures::stream::Stream<
    1486           30 :         Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
    1487           30 :     > + 'a
    1488           30 :     where
    1489           30 :         R: BlockReader + 'a,
    1490           30 :     {
    1491           30 :         use futures::stream::TryStreamExt;
    1492           30 :         let stream = reader.into_stream(start, ctx);
    1493          152 :         stream.map_ok(|(key, value)| {
    1494          152 :             let key = DeltaKey::from_slice(&key);
    1495          152 :             let (key, lsn) = (key.key(), key.lsn());
    1496          152 :             let offset = BlobRef(value);
    1497          152 : 
    1498          152 :             (key, lsn, offset)
    1499          152 :         })
    1500           30 :     }
    1501              : 
    1502              :     /// The file offset to the first block of index.
    1503              :     ///
    1504              :     /// The file structure is summary, values, and index. We often need this for the size of last blob.
    1505          690 :     fn index_start_offset(&self) -> u64 {
    1506          690 :         let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
    1507          690 :         let bref = BlobRef(offset);
    1508          690 :         tracing::debug!(
    1509              :             index_start_blk = self.index_start_blk,
    1510              :             offset,
    1511            0 :             pos = bref.pos(),
    1512            0 :             "index_start_offset"
    1513              :         );
    1514          690 :         offset
    1515          690 :     }
    1516              : 
    1517           90 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
    1518           90 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1519           90 :         let tree_reader =
    1520           90 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
    1521           90 :         DeltaLayerIterator {
    1522           90 :             delta_layer: self,
    1523           90 :             ctx,
    1524           90 :             index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
    1525           90 :             key_values_batch: std::collections::VecDeque::new(),
    1526           90 :             is_end: false,
    1527           90 :             planner: StreamingVectoredReadPlanner::new(
    1528           90 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
    1529           90 :                 1024,        // The default value. Unit tests might use a different value
    1530           90 :             ),
    1531           90 :         }
    1532           90 :     }
    1533              : }
    1534              : 
    1535              : /// A set of data associated with a delta layer key and its value
    1536              : pub struct DeltaEntry<'a> {
    1537              :     pub key: Key,
    1538              :     pub lsn: Lsn,
    1539              :     /// Size of the stored value
    1540              :     pub size: u64,
    1541              :     /// Reference to the on-disk value
    1542              :     pub val: ValueRef<'a>,
    1543              : }
    1544              : 
    1545              : /// Reference to an on-disk value
    1546              : pub struct ValueRef<'a> {
    1547              :     blob_ref: BlobRef,
    1548              :     layer: &'a DeltaLayerInner,
    1549              : }
    1550              : 
    1551              : impl<'a> ValueRef<'a> {
    1552              :     /// Loads the value from disk
    1553      2064038 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1554      2064038 :         let buf = self.load_raw(ctx).await?;
    1555      2064038 :         let val = Value::des(&buf)?;
    1556      2064038 :         Ok(val)
    1557      2064038 :     }
    1558              : 
    1559      2064046 :     async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
    1560      2064046 :         let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
    1561      2064046 :             self.layer,
    1562      2064046 :         )));
    1563      2064046 :         let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1564      2064046 :         Ok(buf)
    1565      2064046 :     }
    1566              : }
    1567              : 
    1568              : pub(crate) struct Adapter<T>(T);
    1569              : 
    1570              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1571      2083002 :     pub(crate) async fn read_blk(
    1572      2083002 :         &self,
    1573      2083002 :         blknum: u32,
    1574      2083002 :         ctx: &RequestContext,
    1575      2083002 :     ) -> Result<BlockLease, std::io::Error> {
    1576      2083002 :         let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
    1577      2083002 :         block_reader.read_blk(blknum, ctx).await
    1578      2083002 :     }
    1579              : }
    1580              : 
    1581              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1582      4166004 :     fn as_ref(&self) -> &DeltaLayerInner {
    1583      4166004 :         self
    1584      4166004 :     }
    1585              : }
    1586              : 
    1587              : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
    1588            0 :     fn key(&self) -> Key {
    1589            0 :         self.key
    1590            0 :     }
    1591            0 :     fn lsn(&self) -> Lsn {
    1592            0 :         self.lsn
    1593            0 :     }
    1594            0 :     fn size(&self) -> u64 {
    1595            0 :         self.size
    1596            0 :     }
    1597              : }
    1598              : 
    1599              : pub struct DeltaLayerIterator<'a> {
    1600              :     delta_layer: &'a DeltaLayerInner,
    1601              :     ctx: &'a RequestContext,
    1602              :     planner: StreamingVectoredReadPlanner,
    1603              :     index_iter: DiskBtreeIterator<'a>,
    1604              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1605              :     is_end: bool,
    1606              : }
    1607              : 
    1608              : impl<'a> DeltaLayerIterator<'a> {
    1609              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1610        18976 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1611        18976 :         assert!(self.key_values_batch.is_empty());
    1612        18976 :         assert!(!self.is_end);
    1613              : 
    1614        18976 :         let plan = loop {
    1615        34456 :             if let Some(res) = self.index_iter.next().await {
    1616        34394 :                 let (raw_key, value) = res?;
    1617        34394 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
    1618        34394 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
    1619        34394 :                 let blob_ref = BlobRef(value);
    1620        34394 :                 let offset = blob_ref.pos();
    1621        34394 :                 if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
    1622        18914 :                     break batch_plan;
    1623        15480 :                 }
    1624              :             } else {
    1625           62 :                 self.is_end = true;
    1626           62 :                 let data_end_offset = self.delta_layer.index_start_offset();
    1627           62 :                 if let Some(item) = self.planner.handle_range_end(data_end_offset) {
    1628           62 :                     break item;
    1629              :                 } else {
    1630            0 :                     return Ok(()); // TODO: test empty iterator
    1631              :                 }
    1632              :             }
    1633              :         };
    1634        18976 :         let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
    1635        18976 :         let mut next_batch = std::collections::VecDeque::new();
    1636        18976 :         let buf_size = plan.size();
    1637        18976 :         let buf = BytesMut::with_capacity(buf_size);
    1638        18976 :         let blobs_buf = vectored_blob_reader
    1639        18976 :             .read_blobs(&plan, buf, self.ctx)
    1640         9637 :             .await?;
    1641        18976 :         let frozen_buf = blobs_buf.buf.freeze();
    1642        34366 :         for meta in blobs_buf.blobs.iter() {
    1643        34366 :             let value = Value::des(&frozen_buf[meta.start..meta.end])?;
    1644        34366 :             next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
    1645              :         }
    1646        18976 :         self.key_values_batch = next_batch;
    1647        18976 :         Ok(())
    1648        18976 :     }
    1649              : 
    1650        34192 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1651        34192 :         if self.key_values_batch.is_empty() {
    1652        18988 :             if self.is_end {
    1653           96 :                 return Ok(None);
    1654        18892 :             }
    1655        18892 :             self.next_batch().await?;
    1656        15204 :         }
    1657        34096 :         Ok(Some(
    1658        34096 :             self.key_values_batch
    1659        34096 :                 .pop_front()
    1660        34096 :                 .expect("should not be empty"),
    1661        34096 :         ))
    1662        34192 :     }
    1663              : }
    1664              : 
    1665              : #[cfg(test)]
    1666              : pub(crate) mod test {
    1667              :     use std::collections::BTreeMap;
    1668              : 
    1669              :     use itertools::MinMaxResult;
    1670              :     use rand::prelude::{SeedableRng, SliceRandom, StdRng};
    1671              :     use rand::RngCore;
    1672              : 
    1673              :     use super::*;
    1674              :     use crate::repository::Value;
    1675              :     use crate::tenant::harness::TIMELINE_ID;
    1676              :     use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
    1677              :     use crate::tenant::Tenant;
    1678              :     use crate::{
    1679              :         context::DownloadBehavior,
    1680              :         task_mgr::TaskKind,
    1681              :         tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
    1682              :         DEFAULT_PG_VERSION,
    1683              :     };
    1684              :     use bytes::Bytes;
    1685              : 
    1686              :     /// Construct an index for a fictional delta layer and and then
    1687              :     /// traverse in order to plan vectored reads for a query. Finally,
    1688              :     /// verify that the traversal fed the right index key and value
    1689              :     /// pairs into the planner.
    1690              :     #[tokio::test]
    1691            2 :     async fn test_delta_layer_index_traversal() {
    1692            2 :         let base_key = Key {
    1693            2 :             field1: 0,
    1694            2 :             field2: 1663,
    1695            2 :             field3: 12972,
    1696            2 :             field4: 16396,
    1697            2 :             field5: 0,
    1698            2 :             field6: 246080,
    1699            2 :         };
    1700            2 : 
    1701            2 :         // Populate the index with some entries
    1702            2 :         let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
    1703            2 :             (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
    1704            2 :             (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1705            2 :             (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1706            2 :             (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
    1707            2 :         ]);
    1708            2 : 
    1709            2 :         let mut disk = TestDisk::default();
    1710            2 :         let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
    1711            2 : 
    1712            2 :         let mut disk_offset = 0;
    1713           10 :         for (key, lsns) in &entries {
    1714           42 :             for lsn in lsns {
    1715           34 :                 let index_key = DeltaKey::from_key_lsn(key, *lsn);
    1716           34 :                 let blob_ref = BlobRef::new(disk_offset, false);
    1717           34 :                 writer
    1718           34 :                     .append(&index_key.0, blob_ref.0)
    1719           34 :                     .expect("In memory disk append should never fail");
    1720           34 : 
    1721           34 :                 disk_offset += 1;
    1722           34 :             }
    1723            2 :         }
    1724            2 : 
    1725            2 :         // Prepare all the arguments for the call into `plan_reads` below
    1726            2 :         let (root_offset, _writer) = writer
    1727            2 :             .finish()
    1728            2 :             .expect("In memory disk finish should never fail");
    1729            2 :         let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
    1730            2 :         let planner = VectoredReadPlanner::new(100);
    1731            2 :         let mut reconstruct_state = ValuesReconstructState::new();
    1732            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1733            2 : 
    1734            2 :         let keyspace = KeySpace {
    1735            2 :             ranges: vec![
    1736            2 :                 base_key..base_key.add(3),
    1737            2 :                 base_key.add(3)..base_key.add(100),
    1738            2 :             ],
    1739            2 :         };
    1740            2 :         let lsn_range = Lsn(2)..Lsn(40);
    1741            2 : 
    1742            2 :         // Plan and validate
    1743            2 :         let vectored_reads = DeltaLayerInner::plan_reads(
    1744            2 :             &keyspace,
    1745            2 :             lsn_range.clone(),
    1746            2 :             disk_offset,
    1747            2 :             reader,
    1748            2 :             planner,
    1749            2 :             &mut reconstruct_state,
    1750            2 :             &ctx,
    1751            2 :         )
    1752            2 :         .await
    1753            2 :         .expect("Read planning should not fail");
    1754            2 : 
    1755            2 :         validate(keyspace, lsn_range, vectored_reads, entries);
    1756            2 :     }
    1757              : 
    1758            2 :     fn validate(
    1759            2 :         keyspace: KeySpace,
    1760            2 :         lsn_range: Range<Lsn>,
    1761            2 :         vectored_reads: Vec<VectoredRead>,
    1762            2 :         index_entries: BTreeMap<Key, Vec<Lsn>>,
    1763            2 :     ) {
    1764            2 :         #[derive(Debug, PartialEq, Eq)]
    1765            2 :         struct BlobSpec {
    1766            2 :             key: Key,
    1767            2 :             lsn: Lsn,
    1768            2 :             at: u64,
    1769            2 :         }
    1770            2 : 
    1771            2 :         let mut planned_blobs = Vec::new();
    1772            8 :         for read in vectored_reads {
    1773           28 :             for (at, meta) in read.blobs_at.as_slice() {
    1774           28 :                 planned_blobs.push(BlobSpec {
    1775           28 :                     key: meta.key,
    1776           28 :                     lsn: meta.lsn,
    1777           28 :                     at: *at,
    1778           28 :                 });
    1779           28 :             }
    1780              :         }
    1781              : 
    1782            2 :         let mut expected_blobs = Vec::new();
    1783            2 :         let mut disk_offset = 0;
    1784           10 :         for (key, lsns) in index_entries {
    1785           42 :             for lsn in lsns {
    1786           42 :                 let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
    1787           34 :                 let lsn_included = lsn_range.contains(&lsn);
    1788           34 : 
    1789           34 :                 if key_included && lsn_included {
    1790           28 :                     expected_blobs.push(BlobSpec {
    1791           28 :                         key,
    1792           28 :                         lsn,
    1793           28 :                         at: disk_offset,
    1794           28 :                     });
    1795           28 :                 }
    1796              : 
    1797           34 :                 disk_offset += 1;
    1798              :             }
    1799              :         }
    1800              : 
    1801            2 :         assert_eq!(planned_blobs, expected_blobs);
    1802            2 :     }
    1803              : 
    1804              :     mod constants {
    1805              :         use utils::lsn::Lsn;
    1806              : 
    1807              :         /// Offset used by all lsns in this test
    1808              :         pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
    1809              :         /// Number of unique keys including in the test data
    1810              :         pub(super) const KEY_COUNT: u8 = 60;
    1811              :         /// Max number of different lsns for each key
    1812              :         pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
    1813              :         /// Possible value sizes for each key along with a probability weight
    1814              :         pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
    1815              :         /// Probability that there will be a gap between the current key and the next one (33.3%)
    1816              :         pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
    1817              :         /// The minimum size of a key range in all the generated reads
    1818              :         pub(super) const MIN_RANGE_SIZE: i128 = 10;
    1819              :         /// The number of ranges included in each vectored read
    1820              :         pub(super) const RANGES_COUNT: u8 = 2;
    1821              :         /// The number of vectored reads performed
    1822              :         pub(super) const READS_COUNT: u8 = 100;
    1823              :         /// Soft max size of a vectored read. Will be violated if we have to read keys
    1824              :         /// with values larger than the limit
    1825              :         pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
    1826              :     }
    1827              : 
    1828              :     struct Entry {
    1829              :         key: Key,
    1830              :         lsn: Lsn,
    1831              :         value: Vec<u8>,
    1832              :     }
    1833              : 
    1834            2 :     fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
    1835            2 :         let mut current_key = Key::MIN;
    1836            2 : 
    1837            2 :         let mut entries = Vec::new();
    1838          122 :         for _ in 0..constants::KEY_COUNT {
    1839          120 :             let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
    1840          120 :             let mut lsns_iter =
    1841         2260 :                 std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
    1842         2260 :                     Some(Lsn(lsn.0 + 0x08))
    1843         2260 :                 });
    1844          120 :             let mut lsns = Vec::new();
    1845         2380 :             while lsns.len() < count as usize {
    1846         2260 :                 let take = rng.gen_bool(0.5);
    1847         2260 :                 let lsn = lsns_iter.next().unwrap();
    1848         2260 :                 if take {
    1849         1112 :                     lsns.push(lsn);
    1850         1148 :                 }
    1851              :             }
    1852              : 
    1853         1232 :             for lsn in lsns {
    1854         1112 :                 let size = constants::VALUE_SIZES
    1855         3336 :                     .choose_weighted(rng, |item| item.1)
    1856         1112 :                     .unwrap()
    1857         1112 :                     .0;
    1858         1112 :                 let mut buf = vec![0; size];
    1859         1112 :                 rng.fill_bytes(&mut buf);
    1860         1112 : 
    1861         1112 :                 entries.push(Entry {
    1862         1112 :                     key: current_key,
    1863         1112 :                     lsn,
    1864         1112 :                     value: buf,
    1865         1112 :                 })
    1866              :             }
    1867              : 
    1868          120 :             let gap = constants::KEY_GAP_CHANGES
    1869          240 :                 .choose_weighted(rng, |item| item.1)
    1870          120 :                 .unwrap()
    1871          120 :                 .0;
    1872          120 :             if gap {
    1873           38 :                 current_key = current_key.add(2);
    1874           82 :             } else {
    1875           82 :                 current_key = current_key.add(1);
    1876           82 :             }
    1877              :         }
    1878              : 
    1879            2 :         entries
    1880            2 :     }
    1881              : 
    1882              :     struct EntriesMeta {
    1883              :         key_range: Range<Key>,
    1884              :         lsn_range: Range<Lsn>,
    1885              :         index: BTreeMap<(Key, Lsn), Vec<u8>>,
    1886              :     }
    1887              : 
    1888            2 :     fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
    1889         1112 :         let key_range = match entries.iter().minmax_by_key(|e| e.key) {
    1890            2 :             MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
    1891            0 :             _ => panic!("More than one entry is always expected"),
    1892              :         };
    1893              : 
    1894         1112 :         let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
    1895            2 :             MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
    1896            0 :             _ => panic!("More than one entry is always expected"),
    1897              :         };
    1898              : 
    1899            2 :         let mut index = BTreeMap::new();
    1900         1112 :         for entry in entries.iter() {
    1901         1112 :             index.insert((entry.key, entry.lsn), entry.value.clone());
    1902         1112 :         }
    1903              : 
    1904            2 :         EntriesMeta {
    1905            2 :             key_range,
    1906            2 :             lsn_range,
    1907            2 :             index,
    1908            2 :         }
    1909            2 :     }
    1910              : 
    1911          200 :     fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
    1912          200 :         let start = key_range.start.to_i128();
    1913          200 :         let end = key_range.end.to_i128();
    1914          200 : 
    1915          200 :         let mut keyspace = KeySpace::default();
    1916              : 
    1917          600 :         for _ in 0..constants::RANGES_COUNT {
    1918          400 :             let mut range: Option<Range<Key>> = Option::default();
    1919         1244 :             while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
    1920          844 :                 let range_start = rng.gen_range(start..end);
    1921          844 :                 let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
    1922          844 :                 if range_end_offset >= end {
    1923          100 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(end));
    1924          744 :                 } else {
    1925          744 :                     let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
    1926          744 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
    1927          744 :                 }
    1928              :             }
    1929          400 :             keyspace.ranges.push(range.unwrap());
    1930              :         }
    1931              : 
    1932          200 :         keyspace
    1933          200 :     }
    1934              : 
    1935              :     #[tokio::test]
    1936            2 :     async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
    1937            2 :         let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
    1938            8 :         let (tenant, ctx) = harness.load().await;
    1939            2 : 
    1940            2 :         let timeline_id = TimelineId::generate();
    1941            2 :         let timeline = tenant
    1942            2 :             .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
    1943            6 :             .await?;
    1944            2 : 
    1945            2 :         tracing::info!("Generating test data ...");
    1946            2 : 
    1947            2 :         let rng = &mut StdRng::seed_from_u64(0);
    1948            2 :         let entries = generate_entries(rng);
    1949            2 :         let entries_meta = get_entries_meta(&entries);
    1950            2 : 
    1951            2 :         tracing::info!("Done generating {} entries", entries.len());
    1952            2 : 
    1953            2 :         tracing::info!("Writing test data to delta layer ...");
    1954            2 :         let mut writer = DeltaLayerWriter::new(
    1955            2 :             harness.conf,
    1956            2 :             timeline_id,
    1957            2 :             harness.tenant_shard_id,
    1958            2 :             entries_meta.key_range.start,
    1959            2 :             entries_meta.lsn_range.clone(),
    1960            2 :             &ctx,
    1961            2 :         )
    1962            2 :         .await?;
    1963            2 : 
    1964         1114 :         for entry in entries {
    1965         1112 :             let (_, res) = writer
    1966         1112 :                 .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx)
    1967          215 :                 .await;
    1968         1112 :             res?;
    1969            2 :         }
    1970            2 : 
    1971            2 :         let resident = writer
    1972            2 :             .finish(entries_meta.key_range.end, &timeline, &ctx)
    1973            5 :             .await?;
    1974            2 : 
    1975            2 :         let inner = resident.get_as_delta(&ctx).await?;
    1976            2 : 
    1977            2 :         let file_size = inner.file.metadata().await?.len();
    1978            2 :         tracing::info!(
    1979            2 :             "Done writing test data to delta layer. Resulting file size is: {}",
    1980            2 :             file_size
    1981            2 :         );
    1982            2 : 
    1983          202 :         for i in 0..constants::READS_COUNT {
    1984          200 :             tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
    1985            2 : 
    1986          200 :             let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
    1987          200 :             let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1988          200 :                 inner.index_start_blk,
    1989          200 :                 inner.index_root_blk,
    1990          200 :                 block_reader,
    1991          200 :             );
    1992          200 : 
    1993          200 :             let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
    1994          200 :             let mut reconstruct_state = ValuesReconstructState::new();
    1995          200 :             let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
    1996          200 :             let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
    1997            2 : 
    1998          200 :             let vectored_reads = DeltaLayerInner::plan_reads(
    1999          200 :                 &keyspace,
    2000          200 :                 entries_meta.lsn_range.clone(),
    2001          200 :                 data_end_offset,
    2002          200 :                 index_reader,
    2003          200 :                 planner,
    2004          200 :                 &mut reconstruct_state,
    2005          200 :                 &ctx,
    2006          200 :             )
    2007            4 :             .await?;
    2008            2 : 
    2009          200 :             let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
    2010          200 :             let buf_size = DeltaLayerInner::get_min_read_buffer_size(
    2011          200 :                 &vectored_reads,
    2012          200 :                 constants::MAX_VECTORED_READ_BYTES,
    2013          200 :             );
    2014          200 :             let mut buf = Some(BytesMut::with_capacity(buf_size));
    2015            2 : 
    2016        19924 :             for read in vectored_reads {
    2017        19724 :                 let blobs_buf = vectored_blob_reader
    2018        19724 :                     .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
    2019        10016 :                     .await?;
    2020        57304 :                 for meta in blobs_buf.blobs.iter() {
    2021        57304 :                     let value = &blobs_buf.buf[meta.start..meta.end];
    2022        57304 :                     assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
    2023            2 :                 }
    2024            2 : 
    2025        19724 :                 buf = Some(blobs_buf.buf);
    2026            2 :             }
    2027            2 :         }
    2028            2 : 
    2029            2 :         Ok(())
    2030            2 :     }
    2031              : 
    2032              :     #[tokio::test]
    2033            2 :     async fn copy_delta_prefix_smoke() {
    2034            2 :         use crate::walrecord::NeonWalRecord;
    2035            2 :         use bytes::Bytes;
    2036            2 : 
    2037            2 :         let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
    2038            2 :             .await
    2039            2 :             .unwrap();
    2040            8 :         let (tenant, ctx) = h.load().await;
    2041            2 :         let ctx = &ctx;
    2042            2 :         let timeline = tenant
    2043            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
    2044            6 :             .await
    2045            2 :             .unwrap();
    2046            2 : 
    2047            2 :         let initdb_layer = timeline
    2048            2 :             .layers
    2049            2 :             .read()
    2050            2 :             .await
    2051            2 :             .likely_resident_layers()
    2052            2 :             .next()
    2053            2 :             .unwrap();
    2054            2 : 
    2055            2 :         {
    2056            2 :             let mut writer = timeline.writer().await;
    2057            2 : 
    2058            2 :             let data = [
    2059            2 :                 (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
    2060            2 :                 (
    2061            2 :                     0x30,
    2062            2 :                     12,
    2063            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2064            2 :                         will_init: false,
    2065            2 :                         rec: Bytes::from_static(b"1"),
    2066            2 :                     }),
    2067            2 :                 ),
    2068            2 :                 (
    2069            2 :                     0x40,
    2070            2 :                     12,
    2071            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2072            2 :                         will_init: true,
    2073            2 :                         rec: Bytes::from_static(b"2"),
    2074            2 :                     }),
    2075            2 :                 ),
    2076            2 :                 // build an oversized value so we cannot extend and existing read over
    2077            2 :                 // this
    2078            2 :                 (
    2079            2 :                     0x50,
    2080            2 :                     12,
    2081            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2082            2 :                         will_init: true,
    2083            2 :                         rec: {
    2084            2 :                             let mut buf =
    2085            2 :                                 vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
    2086            2 :                             buf.iter_mut()
    2087            2 :                                 .enumerate()
    2088       264192 :                                 .for_each(|(i, slot)| *slot = (i % 256) as u8);
    2089            2 :                             Bytes::from(buf)
    2090            2 :                         },
    2091            2 :                     }),
    2092            2 :                 ),
    2093            2 :                 // because the oversized read cannot be extended further, we are sure to exercise the
    2094            2 :                 // builder created on the last round with this:
    2095            2 :                 (
    2096            2 :                     0x60,
    2097            2 :                     12,
    2098            2 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2099            2 :                         will_init: true,
    2100            2 :                         rec: Bytes::from_static(b"3"),
    2101            2 :                     }),
    2102            2 :                 ),
    2103            2 :                 (
    2104            2 :                     0x60,
    2105            2 :                     9,
    2106            2 :                     Value::Image(Bytes::from_static(b"something for a different key")),
    2107            2 :                 ),
    2108            2 :             ];
    2109            2 : 
    2110            2 :             let mut last_lsn = None;
    2111            2 : 
    2112           14 :             for (lsn, key, value) in data {
    2113           12 :                 let key = Key::from_i128(key);
    2114           12 :                 writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
    2115           12 :                 last_lsn = Some(lsn);
    2116            2 :             }
    2117            2 : 
    2118            2 :             writer.finish_write(Lsn(last_lsn.unwrap()));
    2119            2 :         }
    2120            2 :         timeline.freeze_and_flush().await.unwrap();
    2121            2 : 
    2122            2 :         let new_layer = timeline
    2123            2 :             .layers
    2124            2 :             .read()
    2125            2 :             .await
    2126            2 :             .likely_resident_layers()
    2127            4 :             .find(|x| x != &initdb_layer)
    2128            2 :             .unwrap();
    2129            2 : 
    2130            2 :         // create a copy for the timeline, so we don't overwrite the file
    2131            2 :         let branch = tenant
    2132            2 :             .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
    2133            2 :             .await
    2134            2 :             .unwrap();
    2135            2 : 
    2136            2 :         assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
    2137            2 : 
    2138            2 :         // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
    2139            2 :         // a single key
    2140            2 : 
    2141           12 :         for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
    2142           10 :             let truncate_at = Lsn(truncate_at);
    2143            2 : 
    2144           10 :             let mut writer = DeltaLayerWriter::new(
    2145           10 :                 tenant.conf,
    2146           10 :                 branch.timeline_id,
    2147           10 :                 tenant.tenant_shard_id,
    2148           10 :                 Key::MIN,
    2149           10 :                 Lsn(0x11)..truncate_at,
    2150           10 :                 ctx,
    2151           10 :             )
    2152            5 :             .await
    2153           10 :             .unwrap();
    2154            2 : 
    2155           10 :             let new_layer = new_layer.download_and_keep_resident().await.unwrap();
    2156           10 : 
    2157           10 :             new_layer
    2158           10 :                 .copy_delta_prefix(&mut writer, truncate_at, ctx)
    2159           15 :                 .await
    2160           10 :                 .unwrap();
    2161            2 : 
    2162           24 :             let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
    2163           10 : 
    2164           11 :             copied_layer.get_as_delta(ctx).await.unwrap();
    2165           10 : 
    2166           10 :             assert_keys_and_values_eq(
    2167           10 :                 new_layer.get_as_delta(ctx).await.unwrap(),
    2168           10 :                 copied_layer.get_as_delta(ctx).await.unwrap(),
    2169           10 :                 truncate_at,
    2170           10 :                 ctx,
    2171            2 :             )
    2172           50 :             .await;
    2173            2 :         }
    2174            2 :     }
    2175              : 
    2176           10 :     async fn assert_keys_and_values_eq(
    2177           10 :         source: &DeltaLayerInner,
    2178           10 :         truncated: &DeltaLayerInner,
    2179           10 :         truncated_at: Lsn,
    2180           10 :         ctx: &RequestContext,
    2181           10 :     ) {
    2182           10 :         use futures::future::ready;
    2183           10 :         use futures::stream::TryStreamExt;
    2184           10 : 
    2185           10 :         let start_key = [0u8; DELTA_KEY_SIZE];
    2186           10 : 
    2187           10 :         let source_reader = FileBlockReader::new(&source.file, source.file_id);
    2188           10 :         let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2189           10 :             source.index_start_blk,
    2190           10 :             source.index_root_blk,
    2191           10 :             &source_reader,
    2192           10 :         );
    2193           10 :         let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
    2194           60 :         let source_stream = source_stream.filter(|res| match res {
    2195           60 :             Ok((_, lsn, _)) => ready(lsn < &truncated_at),
    2196            0 :             _ => ready(true),
    2197           60 :         });
    2198           10 :         let mut source_stream = std::pin::pin!(source_stream);
    2199           10 : 
    2200           10 :         let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
    2201           10 :         let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2202           10 :             truncated.index_start_blk,
    2203           10 :             truncated.index_root_blk,
    2204           10 :             &truncated_reader,
    2205           10 :         );
    2206           10 :         let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
    2207           10 :         let mut truncated_stream = std::pin::pin!(truncated_stream);
    2208           10 : 
    2209           10 :         let mut scratch_left = Vec::new();
    2210           10 :         let mut scratch_right = Vec::new();
    2211              : 
    2212              :         loop {
    2213           42 :             let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
    2214           42 :             let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
    2215           42 : 
    2216           42 :             if src.is_none() {
    2217           10 :                 assert!(truncated.is_none());
    2218           10 :                 break;
    2219           32 :             }
    2220           32 : 
    2221           32 :             let (src, truncated) = (src.unwrap(), truncated.unwrap());
    2222           32 : 
    2223           32 :             // because we've filtered the source with Lsn, we should always have the same keys from both.
    2224           32 :             assert_eq!(src.0, truncated.0);
    2225           32 :             assert_eq!(src.1, truncated.1);
    2226              : 
    2227              :             // if this is needed for something else, just drop this assert.
    2228           32 :             assert!(
    2229           32 :                 src.2.pos() >= truncated.2.pos(),
    2230            0 :                 "value position should not go backwards {} vs. {}",
    2231            0 :                 src.2.pos(),
    2232            0 :                 truncated.2.pos()
    2233              :             );
    2234              : 
    2235           32 :             scratch_left.clear();
    2236           32 :             let src_cursor = source_reader.block_cursor();
    2237           32 :             let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
    2238           32 :             scratch_right.clear();
    2239           32 :             let trunc_cursor = truncated_reader.block_cursor();
    2240           32 :             let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
    2241              : 
    2242           32 :             tokio::try_join!(left, right).unwrap();
    2243           32 : 
    2244           32 :             assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
    2245              :         }
    2246           10 :     }
    2247              : 
    2248        18026 :     pub(crate) fn sort_delta(
    2249        18026 :         (k1, l1, _): &(Key, Lsn, Value),
    2250        18026 :         (k2, l2, _): &(Key, Lsn, Value),
    2251        18026 :     ) -> std::cmp::Ordering {
    2252        18026 :         (k1, l1).cmp(&(k2, l2))
    2253        18026 :     }
    2254              : 
    2255           94 :     pub(crate) fn sort_delta_value(
    2256           94 :         (k1, l1, v1): &(Key, Lsn, Value),
    2257           94 :         (k2, l2, v2): &(Key, Lsn, Value),
    2258           94 :     ) -> std::cmp::Ordering {
    2259           94 :         let order_1 = if v1.is_image() { 0 } else { 1 };
    2260           94 :         let order_2 = if v2.is_image() { 0 } else { 1 };
    2261           94 :         (k1, l1, order_1).cmp(&(k2, l2, order_2))
    2262           94 :     }
    2263              : 
    2264           20 :     pub(crate) async fn produce_delta_layer(
    2265           20 :         tenant: &Tenant,
    2266           20 :         tline: &Arc<Timeline>,
    2267           20 :         mut deltas: Vec<(Key, Lsn, Value)>,
    2268           20 :         ctx: &RequestContext,
    2269           20 :     ) -> anyhow::Result<ResidentLayer> {
    2270           20 :         deltas.sort_by(sort_delta);
    2271           20 :         let (key_start, _, _) = deltas.first().unwrap();
    2272           20 :         let (key_max, _, _) = deltas.last().unwrap();
    2273         8040 :         let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
    2274         8040 :         let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
    2275           20 :         let lsn_end = Lsn(lsn_max.0 + 1);
    2276           20 :         let mut writer = DeltaLayerWriter::new(
    2277           20 :             tenant.conf,
    2278           20 :             tline.timeline_id,
    2279           20 :             tenant.tenant_shard_id,
    2280           20 :             *key_start,
    2281           20 :             (*lsn_min)..lsn_end,
    2282           20 :             ctx,
    2283           20 :         )
    2284           10 :         .await?;
    2285           20 :         let key_end = key_max.next();
    2286              : 
    2287         8060 :         for (key, lsn, value) in deltas {
    2288         8040 :             writer.put_value(key, lsn, value, ctx).await?;
    2289              :         }
    2290           58 :         let delta_layer = writer.finish(key_end, tline, ctx).await?;
    2291              : 
    2292           20 :         Ok::<_, anyhow::Error>(delta_layer)
    2293           20 :     }
    2294              : 
    2295           28 :     async fn assert_delta_iter_equal(
    2296           28 :         delta_iter: &mut DeltaLayerIterator<'_>,
    2297           28 :         expect: &[(Key, Lsn, Value)],
    2298           28 :     ) {
    2299           28 :         let mut expect_iter = expect.iter();
    2300              :         loop {
    2301        28028 :             let o1 = delta_iter.next().await.unwrap();
    2302        28028 :             let o2 = expect_iter.next();
    2303        28028 :             assert_eq!(o1.is_some(), o2.is_some());
    2304        28028 :             if o1.is_none() && o2.is_none() {
    2305           28 :                 break;
    2306        28000 :             }
    2307        28000 :             let (k1, l1, v1) = o1.unwrap();
    2308        28000 :             let (k2, l2, v2) = o2.unwrap();
    2309        28000 :             assert_eq!(&k1, k2);
    2310        28000 :             assert_eq!(l1, *l2);
    2311        28000 :             assert_eq!(&v1, v2);
    2312              :         }
    2313           28 :     }
    2314              : 
    2315              :     #[tokio::test]
    2316            2 :     async fn delta_layer_iterator() {
    2317            2 :         let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
    2318            8 :         let (tenant, ctx) = harness.load().await;
    2319            2 : 
    2320            2 :         let tline = tenant
    2321            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2322            6 :             .await
    2323            2 :             .unwrap();
    2324            2 : 
    2325         2000 :         fn get_key(id: u32) -> Key {
    2326         2000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    2327         2000 :             key.field6 = id;
    2328         2000 :             key
    2329         2000 :         }
    2330            2 :         const N: usize = 1000;
    2331            2 :         let test_deltas = (0..N)
    2332         2000 :             .map(|idx| {
    2333         2000 :                 (
    2334         2000 :                     get_key(idx as u32 / 10),
    2335         2000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
    2336         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
    2337         2000 :                 )
    2338         2000 :             })
    2339            2 :             .collect_vec();
    2340            2 :         let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
    2341            8 :             .await
    2342            2 :             .unwrap();
    2343            2 :         let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
    2344            6 :         for max_read_size in [1, 1024] {
    2345           32 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    2346           28 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    2347           28 :                 // Test if the batch size is correctly determined
    2348           28 :                 let mut iter = delta_layer.iter(&ctx);
    2349           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    2350           28 :                 let mut num_items = 0;
    2351          112 :                 for _ in 0..3 {
    2352           84 :                     iter.next_batch().await.unwrap();
    2353           84 :                     num_items += iter.key_values_batch.len();
    2354           84 :                     if max_read_size == 1 {
    2355            2 :                         // every key should be a batch b/c the value is larger than max_read_size
    2356           42 :                         assert_eq!(iter.key_values_batch.len(), 1);
    2357            2 :                     } else {
    2358           42 :                         assert_eq!(iter.key_values_batch.len(), batch_size);
    2359            2 :                     }
    2360           84 :                     if num_items >= N {
    2361            2 :                         break;
    2362           84 :                     }
    2363           84 :                     iter.key_values_batch.clear();
    2364            2 :                 }
    2365            2 :                 // Test if the result is correct
    2366           28 :                 let mut iter = delta_layer.iter(&ctx);
    2367           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    2368         9577 :                 assert_delta_iter_equal(&mut iter, &test_deltas).await;
    2369            2 :             }
    2370            2 :         }
    2371            2 :     }
    2372              : }
        

Generated by: LCOV version 2.1-beta