LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 85.9 % 1613 1386
Test Date: 2024-09-24 13:57:57 Functions: 71.8 % 163 117

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "values", and
      24              : //! "index". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache::{self, FileId, PAGE_SZ};
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::BlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{
      37              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      38              : };
      39              : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
      40              : use crate::tenant::timeline::GetVectoredError;
      41              : use crate::tenant::vectored_blob_io::{
      42              :     BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      43              :     VectoredReadCoalesceMode, VectoredReadPlanner,
      44              : };
      45              : use crate::tenant::PageReconstructError;
      46              : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
      47              : use crate::virtual_file::{self, VirtualFile};
      48              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      49              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      50              : use anyhow::{anyhow, bail, ensure, Context, Result};
      51              : use bytes::BytesMut;
      52              : use camino::{Utf8Path, Utf8PathBuf};
      53              : use futures::StreamExt;
      54              : use itertools::Itertools;
      55              : use pageserver_api::config::MaxVectoredReadBytes;
      56              : use pageserver_api::keyspace::KeySpace;
      57              : use pageserver_api::models::ImageCompressionAlgorithm;
      58              : use pageserver_api::shard::TenantShardId;
      59              : use rand::{distributions::Alphanumeric, Rng};
      60              : use serde::{Deserialize, Serialize};
      61              : use std::collections::VecDeque;
      62              : use std::fs::File;
      63              : use std::io::SeekFrom;
      64              : use std::ops::Range;
      65              : use std::os::unix::fs::FileExt;
      66              : use std::str::FromStr;
      67              : use std::sync::Arc;
      68              : use tokio::sync::OnceCell;
      69              : use tokio_epoll_uring::IoBuf;
      70              : use tracing::*;
      71              : 
      72              : use utils::{
      73              :     bin_ser::BeSer,
      74              :     id::{TenantId, TimelineId},
      75              :     lsn::Lsn,
      76              : };
      77              : 
      78              : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
      79              : 
      80              : ///
      81              : /// Header stored in the beginning of the file
      82              : ///
      83              : /// After this comes the 'values' part, starting on block 1. After that,
      84              : /// the 'index' starts at the block indicated by 'index_start_blk'
      85              : ///
      86         3138 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      87              : pub struct Summary {
      88              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      89              :     pub magic: u16,
      90              :     pub format_version: u16,
      91              : 
      92              :     pub tenant_id: TenantId,
      93              :     pub timeline_id: TimelineId,
      94              :     pub key_range: Range<Key>,
      95              :     pub lsn_range: Range<Lsn>,
      96              : 
      97              :     /// Block number where the 'index' part of the file begins.
      98              :     pub index_start_blk: u32,
      99              :     /// Block within the 'index', where the B-tree root page is stored
     100              :     pub index_root_blk: u32,
     101              : }
     102              : 
     103              : impl From<&DeltaLayer> for Summary {
     104            0 :     fn from(layer: &DeltaLayer) -> Self {
     105            0 :         Self::expected(
     106            0 :             layer.desc.tenant_shard_id.tenant_id,
     107            0 :             layer.desc.timeline_id,
     108            0 :             layer.desc.key_range.clone(),
     109            0 :             layer.desc.lsn_range.clone(),
     110            0 :         )
     111            0 :     }
     112              : }
     113              : 
     114              : impl Summary {
     115         3138 :     pub(super) fn expected(
     116         3138 :         tenant_id: TenantId,
     117         3138 :         timeline_id: TimelineId,
     118         3138 :         keys: Range<Key>,
     119         3138 :         lsns: Range<Lsn>,
     120         3138 :     ) -> Self {
     121         3138 :         Self {
     122         3138 :             magic: DELTA_FILE_MAGIC,
     123         3138 :             format_version: STORAGE_FORMAT_VERSION,
     124         3138 : 
     125         3138 :             tenant_id,
     126         3138 :             timeline_id,
     127         3138 :             key_range: keys,
     128         3138 :             lsn_range: lsns,
     129         3138 : 
     130         3138 :             index_start_blk: 0,
     131         3138 :             index_root_blk: 0,
     132         3138 :         }
     133         3138 :     }
     134              : }
     135              : 
     136              : // Flag indicating that this version initialize the page
     137              : const WILL_INIT: u64 = 1;
     138              : 
     139              : /// Struct representing reference to BLOB in layers.
     140              : ///
     141              : /// Reference contains BLOB offset, and for WAL records it also contains
     142              : /// `will_init` flag. The flag helps to determine the range of records
     143              : /// that needs to be applied, without reading/deserializing records themselves.
     144            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     145              : pub struct BlobRef(pub u64);
     146              : 
     147              : impl BlobRef {
     148      1530138 :     pub fn will_init(&self) -> bool {
     149      1530138 :         (self.0 & WILL_INIT) != 0
     150      1530138 :     }
     151              : 
     152     14238648 :     pub fn pos(&self) -> u64 {
     153     14238648 :         self.0 >> 1
     154     14238648 :     }
     155              : 
     156     19413966 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     157     19413966 :         let mut blob_ref = pos << 1;
     158     19413966 :         if will_init {
     159     19409610 :             blob_ref |= WILL_INIT;
     160     19409610 :         }
     161     19413966 :         BlobRef(blob_ref)
     162     19413966 :     }
     163              : }
     164              : 
     165              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     166              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     167              : 
     168              : /// This is the key of the B-tree index stored in the delta layer. It consists
     169              : /// of the serialized representation of a Key and LSN.
     170              : impl DeltaKey {
     171      6192594 :     fn from_slice(buf: &[u8]) -> Self {
     172      6192594 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     173      6192594 :         bytes.copy_from_slice(buf);
     174      6192594 :         DeltaKey(bytes)
     175      6192594 :     }
     176              : 
     177     20153029 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     178     20153029 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     179     20153029 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     180     20153029 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     181     20153029 :         DeltaKey(bytes)
     182     20153029 :     }
     183              : 
     184      6192594 :     fn key(&self) -> Key {
     185      6192594 :         Key::from_slice(&self.0)
     186      6192594 :     }
     187              : 
     188      6192594 :     fn lsn(&self) -> Lsn {
     189      6192594 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     190      6192594 :     }
     191              : 
     192      8045874 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     193      8045874 :         let mut lsn_buf = [0u8; 8];
     194      8045874 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     195      8045874 :         Lsn(u64::from_be_bytes(lsn_buf))
     196      8045874 :     }
     197              : }
     198              : 
     199              : /// This is used only from `pagectl`. Within pageserver, all layers are
     200              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     201              : pub struct DeltaLayer {
     202              :     path: Utf8PathBuf,
     203              :     pub desc: PersistentLayerDesc,
     204              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     205              : }
     206              : 
     207              : impl std::fmt::Debug for DeltaLayer {
     208            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     209              :         use super::RangeDisplayDebug;
     210              : 
     211            0 :         f.debug_struct("DeltaLayer")
     212            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     213            0 :             .field("lsn_range", &self.desc.lsn_range)
     214            0 :             .field("file_size", &self.desc.file_size)
     215            0 :             .field("inner", &self.inner)
     216            0 :             .finish()
     217            0 :     }
     218              : }
     219              : 
     220              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     221              : /// file.
     222              : pub struct DeltaLayerInner {
     223              :     // values copied from summary
     224              :     index_start_blk: u32,
     225              :     index_root_blk: u32,
     226              : 
     227              :     file: VirtualFile,
     228              :     file_id: FileId,
     229              : 
     230              :     layer_key_range: Range<Key>,
     231              :     layer_lsn_range: Range<Lsn>,
     232              : 
     233              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     234              : }
     235              : 
     236              : impl DeltaLayerInner {
     237            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     238            0 :         format!(
     239            0 :             "delta {}..{} {}..{}",
     240            0 :             self.key_range().start,
     241            0 :             self.key_range().end,
     242            0 :             self.lsn_range().start,
     243            0 :             self.lsn_range().end
     244            0 :         )
     245            0 :     }
     246              : }
     247              : 
     248              : impl std::fmt::Debug for DeltaLayerInner {
     249            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     250            0 :         f.debug_struct("DeltaLayerInner")
     251            0 :             .field("index_start_blk", &self.index_start_blk)
     252            0 :             .field("index_root_blk", &self.index_root_blk)
     253            0 :             .finish()
     254            0 :     }
     255              : }
     256              : 
     257              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     258              : impl std::fmt::Display for DeltaLayer {
     259            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     260            0 :         write!(f, "{}", self.layer_desc().short_id())
     261            0 :     }
     262              : }
     263              : 
     264              : impl AsLayerDesc for DeltaLayer {
     265            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     266            0 :         &self.desc
     267            0 :     }
     268              : }
     269              : 
     270              : impl DeltaLayer {
     271            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     272            0 :         self.desc.dump();
     273            0 : 
     274            0 :         if !verbose {
     275            0 :             return Ok(());
     276            0 :         }
     277              : 
     278            0 :         let inner = self.load(ctx).await?;
     279              : 
     280            0 :         inner.dump(ctx).await
     281            0 :     }
     282              : 
     283         4248 :     fn temp_path_for(
     284         4248 :         conf: &PageServerConf,
     285         4248 :         tenant_shard_id: &TenantShardId,
     286         4248 :         timeline_id: &TimelineId,
     287         4248 :         key_start: Key,
     288         4248 :         lsn_range: &Range<Lsn>,
     289         4248 :     ) -> Utf8PathBuf {
     290         4248 :         let rand_string: String = rand::thread_rng()
     291         4248 :             .sample_iter(&Alphanumeric)
     292         4248 :             .take(8)
     293         4248 :             .map(char::from)
     294         4248 :             .collect();
     295         4248 : 
     296         4248 :         conf.timeline_path(tenant_shard_id, timeline_id)
     297         4248 :             .join(format!(
     298         4248 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     299         4248 :                 key_start,
     300         4248 :                 u64::from(lsn_range.start),
     301         4248 :                 u64::from(lsn_range.end),
     302         4248 :                 rand_string,
     303         4248 :                 TEMP_FILE_SUFFIX,
     304         4248 :             ))
     305         4248 :     }
     306              : 
     307              :     ///
     308              :     /// Open the underlying file and read the metadata into memory, if it's
     309              :     /// not loaded already.
     310              :     ///
     311            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
     312            0 :         // Quick exit if already loaded
     313            0 :         self.inner
     314            0 :             .get_or_try_init(|| self.load_inner(ctx))
     315            0 :             .await
     316            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     317            0 :     }
     318              : 
     319            0 :     async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
     320            0 :         let path = self.path();
     321              : 
     322            0 :         let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
     323              : 
     324              :         // not production code
     325            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     326            0 :         let expected_layer_name = self.layer_desc().layer_name();
     327            0 : 
     328            0 :         if actual_layer_name != expected_layer_name {
     329            0 :             println!("warning: filename does not match what is expected from in-file summary");
     330            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     331            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     332            0 :         }
     333              : 
     334            0 :         Ok(Arc::new(loaded))
     335            0 :     }
     336              : 
     337              :     /// Create a DeltaLayer struct representing an existing file on disk.
     338              :     ///
     339              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     340            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     341            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     342            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     343            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     344              : 
     345            0 :         let metadata = file
     346            0 :             .metadata()
     347            0 :             .context("get file metadata to determine size")?;
     348              : 
     349              :         // This function is never used for constructing layers in a running pageserver,
     350              :         // so it does not need an accurate TenantShardId.
     351            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     352            0 : 
     353            0 :         Ok(DeltaLayer {
     354            0 :             path: path.to_path_buf(),
     355            0 :             desc: PersistentLayerDesc::new_delta(
     356            0 :                 tenant_shard_id,
     357            0 :                 summary.timeline_id,
     358            0 :                 summary.key_range,
     359            0 :                 summary.lsn_range,
     360            0 :                 metadata.len(),
     361            0 :             ),
     362            0 :             inner: OnceCell::new(),
     363            0 :         })
     364            0 :     }
     365              : 
     366              :     /// Path to the layer file in pageserver workdir.
     367            0 :     fn path(&self) -> Utf8PathBuf {
     368            0 :         self.path.clone()
     369            0 :     }
     370              : }
     371              : 
     372              : /// A builder object for constructing a new delta layer.
     373              : ///
     374              : /// Usage:
     375              : ///
     376              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     377              : ///
     378              : /// 2. Write the contents by calling `put_value` for every page
     379              : ///    version to store in the layer.
     380              : ///
     381              : /// 3. Call `finish`.
     382              : ///
     383              : struct DeltaLayerWriterInner {
     384              :     pub path: Utf8PathBuf,
     385              :     timeline_id: TimelineId,
     386              :     tenant_shard_id: TenantShardId,
     387              : 
     388              :     key_start: Key,
     389              :     lsn_range: Range<Lsn>,
     390              : 
     391              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     392              : 
     393              :     blob_writer: BlobWriter<true>,
     394              : 
     395              :     // Number of key-lsns in the layer.
     396              :     num_keys: usize,
     397              : }
     398              : 
     399              : impl DeltaLayerWriterInner {
     400              :     ///
     401              :     /// Start building a new delta layer.
     402              :     ///
     403         4248 :     async fn new(
     404         4248 :         conf: &'static PageServerConf,
     405         4248 :         timeline_id: TimelineId,
     406         4248 :         tenant_shard_id: TenantShardId,
     407         4248 :         key_start: Key,
     408         4248 :         lsn_range: Range<Lsn>,
     409         4248 :         ctx: &RequestContext,
     410         4248 :     ) -> anyhow::Result<Self> {
     411         4248 :         // Create the file initially with a temporary filename. We don't know
     412         4248 :         // the end key yet, so we cannot form the final filename yet. We will
     413         4248 :         // rename it when we're done.
     414         4248 :         //
     415         4248 :         // Note: This overwrites any existing file. There shouldn't be any.
     416         4248 :         // FIXME: throw an error instead?
     417         4248 :         let path =
     418         4248 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     419              : 
     420         4248 :         let mut file = VirtualFile::create(&path, ctx).await?;
     421              :         // make room for the header block
     422         4248 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     423         4248 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     424         4248 : 
     425         4248 :         // Initialize the b-tree index builder
     426         4248 :         let block_buf = BlockBuf::new();
     427         4248 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     428         4248 : 
     429         4248 :         Ok(Self {
     430         4248 :             path,
     431         4248 :             timeline_id,
     432         4248 :             tenant_shard_id,
     433         4248 :             key_start,
     434         4248 :             lsn_range,
     435         4248 :             tree: tree_builder,
     436         4248 :             blob_writer,
     437         4248 :             num_keys: 0,
     438         4248 :         })
     439         4248 :     }
     440              : 
     441              :     ///
     442              :     /// Append a key-value pair to the file.
     443              :     ///
     444              :     /// The values must be appended in key, lsn order.
     445              :     ///
     446      6253674 :     async fn put_value(
     447      6253674 :         &mut self,
     448      6253674 :         key: Key,
     449      6253674 :         lsn: Lsn,
     450      6253674 :         val: Value,
     451      6253674 :         ctx: &RequestContext,
     452      6253674 :     ) -> anyhow::Result<()> {
     453      6253674 :         let (_, res) = self
     454      6253674 :             .put_value_bytes(
     455      6253674 :                 key,
     456      6253674 :                 lsn,
     457      6253674 :                 Value::ser(&val)?.slice_len(),
     458      6253674 :                 val.will_init(),
     459      6253674 :                 ctx,
     460              :             )
     461         5946 :             .await;
     462      6253674 :         res
     463      6253674 :     }
     464              : 
     465     19413654 :     async fn put_value_bytes<Buf>(
     466     19413654 :         &mut self,
     467     19413654 :         key: Key,
     468     19413654 :         lsn: Lsn,
     469     19413654 :         val: FullSlice<Buf>,
     470     19413654 :         will_init: bool,
     471     19413654 :         ctx: &RequestContext,
     472     19413654 :     ) -> (FullSlice<Buf>, anyhow::Result<()>)
     473     19413654 :     where
     474     19413654 :         Buf: IoBuf + Send,
     475     19413654 :     {
     476     19413654 :         assert!(
     477     19413654 :             self.lsn_range.start <= lsn,
     478            0 :             "lsn_start={}, lsn={}",
     479              :             self.lsn_range.start,
     480              :             lsn
     481              :         );
     482              :         // We don't want to use compression in delta layer creation
     483     19413654 :         let compression = ImageCompressionAlgorithm::Disabled;
     484     19413654 :         let (val, res) = self
     485     19413654 :             .blob_writer
     486     19413654 :             .write_blob_maybe_compressed(val, ctx, compression)
     487        14793 :             .await;
     488     19413654 :         let off = match res {
     489     19413654 :             Ok((off, _)) => off,
     490            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     491              :         };
     492              : 
     493     19413654 :         let blob_ref = BlobRef::new(off, will_init);
     494     19413654 : 
     495     19413654 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     496     19413654 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     497     19413654 : 
     498     19413654 :         self.num_keys += 1;
     499     19413654 : 
     500     19413654 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     501     19413654 :     }
     502              : 
     503      6071916 :     fn size(&self) -> u64 {
     504      6071916 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     505      6071916 :     }
     506              : 
     507              :     ///
     508              :     /// Finish writing the delta layer.
     509              :     ///
     510         4212 :     async fn finish(
     511         4212 :         self,
     512         4212 :         key_end: Key,
     513         4212 :         ctx: &RequestContext,
     514         4212 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     515         4212 :         let temp_path = self.path.clone();
     516        29007 :         let result = self.finish0(key_end, ctx).await;
     517         4212 :         if result.is_err() {
     518            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
     519            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     520            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     521            0 :             }
     522         4212 :         }
     523         4212 :         result
     524         4212 :     }
     525              : 
     526         4212 :     async fn finish0(
     527         4212 :         self,
     528         4212 :         key_end: Key,
     529         4212 :         ctx: &RequestContext,
     530         4212 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     531         4212 :         let index_start_blk =
     532         4212 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     533              : 
     534         4212 :         let mut file = self.blob_writer.into_inner(ctx).await?;
     535              : 
     536              :         // Write out the index
     537         4212 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     538         4212 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     539            0 :             .await?;
     540        45063 :         for buf in block_buf.blocks {
     541        40851 :             let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     542        40851 :             res?;
     543              :         }
     544         4212 :         assert!(self.lsn_range.start < self.lsn_range.end);
     545              :         // Fill in the summary on blk 0
     546         4212 :         let summary = Summary {
     547         4212 :             magic: DELTA_FILE_MAGIC,
     548         4212 :             format_version: STORAGE_FORMAT_VERSION,
     549         4212 :             tenant_id: self.tenant_shard_id.tenant_id,
     550         4212 :             timeline_id: self.timeline_id,
     551         4212 :             key_range: self.key_start..key_end,
     552         4212 :             lsn_range: self.lsn_range.clone(),
     553         4212 :             index_start_blk,
     554         4212 :             index_root_blk,
     555         4212 :         };
     556         4212 : 
     557         4212 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     558         4212 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     559         4212 :         Summary::ser_into(&summary, &mut buf)?;
     560         4212 :         file.seek(SeekFrom::Start(0)).await?;
     561         4212 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     562         4212 :         res?;
     563              : 
     564         4212 :         let metadata = file
     565         4212 :             .metadata()
     566         2112 :             .await
     567         4212 :             .context("get file metadata to determine size")?;
     568              : 
     569              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     570              :         // Make it a little bit below to account for differing GB units
     571              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     572         4212 :         ensure!(
     573         4212 :             metadata.len() <= S3_UPLOAD_LIMIT,
     574            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     575            0 :             file.path,
     576            0 :             metadata.len()
     577              :         );
     578              : 
     579              :         // Note: Because we opened the file in write-only mode, we cannot
     580              :         // reuse the same VirtualFile for reading later. That's why we don't
     581              :         // set inner.file here. The first read will have to re-open it.
     582              : 
     583         4212 :         let desc = PersistentLayerDesc::new_delta(
     584         4212 :             self.tenant_shard_id,
     585         4212 :             self.timeline_id,
     586         4212 :             self.key_start..key_end,
     587         4212 :             self.lsn_range.clone(),
     588         4212 :             metadata.len(),
     589         4212 :         );
     590         4212 : 
     591         4212 :         // fsync the file
     592         4212 :         file.sync_all().await?;
     593              : 
     594         4212 :         trace!("created delta layer {}", self.path);
     595              : 
     596         4212 :         Ok((desc, self.path))
     597         4212 :     }
     598              : }
     599              : 
     600              : /// A builder object for constructing a new delta layer.
     601              : ///
     602              : /// Usage:
     603              : ///
     604              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     605              : ///
     606              : /// 2. Write the contents by calling `put_value` for every page
     607              : ///    version to store in the layer.
     608              : ///
     609              : /// 3. Call `finish`.
     610              : ///
     611              : /// # Note
     612              : ///
     613              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     614              : /// possible for the writer to drop before `finish` is actually called. So this
     615              : /// could lead to odd temporary files in the directory, exhausting file system.
     616              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     617              : /// implementation that cleans up the temporary file in failure. It's not
     618              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     619              : /// out some fields, making it impossible to implement `Drop`.
     620              : ///
     621              : #[must_use]
     622              : pub struct DeltaLayerWriter {
     623              :     inner: Option<DeltaLayerWriterInner>,
     624              : }
     625              : 
     626              : impl DeltaLayerWriter {
     627              :     ///
     628              :     /// Start building a new delta layer.
     629              :     ///
     630         4248 :     pub async fn new(
     631         4248 :         conf: &'static PageServerConf,
     632         4248 :         timeline_id: TimelineId,
     633         4248 :         tenant_shard_id: TenantShardId,
     634         4248 :         key_start: Key,
     635         4248 :         lsn_range: Range<Lsn>,
     636         4248 :         ctx: &RequestContext,
     637         4248 :     ) -> anyhow::Result<Self> {
     638         4248 :         Ok(Self {
     639         4248 :             inner: Some(
     640         4248 :                 DeltaLayerWriterInner::new(
     641         4248 :                     conf,
     642         4248 :                     timeline_id,
     643         4248 :                     tenant_shard_id,
     644         4248 :                     key_start,
     645         4248 :                     lsn_range,
     646         4248 :                     ctx,
     647         4248 :                 )
     648         2161 :                 .await?,
     649              :             ),
     650              :         })
     651         4248 :     }
     652              : 
     653              :     ///
     654              :     /// Append a key-value pair to the file.
     655              :     ///
     656              :     /// The values must be appended in key, lsn order.
     657              :     ///
     658      6253674 :     pub async fn put_value(
     659      6253674 :         &mut self,
     660      6253674 :         key: Key,
     661      6253674 :         lsn: Lsn,
     662      6253674 :         val: Value,
     663      6253674 :         ctx: &RequestContext,
     664      6253674 :     ) -> anyhow::Result<()> {
     665      6253674 :         self.inner
     666      6253674 :             .as_mut()
     667      6253674 :             .unwrap()
     668      6253674 :             .put_value(key, lsn, val, ctx)
     669         5946 :             .await
     670      6253674 :     }
     671              : 
     672     13159980 :     pub async fn put_value_bytes<Buf>(
     673     13159980 :         &mut self,
     674     13159980 :         key: Key,
     675     13159980 :         lsn: Lsn,
     676     13159980 :         val: FullSlice<Buf>,
     677     13159980 :         will_init: bool,
     678     13159980 :         ctx: &RequestContext,
     679     13159980 :     ) -> (FullSlice<Buf>, anyhow::Result<()>)
     680     13159980 :     where
     681     13159980 :         Buf: IoBuf + Send,
     682     13159980 :     {
     683     13159980 :         self.inner
     684     13159980 :             .as_mut()
     685     13159980 :             .unwrap()
     686     13159980 :             .put_value_bytes(key, lsn, val, will_init, ctx)
     687         8847 :             .await
     688     13159980 :     }
     689              : 
     690      6071916 :     pub fn size(&self) -> u64 {
     691      6071916 :         self.inner.as_ref().unwrap().size()
     692      6071916 :     }
     693              : 
     694              :     ///
     695              :     /// Finish writing the delta layer.
     696              :     ///
     697         4212 :     pub(crate) async fn finish(
     698         4212 :         mut self,
     699         4212 :         key_end: Key,
     700         4212 :         ctx: &RequestContext,
     701         4212 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     702        29007 :         self.inner.take().unwrap().finish(key_end, ctx).await
     703         4212 :     }
     704              : 
     705        36486 :     pub(crate) fn num_keys(&self) -> usize {
     706        36486 :         self.inner.as_ref().unwrap().num_keys
     707        36486 :     }
     708              : 
     709        45252 :     pub(crate) fn estimated_size(&self) -> u64 {
     710        45252 :         let inner = self.inner.as_ref().unwrap();
     711        45252 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
     712        45252 :     }
     713              : }
     714              : 
     715              : impl Drop for DeltaLayerWriter {
     716         4248 :     fn drop(&mut self) {
     717         4248 :         if let Some(inner) = self.inner.take() {
     718           36 :             // We want to remove the virtual file here, so it's fine to not
     719           36 :             // having completely flushed unwritten data.
     720           36 :             let vfile = inner.blob_writer.into_inner_no_flush();
     721           36 :             vfile.remove();
     722         4212 :         }
     723         4248 :     }
     724              : }
     725              : 
     726            0 : #[derive(thiserror::Error, Debug)]
     727              : pub enum RewriteSummaryError {
     728              :     #[error("magic mismatch")]
     729              :     MagicMismatch,
     730              :     #[error(transparent)]
     731              :     Other(#[from] anyhow::Error),
     732              : }
     733              : 
     734              : impl From<std::io::Error> for RewriteSummaryError {
     735            0 :     fn from(e: std::io::Error) -> Self {
     736            0 :         Self::Other(anyhow::anyhow!(e))
     737            0 :     }
     738              : }
     739              : 
     740              : impl DeltaLayer {
     741            0 :     pub async fn rewrite_summary<F>(
     742            0 :         path: &Utf8Path,
     743            0 :         rewrite: F,
     744            0 :         ctx: &RequestContext,
     745            0 :     ) -> Result<(), RewriteSummaryError>
     746            0 :     where
     747            0 :         F: Fn(Summary) -> Summary,
     748            0 :     {
     749            0 :         let mut file = VirtualFile::open_with_options(
     750            0 :             path,
     751            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     752            0 :             ctx,
     753            0 :         )
     754            0 :         .await
     755            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     756            0 :         let file_id = page_cache::next_file_id();
     757            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     758            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     759            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     760            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     761            0 :             return Err(RewriteSummaryError::MagicMismatch);
     762            0 :         }
     763            0 : 
     764            0 :         let new_summary = rewrite(actual_summary);
     765            0 : 
     766            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     767            0 :         // TODO: could use smallvec here, but it's a pain with Slice<T>
     768            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     769            0 :         file.seek(SeekFrom::Start(0)).await?;
     770            0 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     771            0 :         res?;
     772            0 :         Ok(())
     773            0 :     }
     774              : }
     775              : 
     776              : impl DeltaLayerInner {
     777         1434 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     778         1434 :         &self.layer_key_range
     779         1434 :     }
     780              : 
     781         1434 :     pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
     782         1434 :         &self.layer_lsn_range
     783         1434 :     }
     784              : 
     785         3138 :     pub(super) async fn load(
     786         3138 :         path: &Utf8Path,
     787         3138 :         summary: Option<Summary>,
     788         3138 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     789         3138 :         ctx: &RequestContext,
     790         3138 :     ) -> anyhow::Result<Self> {
     791         3138 :         let file = VirtualFile::open(path, ctx)
     792         1569 :             .await
     793         3138 :             .context("open layer file")?;
     794              : 
     795         3138 :         let file_id = page_cache::next_file_id();
     796         3138 : 
     797         3138 :         let block_reader = FileBlockReader::new(&file, file_id);
     798              : 
     799         3138 :         let summary_blk = block_reader
     800         3138 :             .read_blk(0, ctx)
     801         1585 :             .await
     802         3138 :             .context("read first block")?;
     803              : 
     804              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     805         3138 :         let actual_summary =
     806         3138 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     807              : 
     808         3138 :         if let Some(mut expected_summary) = summary {
     809              :             // production code path
     810         3138 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     811         3138 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     812         3138 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     813         3138 :             expected_summary.timeline_id = actual_summary.timeline_id;
     814         3138 : 
     815         3138 :             if actual_summary != expected_summary {
     816            0 :                 bail!(
     817            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     818            0 :                     actual_summary,
     819            0 :                     expected_summary
     820            0 :                 );
     821         3138 :             }
     822            0 :         }
     823              : 
     824         3138 :         Ok(DeltaLayerInner {
     825         3138 :             file,
     826         3138 :             file_id,
     827         3138 :             index_start_blk: actual_summary.index_start_blk,
     828         3138 :             index_root_blk: actual_summary.index_root_blk,
     829         3138 :             max_vectored_read_bytes,
     830         3138 :             layer_key_range: actual_summary.key_range,
     831         3138 :             layer_lsn_range: actual_summary.lsn_range,
     832         3138 :         })
     833         3138 :     }
     834              : 
     835              :     // Look up the keys in the provided keyspace and update
     836              :     // the reconstruct state with whatever is found.
     837              :     //
     838              :     // If the key is cached, go no further than the cached Lsn.
     839              :     //
     840              :     // Currently, the index is visited for each range, but this
     841              :     // can be further optimised to visit the index only once.
     842       611933 :     pub(super) async fn get_values_reconstruct_data(
     843       611933 :         &self,
     844       611933 :         keyspace: KeySpace,
     845       611933 :         lsn_range: Range<Lsn>,
     846       611933 :         reconstruct_state: &mut ValuesReconstructState,
     847       611933 :         ctx: &RequestContext,
     848       611933 :     ) -> Result<(), GetVectoredError> {
     849       611933 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     850       611933 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     851       611933 :             self.index_start_blk,
     852       611933 :             self.index_root_blk,
     853       611933 :             block_reader,
     854       611933 :         );
     855       611933 : 
     856       611933 :         let planner = VectoredReadPlanner::new(
     857       611933 :             self.max_vectored_read_bytes
     858       611933 :                 .expect("Layer is loaded with max vectored bytes config")
     859       611933 :                 .0
     860       611933 :                 .into(),
     861       611933 :         );
     862       611933 : 
     863       611933 :         let data_end_offset = self.index_start_offset();
     864              : 
     865       611933 :         let reads = Self::plan_reads(
     866       611933 :             &keyspace,
     867       611933 :             lsn_range.clone(),
     868       611933 :             data_end_offset,
     869       611933 :             index_reader,
     870       611933 :             planner,
     871       611933 :             reconstruct_state,
     872       611933 :             ctx,
     873       611933 :         )
     874        62351 :         .await
     875       611933 :         .map_err(GetVectoredError::Other)?;
     876              : 
     877       611933 :         self.do_reads_and_update_state(reads, reconstruct_state, ctx)
     878       205482 :             .await;
     879              : 
     880       611933 :         reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
     881       611933 : 
     882       611933 :         Ok(())
     883       611933 :     }
     884              : 
     885       612539 :     async fn plan_reads<Reader>(
     886       612539 :         keyspace: &KeySpace,
     887       612539 :         lsn_range: Range<Lsn>,
     888       612539 :         data_end_offset: u64,
     889       612539 :         index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
     890       612539 :         mut planner: VectoredReadPlanner,
     891       612539 :         reconstruct_state: &mut ValuesReconstructState,
     892       612539 :         ctx: &RequestContext,
     893       612539 :     ) -> anyhow::Result<Vec<VectoredRead>>
     894       612539 :     where
     895       612539 :         Reader: BlockReader + Clone,
     896       612539 :     {
     897       612539 :         let ctx = RequestContextBuilder::extend(ctx)
     898       612539 :             .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     899       612539 :             .build();
     900              : 
     901       739273 :         for range in keyspace.ranges.iter() {
     902       739273 :             let mut range_end_handled = false;
     903       739273 : 
     904       739273 :             let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
     905       739273 :             let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
     906       739273 :             let mut index_stream = std::pin::pin!(index_stream);
     907              : 
     908      1775620 :             while let Some(index_entry) = index_stream.next().await {
     909      1748996 :                 let (raw_key, value) = index_entry?;
     910      1748996 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     911      1748996 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
     912      1748996 :                 let blob_ref = BlobRef(value);
     913      1748996 : 
     914      1748996 :                 // Lsns are not monotonically increasing across keys, so we don't assert on them.
     915      1748996 :                 assert!(key >= range.start);
     916              : 
     917      1748996 :                 let outside_lsn_range = !lsn_range.contains(&lsn);
     918      1748996 :                 let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
     919              : 
     920      1748996 :                 let flag = {
     921      1748996 :                     if outside_lsn_range || below_cached_lsn {
     922       218858 :                         BlobFlag::Ignore
     923      1530138 :                     } else if blob_ref.will_init() {
     924      1355268 :                         BlobFlag::ReplaceAll
     925              :                     } else {
     926              :                         // Usual path: add blob to the read
     927       174870 :                         BlobFlag::None
     928              :                     }
     929              :                 };
     930              : 
     931      1748996 :                 if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
     932       712649 :                     planner.handle_range_end(blob_ref.pos());
     933       712649 :                     range_end_handled = true;
     934       712649 :                     break;
     935      1036347 :                 } else {
     936      1036347 :                     planner.handle(key, lsn, blob_ref.pos(), flag);
     937      1036347 :                 }
     938              :             }
     939              : 
     940       739273 :             if !range_end_handled {
     941        26624 :                 tracing::debug!("Handling range end fallback at {}", data_end_offset);
     942        26624 :                 planner.handle_range_end(data_end_offset);
     943       712649 :             }
     944              :         }
     945              : 
     946       612539 :         Ok(planner.finish())
     947       612539 :     }
     948              : 
     949       612533 :     fn get_min_read_buffer_size(
     950       612533 :         planned_reads: &[VectoredRead],
     951       612533 :         read_size_soft_max: usize,
     952       612533 :     ) -> usize {
     953       612533 :         let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
     954       246382 :             return read_size_soft_max;
     955              :         };
     956              : 
     957       366151 :         let largest_read_size = largest_read.size();
     958       366151 :         if largest_read_size > read_size_soft_max {
     959              :             // If the read is oversized, it should only contain one key.
     960          600 :             let offenders = largest_read
     961          600 :                 .blobs_at
     962          600 :                 .as_slice()
     963          600 :                 .iter()
     964          600 :                 .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
     965          600 :                 .join(", ");
     966          600 :             tracing::warn!(
     967            0 :                 "Oversized vectored read ({} > {}) for keys {}",
     968              :                 largest_read_size,
     969              :                 read_size_soft_max,
     970              :                 offenders
     971              :             );
     972       365551 :         }
     973              : 
     974       366151 :         largest_read_size
     975       612533 :     }
     976              : 
     977       611933 :     async fn do_reads_and_update_state(
     978       611933 :         &self,
     979       611933 :         reads: Vec<VectoredRead>,
     980       611933 :         reconstruct_state: &mut ValuesReconstructState,
     981       611933 :         ctx: &RequestContext,
     982       611933 :     ) {
     983       611933 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     984       611933 :         let mut ignore_key_with_err = None;
     985       611933 : 
     986       611933 :         let max_vectored_read_bytes = self
     987       611933 :             .max_vectored_read_bytes
     988       611933 :             .expect("Layer is loaded with max vectored bytes config")
     989       611933 :             .0
     990       611933 :             .into();
     991       611933 :         let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
     992       611933 :         let mut buf = Some(BytesMut::with_capacity(buf_size));
     993              : 
     994              :         // Note that reads are processed in reverse order (from highest key+lsn).
     995              :         // This is the order that `ReconstructState` requires such that it can
     996              :         // track when a key is done.
     997       611933 :         for read in reads.into_iter().rev() {
     998       403329 :             let res = vectored_blob_reader
     999       403329 :                 .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
    1000       205482 :                 .await;
    1001              : 
    1002       403329 :             let blobs_buf = match res {
    1003       403329 :                 Ok(blobs_buf) => blobs_buf,
    1004            0 :                 Err(err) => {
    1005            0 :                     let kind = err.kind();
    1006            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
    1007            0 :                         reconstruct_state.on_key_error(
    1008            0 :                             blob_meta.key,
    1009            0 :                             PageReconstructError::Other(anyhow!(
    1010            0 :                                 "Failed to read blobs from virtual file {}: {}",
    1011            0 :                                 self.file.path,
    1012            0 :                                 kind
    1013            0 :                             )),
    1014            0 :                         );
    1015            0 :                     }
    1016              : 
    1017              :                     // We have "lost" the buffer since the lower level IO api
    1018              :                     // doesn't return the buffer on error. Allocate a new one.
    1019            0 :                     buf = Some(BytesMut::with_capacity(buf_size));
    1020            0 : 
    1021            0 :                     continue;
    1022              :                 }
    1023              :             };
    1024              : 
    1025       449749 :             for meta in blobs_buf.blobs.iter().rev() {
    1026       449749 :                 if Some(meta.meta.key) == ignore_key_with_err {
    1027            0 :                     continue;
    1028       449749 :                 }
    1029       449749 : 
    1030       449749 :                 let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
    1031       449749 :                 let value = match value {
    1032       449749 :                     Ok(v) => v,
    1033            0 :                     Err(e) => {
    1034            0 :                         reconstruct_state.on_key_error(
    1035            0 :                             meta.meta.key,
    1036            0 :                             PageReconstructError::Other(anyhow!(e).context(format!(
    1037            0 :                                 "Failed to deserialize blob from virtual file {}",
    1038            0 :                                 self.file.path,
    1039            0 :                             ))),
    1040            0 :                         );
    1041            0 : 
    1042            0 :                         ignore_key_with_err = Some(meta.meta.key);
    1043            0 :                         continue;
    1044              :                     }
    1045              :                 };
    1046              : 
    1047              :                 // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
    1048              :                 // state, no further updates shall be made to it. The call below will
    1049              :                 // panic if the invariant is violated.
    1050       449749 :                 reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
    1051              :             }
    1052              : 
    1053       403329 :             buf = Some(blobs_buf.buf);
    1054              :         }
    1055       611933 :     }
    1056              : 
    1057         1218 :     pub(super) async fn load_keys<'a>(
    1058         1218 :         &'a self,
    1059         1218 :         ctx: &RequestContext,
    1060         1218 :     ) -> Result<Vec<DeltaEntry<'a>>> {
    1061         1218 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1062         1218 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1063         1218 :             self.index_start_blk,
    1064         1218 :             self.index_root_blk,
    1065         1218 :             block_reader,
    1066         1218 :         );
    1067         1218 : 
    1068         1218 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
    1069         1218 : 
    1070         1218 :         tree_reader
    1071         1218 :             .visit(
    1072         1218 :                 &[0u8; DELTA_KEY_SIZE],
    1073         1218 :                 VisitDirection::Forwards,
    1074      6192138 :                 |key, value| {
    1075      6192138 :                     let delta_key = DeltaKey::from_slice(key);
    1076      6192138 :                     let val_ref = ValueRef {
    1077      6192138 :                         blob_ref: BlobRef(value),
    1078      6192138 :                         layer: self,
    1079      6192138 :                     };
    1080      6192138 :                     let pos = BlobRef(value).pos();
    1081      6192138 :                     if let Some(last) = all_keys.last_mut() {
    1082      6190920 :                         // subtract offset of the current and last entries to get the size
    1083      6190920 :                         // of the value associated with this (key, lsn) tuple
    1084      6190920 :                         let first_pos = last.size;
    1085      6190920 :                         last.size = pos - first_pos;
    1086      6190920 :                     }
    1087      6192138 :                     let entry = DeltaEntry {
    1088      6192138 :                         key: delta_key.key(),
    1089      6192138 :                         lsn: delta_key.lsn(),
    1090      6192138 :                         size: pos,
    1091      6192138 :                         val: val_ref,
    1092      6192138 :                     };
    1093      6192138 :                     all_keys.push(entry);
    1094      6192138 :                     true
    1095      6192138 :                 },
    1096         1218 :                 &RequestContextBuilder::extend(ctx)
    1097         1218 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1098         1218 :                     .build(),
    1099         1218 :             )
    1100         6357 :             .await?;
    1101         1218 :         if let Some(last) = all_keys.last_mut() {
    1102         1218 :             // Last key occupies all space till end of value storage,
    1103         1218 :             // which corresponds to beginning of the index
    1104         1218 :             last.size = self.index_start_offset() - last.size;
    1105         1218 :         }
    1106         1218 :         Ok(all_keys)
    1107         1218 :     }
    1108              : 
    1109              :     /// Using the given writer, write out a version which has the earlier Lsns than `until`.
    1110              :     ///
    1111              :     /// Return the amount of key value records pushed to the writer.
    1112           30 :     pub(super) async fn copy_prefix(
    1113           30 :         &self,
    1114           30 :         writer: &mut DeltaLayerWriter,
    1115           30 :         until: Lsn,
    1116           30 :         ctx: &RequestContext,
    1117           30 :     ) -> anyhow::Result<usize> {
    1118              :         use crate::tenant::vectored_blob_io::{
    1119              :             BlobMeta, VectoredReadBuilder, VectoredReadExtended,
    1120              :         };
    1121              :         use futures::stream::TryStreamExt;
    1122              : 
    1123              :         #[derive(Debug)]
    1124              :         enum Item {
    1125              :             Actual(Key, Lsn, BlobRef),
    1126              :             Sentinel,
    1127              :         }
    1128              : 
    1129              :         impl From<Item> for Option<(Key, Lsn, BlobRef)> {
    1130          210 :             fn from(value: Item) -> Self {
    1131          210 :                 match value {
    1132          180 :                     Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
    1133           30 :                     Item::Sentinel => None,
    1134              :                 }
    1135          210 :             }
    1136              :         }
    1137              : 
    1138              :         impl Item {
    1139          210 :             fn offset(&self) -> Option<BlobRef> {
    1140          210 :                 match self {
    1141          180 :                     Item::Actual(_, _, blob) => Some(*blob),
    1142           30 :                     Item::Sentinel => None,
    1143              :                 }
    1144          210 :             }
    1145              : 
    1146          210 :             fn is_last(&self) -> bool {
    1147          210 :                 matches!(self, Item::Sentinel)
    1148          210 :             }
    1149              :         }
    1150              : 
    1151           30 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1152           30 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1153           30 :             self.index_start_blk,
    1154           30 :             self.index_root_blk,
    1155           30 :             block_reader,
    1156           30 :         );
    1157           30 : 
    1158           30 :         let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
    1159          180 :         let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
    1160           30 :         // put in a sentinel value for getting the end offset for last item, and not having to
    1161           30 :         // repeat the whole read part
    1162           30 :         let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
    1163           30 :             Item::Sentinel,
    1164           30 :         ))));
    1165           30 :         let mut stream = std::pin::pin!(stream);
    1166           30 : 
    1167           30 :         let mut prev: Option<(Key, Lsn, BlobRef)> = None;
    1168           30 : 
    1169           30 :         let mut read_builder: Option<VectoredReadBuilder> = None;
    1170           30 :         let read_mode = VectoredReadCoalesceMode::get();
    1171           30 : 
    1172           30 :         let max_read_size = self
    1173           30 :             .max_vectored_read_bytes
    1174           30 :             .map(|x| x.0.get())
    1175           30 :             .unwrap_or(8192);
    1176           30 : 
    1177           30 :         let mut buffer = Some(BytesMut::with_capacity(max_read_size));
    1178           30 : 
    1179           30 :         // FIXME: buffering of DeltaLayerWriter
    1180           30 :         let mut per_blob_copy = Vec::new();
    1181           30 : 
    1182           30 :         let mut records = 0;
    1183              : 
    1184          240 :         while let Some(item) = stream.try_next().await? {
    1185          210 :             tracing::debug!(?item, "popped");
    1186          210 :             let offset = item
    1187          210 :                 .offset()
    1188          210 :                 .unwrap_or(BlobRef::new(self.index_start_offset(), false));
    1189              : 
    1190          210 :             let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
    1191          180 :                 let end_offset = offset;
    1192          180 : 
    1193          180 :                 Some((BlobMeta { key, lsn }, start_offset..end_offset))
    1194              :             } else {
    1195           30 :                 None
    1196              :             };
    1197              : 
    1198          210 :             let is_last = item.is_last();
    1199          210 : 
    1200          210 :             prev = Option::from(item);
    1201          210 : 
    1202          210 :             let actionable = actionable.filter(|x| x.0.lsn < until);
    1203              : 
    1204          210 :             let builder = if let Some((meta, offsets)) = actionable {
    1205              :                 // extend or create a new builder
    1206           96 :                 if read_builder
    1207           96 :                     .as_mut()
    1208           96 :                     .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
    1209           96 :                     .unwrap_or(VectoredReadExtended::No)
    1210           96 :                     == VectoredReadExtended::Yes
    1211              :                 {
    1212           48 :                     None
    1213              :                 } else {
    1214           48 :                     read_builder.replace(VectoredReadBuilder::new(
    1215           48 :                         offsets.start.pos(),
    1216           48 :                         offsets.end.pos(),
    1217           48 :                         meta,
    1218           48 :                         max_read_size,
    1219           48 :                         read_mode,
    1220           48 :                     ))
    1221              :                 }
    1222              :             } else {
    1223              :                 // nothing to do, except perhaps flush any existing for the last element
    1224          114 :                 None
    1225              :             };
    1226              : 
    1227              :             // flush the possible older builder and also the new one if the item was the last one
    1228          210 :             let builders = builder.into_iter();
    1229          210 :             let builders = if is_last {
    1230           30 :                 builders.chain(read_builder.take())
    1231              :             } else {
    1232          180 :                 builders.chain(None)
    1233              :             };
    1234              : 
    1235          258 :             for builder in builders {
    1236           48 :                 let read = builder.build();
    1237           48 : 
    1238           48 :                 let reader = VectoredBlobReader::new(&self.file);
    1239           48 : 
    1240           48 :                 let mut buf = buffer.take().unwrap();
    1241           48 : 
    1242           48 :                 buf.clear();
    1243           48 :                 buf.reserve(read.size());
    1244           48 :                 let res = reader.read_blobs(&read, buf, ctx).await?;
    1245              : 
    1246          144 :                 for blob in res.blobs {
    1247           96 :                     let key = blob.meta.key;
    1248           96 :                     let lsn = blob.meta.lsn;
    1249           96 :                     let data = &res.buf[blob.start..blob.end];
    1250           96 : 
    1251           96 :                     #[cfg(debug_assertions)]
    1252           96 :                     Value::des(data)
    1253           96 :                         .with_context(|| {
    1254            0 :                             format!(
    1255            0 :                                 "blob failed to deserialize for {}@{}, {}..{}: {:?}",
    1256            0 :                                 blob.meta.key,
    1257            0 :                                 blob.meta.lsn,
    1258            0 :                                 blob.start,
    1259            0 :                                 blob.end,
    1260            0 :                                 utils::Hex(data)
    1261            0 :                             )
    1262           96 :                         })
    1263           96 :                         .unwrap();
    1264           96 : 
    1265           96 :                     // is it an image or will_init walrecord?
    1266           96 :                     // FIXME: this could be handled by threading the BlobRef to the
    1267           96 :                     // VectoredReadBuilder
    1268           96 :                     let will_init = crate::repository::ValueBytes::will_init(data)
    1269           96 :                         .inspect_err(|_e| {
    1270            0 :                             #[cfg(feature = "testing")]
    1271            0 :                             tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
    1272           96 :                         })
    1273           96 :                         .unwrap_or(false);
    1274           96 : 
    1275           96 :                     per_blob_copy.clear();
    1276           96 :                     per_blob_copy.extend_from_slice(data);
    1277              : 
    1278           96 :                     let (tmp, res) = writer
    1279           96 :                         .put_value_bytes(
    1280           96 :                             key,
    1281           96 :                             lsn,
    1282           96 :                             std::mem::take(&mut per_blob_copy).slice_len(),
    1283           96 :                             will_init,
    1284           96 :                             ctx,
    1285           96 :                         )
    1286           12 :                         .await;
    1287           96 :                     per_blob_copy = tmp.into_raw_slice().into_inner();
    1288           96 : 
    1289           96 :                     res?;
    1290              : 
    1291           96 :                     records += 1;
    1292              :                 }
    1293              : 
    1294           48 :                 buffer = Some(res.buf);
    1295              :             }
    1296              :         }
    1297              : 
    1298           30 :         assert!(
    1299           30 :             read_builder.is_none(),
    1300            0 :             "with the sentinel above loop should had handled all"
    1301              :         );
    1302              : 
    1303           30 :         Ok(records)
    1304           30 :     }
    1305              : 
    1306           12 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1307           12 :         println!(
    1308           12 :             "index_start_blk: {}, root {}",
    1309           12 :             self.index_start_blk, self.index_root_blk
    1310           12 :         );
    1311           12 : 
    1312           12 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1313           12 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1314           12 :             self.index_start_blk,
    1315           12 :             self.index_root_blk,
    1316           12 :             block_reader,
    1317           12 :         );
    1318           12 : 
    1319           12 :         tree_reader.dump().await?;
    1320              : 
    1321           12 :         let keys = self.load_keys(ctx).await?;
    1322              : 
    1323           24 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1324           24 :             let buf = val.load_raw(ctx).await?;
    1325           24 :             let val = Value::des(&buf)?;
    1326           24 :             let desc = match val {
    1327           24 :                 Value::Image(img) => {
    1328           24 :                     format!(" img {} bytes", img.len())
    1329              :                 }
    1330            0 :                 Value::WalRecord(rec) => {
    1331            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
    1332            0 :                     format!(
    1333            0 :                         " rec {} bytes will_init: {} {}",
    1334            0 :                         buf.len(),
    1335            0 :                         rec.will_init(),
    1336            0 :                         wal_desc
    1337            0 :                     )
    1338              :                 }
    1339              :             };
    1340           24 :             Ok(desc)
    1341           24 :         }
    1342              : 
    1343           36 :         for entry in keys {
    1344           24 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1345           24 :             let desc = match dump_blob(&val, ctx).await {
    1346           24 :                 Ok(desc) => desc,
    1347            0 :                 Err(err) => {
    1348            0 :                     format!("ERROR: {err}")
    1349              :                 }
    1350              :             };
    1351           24 :             println!("  key {key} at {lsn}: {desc}");
    1352              : 
    1353              :             // Print more details about CHECKPOINT records. Would be nice to print details
    1354              :             // of many other record types too, but these are particularly interesting, as
    1355              :             // have a lot of special processing for them in walingest.rs.
    1356           12 :             use pageserver_api::key::CHECKPOINT_KEY;
    1357           12 :             use postgres_ffi::CheckPoint;
    1358           24 :             if key == CHECKPOINT_KEY {
    1359            0 :                 let val = val.load(ctx).await?;
    1360            0 :                 match val {
    1361            0 :                     Value::Image(img) => {
    1362            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1363            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
    1364              :                     }
    1365            0 :                     Value::WalRecord(_rec) => {
    1366            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1367            0 :                     }
    1368              :                 }
    1369           24 :             }
    1370              :         }
    1371              : 
    1372           12 :         Ok(())
    1373           12 :     }
    1374              : 
    1375           90 :     fn stream_index_forwards<'a, R>(
    1376           90 :         &'a self,
    1377           90 :         reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
    1378           90 :         start: &'a [u8; DELTA_KEY_SIZE],
    1379           90 :         ctx: &'a RequestContext,
    1380           90 :     ) -> impl futures::stream::Stream<
    1381           90 :         Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
    1382           90 :     > + 'a
    1383           90 :     where
    1384           90 :         R: BlockReader + 'a,
    1385           90 :     {
    1386              :         use futures::stream::TryStreamExt;
    1387           90 :         let stream = reader.into_stream(start, ctx);
    1388          456 :         stream.map_ok(|(key, value)| {
    1389          456 :             let key = DeltaKey::from_slice(&key);
    1390          456 :             let (key, lsn) = (key.key(), key.lsn());
    1391          456 :             let offset = BlobRef(value);
    1392          456 : 
    1393          456 :             (key, lsn, offset)
    1394          456 :         })
    1395           90 :     }
    1396              : 
    1397              :     /// The file offset to the first block of index.
    1398              :     ///
    1399              :     /// The file structure is summary, values, and index. We often need this for the size of last blob.
    1400       614879 :     fn index_start_offset(&self) -> u64 {
    1401       614879 :         let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
    1402       614879 :         let bref = BlobRef(offset);
    1403       614879 :         tracing::debug!(
    1404              :             index_start_blk = self.index_start_blk,
    1405              :             offset,
    1406            0 :             pos = bref.pos(),
    1407            0 :             "index_start_offset"
    1408              :         );
    1409       614879 :         offset
    1410       614879 :     }
    1411              : 
    1412         1602 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
    1413         1602 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1414         1602 :         let tree_reader =
    1415         1602 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
    1416         1602 :         DeltaLayerIterator {
    1417         1602 :             delta_layer: self,
    1418         1602 :             ctx,
    1419         1602 :             index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
    1420         1602 :             key_values_batch: std::collections::VecDeque::new(),
    1421         1602 :             is_end: false,
    1422         1602 :             planner: StreamingVectoredReadPlanner::new(
    1423         1602 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
    1424         1602 :                 1024,        // The default value. Unit tests might use a different value
    1425         1602 :             ),
    1426         1602 :         }
    1427         1602 :     }
    1428              : }
    1429              : 
    1430              : /// A set of data associated with a delta layer key and its value
    1431              : pub struct DeltaEntry<'a> {
    1432              :     pub key: Key,
    1433              :     pub lsn: Lsn,
    1434              :     /// Size of the stored value
    1435              :     pub size: u64,
    1436              :     /// Reference to the on-disk value
    1437              :     pub val: ValueRef<'a>,
    1438              : }
    1439              : 
    1440              : /// Reference to an on-disk value
    1441              : pub struct ValueRef<'a> {
    1442              :     blob_ref: BlobRef,
    1443              :     layer: &'a DeltaLayerInner,
    1444              : }
    1445              : 
    1446              : impl<'a> ValueRef<'a> {
    1447              :     /// Loads the value from disk
    1448            0 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1449            0 :         let buf = self.load_raw(ctx).await?;
    1450            0 :         let val = Value::des(&buf)?;
    1451            0 :         Ok(val)
    1452            0 :     }
    1453              : 
    1454           24 :     async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
    1455           24 :         let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
    1456           24 :             self.layer,
    1457           24 :         )));
    1458           24 :         let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1459           24 :         Ok(buf)
    1460           24 :     }
    1461              : }
    1462              : 
    1463              : pub(crate) struct Adapter<T>(T);
    1464              : 
    1465              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1466           24 :     pub(crate) async fn read_blk(
    1467           24 :         &self,
    1468           24 :         blknum: u32,
    1469           24 :         ctx: &RequestContext,
    1470           24 :     ) -> Result<BlockLease, std::io::Error> {
    1471           24 :         let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
    1472           24 :         block_reader.read_blk(blknum, ctx).await
    1473           24 :     }
    1474              : }
    1475              : 
    1476              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1477           48 :     fn as_ref(&self) -> &DeltaLayerInner {
    1478           48 :         self
    1479           48 :     }
    1480              : }
    1481              : 
    1482              : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
    1483            0 :     fn key(&self) -> Key {
    1484            0 :         self.key
    1485            0 :     }
    1486            0 :     fn lsn(&self) -> Lsn {
    1487            0 :         self.lsn
    1488            0 :     }
    1489            0 :     fn size(&self) -> u64 {
    1490            0 :         self.size
    1491            0 :     }
    1492              : }
    1493              : 
    1494              : pub struct DeltaLayerIterator<'a> {
    1495              :     delta_layer: &'a DeltaLayerInner,
    1496              :     ctx: &'a RequestContext,
    1497              :     planner: StreamingVectoredReadPlanner,
    1498              :     index_iter: DiskBtreeIterator<'a>,
    1499              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1500              :     is_end: bool,
    1501              : }
    1502              : 
    1503              : impl<'a> DeltaLayerIterator<'a> {
    1504            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1505            0 :         self.delta_layer.layer_dbg_info()
    1506            0 :     }
    1507              : 
    1508              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1509        63812 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1510        63812 :         assert!(self.key_values_batch.is_empty());
    1511        63812 :         assert!(!self.is_end);
    1512              : 
    1513        63812 :         let plan = loop {
    1514      6298396 :             if let Some(res) = self.index_iter.next().await {
    1515      6296878 :                 let (raw_key, value) = res?;
    1516      6296878 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
    1517      6296878 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
    1518      6296878 :                 let blob_ref = BlobRef(value);
    1519      6296878 :                 let offset = blob_ref.pos();
    1520      6296878 :                 if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
    1521        62294 :                     break batch_plan;
    1522      6234584 :                 }
    1523              :             } else {
    1524         1518 :                 self.is_end = true;
    1525         1518 :                 let data_end_offset = self.delta_layer.index_start_offset();
    1526         1518 :                 if let Some(item) = self.planner.handle_range_end(data_end_offset) {
    1527         1518 :                     break item;
    1528              :                 } else {
    1529            0 :                     return Ok(()); // TODO: test empty iterator
    1530              :                 }
    1531              :             }
    1532              :         };
    1533        63812 :         let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
    1534        63812 :         let mut next_batch = std::collections::VecDeque::new();
    1535        63812 :         let buf_size = plan.size();
    1536        63812 :         let buf = BytesMut::with_capacity(buf_size);
    1537        63812 :         let blobs_buf = vectored_blob_reader
    1538        63812 :             .read_blobs(&plan, buf, self.ctx)
    1539        32622 :             .await?;
    1540        63812 :         let frozen_buf = blobs_buf.buf.freeze();
    1541      6296794 :         for meta in blobs_buf.blobs.iter() {
    1542      6296794 :             let value = Value::des(&frozen_buf[meta.start..meta.end])?;
    1543      6296794 :             next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
    1544              :         }
    1545        63812 :         self.key_values_batch = next_batch;
    1546        63812 :         Ok(())
    1547        63812 :     }
    1548              : 
    1549      6298908 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1550      6298908 :         if self.key_values_batch.is_empty() {
    1551        66500 :             if self.is_end {
    1552         2940 :                 return Ok(None);
    1553        63560 :             }
    1554        63560 :             self.next_batch().await?;
    1555      6232408 :         }
    1556      6295968 :         Ok(Some(
    1557      6295968 :             self.key_values_batch
    1558      6295968 :                 .pop_front()
    1559      6295968 :                 .expect("should not be empty"),
    1560      6295968 :         ))
    1561      6298908 :     }
    1562              : }
    1563              : 
    1564              : #[cfg(test)]
    1565              : pub(crate) mod test {
    1566              :     use std::collections::BTreeMap;
    1567              : 
    1568              :     use itertools::MinMaxResult;
    1569              :     use rand::prelude::{SeedableRng, SliceRandom, StdRng};
    1570              :     use rand::RngCore;
    1571              : 
    1572              :     use super::*;
    1573              :     use crate::repository::Value;
    1574              :     use crate::tenant::harness::TIMELINE_ID;
    1575              :     use crate::tenant::storage_layer::{Layer, ResidentLayer};
    1576              :     use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
    1577              :     use crate::tenant::{Tenant, Timeline};
    1578              :     use crate::{
    1579              :         context::DownloadBehavior,
    1580              :         task_mgr::TaskKind,
    1581              :         tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
    1582              :         DEFAULT_PG_VERSION,
    1583              :     };
    1584              :     use bytes::Bytes;
    1585              : 
    1586              :     /// Construct an index for a fictional delta layer and and then
    1587              :     /// traverse in order to plan vectored reads for a query. Finally,
    1588              :     /// verify that the traversal fed the right index key and value
    1589              :     /// pairs into the planner.
    1590              :     #[tokio::test]
    1591            6 :     async fn test_delta_layer_index_traversal() {
    1592            6 :         let base_key = Key {
    1593            6 :             field1: 0,
    1594            6 :             field2: 1663,
    1595            6 :             field3: 12972,
    1596            6 :             field4: 16396,
    1597            6 :             field5: 0,
    1598            6 :             field6: 246080,
    1599            6 :         };
    1600            6 : 
    1601            6 :         // Populate the index with some entries
    1602            6 :         let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
    1603            6 :             (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
    1604            6 :             (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1605            6 :             (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1606            6 :             (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
    1607            6 :         ]);
    1608            6 : 
    1609            6 :         let mut disk = TestDisk::default();
    1610            6 :         let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
    1611            6 : 
    1612            6 :         let mut disk_offset = 0;
    1613           30 :         for (key, lsns) in &entries {
    1614          126 :             for lsn in lsns {
    1615          102 :                 let index_key = DeltaKey::from_key_lsn(key, *lsn);
    1616          102 :                 let blob_ref = BlobRef::new(disk_offset, false);
    1617          102 :                 writer
    1618          102 :                     .append(&index_key.0, blob_ref.0)
    1619          102 :                     .expect("In memory disk append should never fail");
    1620          102 : 
    1621          102 :                 disk_offset += 1;
    1622          102 :             }
    1623            6 :         }
    1624            6 : 
    1625            6 :         // Prepare all the arguments for the call into `plan_reads` below
    1626            6 :         let (root_offset, _writer) = writer
    1627            6 :             .finish()
    1628            6 :             .expect("In memory disk finish should never fail");
    1629            6 :         let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
    1630            6 :         let planner = VectoredReadPlanner::new(100);
    1631            6 :         let mut reconstruct_state = ValuesReconstructState::new();
    1632            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1633            6 : 
    1634            6 :         let keyspace = KeySpace {
    1635            6 :             ranges: vec![
    1636            6 :                 base_key..base_key.add(3),
    1637            6 :                 base_key.add(3)..base_key.add(100),
    1638            6 :             ],
    1639            6 :         };
    1640            6 :         let lsn_range = Lsn(2)..Lsn(40);
    1641            6 : 
    1642            6 :         // Plan and validate
    1643            6 :         let vectored_reads = DeltaLayerInner::plan_reads(
    1644            6 :             &keyspace,
    1645            6 :             lsn_range.clone(),
    1646            6 :             disk_offset,
    1647            6 :             reader,
    1648            6 :             planner,
    1649            6 :             &mut reconstruct_state,
    1650            6 :             &ctx,
    1651            6 :         )
    1652            6 :         .await
    1653            6 :         .expect("Read planning should not fail");
    1654            6 : 
    1655            6 :         validate(keyspace, lsn_range, vectored_reads, entries);
    1656            6 :     }
    1657              : 
    1658            6 :     fn validate(
    1659            6 :         keyspace: KeySpace,
    1660            6 :         lsn_range: Range<Lsn>,
    1661            6 :         vectored_reads: Vec<VectoredRead>,
    1662            6 :         index_entries: BTreeMap<Key, Vec<Lsn>>,
    1663            6 :     ) {
    1664              :         #[derive(Debug, PartialEq, Eq)]
    1665              :         struct BlobSpec {
    1666              :             key: Key,
    1667              :             lsn: Lsn,
    1668              :             at: u64,
    1669              :         }
    1670              : 
    1671            6 :         let mut planned_blobs = Vec::new();
    1672           46 :         for read in vectored_reads {
    1673           84 :             for (at, meta) in read.blobs_at.as_slice() {
    1674           84 :                 planned_blobs.push(BlobSpec {
    1675           84 :                     key: meta.key,
    1676           84 :                     lsn: meta.lsn,
    1677           84 :                     at: *at,
    1678           84 :                 });
    1679           84 :             }
    1680              :         }
    1681              : 
    1682            6 :         let mut expected_blobs = Vec::new();
    1683            6 :         let mut disk_offset = 0;
    1684           30 :         for (key, lsns) in index_entries {
    1685          126 :             for lsn in lsns {
    1686          126 :                 let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
    1687          102 :                 let lsn_included = lsn_range.contains(&lsn);
    1688          102 : 
    1689          102 :                 if key_included && lsn_included {
    1690           84 :                     expected_blobs.push(BlobSpec {
    1691           84 :                         key,
    1692           84 :                         lsn,
    1693           84 :                         at: disk_offset,
    1694           84 :                     });
    1695           84 :                 }
    1696              : 
    1697          102 :                 disk_offset += 1;
    1698              :             }
    1699              :         }
    1700              : 
    1701            6 :         assert_eq!(planned_blobs, expected_blobs);
    1702            6 :     }
    1703              : 
    1704              :     mod constants {
    1705              :         use utils::lsn::Lsn;
    1706              : 
    1707              :         /// Offset used by all lsns in this test
    1708              :         pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
    1709              :         /// Number of unique keys including in the test data
    1710              :         pub(super) const KEY_COUNT: u8 = 60;
    1711              :         /// Max number of different lsns for each key
    1712              :         pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
    1713              :         /// Possible value sizes for each key along with a probability weight
    1714              :         pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
    1715              :         /// Probability that there will be a gap between the current key and the next one (33.3%)
    1716              :         pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
    1717              :         /// The minimum size of a key range in all the generated reads
    1718              :         pub(super) const MIN_RANGE_SIZE: i128 = 10;
    1719              :         /// The number of ranges included in each vectored read
    1720              :         pub(super) const RANGES_COUNT: u8 = 2;
    1721              :         /// The number of vectored reads performed
    1722              :         pub(super) const READS_COUNT: u8 = 100;
    1723              :         /// Soft max size of a vectored read. Will be violated if we have to read keys
    1724              :         /// with values larger than the limit
    1725              :         pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
    1726              :     }
    1727              : 
    1728              :     struct Entry {
    1729              :         key: Key,
    1730              :         lsn: Lsn,
    1731              :         value: Vec<u8>,
    1732              :     }
    1733              : 
    1734            6 :     fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
    1735            6 :         let mut current_key = Key::MIN;
    1736            6 : 
    1737            6 :         let mut entries = Vec::new();
    1738          366 :         for _ in 0..constants::KEY_COUNT {
    1739          360 :             let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
    1740          360 :             let mut lsns_iter =
    1741         6780 :                 std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
    1742         6780 :                     Some(Lsn(lsn.0 + 0x08))
    1743         6780 :                 });
    1744          360 :             let mut lsns = Vec::new();
    1745         7140 :             while lsns.len() < count as usize {
    1746         6780 :                 let take = rng.gen_bool(0.5);
    1747         6780 :                 let lsn = lsns_iter.next().unwrap();
    1748         6780 :                 if take {
    1749         3336 :                     lsns.push(lsn);
    1750         3444 :                 }
    1751              :             }
    1752              : 
    1753         3696 :             for lsn in lsns {
    1754         3336 :                 let size = constants::VALUE_SIZES
    1755        10008 :                     .choose_weighted(rng, |item| item.1)
    1756         3336 :                     .unwrap()
    1757         3336 :                     .0;
    1758         3336 :                 let mut buf = vec![0; size];
    1759         3336 :                 rng.fill_bytes(&mut buf);
    1760         3336 : 
    1761         3336 :                 entries.push(Entry {
    1762         3336 :                     key: current_key,
    1763         3336 :                     lsn,
    1764         3336 :                     value: buf,
    1765         3336 :                 })
    1766              :             }
    1767              : 
    1768          360 :             let gap = constants::KEY_GAP_CHANGES
    1769          720 :                 .choose_weighted(rng, |item| item.1)
    1770          360 :                 .unwrap()
    1771          360 :                 .0;
    1772          360 :             if gap {
    1773          114 :                 current_key = current_key.add(2);
    1774          246 :             } else {
    1775          246 :                 current_key = current_key.add(1);
    1776          246 :             }
    1777              :         }
    1778              : 
    1779            6 :         entries
    1780            6 :     }
    1781              : 
    1782              :     struct EntriesMeta {
    1783              :         key_range: Range<Key>,
    1784              :         lsn_range: Range<Lsn>,
    1785              :         index: BTreeMap<(Key, Lsn), Vec<u8>>,
    1786              :     }
    1787              : 
    1788            6 :     fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
    1789         3336 :         let key_range = match entries.iter().minmax_by_key(|e| e.key) {
    1790            6 :             MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
    1791            0 :             _ => panic!("More than one entry is always expected"),
    1792              :         };
    1793              : 
    1794         3336 :         let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
    1795            6 :             MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
    1796            0 :             _ => panic!("More than one entry is always expected"),
    1797              :         };
    1798              : 
    1799            6 :         let mut index = BTreeMap::new();
    1800         3336 :         for entry in entries.iter() {
    1801         3336 :             index.insert((entry.key, entry.lsn), entry.value.clone());
    1802         3336 :         }
    1803              : 
    1804            6 :         EntriesMeta {
    1805            6 :             key_range,
    1806            6 :             lsn_range,
    1807            6 :             index,
    1808            6 :         }
    1809            6 :     }
    1810              : 
    1811          600 :     fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
    1812          600 :         let start = key_range.start.to_i128();
    1813          600 :         let end = key_range.end.to_i128();
    1814          600 : 
    1815          600 :         let mut keyspace = KeySpace::default();
    1816              : 
    1817         1800 :         for _ in 0..constants::RANGES_COUNT {
    1818         1200 :             let mut range: Option<Range<Key>> = Option::default();
    1819         3732 :             while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
    1820         2532 :                 let range_start = rng.gen_range(start..end);
    1821         2532 :                 let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
    1822         2532 :                 if range_end_offset >= end {
    1823          300 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(end));
    1824         2232 :                 } else {
    1825         2232 :                     let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
    1826         2232 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
    1827         2232 :                 }
    1828              :             }
    1829         1200 :             keyspace.ranges.push(range.unwrap());
    1830              :         }
    1831              : 
    1832          600 :         keyspace
    1833          600 :     }
    1834              : 
    1835              :     #[tokio::test]
    1836            6 :     async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
    1837            6 :         let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
    1838           24 :         let (tenant, ctx) = harness.load().await;
    1839            6 : 
    1840            6 :         let timeline_id = TimelineId::generate();
    1841            6 :         let timeline = tenant
    1842            6 :             .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
    1843           12 :             .await?;
    1844            6 : 
    1845            6 :         tracing::info!("Generating test data ...");
    1846            6 : 
    1847            6 :         let rng = &mut StdRng::seed_from_u64(0);
    1848            6 :         let entries = generate_entries(rng);
    1849            6 :         let entries_meta = get_entries_meta(&entries);
    1850            6 : 
    1851            6 :         tracing::info!("Done generating {} entries", entries.len());
    1852            6 : 
    1853            6 :         tracing::info!("Writing test data to delta layer ...");
    1854            6 :         let mut writer = DeltaLayerWriter::new(
    1855            6 :             harness.conf,
    1856            6 :             timeline_id,
    1857            6 :             harness.tenant_shard_id,
    1858            6 :             entries_meta.key_range.start,
    1859            6 :             entries_meta.lsn_range.clone(),
    1860            6 :             &ctx,
    1861            6 :         )
    1862            6 :         .await?;
    1863            6 : 
    1864         3342 :         for entry in entries {
    1865         3336 :             let (_, res) = writer
    1866         3336 :                 .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
    1867          645 :                 .await;
    1868         3336 :             res?;
    1869            6 :         }
    1870            6 : 
    1871           15 :         let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
    1872            6 :         let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
    1873            6 : 
    1874            6 :         let inner = resident.get_as_delta(&ctx).await?;
    1875            6 : 
    1876            6 :         let file_size = inner.file.metadata().await?.len();
    1877            6 :         tracing::info!(
    1878            6 :             "Done writing test data to delta layer. Resulting file size is: {}",
    1879            6 :             file_size
    1880            6 :         );
    1881            6 : 
    1882          606 :         for i in 0..constants::READS_COUNT {
    1883          600 :             tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
    1884            6 : 
    1885          600 :             let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
    1886          600 :             let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1887          600 :                 inner.index_start_blk,
    1888          600 :                 inner.index_root_blk,
    1889          600 :                 block_reader,
    1890          600 :             );
    1891          600 : 
    1892          600 :             let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
    1893          600 :             let mut reconstruct_state = ValuesReconstructState::new();
    1894          600 :             let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
    1895          600 :             let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
    1896            6 : 
    1897          600 :             let vectored_reads = DeltaLayerInner::plan_reads(
    1898          600 :                 &keyspace,
    1899          600 :                 entries_meta.lsn_range.clone(),
    1900          600 :                 data_end_offset,
    1901          600 :                 index_reader,
    1902          600 :                 planner,
    1903          600 :                 &mut reconstruct_state,
    1904          600 :                 &ctx,
    1905          600 :             )
    1906           12 :             .await?;
    1907            6 : 
    1908          600 :             let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
    1909          600 :             let buf_size = DeltaLayerInner::get_min_read_buffer_size(
    1910          600 :                 &vectored_reads,
    1911          600 :                 constants::MAX_VECTORED_READ_BYTES,
    1912          600 :             );
    1913          600 :             let mut buf = Some(BytesMut::with_capacity(buf_size));
    1914            6 : 
    1915        59772 :             for read in vectored_reads {
    1916        59172 :                 let blobs_buf = vectored_blob_reader
    1917        59172 :                     .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
    1918        30048 :                     .await?;
    1919       171912 :                 for meta in blobs_buf.blobs.iter() {
    1920       171912 :                     let value = &blobs_buf.buf[meta.start..meta.end];
    1921       171912 :                     assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
    1922            6 :                 }
    1923            6 : 
    1924        59172 :                 buf = Some(blobs_buf.buf);
    1925            6 :             }
    1926            6 :         }
    1927            6 : 
    1928            6 :         Ok(())
    1929            6 :     }
    1930              : 
    1931              :     #[tokio::test]
    1932            6 :     async fn copy_delta_prefix_smoke() {
    1933            6 :         use crate::walrecord::NeonWalRecord;
    1934            6 :         use bytes::Bytes;
    1935            6 : 
    1936            6 :         let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
    1937            6 :             .await
    1938            6 :             .unwrap();
    1939           24 :         let (tenant, ctx) = h.load().await;
    1940            6 :         let ctx = &ctx;
    1941            6 :         let timeline = tenant
    1942            6 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
    1943           12 :             .await
    1944            6 :             .unwrap();
    1945            6 : 
    1946            6 :         let initdb_layer = timeline
    1947            6 :             .layers
    1948            6 :             .read()
    1949            6 :             .await
    1950            6 :             .likely_resident_layers()
    1951            6 :             .next()
    1952            6 :             .cloned()
    1953            6 :             .unwrap();
    1954            6 : 
    1955            6 :         {
    1956            6 :             let mut writer = timeline.writer().await;
    1957            6 : 
    1958            6 :             let data = [
    1959            6 :                 (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
    1960            6 :                 (
    1961            6 :                     0x30,
    1962            6 :                     12,
    1963            6 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1964            6 :                         will_init: false,
    1965            6 :                         rec: Bytes::from_static(b"1"),
    1966            6 :                     }),
    1967            6 :                 ),
    1968            6 :                 (
    1969            6 :                     0x40,
    1970            6 :                     12,
    1971            6 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1972            6 :                         will_init: true,
    1973            6 :                         rec: Bytes::from_static(b"2"),
    1974            6 :                     }),
    1975            6 :                 ),
    1976            6 :                 // build an oversized value so we cannot extend and existing read over
    1977            6 :                 // this
    1978            6 :                 (
    1979            6 :                     0x50,
    1980            6 :                     12,
    1981            6 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1982            6 :                         will_init: true,
    1983            6 :                         rec: {
    1984            6 :                             let mut buf =
    1985            6 :                                 vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
    1986            6 :                             buf.iter_mut()
    1987            6 :                                 .enumerate()
    1988       792576 :                                 .for_each(|(i, slot)| *slot = (i % 256) as u8);
    1989            6 :                             Bytes::from(buf)
    1990            6 :                         },
    1991            6 :                     }),
    1992            6 :                 ),
    1993            6 :                 // because the oversized read cannot be extended further, we are sure to exercise the
    1994            6 :                 // builder created on the last round with this:
    1995            6 :                 (
    1996            6 :                     0x60,
    1997            6 :                     12,
    1998            6 :                     Value::WalRecord(NeonWalRecord::Postgres {
    1999            6 :                         will_init: true,
    2000            6 :                         rec: Bytes::from_static(b"3"),
    2001            6 :                     }),
    2002            6 :                 ),
    2003            6 :                 (
    2004            6 :                     0x60,
    2005            6 :                     9,
    2006            6 :                     Value::Image(Bytes::from_static(b"something for a different key")),
    2007            6 :                 ),
    2008            6 :             ];
    2009            6 : 
    2010            6 :             let mut last_lsn = None;
    2011            6 : 
    2012           42 :             for (lsn, key, value) in data {
    2013           36 :                 let key = Key::from_i128(key);
    2014           36 :                 writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
    2015           36 :                 last_lsn = Some(lsn);
    2016            6 :             }
    2017            6 : 
    2018            6 :             writer.finish_write(Lsn(last_lsn.unwrap()));
    2019            6 :         }
    2020            6 :         timeline.freeze_and_flush().await.unwrap();
    2021            6 : 
    2022            6 :         let new_layer = timeline
    2023            6 :             .layers
    2024            6 :             .read()
    2025            6 :             .await
    2026            6 :             .likely_resident_layers()
    2027            7 :             .find(|&x| x != &initdb_layer)
    2028            6 :             .cloned()
    2029            6 :             .unwrap();
    2030            6 : 
    2031            6 :         // create a copy for the timeline, so we don't overwrite the file
    2032            6 :         let branch = tenant
    2033            6 :             .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
    2034            6 :             .await
    2035            6 :             .unwrap();
    2036            6 : 
    2037            6 :         assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
    2038            6 : 
    2039            6 :         // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
    2040            6 :         // a single key
    2041            6 : 
    2042           36 :         for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
    2043           30 :             let truncate_at = Lsn(truncate_at);
    2044            6 : 
    2045           30 :             let mut writer = DeltaLayerWriter::new(
    2046           30 :                 tenant.conf,
    2047           30 :                 branch.timeline_id,
    2048           30 :                 tenant.tenant_shard_id,
    2049           30 :                 Key::MIN,
    2050           30 :                 Lsn(0x11)..truncate_at,
    2051           30 :                 ctx,
    2052           30 :             )
    2053           15 :             .await
    2054           30 :             .unwrap();
    2055            6 : 
    2056           30 :             let new_layer = new_layer.download_and_keep_resident().await.unwrap();
    2057           30 : 
    2058           30 :             new_layer
    2059           30 :                 .copy_delta_prefix(&mut writer, truncate_at, ctx)
    2060           45 :                 .await
    2061           30 :                 .unwrap();
    2062            6 : 
    2063           72 :             let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
    2064           30 :             let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
    2065           30 : 
    2066           30 :             copied_layer.get_as_delta(ctx).await.unwrap();
    2067           30 : 
    2068           30 :             assert_keys_and_values_eq(
    2069           30 :                 new_layer.get_as_delta(ctx).await.unwrap(),
    2070           30 :                 copied_layer.get_as_delta(ctx).await.unwrap(),
    2071           30 :                 truncate_at,
    2072           30 :                 ctx,
    2073            6 :             )
    2074          153 :             .await;
    2075            6 :         }
    2076            6 :     }
    2077              : 
    2078           30 :     async fn assert_keys_and_values_eq(
    2079           30 :         source: &DeltaLayerInner,
    2080           30 :         truncated: &DeltaLayerInner,
    2081           30 :         truncated_at: Lsn,
    2082           30 :         ctx: &RequestContext,
    2083           30 :     ) {
    2084              :         use futures::future::ready;
    2085              :         use futures::stream::TryStreamExt;
    2086              : 
    2087           30 :         let start_key = [0u8; DELTA_KEY_SIZE];
    2088           30 : 
    2089           30 :         let source_reader = FileBlockReader::new(&source.file, source.file_id);
    2090           30 :         let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2091           30 :             source.index_start_blk,
    2092           30 :             source.index_root_blk,
    2093           30 :             &source_reader,
    2094           30 :         );
    2095           30 :         let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
    2096          180 :         let source_stream = source_stream.filter(|res| match res {
    2097          180 :             Ok((_, lsn, _)) => ready(lsn < &truncated_at),
    2098            0 :             _ => ready(true),
    2099          180 :         });
    2100           30 :         let mut source_stream = std::pin::pin!(source_stream);
    2101           30 : 
    2102           30 :         let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
    2103           30 :         let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2104           30 :             truncated.index_start_blk,
    2105           30 :             truncated.index_root_blk,
    2106           30 :             &truncated_reader,
    2107           30 :         );
    2108           30 :         let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
    2109           30 :         let mut truncated_stream = std::pin::pin!(truncated_stream);
    2110           30 : 
    2111           30 :         let mut scratch_left = Vec::new();
    2112           30 :         let mut scratch_right = Vec::new();
    2113              : 
    2114              :         loop {
    2115          126 :             let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
    2116          126 :             let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
    2117          126 : 
    2118          126 :             if src.is_none() {
    2119           30 :                 assert!(truncated.is_none());
    2120           30 :                 break;
    2121           96 :             }
    2122           96 : 
    2123           96 :             let (src, truncated) = (src.unwrap(), truncated.unwrap());
    2124           96 : 
    2125           96 :             // because we've filtered the source with Lsn, we should always have the same keys from both.
    2126           96 :             assert_eq!(src.0, truncated.0);
    2127           96 :             assert_eq!(src.1, truncated.1);
    2128              : 
    2129              :             // if this is needed for something else, just drop this assert.
    2130           96 :             assert!(
    2131           96 :                 src.2.pos() >= truncated.2.pos(),
    2132            0 :                 "value position should not go backwards {} vs. {}",
    2133            0 :                 src.2.pos(),
    2134            0 :                 truncated.2.pos()
    2135              :             );
    2136              : 
    2137           96 :             scratch_left.clear();
    2138           96 :             let src_cursor = source_reader.block_cursor();
    2139           96 :             let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
    2140           96 :             scratch_right.clear();
    2141           96 :             let trunc_cursor = truncated_reader.block_cursor();
    2142           96 :             let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
    2143           96 : 
    2144           96 :             tokio::try_join!(left, right).unwrap();
    2145           96 : 
    2146           96 :             assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
    2147              :         }
    2148           30 :     }
    2149              : 
    2150        54672 :     pub(crate) fn sort_delta(
    2151        54672 :         (k1, l1, _): &(Key, Lsn, Value),
    2152        54672 :         (k2, l2, _): &(Key, Lsn, Value),
    2153        54672 :     ) -> std::cmp::Ordering {
    2154        54672 :         (k1, l1).cmp(&(k2, l2))
    2155        54672 :     }
    2156              : 
    2157          282 :     pub(crate) fn sort_delta_value(
    2158          282 :         (k1, l1, v1): &(Key, Lsn, Value),
    2159          282 :         (k2, l2, v2): &(Key, Lsn, Value),
    2160          282 :     ) -> std::cmp::Ordering {
    2161          282 :         let order_1 = if v1.is_image() { 0 } else { 1 };
    2162          282 :         let order_2 = if v2.is_image() { 0 } else { 1 };
    2163          282 :         (k1, l1, order_1).cmp(&(k2, l2, order_2))
    2164          282 :     }
    2165              : 
    2166           66 :     pub(crate) async fn produce_delta_layer(
    2167           66 :         tenant: &Tenant,
    2168           66 :         tline: &Arc<Timeline>,
    2169           66 :         mut deltas: Vec<(Key, Lsn, Value)>,
    2170           66 :         ctx: &RequestContext,
    2171           66 :     ) -> anyhow::Result<ResidentLayer> {
    2172           66 :         deltas.sort_by(sort_delta);
    2173           66 :         let (key_start, _, _) = deltas.first().unwrap();
    2174           66 :         let (key_max, _, _) = deltas.last().unwrap();
    2175        24720 :         let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
    2176        24720 :         let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
    2177           66 :         let lsn_end = Lsn(lsn_max.0 + 1);
    2178           66 :         let mut writer = DeltaLayerWriter::new(
    2179           66 :             tenant.conf,
    2180           66 :             tline.timeline_id,
    2181           66 :             tenant.tenant_shard_id,
    2182           66 :             *key_start,
    2183           66 :             (*lsn_min)..lsn_end,
    2184           66 :             ctx,
    2185           66 :         )
    2186           33 :         .await?;
    2187           66 :         let key_end = key_max.next();
    2188              : 
    2189        24786 :         for (key, lsn, value) in deltas {
    2190        24720 :             writer.put_value(key, lsn, value, ctx).await?;
    2191              :         }
    2192              : 
    2193          189 :         let (desc, path) = writer.finish(key_end, ctx).await?;
    2194           66 :         let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    2195              : 
    2196           66 :         Ok::<_, anyhow::Error>(delta_layer)
    2197           66 :     }
    2198              : 
    2199           84 :     async fn assert_delta_iter_equal(
    2200           84 :         delta_iter: &mut DeltaLayerIterator<'_>,
    2201           84 :         expect: &[(Key, Lsn, Value)],
    2202           84 :     ) {
    2203           84 :         let mut expect_iter = expect.iter();
    2204              :         loop {
    2205        84084 :             let o1 = delta_iter.next().await.unwrap();
    2206        84084 :             let o2 = expect_iter.next();
    2207        84084 :             assert_eq!(o1.is_some(), o2.is_some());
    2208        84084 :             if o1.is_none() && o2.is_none() {
    2209           84 :                 break;
    2210        84000 :             }
    2211        84000 :             let (k1, l1, v1) = o1.unwrap();
    2212        84000 :             let (k2, l2, v2) = o2.unwrap();
    2213        84000 :             assert_eq!(&k1, k2);
    2214        84000 :             assert_eq!(l1, *l2);
    2215        84000 :             assert_eq!(&v1, v2);
    2216              :         }
    2217           84 :     }
    2218              : 
    2219              :     #[tokio::test]
    2220            6 :     async fn delta_layer_iterator() {
    2221            6 :         let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
    2222           24 :         let (tenant, ctx) = harness.load().await;
    2223            6 : 
    2224            6 :         let tline = tenant
    2225            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2226           12 :             .await
    2227            6 :             .unwrap();
    2228            6 : 
    2229         6000 :         fn get_key(id: u32) -> Key {
    2230         6000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    2231         6000 :             key.field6 = id;
    2232         6000 :             key
    2233         6000 :         }
    2234            6 :         const N: usize = 1000;
    2235            6 :         let test_deltas = (0..N)
    2236         6000 :             .map(|idx| {
    2237         6000 :                 (
    2238         6000 :                     get_key(idx as u32 / 10),
    2239         6000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
    2240         6000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
    2241         6000 :                 )
    2242         6000 :             })
    2243            6 :             .collect_vec();
    2244            6 :         let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
    2245           24 :             .await
    2246            6 :             .unwrap();
    2247            6 :         let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
    2248           18 :         for max_read_size in [1, 1024] {
    2249           96 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    2250           84 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    2251           84 :                 // Test if the batch size is correctly determined
    2252           84 :                 let mut iter = delta_layer.iter(&ctx);
    2253           84 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    2254           84 :                 let mut num_items = 0;
    2255          336 :                 for _ in 0..3 {
    2256          252 :                     iter.next_batch().await.unwrap();
    2257          252 :                     num_items += iter.key_values_batch.len();
    2258          252 :                     if max_read_size == 1 {
    2259            6 :                         // every key should be a batch b/c the value is larger than max_read_size
    2260          126 :                         assert_eq!(iter.key_values_batch.len(), 1);
    2261            6 :                     } else {
    2262          126 :                         assert!(iter.key_values_batch.len() <= batch_size);
    2263            6 :                     }
    2264          252 :                     if num_items >= N {
    2265            6 :                         break;
    2266          252 :                     }
    2267          252 :                     iter.key_values_batch.clear();
    2268            6 :                 }
    2269            6 :                 // Test if the result is correct
    2270           84 :                 let mut iter = delta_layer.iter(&ctx);
    2271           84 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    2272        28803 :                 assert_delta_iter_equal(&mut iter, &test_deltas).await;
    2273            6 :             }
    2274            6 :         }
    2275            6 :     }
    2276              : }
        

Generated by: LCOV version 2.1-beta