LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: 6c6fe25ecc82be7eef3e957667d85acf2b969737.info Lines: 86.8 % 1642 1426
Test Date: 2025-04-16 17:36:39 Functions: 71.8 % 163 117

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "values", and
      24              : //! "index". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use std::collections::{HashMap, VecDeque};
      31              : use std::fs::File;
      32              : use std::io::SeekFrom;
      33              : use std::ops::Range;
      34              : use std::os::unix::fs::FileExt;
      35              : use std::str::FromStr;
      36              : use std::sync::Arc;
      37              : 
      38              : use anyhow::{Context, Result, bail, ensure};
      39              : use camino::{Utf8Path, Utf8PathBuf};
      40              : use futures::StreamExt;
      41              : use itertools::Itertools;
      42              : use pageserver_api::config::MaxVectoredReadBytes;
      43              : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
      44              : use pageserver_api::keyspace::KeySpace;
      45              : use pageserver_api::models::ImageCompressionAlgorithm;
      46              : use pageserver_api::shard::TenantShardId;
      47              : use pageserver_api::value::Value;
      48              : use rand::Rng;
      49              : use rand::distributions::Alphanumeric;
      50              : use serde::{Deserialize, Serialize};
      51              : use tokio::sync::OnceCell;
      52              : use tokio_epoll_uring::IoBuf;
      53              : use tokio_util::sync::CancellationToken;
      54              : use tracing::*;
      55              : use utils::bin_ser::BeSer;
      56              : use utils::id::{TenantId, TimelineId};
      57              : use utils::lsn::Lsn;
      58              : 
      59              : use super::{
      60              :     AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
      61              :     ValuesReconstructState,
      62              : };
      63              : use crate::config::PageServerConf;
      64              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      65              : use crate::page_cache::{self, FileId, PAGE_SZ};
      66              : use crate::tenant::blob_io::BlobWriter;
      67              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      68              : use crate::tenant::disk_btree::{
      69              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      70              : };
      71              : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
      72              : use crate::tenant::timeline::GetVectoredError;
      73              : use crate::tenant::vectored_blob_io::{
      74              :     BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      75              :     VectoredReadPlanner,
      76              : };
      77              : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
      78              : use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
      79              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      80              : 
      81              : ///
      82              : /// Header stored in the beginning of the file
      83              : ///
      84              : /// After this comes the 'values' part, starting on block 1. After that,
      85              : /// the 'index' starts at the block indicated by 'index_start_blk'
      86              : ///
      87            0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      88              : pub struct Summary {
      89              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      90              :     pub magic: u16,
      91              :     pub format_version: u16,
      92              : 
      93              :     pub tenant_id: TenantId,
      94              :     pub timeline_id: TimelineId,
      95              :     pub key_range: Range<Key>,
      96              :     pub lsn_range: Range<Lsn>,
      97              : 
      98              :     /// Block number where the 'index' part of the file begins.
      99              :     pub index_start_blk: u32,
     100              :     /// Block within the 'index', where the B-tree root page is stored
     101              :     pub index_root_blk: u32,
     102              : }
     103              : 
     104              : impl From<&DeltaLayer> for Summary {
     105            0 :     fn from(layer: &DeltaLayer) -> Self {
     106            0 :         Self::expected(
     107            0 :             layer.desc.tenant_shard_id.tenant_id,
     108            0 :             layer.desc.timeline_id,
     109            0 :             layer.desc.key_range.clone(),
     110            0 :             layer.desc.lsn_range.clone(),
     111            0 :         )
     112            0 :     }
     113              : }
     114              : 
     115              : impl Summary {
     116         2204 :     pub(super) fn expected(
     117         2204 :         tenant_id: TenantId,
     118         2204 :         timeline_id: TimelineId,
     119         2204 :         keys: Range<Key>,
     120         2204 :         lsns: Range<Lsn>,
     121         2204 :     ) -> Self {
     122         2204 :         Self {
     123         2204 :             magic: DELTA_FILE_MAGIC,
     124         2204 :             format_version: STORAGE_FORMAT_VERSION,
     125         2204 : 
     126         2204 :             tenant_id,
     127         2204 :             timeline_id,
     128         2204 :             key_range: keys,
     129         2204 :             lsn_range: lsns,
     130         2204 : 
     131         2204 :             index_start_blk: 0,
     132         2204 :             index_root_blk: 0,
     133         2204 :         }
     134         2204 :     }
     135              : }
     136              : 
     137              : // Flag indicating that this version initialize the page
     138              : const WILL_INIT: u64 = 1;
     139              : 
     140              : /// Struct representing reference to BLOB in layers.
     141              : ///
     142              : /// Reference contains BLOB offset, and for WAL records it also contains
     143              : /// `will_init` flag. The flag helps to determine the range of records
     144              : /// that needs to be applied, without reading/deserializing records themselves.
     145            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     146              : pub struct BlobRef(pub u64);
     147              : 
     148              : impl BlobRef {
     149      9608367 :     pub fn will_init(&self) -> bool {
     150      9608367 :         (self.0 & WILL_INIT) != 0
     151      9608367 :     }
     152              : 
     153     15477128 :     pub fn pos(&self) -> u64 {
     154     15477128 :         self.0 >> 1
     155     15477128 :     }
     156              : 
     157     12984164 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     158     12984164 :         let mut blob_ref = pos << 1;
     159     12984164 :         if will_init {
     160     12940332 :             blob_ref |= WILL_INIT;
     161     12940332 :         }
     162     12984164 :         BlobRef(blob_ref)
     163     12984164 :     }
     164              : }
     165              : 
     166              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     167              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     168              : 
     169              : /// This is the key of the B-tree index stored in the delta layer. It consists
     170              : /// of the serialized representation of a Key and LSN.
     171              : impl DeltaKey {
     172      4128396 :     fn from_slice(buf: &[u8]) -> Self {
     173      4128396 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     174      4128396 :         bytes.copy_from_slice(buf);
     175      4128396 :         DeltaKey(bytes)
     176      4128396 :     }
     177              : 
     178     13465657 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     179     13465657 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     180     13465657 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     181     13465657 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     182     13465657 :         DeltaKey(bytes)
     183     13465657 :     }
     184              : 
     185      4128396 :     fn key(&self) -> Key {
     186      4128396 :         Key::from_slice(&self.0)
     187      4128396 :     }
     188              : 
     189      4128396 :     fn lsn(&self) -> Lsn {
     190      4128396 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     191      4128396 :     }
     192              : 
     193     11348612 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     194     11348612 :         let mut lsn_buf = [0u8; 8];
     195     11348612 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     196     11348612 :         Lsn(u64::from_be_bytes(lsn_buf))
     197     11348612 :     }
     198              : }
     199              : 
     200              : /// This is used only from `pagectl`. Within pageserver, all layers are
     201              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     202              : pub struct DeltaLayer {
     203              :     path: Utf8PathBuf,
     204              :     pub desc: PersistentLayerDesc,
     205              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     206              : }
     207              : 
     208              : impl std::fmt::Debug for DeltaLayer {
     209            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     210              :         use super::RangeDisplayDebug;
     211              : 
     212            0 :         f.debug_struct("DeltaLayer")
     213            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     214            0 :             .field("lsn_range", &self.desc.lsn_range)
     215            0 :             .field("file_size", &self.desc.file_size)
     216            0 :             .field("inner", &self.inner)
     217            0 :             .finish()
     218            0 :     }
     219              : }
     220              : 
     221              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     222              : /// file.
     223              : pub struct DeltaLayerInner {
     224              :     // values copied from summary
     225              :     index_start_blk: u32,
     226              :     index_root_blk: u32,
     227              : 
     228              :     file: Arc<VirtualFile>,
     229              :     file_id: FileId,
     230              : 
     231              :     layer_key_range: Range<Key>,
     232              :     layer_lsn_range: Range<Lsn>,
     233              : 
     234              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     235              : }
     236              : 
     237              : impl DeltaLayerInner {
     238            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     239            0 :         format!(
     240            0 :             "delta {}..{} {}..{}",
     241            0 :             self.key_range().start,
     242            0 :             self.key_range().end,
     243            0 :             self.lsn_range().start,
     244            0 :             self.lsn_range().end
     245            0 :         )
     246            0 :     }
     247              : }
     248              : 
     249              : impl std::fmt::Debug for DeltaLayerInner {
     250            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     251            0 :         f.debug_struct("DeltaLayerInner")
     252            0 :             .field("index_start_blk", &self.index_start_blk)
     253            0 :             .field("index_root_blk", &self.index_root_blk)
     254            0 :             .finish()
     255            0 :     }
     256              : }
     257              : 
     258              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     259              : impl std::fmt::Display for DeltaLayer {
     260            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     261            0 :         write!(f, "{}", self.layer_desc().short_id())
     262            0 :     }
     263              : }
     264              : 
     265              : impl AsLayerDesc for DeltaLayer {
     266            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     267            0 :         &self.desc
     268            0 :     }
     269              : }
     270              : 
     271              : impl DeltaLayer {
     272            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     273            0 :         self.desc.dump();
     274            0 : 
     275            0 :         if !verbose {
     276            0 :             return Ok(());
     277            0 :         }
     278              : 
     279            0 :         let inner = self.load(ctx).await?;
     280              : 
     281            0 :         inner.dump(ctx).await
     282            0 :     }
     283              : 
     284         2952 :     fn temp_path_for(
     285         2952 :         conf: &PageServerConf,
     286         2952 :         tenant_shard_id: &TenantShardId,
     287         2952 :         timeline_id: &TimelineId,
     288         2952 :         key_start: Key,
     289         2952 :         lsn_range: &Range<Lsn>,
     290         2952 :     ) -> Utf8PathBuf {
     291         2952 :         let rand_string: String = rand::thread_rng()
     292         2952 :             .sample_iter(&Alphanumeric)
     293         2952 :             .take(8)
     294         2952 :             .map(char::from)
     295         2952 :             .collect();
     296         2952 : 
     297         2952 :         conf.timeline_path(tenant_shard_id, timeline_id)
     298         2952 :             .join(format!(
     299         2952 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     300         2952 :                 key_start,
     301         2952 :                 u64::from(lsn_range.start),
     302         2952 :                 u64::from(lsn_range.end),
     303         2952 :                 rand_string,
     304         2952 :                 TEMP_FILE_SUFFIX,
     305         2952 :             ))
     306         2952 :     }
     307              : 
     308              :     ///
     309              :     /// Open the underlying file and read the metadata into memory, if it's
     310              :     /// not loaded already.
     311              :     ///
     312            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
     313            0 :         // Quick exit if already loaded
     314            0 :         self.inner
     315            0 :             .get_or_try_init(|| self.load_inner(ctx))
     316            0 :             .await
     317            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     318            0 :     }
     319              : 
     320            0 :     async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
     321            0 :         let path = self.path();
     322              : 
     323            0 :         let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
     324              : 
     325              :         // not production code
     326            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     327            0 :         let expected_layer_name = self.layer_desc().layer_name();
     328            0 : 
     329            0 :         if actual_layer_name != expected_layer_name {
     330            0 :             println!("warning: filename does not match what is expected from in-file summary");
     331            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     332            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     333            0 :         }
     334              : 
     335            0 :         Ok(Arc::new(loaded))
     336            0 :     }
     337              : 
     338              :     /// Create a DeltaLayer struct representing an existing file on disk.
     339              :     ///
     340              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     341            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     342            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     343            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     344            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     345              : 
     346            0 :         let metadata = file
     347            0 :             .metadata()
     348            0 :             .context("get file metadata to determine size")?;
     349              : 
     350              :         // This function is never used for constructing layers in a running pageserver,
     351              :         // so it does not need an accurate TenantShardId.
     352            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     353            0 : 
     354            0 :         Ok(DeltaLayer {
     355            0 :             path: path.to_path_buf(),
     356            0 :             desc: PersistentLayerDesc::new_delta(
     357            0 :                 tenant_shard_id,
     358            0 :                 summary.timeline_id,
     359            0 :                 summary.key_range,
     360            0 :                 summary.lsn_range,
     361            0 :                 metadata.len(),
     362            0 :             ),
     363            0 :             inner: OnceCell::new(),
     364            0 :         })
     365            0 :     }
     366              : 
     367              :     /// Path to the layer file in pageserver workdir.
     368            0 :     fn path(&self) -> Utf8PathBuf {
     369            0 :         self.path.clone()
     370            0 :     }
     371              : }
     372              : 
     373              : /// A builder object for constructing a new delta layer.
     374              : ///
     375              : /// Usage:
     376              : ///
     377              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     378              : ///
     379              : /// 2. Write the contents by calling `put_value` for every page
     380              : ///    version to store in the layer.
     381              : ///
     382              : /// 3. Call `finish`.
     383              : ///
     384              : struct DeltaLayerWriterInner {
     385              :     pub path: Utf8PathBuf,
     386              :     timeline_id: TimelineId,
     387              :     tenant_shard_id: TenantShardId,
     388              : 
     389              :     key_start: Key,
     390              :     lsn_range: Range<Lsn>,
     391              : 
     392              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     393              : 
     394              :     blob_writer: BlobWriter<true>,
     395              : 
     396              :     // Number of key-lsns in the layer.
     397              :     num_keys: usize,
     398              : }
     399              : 
     400              : impl DeltaLayerWriterInner {
     401              :     ///
     402              :     /// Start building a new delta layer.
     403              :     ///
     404              :     #[allow(clippy::too_many_arguments)]
     405         2952 :     async fn new(
     406         2952 :         conf: &'static PageServerConf,
     407         2952 :         timeline_id: TimelineId,
     408         2952 :         tenant_shard_id: TenantShardId,
     409         2952 :         key_start: Key,
     410         2952 :         lsn_range: Range<Lsn>,
     411         2952 :         gate: &utils::sync::gate::Gate,
     412         2952 :         cancel: CancellationToken,
     413         2952 :         ctx: &RequestContext,
     414         2952 :     ) -> anyhow::Result<Self> {
     415         2952 :         // Create the file initially with a temporary filename. We don't know
     416         2952 :         // the end key yet, so we cannot form the final filename yet. We will
     417         2952 :         // rename it when we're done.
     418         2952 :         //
     419         2952 :         // Note: This overwrites any existing file. There shouldn't be any.
     420         2952 :         // FIXME: throw an error instead?
     421         2952 :         let path =
     422         2952 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     423              : 
     424         2952 :         let mut file = VirtualFile::create(&path, ctx).await?;
     425              :         // make room for the header block
     426         2952 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     427         2952 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
     428         2952 : 
     429         2952 :         // Initialize the b-tree index builder
     430         2952 :         let block_buf = BlockBuf::new();
     431         2952 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     432         2952 : 
     433         2952 :         Ok(Self {
     434         2952 :             path,
     435         2952 :             timeline_id,
     436         2952 :             tenant_shard_id,
     437         2952 :             key_start,
     438         2952 :             lsn_range,
     439         2952 :             tree: tree_builder,
     440         2952 :             blob_writer,
     441         2952 :             num_keys: 0,
     442         2952 :         })
     443         2952 :     }
     444              : 
     445              :     ///
     446              :     /// Append a key-value pair to the file.
     447              :     ///
     448              :     /// The values must be appended in key, lsn order.
     449              :     ///
     450      4210636 :     async fn put_value(
     451      4210636 :         &mut self,
     452      4210636 :         key: Key,
     453      4210636 :         lsn: Lsn,
     454      4210636 :         val: Value,
     455      4210636 :         ctx: &RequestContext,
     456      4210636 :     ) -> anyhow::Result<()> {
     457      4210636 :         let (_, res) = self
     458      4210636 :             .put_value_bytes(
     459      4210636 :                 key,
     460      4210636 :                 lsn,
     461      4210636 :                 Value::ser(&val)?.slice_len(),
     462      4210636 :                 val.will_init(),
     463      4210636 :                 ctx,
     464      4210636 :             )
     465      4210636 :             .await;
     466      4210636 :         res
     467      4210636 :     }
     468              : 
     469     12983956 :     async fn put_value_bytes<Buf>(
     470     12983956 :         &mut self,
     471     12983956 :         key: Key,
     472     12983956 :         lsn: Lsn,
     473     12983956 :         val: FullSlice<Buf>,
     474     12983956 :         will_init: bool,
     475     12983956 :         ctx: &RequestContext,
     476     12983956 :     ) -> (FullSlice<Buf>, anyhow::Result<()>)
     477     12983956 :     where
     478     12983956 :         Buf: IoBuf + Send,
     479     12983956 :     {
     480     12983956 :         assert!(
     481     12983956 :             self.lsn_range.start <= lsn,
     482            0 :             "lsn_start={}, lsn={}",
     483              :             self.lsn_range.start,
     484              :             lsn
     485              :         );
     486              :         // We don't want to use compression in delta layer creation
     487     12983956 :         let compression = ImageCompressionAlgorithm::Disabled;
     488     12983956 :         let (val, res) = self
     489     12983956 :             .blob_writer
     490     12983956 :             .write_blob_maybe_compressed(val, ctx, compression)
     491     12983956 :             .await;
     492     12983956 :         let off = match res {
     493     12983956 :             Ok((off, _)) => off,
     494            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     495              :         };
     496              : 
     497     12983956 :         let blob_ref = BlobRef::new(off, will_init);
     498     12983956 : 
     499     12983956 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     500     12983956 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     501     12983956 : 
     502     12983956 :         self.num_keys += 1;
     503     12983956 : 
     504     12983956 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     505     12983956 :     }
     506              : 
     507      4047944 :     fn size(&self) -> u64 {
     508      4047944 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     509      4047944 :     }
     510              : 
     511              :     ///
     512              :     /// Finish writing the delta layer.
     513              :     ///
     514         2900 :     async fn finish(
     515         2900 :         self,
     516         2900 :         key_end: Key,
     517         2900 :         ctx: &RequestContext,
     518         2900 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     519         2900 :         let temp_path = self.path.clone();
     520         2900 :         let result = self.finish0(key_end, ctx).await;
     521         2900 :         if let Err(ref e) = result {
     522            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
     523            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     524            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     525            0 :             }
     526         2900 :         }
     527         2900 :         result
     528         2900 :     }
     529              : 
     530         2900 :     async fn finish0(
     531         2900 :         self,
     532         2900 :         key_end: Key,
     533         2900 :         ctx: &RequestContext,
     534         2900 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     535         2900 :         let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
     536              : 
     537         2900 :         let mut file = self.blob_writer.into_inner(ctx).await?;
     538              : 
     539              :         // Write out the index
     540         2900 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     541         2900 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     542         2900 :             .await?;
     543        30298 :         for buf in block_buf.blocks {
     544        27398 :             let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     545        27398 :             res?;
     546              :         }
     547         2900 :         assert!(self.lsn_range.start < self.lsn_range.end);
     548              :         // Fill in the summary on blk 0
     549         2900 :         let summary = Summary {
     550         2900 :             magic: DELTA_FILE_MAGIC,
     551         2900 :             format_version: STORAGE_FORMAT_VERSION,
     552         2900 :             tenant_id: self.tenant_shard_id.tenant_id,
     553         2900 :             timeline_id: self.timeline_id,
     554         2900 :             key_range: self.key_start..key_end,
     555         2900 :             lsn_range: self.lsn_range.clone(),
     556         2900 :             index_start_blk,
     557         2900 :             index_root_blk,
     558         2900 :         };
     559         2900 : 
     560         2900 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     561         2900 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     562         2900 :         Summary::ser_into(&summary, &mut buf)?;
     563         2900 :         file.seek(SeekFrom::Start(0)).await?;
     564         2900 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     565         2900 :         res?;
     566              : 
     567         2900 :         let metadata = file
     568         2900 :             .metadata()
     569         2900 :             .await
     570         2900 :             .context("get file metadata to determine size")?;
     571              : 
     572              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     573              :         // Make it a little bit below to account for differing GB units
     574              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     575         2900 :         ensure!(
     576         2900 :             metadata.len() <= S3_UPLOAD_LIMIT,
     577            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     578            0 :             file.path(),
     579            0 :             metadata.len()
     580              :         );
     581              : 
     582              :         // Note: Because we opened the file in write-only mode, we cannot
     583              :         // reuse the same VirtualFile for reading later. That's why we don't
     584              :         // set inner.file here. The first read will have to re-open it.
     585              : 
     586         2900 :         let desc = PersistentLayerDesc::new_delta(
     587         2900 :             self.tenant_shard_id,
     588         2900 :             self.timeline_id,
     589         2900 :             self.key_start..key_end,
     590         2900 :             self.lsn_range.clone(),
     591         2900 :             metadata.len(),
     592         2900 :         );
     593         2900 : 
     594         2900 :         // fsync the file
     595         2900 :         file.sync_all()
     596         2900 :             .await
     597         2900 :             .maybe_fatal_err("delta_layer sync_all")?;
     598              : 
     599         2900 :         trace!("created delta layer {}", self.path);
     600              : 
     601         2900 :         Ok((desc, self.path))
     602         2900 :     }
     603              : }
     604              : 
     605              : /// A builder object for constructing a new delta layer.
     606              : ///
     607              : /// Usage:
     608              : ///
     609              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     610              : ///
     611              : /// 2. Write the contents by calling `put_value` for every page
     612              : ///    version to store in the layer.
     613              : ///
     614              : /// 3. Call `finish`.
     615              : ///
     616              : /// # Note
     617              : ///
     618              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     619              : /// possible for the writer to drop before `finish` is actually called. So this
     620              : /// could lead to odd temporary files in the directory, exhausting file system.
     621              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     622              : /// implementation that cleans up the temporary file in failure. It's not
     623              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     624              : /// out some fields, making it impossible to implement `Drop`.
     625              : ///
     626              : #[must_use]
     627              : pub struct DeltaLayerWriter {
     628              :     inner: Option<DeltaLayerWriterInner>,
     629              : }
     630              : 
     631              : impl DeltaLayerWriter {
     632              :     ///
     633              :     /// Start building a new delta layer.
     634              :     ///
     635              :     #[allow(clippy::too_many_arguments)]
     636         2952 :     pub async fn new(
     637         2952 :         conf: &'static PageServerConf,
     638         2952 :         timeline_id: TimelineId,
     639         2952 :         tenant_shard_id: TenantShardId,
     640         2952 :         key_start: Key,
     641         2952 :         lsn_range: Range<Lsn>,
     642         2952 :         gate: &utils::sync::gate::Gate,
     643         2952 :         cancel: CancellationToken,
     644         2952 :         ctx: &RequestContext,
     645         2952 :     ) -> anyhow::Result<Self> {
     646         2952 :         Ok(Self {
     647         2952 :             inner: Some(
     648         2952 :                 DeltaLayerWriterInner::new(
     649         2952 :                     conf,
     650         2952 :                     timeline_id,
     651         2952 :                     tenant_shard_id,
     652         2952 :                     key_start,
     653         2952 :                     lsn_range,
     654         2952 :                     gate,
     655         2952 :                     cancel,
     656         2952 :                     ctx,
     657         2952 :                 )
     658         2952 :                 .await?,
     659              :             ),
     660              :         })
     661         2952 :     }
     662              : 
     663            0 :     pub fn is_empty(&self) -> bool {
     664            0 :         self.inner.as_ref().unwrap().num_keys == 0
     665            0 :     }
     666              : 
     667              :     ///
     668              :     /// Append a key-value pair to the file.
     669              :     ///
     670              :     /// The values must be appended in key, lsn order.
     671              :     ///
     672      4210636 :     pub async fn put_value(
     673      4210636 :         &mut self,
     674      4210636 :         key: Key,
     675      4210636 :         lsn: Lsn,
     676      4210636 :         val: Value,
     677      4210636 :         ctx: &RequestContext,
     678      4210636 :     ) -> anyhow::Result<()> {
     679      4210636 :         self.inner
     680      4210636 :             .as_mut()
     681      4210636 :             .unwrap()
     682      4210636 :             .put_value(key, lsn, val, ctx)
     683      4210636 :             .await
     684      4210636 :     }
     685              : 
     686      8773320 :     pub async fn put_value_bytes<Buf>(
     687      8773320 :         &mut self,
     688      8773320 :         key: Key,
     689      8773320 :         lsn: Lsn,
     690      8773320 :         val: FullSlice<Buf>,
     691      8773320 :         will_init: bool,
     692      8773320 :         ctx: &RequestContext,
     693      8773320 :     ) -> (FullSlice<Buf>, anyhow::Result<()>)
     694      8773320 :     where
     695      8773320 :         Buf: IoBuf + Send,
     696      8773320 :     {
     697      8773320 :         self.inner
     698      8773320 :             .as_mut()
     699      8773320 :             .unwrap()
     700      8773320 :             .put_value_bytes(key, lsn, val, will_init, ctx)
     701      8773320 :             .await
     702      8773320 :     }
     703              : 
     704      4047944 :     pub fn size(&self) -> u64 {
     705      4047944 :         self.inner.as_ref().unwrap().size()
     706      4047944 :     }
     707              : 
     708              :     ///
     709              :     /// Finish writing the delta layer.
     710              :     ///
     711         2900 :     pub(crate) async fn finish(
     712         2900 :         mut self,
     713         2900 :         key_end: Key,
     714         2900 :         ctx: &RequestContext,
     715         2900 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     716         2900 :         self.inner.take().unwrap().finish(key_end, ctx).await
     717         2900 :     }
     718              : 
     719        24508 :     pub(crate) fn num_keys(&self) -> usize {
     720        24508 :         self.inner.as_ref().unwrap().num_keys
     721        24508 :     }
     722              : 
     723        30280 :     pub(crate) fn estimated_size(&self) -> u64 {
     724        30280 :         let inner = self.inner.as_ref().unwrap();
     725        30280 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
     726        30280 :     }
     727              : }
     728              : 
     729              : impl Drop for DeltaLayerWriter {
     730         2952 :     fn drop(&mut self) {
     731         2952 :         if let Some(inner) = self.inner.take() {
     732           52 :             // We want to remove the virtual file here, so it's fine to not
     733           52 :             // having completely flushed unwritten data.
     734           52 :             let vfile = inner.blob_writer.into_inner_no_flush();
     735           52 :             vfile.remove();
     736         2900 :         }
     737         2952 :     }
     738              : }
     739              : 
     740              : #[derive(thiserror::Error, Debug)]
     741              : pub enum RewriteSummaryError {
     742              :     #[error("magic mismatch")]
     743              :     MagicMismatch,
     744              :     #[error(transparent)]
     745              :     Other(#[from] anyhow::Error),
     746              : }
     747              : 
     748              : impl From<std::io::Error> for RewriteSummaryError {
     749            0 :     fn from(e: std::io::Error) -> Self {
     750            0 :         Self::Other(anyhow::anyhow!(e))
     751            0 :     }
     752              : }
     753              : 
     754              : impl DeltaLayer {
     755            0 :     pub async fn rewrite_summary<F>(
     756            0 :         path: &Utf8Path,
     757            0 :         rewrite: F,
     758            0 :         ctx: &RequestContext,
     759            0 :     ) -> Result<(), RewriteSummaryError>
     760            0 :     where
     761            0 :         F: Fn(Summary) -> Summary,
     762            0 :     {
     763            0 :         let mut file = VirtualFile::open_with_options(
     764            0 :             path,
     765            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     766            0 :             ctx,
     767            0 :         )
     768            0 :         .await
     769            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     770            0 :         let file_id = page_cache::next_file_id();
     771            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     772            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     773            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     774            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     775            0 :             return Err(RewriteSummaryError::MagicMismatch);
     776            0 :         }
     777            0 : 
     778            0 :         let new_summary = rewrite(actual_summary);
     779            0 : 
     780            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     781            0 :         // TODO: could use smallvec here, but it's a pain with Slice<T>
     782            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     783            0 :         file.seek(SeekFrom::Start(0)).await?;
     784            0 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     785            0 :         res?;
     786            0 :         Ok(())
     787            0 :     }
     788              : }
     789              : 
     790              : impl DeltaLayerInner {
     791         2080 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     792         2080 :         &self.layer_key_range
     793         2080 :     }
     794              : 
     795         2080 :     pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
     796         2080 :         &self.layer_lsn_range
     797         2080 :     }
     798              : 
     799         2204 :     pub(super) async fn load(
     800         2204 :         path: &Utf8Path,
     801         2204 :         summary: Option<Summary>,
     802         2204 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     803         2204 :         ctx: &RequestContext,
     804         2204 :     ) -> anyhow::Result<Self> {
     805         2204 :         let file = Arc::new(
     806         2204 :             VirtualFile::open_v2(path, ctx)
     807         2204 :                 .await
     808         2204 :                 .context("open layer file")?,
     809              :         );
     810              : 
     811         2204 :         let file_id = page_cache::next_file_id();
     812         2204 : 
     813         2204 :         let block_reader = FileBlockReader::new(&file, file_id);
     814              : 
     815         2204 :         let summary_blk = block_reader
     816         2204 :             .read_blk(0, ctx)
     817         2204 :             .await
     818         2204 :             .context("read first block")?;
     819              : 
     820              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     821         2204 :         let actual_summary =
     822         2204 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     823              : 
     824         2204 :         if let Some(mut expected_summary) = summary {
     825              :             // production code path
     826         2204 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     827         2204 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     828         2204 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     829         2204 :             expected_summary.timeline_id = actual_summary.timeline_id;
     830         2204 : 
     831         2204 :             if actual_summary != expected_summary {
     832            0 :                 bail!(
     833            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     834            0 :                     actual_summary,
     835            0 :                     expected_summary
     836            0 :                 );
     837         2204 :             }
     838            0 :         }
     839              : 
     840         2204 :         Ok(DeltaLayerInner {
     841         2204 :             file,
     842         2204 :             file_id,
     843         2204 :             index_start_blk: actual_summary.index_start_blk,
     844         2204 :             index_root_blk: actual_summary.index_root_blk,
     845         2204 :             max_vectored_read_bytes,
     846         2204 :             layer_key_range: actual_summary.key_range,
     847         2204 :             layer_lsn_range: actual_summary.lsn_range,
     848         2204 :         })
     849         2204 :     }
     850              : 
     851              :     // Look up the keys in the provided keyspace and update
     852              :     // the reconstruct state with whatever is found.
     853              :     //
     854              :     // Currently, the index is visited for each range, but this
     855              :     // can be further optimised to visit the index only once.
     856       470989 :     pub(super) async fn get_values_reconstruct_data(
     857       470989 :         &self,
     858       470989 :         this: ResidentLayer,
     859       470989 :         keyspace: KeySpace,
     860       470989 :         lsn_range: Range<Lsn>,
     861       470989 :         reconstruct_state: &mut ValuesReconstructState,
     862       470989 :         ctx: &RequestContext,
     863       470989 :     ) -> Result<(), GetVectoredError> {
     864       470989 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     865       470989 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     866       470989 :             self.index_start_blk,
     867       470989 :             self.index_root_blk,
     868       470989 :             block_reader,
     869       470989 :         );
     870       470989 : 
     871       470989 :         let planner = VectoredReadPlanner::new(
     872       470989 :             self.max_vectored_read_bytes
     873       470989 :                 .expect("Layer is loaded with max vectored bytes config")
     874       470989 :                 .0
     875       470989 :                 .into(),
     876       470989 :         );
     877       470989 : 
     878       470989 :         let data_end_offset = self.index_start_offset();
     879              : 
     880       470989 :         let reads = Self::plan_reads(
     881       470989 :             &keyspace,
     882       470989 :             lsn_range.clone(),
     883       470989 :             data_end_offset,
     884       470989 :             index_reader,
     885       470989 :             planner,
     886       470989 :             ctx,
     887       470989 :         )
     888       470989 :         .await
     889       470989 :         .map_err(GetVectoredError::Other)?;
     890              : 
     891       470989 :         self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
     892       470989 :             .await;
     893              : 
     894       470989 :         Ok(())
     895       470989 :     }
     896              : 
     897       471393 :     async fn plan_reads<Reader>(
     898       471393 :         keyspace: &KeySpace,
     899       471393 :         lsn_range: Range<Lsn>,
     900       471393 :         data_end_offset: u64,
     901       471393 :         index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
     902       471393 :         mut planner: VectoredReadPlanner,
     903       471393 :         ctx: &RequestContext,
     904       471393 :     ) -> anyhow::Result<Vec<VectoredRead>>
     905       471393 :     where
     906       471393 :         Reader: BlockReader + Clone,
     907       471393 :     {
     908       471393 :         let ctx = RequestContextBuilder::from(ctx)
     909       471393 :             .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     910       471393 :             .attached_child();
     911              : 
     912       481633 :         for range in keyspace.ranges.iter() {
     913       481633 :             let mut range_end_handled = false;
     914       481633 : 
     915       481633 :             let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
     916       481633 :             let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
     917       481633 :             let mut index_stream = std::pin::pin!(index_stream);
     918              : 
     919      7167755 :             while let Some(index_entry) = index_stream.next().await {
     920      7150448 :                 let (raw_key, value) = index_entry?;
     921      7150448 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     922      7150448 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
     923      7150448 :                 let blob_ref = BlobRef(value);
     924      7150448 : 
     925      7150448 :                 // Lsns are not monotonically increasing across keys, so we don't assert on them.
     926      7150448 :                 assert!(key >= range.start);
     927              : 
     928      7150448 :                 let outside_lsn_range = !lsn_range.contains(&lsn);
     929              : 
     930      7150448 :                 let flag = {
     931      7150448 :                     if outside_lsn_range {
     932      1740245 :                         BlobFlag::Ignore
     933      5410203 :                     } else if blob_ref.will_init() {
     934      1095631 :                         BlobFlag::ReplaceAll
     935              :                     } else {
     936              :                         // Usual path: add blob to the read
     937      4314572 :                         BlobFlag::None
     938              :                     }
     939              :                 };
     940              : 
     941      7150448 :                 if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
     942       464326 :                     planner.handle_range_end(blob_ref.pos());
     943       464326 :                     range_end_handled = true;
     944       464326 :                     break;
     945      6686122 :                 } else {
     946      6686122 :                     planner.handle(key, lsn, blob_ref.pos(), flag);
     947      6686122 :                 }
     948              :             }
     949              : 
     950       481633 :             if !range_end_handled {
     951        17307 :                 tracing::debug!("Handling range end fallback at {}", data_end_offset);
     952        17307 :                 planner.handle_range_end(data_end_offset);
     953       464326 :             }
     954              :         }
     955              : 
     956       471393 :         Ok(planner.finish())
     957       471393 :     }
     958              : 
     959       471389 :     fn get_min_read_buffer_size(
     960       471389 :         planned_reads: &[VectoredRead],
     961       471389 :         read_size_soft_max: usize,
     962       471389 :     ) -> usize {
     963       471389 :         let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
     964       172103 :             return read_size_soft_max;
     965              :         };
     966              : 
     967       299286 :         let largest_read_size = largest_read.size();
     968       299286 :         if largest_read_size > read_size_soft_max {
     969              :             // If the read is oversized, it should only contain one key.
     970          400 :             let offenders = largest_read
     971          400 :                 .blobs_at
     972          400 :                 .as_slice()
     973          400 :                 .iter()
     974          400 :                 .filter_map(|(_, blob_meta)| {
     975          400 :                     if blob_meta.key.is_rel_dir_key()
     976          400 :                         || blob_meta.key == DBDIR_KEY
     977          400 :                         || blob_meta.key.is_aux_file_key()
     978              :                     {
     979              :                         // The size of values for these keys is unbounded and can
     980              :                         // grow very large in pathological cases.
     981            0 :                         None
     982              :                     } else {
     983          400 :                         Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
     984              :                     }
     985          400 :                 })
     986          400 :                 .join(", ");
     987          400 : 
     988          400 :             if !offenders.is_empty() {
     989          400 :                 tracing::warn!(
     990            0 :                     "Oversized vectored read ({} > {}) for keys {}",
     991              :                     largest_read_size,
     992              :                     read_size_soft_max,
     993              :                     offenders
     994              :                 );
     995            0 :             }
     996       298886 :         }
     997              : 
     998       299286 :         largest_read_size
     999       471389 :     }
    1000              : 
    1001       470989 :     async fn do_reads_and_update_state(
    1002       470989 :         &self,
    1003       470989 :         this: ResidentLayer,
    1004       470989 :         reads: Vec<VectoredRead>,
    1005       470989 :         reconstruct_state: &mut ValuesReconstructState,
    1006       470989 :         ctx: &RequestContext,
    1007       470989 :     ) {
    1008       470989 :         let max_vectored_read_bytes = self
    1009       470989 :             .max_vectored_read_bytes
    1010       470989 :             .expect("Layer is loaded with max vectored bytes config")
    1011       470989 :             .0
    1012       470989 :             .into();
    1013       470989 :         let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
    1014              : 
    1015              :         // Note that reads are processed in reverse order (from highest key+lsn).
    1016              :         // This is the order that `ReconstructState` requires such that it can
    1017              :         // track when a key is done.
    1018       470989 :         for read in reads.into_iter().rev() {
    1019       340715 :             let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
    1020      3147671 :             for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
    1021      3147671 :                 let io = reconstruct_state.update_key(
    1022      3147671 :                     &blob_meta.key,
    1023      3147671 :                     blob_meta.lsn,
    1024      3147671 :                     blob_meta.will_init,
    1025      3147671 :                 );
    1026      3147671 :                 ios.insert((blob_meta.key, blob_meta.lsn), io);
    1027      3147671 :             }
    1028              : 
    1029       340715 :             let read_extend_residency = this.clone();
    1030       340715 :             let read_from = self.file.clone();
    1031       340715 :             let read_ctx = ctx.attached_child();
    1032       340715 :             reconstruct_state
    1033       340715 :                 .spawn_io(async move {
    1034       340715 :                     let vectored_blob_reader = VectoredBlobReader::new(&read_from);
    1035       340715 :                     let buf = IoBufferMut::with_capacity(buf_size);
    1036              : 
    1037       340715 :                     let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
    1038       340715 :                     match res {
    1039       340715 :                         Ok(blobs_buf) => {
    1040       340715 :                             let view = BufView::new_slice(&blobs_buf.buf);
    1041      3147671 :                             for meta in blobs_buf.blobs.iter().rev() {
    1042      3147671 :                                 let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
    1043              : 
    1044      3147671 :                                 let blob_read = meta.read(&view).await;
    1045      3147671 :                                 let blob_read = match blob_read {
    1046      3147671 :                                     Ok(buf) => buf,
    1047            0 :                                     Err(e) => {
    1048            0 :                                         io.complete(Err(e));
    1049            0 :                                         continue;
    1050              :                                     }
    1051              :                                 };
    1052              : 
    1053      3147671 :                                 io.complete(Ok(OnDiskValue::WalRecordOrImage(
    1054      3147671 :                                     blob_read.into_bytes(),
    1055      3147671 :                                 )));
    1056              :                             }
    1057              : 
    1058       340715 :                             assert!(ios.is_empty());
    1059              :                         }
    1060            0 :                         Err(err) => {
    1061            0 :                             for (_, sender) in ios {
    1062            0 :                                 sender.complete(Err(std::io::Error::new(
    1063            0 :                                     err.kind(),
    1064            0 :                                     "vec read failed",
    1065            0 :                                 )));
    1066            0 :                             }
    1067              :                         }
    1068              :                     }
    1069              : 
    1070              :                     // keep layer resident until this IO is done; this spawned IO future generally outlives the
    1071              :                     // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
    1072       340715 :                     drop(read_extend_residency);
    1073       340715 :                 })
    1074       340715 :                 .await;
    1075              :         }
    1076       470989 :     }
    1077              : 
    1078          812 :     pub(crate) async fn index_entries<'a>(
    1079          812 :         &'a self,
    1080          812 :         ctx: &RequestContext,
    1081          812 :     ) -> Result<Vec<DeltaEntry<'a>>> {
    1082          812 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1083          812 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1084          812 :             self.index_start_blk,
    1085          812 :             self.index_root_blk,
    1086          812 :             block_reader,
    1087          812 :         );
    1088          812 : 
    1089          812 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
    1090          812 : 
    1091          812 :         tree_reader
    1092          812 :             .visit(
    1093          812 :                 &[0u8; DELTA_KEY_SIZE],
    1094          812 :                 VisitDirection::Forwards,
    1095      4128092 :                 |key, value| {
    1096      4128092 :                     let delta_key = DeltaKey::from_slice(key);
    1097      4128092 :                     let val_ref = ValueRef {
    1098      4128092 :                         blob_ref: BlobRef(value),
    1099      4128092 :                         layer: self,
    1100      4128092 :                     };
    1101      4128092 :                     let pos = BlobRef(value).pos();
    1102      4128092 :                     if let Some(last) = all_keys.last_mut() {
    1103      4127280 :                         // subtract offset of the current and last entries to get the size
    1104      4127280 :                         // of the value associated with this (key, lsn) tuple
    1105      4127280 :                         let first_pos = last.size;
    1106      4127280 :                         last.size = pos - first_pos;
    1107      4127280 :                     }
    1108      4128092 :                     let entry = DeltaEntry {
    1109      4128092 :                         key: delta_key.key(),
    1110      4128092 :                         lsn: delta_key.lsn(),
    1111      4128092 :                         size: pos,
    1112      4128092 :                         val: val_ref,
    1113      4128092 :                     };
    1114      4128092 :                     all_keys.push(entry);
    1115      4128092 :                     true
    1116      4128092 :                 },
    1117          812 :                 &RequestContextBuilder::from(ctx)
    1118          812 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1119          812 :                     .attached_child(),
    1120          812 :             )
    1121          812 :             .await?;
    1122          812 :         if let Some(last) = all_keys.last_mut() {
    1123          812 :             // Last key occupies all space till end of value storage,
    1124          812 :             // which corresponds to beginning of the index
    1125          812 :             last.size = self.index_start_offset() - last.size;
    1126          812 :         }
    1127          812 :         Ok(all_keys)
    1128          812 :     }
    1129              : 
    1130              :     /// Using the given writer, write out a version which has the earlier Lsns than `until`.
    1131              :     ///
    1132              :     /// Return the amount of key value records pushed to the writer.
    1133           20 :     pub(super) async fn copy_prefix(
    1134           20 :         &self,
    1135           20 :         writer: &mut DeltaLayerWriter,
    1136           20 :         until: Lsn,
    1137           20 :         ctx: &RequestContext,
    1138           20 :     ) -> anyhow::Result<usize> {
    1139              :         use futures::stream::TryStreamExt;
    1140              : 
    1141              :         use crate::tenant::vectored_blob_io::{
    1142              :             BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
    1143              :         };
    1144              : 
    1145              :         #[derive(Debug)]
    1146              :         enum Item {
    1147              :             Actual(Key, Lsn, BlobRef),
    1148              :             Sentinel,
    1149              :         }
    1150              : 
    1151              :         impl From<Item> for Option<(Key, Lsn, BlobRef)> {
    1152          140 :             fn from(value: Item) -> Self {
    1153          140 :                 match value {
    1154          120 :                     Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
    1155           20 :                     Item::Sentinel => None,
    1156              :                 }
    1157          140 :             }
    1158              :         }
    1159              : 
    1160              :         impl Item {
    1161          140 :             fn offset(&self) -> Option<BlobRef> {
    1162          140 :                 match self {
    1163          120 :                     Item::Actual(_, _, blob) => Some(*blob),
    1164           20 :                     Item::Sentinel => None,
    1165              :                 }
    1166          140 :             }
    1167              : 
    1168          140 :             fn is_last(&self) -> bool {
    1169          140 :                 matches!(self, Item::Sentinel)
    1170          140 :             }
    1171              :         }
    1172              : 
    1173           20 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1174           20 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1175           20 :             self.index_start_blk,
    1176           20 :             self.index_root_blk,
    1177           20 :             block_reader,
    1178           20 :         );
    1179           20 : 
    1180           20 :         let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
    1181          120 :         let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
    1182           20 :         // put in a sentinel value for getting the end offset for last item, and not having to
    1183           20 :         // repeat the whole read part
    1184           20 :         let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
    1185           20 :             Item::Sentinel,
    1186           20 :         ))));
    1187           20 :         let mut stream = std::pin::pin!(stream);
    1188           20 : 
    1189           20 :         let mut prev: Option<(Key, Lsn, BlobRef)> = None;
    1190           20 : 
    1191           20 :         let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
    1192           20 : 
    1193           20 :         let max_read_size = self
    1194           20 :             .max_vectored_read_bytes
    1195           20 :             .map(|x| x.0.get())
    1196           20 :             .unwrap_or(8192);
    1197           20 : 
    1198           20 :         let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
    1199           20 : 
    1200           20 :         // FIXME: buffering of DeltaLayerWriter
    1201           20 :         let mut per_blob_copy = Vec::new();
    1202           20 : 
    1203           20 :         let mut records = 0;
    1204              : 
    1205          160 :         while let Some(item) = stream.try_next().await? {
    1206          140 :             tracing::debug!(?item, "popped");
    1207          140 :             let offset = item
    1208          140 :                 .offset()
    1209          140 :                 .unwrap_or(BlobRef::new(self.index_start_offset(), false));
    1210              : 
    1211          140 :             let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
    1212          120 :                 let end_offset = offset;
    1213          120 : 
    1214          120 :                 Some((
    1215          120 :                     BlobMeta {
    1216          120 :                         key,
    1217          120 :                         lsn,
    1218          120 :                         will_init: false,
    1219          120 :                     },
    1220          120 :                     start_offset..end_offset,
    1221          120 :                 ))
    1222              :             } else {
    1223           20 :                 None
    1224              :             };
    1225              : 
    1226          140 :             let is_last = item.is_last();
    1227          140 : 
    1228          140 :             prev = Option::from(item);
    1229          140 : 
    1230          140 :             let actionable = actionable.filter(|x| x.0.lsn < until);
    1231              : 
    1232          140 :             let builder = if let Some((meta, offsets)) = actionable {
    1233              :                 // extend or create a new builder
    1234           64 :                 if read_builder
    1235           64 :                     .as_mut()
    1236           64 :                     .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
    1237           64 :                     .unwrap_or(VectoredReadExtended::No)
    1238           64 :                     == VectoredReadExtended::Yes
    1239              :                 {
    1240           32 :                     None
    1241              :                 } else {
    1242           32 :                     read_builder.replace(ChunkedVectoredReadBuilder::new(
    1243           32 :                         offsets.start.pos(),
    1244           32 :                         offsets.end.pos(),
    1245           32 :                         meta,
    1246           32 :                         max_read_size,
    1247           32 :                     ))
    1248              :                 }
    1249              :             } else {
    1250              :                 // nothing to do, except perhaps flush any existing for the last element
    1251           76 :                 None
    1252              :             };
    1253              : 
    1254              :             // flush the possible older builder and also the new one if the item was the last one
    1255          140 :             let builders = builder.into_iter();
    1256          140 :             let builders = if is_last {
    1257           20 :                 builders.chain(read_builder.take())
    1258              :             } else {
    1259          120 :                 builders.chain(None)
    1260              :             };
    1261              : 
    1262          172 :             for builder in builders {
    1263           32 :                 let read = builder.build();
    1264           32 : 
    1265           32 :                 let reader = VectoredBlobReader::new(&self.file);
    1266           32 : 
    1267           32 :                 let mut buf = buffer.take().unwrap();
    1268           32 : 
    1269           32 :                 buf.clear();
    1270           32 :                 buf.reserve(read.size());
    1271           32 :                 let res = reader.read_blobs(&read, buf, ctx).await?;
    1272              : 
    1273           32 :                 let view = BufView::new_slice(&res.buf);
    1274              : 
    1275           96 :                 for blob in res.blobs {
    1276           64 :                     let key = blob.meta.key;
    1277           64 :                     let lsn = blob.meta.lsn;
    1278              : 
    1279           64 :                     let data = blob.read(&view).await?;
    1280              : 
    1281              :                     #[cfg(debug_assertions)]
    1282           64 :                     Value::des(&data)
    1283           64 :                         .with_context(|| {
    1284            0 :                             format!(
    1285            0 :                                 "blob failed to deserialize for {}: {:?}",
    1286            0 :                                 blob,
    1287            0 :                                 utils::Hex(&data)
    1288            0 :                             )
    1289           64 :                         })
    1290           64 :                         .unwrap();
    1291           64 : 
    1292           64 :                     // is it an image or will_init walrecord?
    1293           64 :                     // FIXME: this could be handled by threading the BlobRef to the
    1294           64 :                     // VectoredReadBuilder
    1295           64 :                     let will_init = pageserver_api::value::ValueBytes::will_init(&data)
    1296           64 :                         .inspect_err(|_e| {
    1297            0 :                             #[cfg(feature = "testing")]
    1298            0 :                             tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
    1299           64 :                         })
    1300           64 :                         .unwrap_or(false);
    1301           64 : 
    1302           64 :                     per_blob_copy.clear();
    1303           64 :                     per_blob_copy.extend_from_slice(&data);
    1304              : 
    1305           64 :                     let (tmp, res) = writer
    1306           64 :                         .put_value_bytes(
    1307           64 :                             key,
    1308           64 :                             lsn,
    1309           64 :                             std::mem::take(&mut per_blob_copy).slice_len(),
    1310           64 :                             will_init,
    1311           64 :                             ctx,
    1312           64 :                         )
    1313           64 :                         .await;
    1314           64 :                     per_blob_copy = tmp.into_raw_slice().into_inner();
    1315           64 : 
    1316           64 :                     res?;
    1317              : 
    1318           64 :                     records += 1;
    1319              :                 }
    1320              : 
    1321           32 :                 buffer = Some(res.buf);
    1322              :             }
    1323              :         }
    1324              : 
    1325           20 :         assert!(
    1326           20 :             read_builder.is_none(),
    1327            0 :             "with the sentinel above loop should had handled all"
    1328              :         );
    1329              : 
    1330           20 :         Ok(records)
    1331           20 :     }
    1332              : 
    1333            8 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1334            8 :         println!(
    1335            8 :             "index_start_blk: {}, root {}",
    1336            8 :             self.index_start_blk, self.index_root_blk
    1337            8 :         );
    1338            8 : 
    1339            8 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1340            8 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1341            8 :             self.index_start_blk,
    1342            8 :             self.index_root_blk,
    1343            8 :             block_reader,
    1344            8 :         );
    1345            8 : 
    1346            8 :         tree_reader.dump(ctx).await?;
    1347              : 
    1348            8 :         let keys = self.index_entries(ctx).await?;
    1349              : 
    1350           16 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1351           16 :             let buf = val.load_raw(ctx).await?;
    1352           16 :             let val = Value::des(&buf)?;
    1353           16 :             let desc = match val {
    1354           16 :                 Value::Image(img) => {
    1355           16 :                     format!(" img {} bytes", img.len())
    1356              :                 }
    1357            0 :                 Value::WalRecord(rec) => {
    1358            0 :                     let wal_desc = pageserver_api::record::describe_wal_record(&rec)?;
    1359            0 :                     format!(
    1360            0 :                         " rec {} bytes will_init: {} {}",
    1361            0 :                         buf.len(),
    1362            0 :                         rec.will_init(),
    1363            0 :                         wal_desc
    1364            0 :                     )
    1365              :                 }
    1366              :             };
    1367           16 :             Ok(desc)
    1368           16 :         }
    1369              : 
    1370           24 :         for entry in keys {
    1371           16 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1372           16 :             let desc = match dump_blob(&val, ctx).await {
    1373           16 :                 Ok(desc) => desc,
    1374            0 :                 Err(err) => {
    1375            0 :                     format!("ERROR: {err}")
    1376              :                 }
    1377              :             };
    1378           16 :             println!("  key {key} at {lsn}: {desc}");
    1379              : 
    1380              :             // Print more details about CHECKPOINT records. Would be nice to print details
    1381              :             // of many other record types too, but these are particularly interesting, as
    1382              :             // have a lot of special processing for them in walingest.rs.
    1383            8 :             use pageserver_api::key::CHECKPOINT_KEY;
    1384            8 :             use postgres_ffi::CheckPoint;
    1385           16 :             if key == CHECKPOINT_KEY {
    1386            0 :                 let val = val.load(ctx).await?;
    1387            0 :                 match val {
    1388            0 :                     Value::Image(img) => {
    1389            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1390            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
    1391              :                     }
    1392            0 :                     Value::WalRecord(_rec) => {
    1393            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1394            0 :                     }
    1395              :                 }
    1396           16 :             }
    1397              :         }
    1398              : 
    1399            8 :         Ok(())
    1400            8 :     }
    1401              : 
    1402           60 :     fn stream_index_forwards<'a, R>(
    1403           60 :         &'a self,
    1404           60 :         reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
    1405           60 :         start: &'a [u8; DELTA_KEY_SIZE],
    1406           60 :         ctx: &'a RequestContext,
    1407           60 :     ) -> impl futures::stream::Stream<
    1408           60 :         Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
    1409           60 :     > + 'a
    1410           60 :     where
    1411           60 :         R: BlockReader + 'a,
    1412           60 :     {
    1413              :         use futures::stream::TryStreamExt;
    1414           60 :         let stream = reader.into_stream(start, ctx);
    1415          304 :         stream.map_ok(|(key, value)| {
    1416          304 :             let key = DeltaKey::from_slice(&key);
    1417          304 :             let (key, lsn) = (key.key(), key.lsn());
    1418          304 :             let offset = BlobRef(value);
    1419          304 : 
    1420          304 :             (key, lsn, offset)
    1421          304 :         })
    1422           60 :     }
    1423              : 
    1424              :     /// The file offset to the first block of index.
    1425              :     ///
    1426              :     /// The file structure is summary, values, and index. We often need this for the size of last blob.
    1427       473037 :     fn index_start_offset(&self) -> u64 {
    1428       473037 :         let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
    1429       473037 :         let bref = BlobRef(offset);
    1430       473037 :         tracing::debug!(
    1431              :             index_start_blk = self.index_start_blk,
    1432              :             offset,
    1433            0 :             pos = bref.pos(),
    1434            0 :             "index_start_offset"
    1435              :         );
    1436       473037 :         offset
    1437       473037 :     }
    1438              : 
    1439         1152 :     pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
    1440         1152 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1441         1152 :         let tree_reader =
    1442         1152 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
    1443         1152 :         DeltaLayerIterator {
    1444         1152 :             delta_layer: self,
    1445         1152 :             ctx,
    1446         1152 :             index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
    1447         1152 :             key_values_batch: std::collections::VecDeque::new(),
    1448         1152 :             is_end: false,
    1449         1152 :             planner: StreamingVectoredReadPlanner::new(
    1450         1152 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
    1451         1152 :                 1024,        // The default value. Unit tests might use a different value
    1452         1152 :             ),
    1453         1152 :         }
    1454         1152 :     }
    1455              : 
    1456              :     /// NB: not super efficient, but not terrible either. Should prob be an iterator.
    1457              :     //
    1458              :     // We're reusing the index traversal logical in plan_reads; would be nice to
    1459              :     // factor that out.
    1460            0 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
    1461            0 :         self.index_entries(ctx)
    1462            0 :             .await
    1463            0 :             .map(|entries| entries.into_iter().map(|entry| entry.key).collect())
    1464            0 :     }
    1465              : }
    1466              : 
    1467              : /// A set of data associated with a delta layer key and its value
    1468              : pub struct DeltaEntry<'a> {
    1469              :     pub key: Key,
    1470              :     pub lsn: Lsn,
    1471              :     /// Size of the stored value
    1472              :     pub size: u64,
    1473              :     /// Reference to the on-disk value
    1474              :     pub val: ValueRef<'a>,
    1475              : }
    1476              : 
    1477              : /// Reference to an on-disk value
    1478              : pub struct ValueRef<'a> {
    1479              :     blob_ref: BlobRef,
    1480              :     layer: &'a DeltaLayerInner,
    1481              : }
    1482              : 
    1483              : impl ValueRef<'_> {
    1484              :     /// Loads the value from disk
    1485            0 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1486            0 :         let buf = self.load_raw(ctx).await?;
    1487            0 :         let val = Value::des(&buf)?;
    1488            0 :         Ok(val)
    1489            0 :     }
    1490              : 
    1491           16 :     async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
    1492           16 :         let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
    1493           16 :             self.layer,
    1494           16 :         )));
    1495           16 :         let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1496           16 :         Ok(buf)
    1497           16 :     }
    1498              : }
    1499              : 
    1500              : pub(crate) struct Adapter<T>(T);
    1501              : 
    1502              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1503           16 :     pub(crate) async fn read_blk(
    1504           16 :         &self,
    1505           16 :         blknum: u32,
    1506           16 :         ctx: &RequestContext,
    1507           16 :     ) -> Result<BlockLease, std::io::Error> {
    1508           16 :         let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
    1509           16 :         block_reader.read_blk(blknum, ctx).await
    1510           16 :     }
    1511              : }
    1512              : 
    1513              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1514           32 :     fn as_ref(&self) -> &DeltaLayerInner {
    1515           32 :         self
    1516           32 :     }
    1517              : }
    1518              : 
    1519              : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
    1520            0 :     fn key(&self) -> Key {
    1521            0 :         self.key
    1522            0 :     }
    1523            0 :     fn lsn(&self) -> Lsn {
    1524            0 :         self.lsn
    1525            0 :     }
    1526            0 :     fn size(&self) -> u64 {
    1527            0 :         self.size
    1528            0 :     }
    1529              : }
    1530              : 
    1531              : pub struct DeltaLayerIterator<'a> {
    1532              :     delta_layer: &'a DeltaLayerInner,
    1533              :     ctx: &'a RequestContext,
    1534              :     planner: StreamingVectoredReadPlanner,
    1535              :     index_iter: DiskBtreeIterator<'a>,
    1536              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1537              :     is_end: bool,
    1538              : }
    1539              : 
    1540              : impl DeltaLayerIterator<'_> {
    1541            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1542            0 :         self.delta_layer.layer_dbg_info()
    1543            0 :     }
    1544              : 
    1545              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1546        42812 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1547        42812 :         assert!(self.key_values_batch.is_empty());
    1548        42812 :         assert!(!self.is_end);
    1549              : 
    1550        42812 :         let plan = loop {
    1551      4199260 :             if let Some(res) = self.index_iter.next().await {
    1552      4198164 :                 let (raw_key, value) = res?;
    1553      4198164 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
    1554      4198164 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
    1555      4198164 :                 let blob_ref = BlobRef(value);
    1556      4198164 :                 let offset = blob_ref.pos();
    1557        41716 :                 if let Some(batch_plan) =
    1558      4198164 :                     self.planner.handle(key, lsn, offset, blob_ref.will_init())
    1559              :                 {
    1560        41716 :                     break batch_plan;
    1561      4156448 :                 }
    1562              :             } else {
    1563         1096 :                 self.is_end = true;
    1564         1096 :                 let data_end_offset = self.delta_layer.index_start_offset();
    1565         1096 :                 if let Some(item) = self.planner.handle_range_end(data_end_offset) {
    1566         1096 :                     break item;
    1567              :                 } else {
    1568            0 :                     return Ok(()); // TODO: test empty iterator
    1569              :                 }
    1570              :             }
    1571              :         };
    1572        42812 :         let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
    1573        42812 :         let mut next_batch = std::collections::VecDeque::new();
    1574        42812 :         let buf_size = plan.size();
    1575        42812 :         let buf = IoBufferMut::with_capacity(buf_size);
    1576        42812 :         let blobs_buf = vectored_blob_reader
    1577        42812 :             .read_blobs(&plan, buf, self.ctx)
    1578        42812 :             .await?;
    1579        42812 :         let view = BufView::new_slice(&blobs_buf.buf);
    1580      4198108 :         for meta in blobs_buf.blobs.iter() {
    1581      4198108 :             let blob_read = meta.read(&view).await?;
    1582      4198108 :             let value = Value::des(&blob_read)?;
    1583              : 
    1584      4198108 :             next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
    1585              :         }
    1586        42812 :         self.key_values_batch = next_batch;
    1587        42812 :         Ok(())
    1588        42812 :     }
    1589              : 
    1590      4199684 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1591      4199684 :         if self.key_values_batch.is_empty() {
    1592        44768 :             if self.is_end {
    1593         2124 :                 return Ok(None);
    1594        42644 :             }
    1595        42644 :             self.next_batch().await?;
    1596      4154916 :         }
    1597      4197560 :         Ok(Some(
    1598      4197560 :             self.key_values_batch
    1599      4197560 :                 .pop_front()
    1600      4197560 :                 .expect("should not be empty"),
    1601      4197560 :         ))
    1602      4199684 :     }
    1603              : }
    1604              : 
    1605              : #[cfg(test)]
    1606              : pub(crate) mod test {
    1607              :     use std::collections::BTreeMap;
    1608              : 
    1609              :     use bytes::Bytes;
    1610              :     use itertools::MinMaxResult;
    1611              :     use pageserver_api::value::Value;
    1612              :     use rand::RngCore;
    1613              :     use rand::prelude::{SeedableRng, SliceRandom, StdRng};
    1614              : 
    1615              :     use super::*;
    1616              :     use crate::DEFAULT_PG_VERSION;
    1617              :     use crate::context::DownloadBehavior;
    1618              :     use crate::task_mgr::TaskKind;
    1619              :     use crate::tenant::disk_btree::tests::TestDisk;
    1620              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
    1621              :     use crate::tenant::storage_layer::{Layer, ResidentLayer};
    1622              :     use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
    1623              :     use crate::tenant::{Tenant, Timeline};
    1624              : 
    1625              :     /// Construct an index for a fictional delta layer and and then
    1626              :     /// traverse in order to plan vectored reads for a query. Finally,
    1627              :     /// verify that the traversal fed the right index key and value
    1628              :     /// pairs into the planner.
    1629              :     #[tokio::test]
    1630            4 :     async fn test_delta_layer_index_traversal() {
    1631            4 :         let base_key = Key {
    1632            4 :             field1: 0,
    1633            4 :             field2: 1663,
    1634            4 :             field3: 12972,
    1635            4 :             field4: 16396,
    1636            4 :             field5: 0,
    1637            4 :             field6: 246080,
    1638            4 :         };
    1639            4 : 
    1640            4 :         // Populate the index with some entries
    1641            4 :         let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
    1642            4 :             (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
    1643            4 :             (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1644            4 :             (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1645            4 :             (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
    1646            4 :         ]);
    1647            4 : 
    1648            4 :         let mut disk = TestDisk::default();
    1649            4 :         let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
    1650            4 : 
    1651            4 :         let mut disk_offset = 0;
    1652           20 :         for (key, lsns) in &entries {
    1653           84 :             for lsn in lsns {
    1654           68 :                 let index_key = DeltaKey::from_key_lsn(key, *lsn);
    1655           68 :                 let blob_ref = BlobRef::new(disk_offset, false);
    1656           68 :                 writer
    1657           68 :                     .append(&index_key.0, blob_ref.0)
    1658           68 :                     .expect("In memory disk append should never fail");
    1659           68 : 
    1660           68 :                 disk_offset += 1;
    1661           68 :             }
    1662            4 :         }
    1663            4 : 
    1664            4 :         // Prepare all the arguments for the call into `plan_reads` below
    1665            4 :         let (root_offset, _writer) = writer
    1666            4 :             .finish()
    1667            4 :             .expect("In memory disk finish should never fail");
    1668            4 :         let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
    1669            4 :         let planner = VectoredReadPlanner::new(100);
    1670            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1671            4 : 
    1672            4 :         let keyspace = KeySpace {
    1673            4 :             ranges: vec![
    1674            4 :                 base_key..base_key.add(3),
    1675            4 :                 base_key.add(3)..base_key.add(100),
    1676            4 :             ],
    1677            4 :         };
    1678            4 :         let lsn_range = Lsn(2)..Lsn(40);
    1679            4 : 
    1680            4 :         // Plan and validate
    1681            4 :         let vectored_reads = DeltaLayerInner::plan_reads(
    1682            4 :             &keyspace,
    1683            4 :             lsn_range.clone(),
    1684            4 :             disk_offset,
    1685            4 :             reader,
    1686            4 :             planner,
    1687            4 :             &ctx,
    1688            4 :         )
    1689            4 :         .await
    1690            4 :         .expect("Read planning should not fail");
    1691            4 : 
    1692            4 :         validate(keyspace, lsn_range, vectored_reads, entries);
    1693            4 :     }
    1694              : 
    1695            4 :     fn validate(
    1696            4 :         keyspace: KeySpace,
    1697            4 :         lsn_range: Range<Lsn>,
    1698            4 :         vectored_reads: Vec<VectoredRead>,
    1699            4 :         index_entries: BTreeMap<Key, Vec<Lsn>>,
    1700            4 :     ) {
    1701              :         #[derive(Debug, PartialEq, Eq)]
    1702              :         struct BlobSpec {
    1703              :             key: Key,
    1704              :             lsn: Lsn,
    1705              :             at: u64,
    1706              :         }
    1707              : 
    1708            4 :         let mut planned_blobs = Vec::new();
    1709           60 :         for read in vectored_reads {
    1710           56 :             for (at, meta) in read.blobs_at.as_slice() {
    1711           56 :                 planned_blobs.push(BlobSpec {
    1712           56 :                     key: meta.key,
    1713           56 :                     lsn: meta.lsn,
    1714           56 :                     at: *at,
    1715           56 :                 });
    1716           56 :             }
    1717              :         }
    1718              : 
    1719            4 :         let mut expected_blobs = Vec::new();
    1720            4 :         let mut disk_offset = 0;
    1721           20 :         for (key, lsns) in index_entries {
    1722           84 :             for lsn in lsns {
    1723           84 :                 let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
    1724           68 :                 let lsn_included = lsn_range.contains(&lsn);
    1725           68 : 
    1726           68 :                 if key_included && lsn_included {
    1727           56 :                     expected_blobs.push(BlobSpec {
    1728           56 :                         key,
    1729           56 :                         lsn,
    1730           56 :                         at: disk_offset,
    1731           56 :                     });
    1732           56 :                 }
    1733              : 
    1734           68 :                 disk_offset += 1;
    1735              :             }
    1736              :         }
    1737              : 
    1738            4 :         assert_eq!(planned_blobs, expected_blobs);
    1739            4 :     }
    1740              : 
    1741              :     mod constants {
    1742              :         use utils::lsn::Lsn;
    1743              : 
    1744              :         /// Offset used by all lsns in this test
    1745              :         pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
    1746              :         /// Number of unique keys including in the test data
    1747              :         pub(super) const KEY_COUNT: u8 = 60;
    1748              :         /// Max number of different lsns for each key
    1749              :         pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
    1750              :         /// Possible value sizes for each key along with a probability weight
    1751              :         pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
    1752              :         /// Probability that there will be a gap between the current key and the next one (33.3%)
    1753              :         pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
    1754              :         /// The minimum size of a key range in all the generated reads
    1755              :         pub(super) const MIN_RANGE_SIZE: i128 = 10;
    1756              :         /// The number of ranges included in each vectored read
    1757              :         pub(super) const RANGES_COUNT: u8 = 2;
    1758              :         /// The number of vectored reads performed
    1759              :         pub(super) const READS_COUNT: u8 = 100;
    1760              :         /// Soft max size of a vectored read. Will be violated if we have to read keys
    1761              :         /// with values larger than the limit
    1762              :         pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
    1763              :     }
    1764              : 
    1765              :     struct Entry {
    1766              :         key: Key,
    1767              :         lsn: Lsn,
    1768              :         value: Vec<u8>,
    1769              :     }
    1770              : 
    1771            4 :     fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
    1772            4 :         let mut current_key = Key::MIN;
    1773            4 : 
    1774            4 :         let mut entries = Vec::new();
    1775          244 :         for _ in 0..constants::KEY_COUNT {
    1776          240 :             let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
    1777          240 :             let mut lsns_iter =
    1778         4520 :                 std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
    1779         4520 :                     Some(Lsn(lsn.0 + 0x08))
    1780         4520 :                 });
    1781          240 :             let mut lsns = Vec::new();
    1782         4760 :             while lsns.len() < count as usize {
    1783         4520 :                 let take = rng.gen_bool(0.5);
    1784         4520 :                 let lsn = lsns_iter.next().unwrap();
    1785         4520 :                 if take {
    1786         2224 :                     lsns.push(lsn);
    1787         2296 :                 }
    1788              :             }
    1789              : 
    1790         2464 :             for lsn in lsns {
    1791         2224 :                 let size = constants::VALUE_SIZES
    1792         6672 :                     .choose_weighted(rng, |item| item.1)
    1793         2224 :                     .unwrap()
    1794         2224 :                     .0;
    1795         2224 :                 let mut buf = vec![0; size];
    1796         2224 :                 rng.fill_bytes(&mut buf);
    1797         2224 : 
    1798         2224 :                 entries.push(Entry {
    1799         2224 :                     key: current_key,
    1800         2224 :                     lsn,
    1801         2224 :                     value: buf,
    1802         2224 :                 })
    1803              :             }
    1804              : 
    1805          240 :             let gap = constants::KEY_GAP_CHANGES
    1806          480 :                 .choose_weighted(rng, |item| item.1)
    1807          240 :                 .unwrap()
    1808          240 :                 .0;
    1809          240 :             if gap {
    1810           76 :                 current_key = current_key.add(2);
    1811          164 :             } else {
    1812          164 :                 current_key = current_key.add(1);
    1813          164 :             }
    1814              :         }
    1815              : 
    1816            4 :         entries
    1817            4 :     }
    1818              : 
    1819              :     struct EntriesMeta {
    1820              :         key_range: Range<Key>,
    1821              :         lsn_range: Range<Lsn>,
    1822              :         index: BTreeMap<(Key, Lsn), Vec<u8>>,
    1823              :     }
    1824              : 
    1825            4 :     fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
    1826         2224 :         let key_range = match entries.iter().minmax_by_key(|e| e.key) {
    1827            4 :             MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
    1828            0 :             _ => panic!("More than one entry is always expected"),
    1829              :         };
    1830              : 
    1831         2224 :         let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
    1832            4 :             MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
    1833            0 :             _ => panic!("More than one entry is always expected"),
    1834              :         };
    1835              : 
    1836            4 :         let mut index = BTreeMap::new();
    1837         2224 :         for entry in entries.iter() {
    1838         2224 :             index.insert((entry.key, entry.lsn), entry.value.clone());
    1839         2224 :         }
    1840              : 
    1841            4 :         EntriesMeta {
    1842            4 :             key_range,
    1843            4 :             lsn_range,
    1844            4 :             index,
    1845            4 :         }
    1846            4 :     }
    1847              : 
    1848          400 :     fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
    1849          400 :         let start = key_range.start.to_i128();
    1850          400 :         let end = key_range.end.to_i128();
    1851          400 : 
    1852          400 :         let mut keyspace = KeySpace::default();
    1853              : 
    1854         1200 :         for _ in 0..constants::RANGES_COUNT {
    1855          800 :             let mut range: Option<Range<Key>> = Option::default();
    1856         2488 :             while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
    1857         1688 :                 let range_start = rng.gen_range(start..end);
    1858         1688 :                 let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
    1859         1688 :                 if range_end_offset >= end {
    1860          200 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(end));
    1861         1488 :                 } else {
    1862         1488 :                     let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
    1863         1488 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
    1864         1488 :                 }
    1865              :             }
    1866          800 :             keyspace.ranges.push(range.unwrap());
    1867              :         }
    1868              : 
    1869          400 :         keyspace
    1870          400 :     }
    1871              : 
    1872              :     #[tokio::test]
    1873            4 :     async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
    1874            4 :         let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
    1875            4 :         let (tenant, ctx) = harness.load().await;
    1876            4 : 
    1877            4 :         let timeline_id = TimelineId::generate();
    1878            4 :         let timeline = tenant
    1879            4 :             .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
    1880            4 :             .await?;
    1881            4 : 
    1882            4 :         tracing::info!("Generating test data ...");
    1883            4 : 
    1884            4 :         let rng = &mut StdRng::seed_from_u64(0);
    1885            4 :         let entries = generate_entries(rng);
    1886            4 :         let entries_meta = get_entries_meta(&entries);
    1887            4 : 
    1888            4 :         tracing::info!("Done generating {} entries", entries.len());
    1889            4 : 
    1890            4 :         tracing::info!("Writing test data to delta layer ...");
    1891            4 :         let mut writer = DeltaLayerWriter::new(
    1892            4 :             harness.conf,
    1893            4 :             timeline_id,
    1894            4 :             harness.tenant_shard_id,
    1895            4 :             entries_meta.key_range.start,
    1896            4 :             entries_meta.lsn_range.clone(),
    1897            4 :             &timeline.gate,
    1898            4 :             timeline.cancel.clone(),
    1899            4 :             &ctx,
    1900            4 :         )
    1901            4 :         .await?;
    1902            4 : 
    1903         2228 :         for entry in entries {
    1904         2224 :             let (_, res) = writer
    1905         2224 :                 .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
    1906         2224 :                 .await;
    1907         2224 :             res?;
    1908            4 :         }
    1909            4 : 
    1910            4 :         let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
    1911            4 :         let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
    1912            4 : 
    1913            4 :         let inner = resident.get_as_delta(&ctx).await?;
    1914            4 : 
    1915            4 :         let file_size = inner.file.metadata().await?.len();
    1916            4 :         tracing::info!(
    1917            4 :             "Done writing test data to delta layer. Resulting file size is: {}",
    1918            4 :             file_size
    1919            4 :         );
    1920            4 : 
    1921          404 :         for i in 0..constants::READS_COUNT {
    1922          400 :             tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
    1923            4 : 
    1924          400 :             let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
    1925          400 :             let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1926          400 :                 inner.index_start_blk,
    1927          400 :                 inner.index_root_blk,
    1928          400 :                 block_reader,
    1929          400 :             );
    1930          400 : 
    1931          400 :             let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
    1932          400 :             let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
    1933          400 :             let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
    1934            4 : 
    1935          400 :             let vectored_reads = DeltaLayerInner::plan_reads(
    1936          400 :                 &keyspace,
    1937          400 :                 entries_meta.lsn_range.clone(),
    1938          400 :                 data_end_offset,
    1939          400 :                 index_reader,
    1940          400 :                 planner,
    1941          400 :                 &ctx,
    1942          400 :             )
    1943          400 :             .await?;
    1944            4 : 
    1945          400 :             let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
    1946          400 :             let buf_size = DeltaLayerInner::get_min_read_buffer_size(
    1947          400 :                 &vectored_reads,
    1948          400 :                 constants::MAX_VECTORED_READ_BYTES,
    1949          400 :             );
    1950          400 :             let mut buf = Some(IoBufferMut::with_capacity(buf_size));
    1951            4 : 
    1952        39848 :             for read in vectored_reads {
    1953        39448 :                 let blobs_buf = vectored_blob_reader
    1954        39448 :                     .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
    1955        39448 :                     .await?;
    1956        39448 :                 let view = BufView::new_slice(&blobs_buf.buf);
    1957       114608 :                 for meta in blobs_buf.blobs.iter() {
    1958       114608 :                     let value = meta.read(&view).await?;
    1959       114608 :                     assert_eq!(
    1960       114608 :                         &value[..],
    1961       114608 :                         &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
    1962       114608 :                     );
    1963            4 :                 }
    1964            4 : 
    1965        39448 :                 buf = Some(blobs_buf.buf);
    1966            4 :             }
    1967            4 :         }
    1968            4 : 
    1969            4 :         Ok(())
    1970            4 :     }
    1971              : 
    1972              :     #[tokio::test]
    1973            4 :     async fn copy_delta_prefix_smoke() {
    1974            4 :         use bytes::Bytes;
    1975            4 :         use pageserver_api::record::NeonWalRecord;
    1976            4 : 
    1977            4 :         let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
    1978            4 :             .await
    1979            4 :             .unwrap();
    1980            4 :         let (tenant, ctx) = h.load().await;
    1981            4 :         let ctx = &ctx;
    1982            4 :         let timeline = tenant
    1983            4 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
    1984            4 :             .await
    1985            4 :             .unwrap();
    1986            4 :         let ctx = &ctx.with_scope_timeline(&timeline);
    1987            4 : 
    1988            4 :         let initdb_layer = timeline
    1989            4 :             .layers
    1990            4 :             .read()
    1991            4 :             .await
    1992            4 :             .likely_resident_layers()
    1993            4 :             .next()
    1994            4 :             .cloned()
    1995            4 :             .unwrap();
    1996            4 : 
    1997            4 :         {
    1998            4 :             let mut writer = timeline.writer().await;
    1999            4 : 
    2000            4 :             let data = [
    2001            4 :                 (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
    2002            4 :                 (
    2003            4 :                     0x30,
    2004            4 :                     12,
    2005            4 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2006            4 :                         will_init: false,
    2007            4 :                         rec: Bytes::from_static(b"1"),
    2008            4 :                     }),
    2009            4 :                 ),
    2010            4 :                 (
    2011            4 :                     0x40,
    2012            4 :                     12,
    2013            4 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2014            4 :                         will_init: true,
    2015            4 :                         rec: Bytes::from_static(b"2"),
    2016            4 :                     }),
    2017            4 :                 ),
    2018            4 :                 // build an oversized value so we cannot extend and existing read over
    2019            4 :                 // this
    2020            4 :                 (
    2021            4 :                     0x50,
    2022            4 :                     12,
    2023            4 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2024            4 :                         will_init: true,
    2025            4 :                         rec: {
    2026            4 :                             let mut buf =
    2027            4 :                                 vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
    2028            4 :                             buf.iter_mut()
    2029            4 :                                 .enumerate()
    2030       536576 :                                 .for_each(|(i, slot)| *slot = (i % 256) as u8);
    2031            4 :                             Bytes::from(buf)
    2032            4 :                         },
    2033            4 :                     }),
    2034            4 :                 ),
    2035            4 :                 // because the oversized read cannot be extended further, we are sure to exercise the
    2036            4 :                 // builder created on the last round with this:
    2037            4 :                 (
    2038            4 :                     0x60,
    2039            4 :                     12,
    2040            4 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2041            4 :                         will_init: true,
    2042            4 :                         rec: Bytes::from_static(b"3"),
    2043            4 :                     }),
    2044            4 :                 ),
    2045            4 :                 (
    2046            4 :                     0x60,
    2047            4 :                     9,
    2048            4 :                     Value::Image(Bytes::from_static(b"something for a different key")),
    2049            4 :                 ),
    2050            4 :             ];
    2051            4 : 
    2052            4 :             let mut last_lsn = None;
    2053            4 : 
    2054           28 :             for (lsn, key, value) in data {
    2055           24 :                 let key = Key::from_i128(key);
    2056           24 :                 writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
    2057           24 :                 last_lsn = Some(lsn);
    2058            4 :             }
    2059            4 : 
    2060            4 :             writer.finish_write(Lsn(last_lsn.unwrap()));
    2061            4 :         }
    2062            4 :         timeline.freeze_and_flush().await.unwrap();
    2063            4 : 
    2064            4 :         let new_layer = timeline
    2065            4 :             .layers
    2066            4 :             .read()
    2067            4 :             .await
    2068            4 :             .likely_resident_layers()
    2069            6 :             .find(|&x| x != &initdb_layer)
    2070            4 :             .cloned()
    2071            4 :             .unwrap();
    2072            4 : 
    2073            4 :         // create a copy for the timeline, so we don't overwrite the file
    2074            4 :         let branch = tenant
    2075            4 :             .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
    2076            4 :             .await
    2077            4 :             .unwrap();
    2078            4 : 
    2079            4 :         assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
    2080            4 : 
    2081            4 :         // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
    2082            4 :         // a single key
    2083            4 : 
    2084           24 :         for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
    2085           20 :             let truncate_at = Lsn(truncate_at);
    2086            4 : 
    2087           20 :             let mut writer = DeltaLayerWriter::new(
    2088           20 :                 tenant.conf,
    2089           20 :                 branch.timeline_id,
    2090           20 :                 tenant.tenant_shard_id,
    2091           20 :                 Key::MIN,
    2092           20 :                 Lsn(0x11)..truncate_at,
    2093           20 :                 &branch.gate,
    2094           20 :                 branch.cancel.clone(),
    2095           20 :                 ctx,
    2096           20 :             )
    2097           20 :             .await
    2098           20 :             .unwrap();
    2099            4 : 
    2100           20 :             let new_layer = new_layer.download_and_keep_resident(ctx).await.unwrap();
    2101           20 : 
    2102           20 :             new_layer
    2103           20 :                 .copy_delta_prefix(&mut writer, truncate_at, ctx)
    2104           20 :                 .await
    2105           20 :                 .unwrap();
    2106            4 : 
    2107           20 :             let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
    2108           20 :             let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
    2109           20 : 
    2110           20 :             copied_layer.get_as_delta(ctx).await.unwrap();
    2111           20 : 
    2112           20 :             assert_keys_and_values_eq(
    2113           20 :                 new_layer.get_as_delta(ctx).await.unwrap(),
    2114           20 :                 copied_layer.get_as_delta(ctx).await.unwrap(),
    2115           20 :                 truncate_at,
    2116           20 :                 ctx,
    2117           20 :             )
    2118           20 :             .await;
    2119            4 :         }
    2120            4 :     }
    2121              : 
    2122           20 :     async fn assert_keys_and_values_eq(
    2123           20 :         source: &DeltaLayerInner,
    2124           20 :         truncated: &DeltaLayerInner,
    2125           20 :         truncated_at: Lsn,
    2126           20 :         ctx: &RequestContext,
    2127           20 :     ) {
    2128              :         use futures::future::ready;
    2129              :         use futures::stream::TryStreamExt;
    2130              : 
    2131           20 :         let start_key = [0u8; DELTA_KEY_SIZE];
    2132           20 : 
    2133           20 :         let source_reader = FileBlockReader::new(&source.file, source.file_id);
    2134           20 :         let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2135           20 :             source.index_start_blk,
    2136           20 :             source.index_root_blk,
    2137           20 :             &source_reader,
    2138           20 :         );
    2139           20 :         let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
    2140          120 :         let source_stream = source_stream.filter(|res| match res {
    2141          120 :             Ok((_, lsn, _)) => ready(lsn < &truncated_at),
    2142            0 :             _ => ready(true),
    2143          120 :         });
    2144           20 :         let mut source_stream = std::pin::pin!(source_stream);
    2145           20 : 
    2146           20 :         let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
    2147           20 :         let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2148           20 :             truncated.index_start_blk,
    2149           20 :             truncated.index_root_blk,
    2150           20 :             &truncated_reader,
    2151           20 :         );
    2152           20 :         let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
    2153           20 :         let mut truncated_stream = std::pin::pin!(truncated_stream);
    2154           20 : 
    2155           20 :         let mut scratch_left = Vec::new();
    2156           20 :         let mut scratch_right = Vec::new();
    2157              : 
    2158              :         loop {
    2159           84 :             let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
    2160           84 :             let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
    2161           84 : 
    2162           84 :             if src.is_none() {
    2163           20 :                 assert!(truncated.is_none());
    2164           20 :                 break;
    2165           64 :             }
    2166           64 : 
    2167           64 :             let (src, truncated) = (src.unwrap(), truncated.unwrap());
    2168           64 : 
    2169           64 :             // because we've filtered the source with Lsn, we should always have the same keys from both.
    2170           64 :             assert_eq!(src.0, truncated.0);
    2171           64 :             assert_eq!(src.1, truncated.1);
    2172              : 
    2173              :             // if this is needed for something else, just drop this assert.
    2174           64 :             assert!(
    2175           64 :                 src.2.pos() >= truncated.2.pos(),
    2176            0 :                 "value position should not go backwards {} vs. {}",
    2177            0 :                 src.2.pos(),
    2178            0 :                 truncated.2.pos()
    2179              :             );
    2180              : 
    2181           64 :             scratch_left.clear();
    2182           64 :             let src_cursor = source_reader.block_cursor();
    2183           64 :             let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
    2184           64 :             scratch_right.clear();
    2185           64 :             let trunc_cursor = truncated_reader.block_cursor();
    2186           64 :             let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
    2187           64 : 
    2188           64 :             tokio::try_join!(left, right).unwrap();
    2189           64 : 
    2190           64 :             assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
    2191              :         }
    2192           20 :     }
    2193              : 
    2194        36448 :     pub(crate) fn sort_delta(
    2195        36448 :         (k1, l1, _): &(Key, Lsn, Value),
    2196        36448 :         (k2, l2, _): &(Key, Lsn, Value),
    2197        36448 :     ) -> std::cmp::Ordering {
    2198        36448 :         (k1, l1).cmp(&(k2, l2))
    2199        36448 :     }
    2200              : 
    2201              :     #[cfg(feature = "testing")]
    2202          188 :     pub(crate) fn sort_delta_value(
    2203          188 :         (k1, l1, v1): &(Key, Lsn, Value),
    2204          188 :         (k2, l2, v2): &(Key, Lsn, Value),
    2205          188 :     ) -> std::cmp::Ordering {
    2206          188 :         let order_1 = if v1.is_image() { 0 } else { 1 };
    2207          188 :         let order_2 = if v2.is_image() { 0 } else { 1 };
    2208          188 :         (k1, l1, order_1).cmp(&(k2, l2, order_2))
    2209          188 :     }
    2210              : 
    2211           44 :     pub(crate) async fn produce_delta_layer(
    2212           44 :         tenant: &Tenant,
    2213           44 :         tline: &Arc<Timeline>,
    2214           44 :         mut deltas: Vec<(Key, Lsn, Value)>,
    2215           44 :         ctx: &RequestContext,
    2216           44 :     ) -> anyhow::Result<ResidentLayer> {
    2217           44 :         deltas.sort_by(sort_delta);
    2218           44 :         let (key_start, _, _) = deltas.first().unwrap();
    2219           44 :         let (key_max, _, _) = deltas.last().unwrap();
    2220        16480 :         let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
    2221        16480 :         let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
    2222           44 :         let lsn_end = Lsn(lsn_max.0 + 1);
    2223           44 :         let mut writer = DeltaLayerWriter::new(
    2224           44 :             tenant.conf,
    2225           44 :             tline.timeline_id,
    2226           44 :             tenant.tenant_shard_id,
    2227           44 :             *key_start,
    2228           44 :             (*lsn_min)..lsn_end,
    2229           44 :             &tline.gate,
    2230           44 :             tline.cancel.clone(),
    2231           44 :             ctx,
    2232           44 :         )
    2233           44 :         .await?;
    2234           44 :         let key_end = key_max.next();
    2235              : 
    2236        16524 :         for (key, lsn, value) in deltas {
    2237        16480 :             writer.put_value(key, lsn, value, ctx).await?;
    2238              :         }
    2239              : 
    2240           44 :         let (desc, path) = writer.finish(key_end, ctx).await?;
    2241           44 :         let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    2242              : 
    2243           44 :         Ok::<_, anyhow::Error>(delta_layer)
    2244           44 :     }
    2245              : 
    2246           56 :     async fn assert_delta_iter_equal(
    2247           56 :         delta_iter: &mut DeltaLayerIterator<'_>,
    2248           56 :         expect: &[(Key, Lsn, Value)],
    2249           56 :     ) {
    2250           56 :         let mut expect_iter = expect.iter();
    2251              :         loop {
    2252        56056 :             let o1 = delta_iter.next().await.unwrap();
    2253        56056 :             let o2 = expect_iter.next();
    2254        56056 :             assert_eq!(o1.is_some(), o2.is_some());
    2255        56056 :             if o1.is_none() && o2.is_none() {
    2256           56 :                 break;
    2257        56000 :             }
    2258        56000 :             let (k1, l1, v1) = o1.unwrap();
    2259        56000 :             let (k2, l2, v2) = o2.unwrap();
    2260        56000 :             assert_eq!(&k1, k2);
    2261        56000 :             assert_eq!(l1, *l2);
    2262        56000 :             assert_eq!(&v1, v2);
    2263              :         }
    2264           56 :     }
    2265              : 
    2266              :     #[tokio::test]
    2267            4 :     async fn delta_layer_iterator() {
    2268            4 :         let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
    2269            4 :         let (tenant, ctx) = harness.load().await;
    2270            4 : 
    2271            4 :         let tline = tenant
    2272            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2273            4 :             .await
    2274            4 :             .unwrap();
    2275            4 : 
    2276         4000 :         fn get_key(id: u32) -> Key {
    2277         4000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    2278         4000 :             key.field6 = id;
    2279         4000 :             key
    2280         4000 :         }
    2281            4 :         const N: usize = 1000;
    2282            4 :         let test_deltas = (0..N)
    2283         4000 :             .map(|idx| {
    2284         4000 :                 (
    2285         4000 :                     get_key(idx as u32 / 10),
    2286         4000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
    2287         4000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
    2288         4000 :                 )
    2289         4000 :             })
    2290            4 :             .collect_vec();
    2291            4 :         let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
    2292            4 :             .await
    2293            4 :             .unwrap();
    2294            4 :         let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
    2295           12 :         for max_read_size in [1, 1024] {
    2296           64 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    2297           56 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    2298           56 :                 // Test if the batch size is correctly determined
    2299           56 :                 let mut iter = delta_layer.iter(&ctx);
    2300           56 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    2301           56 :                 let mut num_items = 0;
    2302          224 :                 for _ in 0..3 {
    2303          168 :                     iter.next_batch().await.unwrap();
    2304          168 :                     num_items += iter.key_values_batch.len();
    2305          168 :                     if max_read_size == 1 {
    2306            4 :                         // every key should be a batch b/c the value is larger than max_read_size
    2307           84 :                         assert_eq!(iter.key_values_batch.len(), 1);
    2308            4 :                     } else {
    2309           84 :                         assert!(iter.key_values_batch.len() <= batch_size);
    2310            4 :                     }
    2311          168 :                     if num_items >= N {
    2312            4 :                         break;
    2313          168 :                     }
    2314          168 :                     iter.key_values_batch.clear();
    2315            4 :                 }
    2316            4 :                 // Test if the result is correct
    2317           56 :                 let mut iter = delta_layer.iter(&ctx);
    2318           56 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    2319           56 :                 assert_delta_iter_equal(&mut iter, &test_deltas).await;
    2320            4 :             }
    2321            4 :         }
    2322            4 :     }
    2323              : }
        

Generated by: LCOV version 2.1-beta