LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 86.7 % 1483 1286
Test Date: 2025-07-16 12:29:03 Functions: 73.6 % 148 109

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "values", and
      24              : //! "index". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use std::collections::{HashMap, VecDeque};
      31              : use std::fs::File;
      32              : use std::ops::Range;
      33              : use std::os::unix::fs::FileExt;
      34              : use std::str::FromStr;
      35              : use std::sync::Arc;
      36              : use std::sync::atomic::AtomicU64;
      37              : 
      38              : use anyhow::{Context, Result, bail, ensure};
      39              : use camino::{Utf8Path, Utf8PathBuf};
      40              : use futures::StreamExt;
      41              : use itertools::Itertools;
      42              : use pageserver_api::config::MaxVectoredReadBytes;
      43              : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
      44              : use pageserver_api::keyspace::KeySpace;
      45              : use pageserver_api::models::ImageCompressionAlgorithm;
      46              : use pageserver_api::shard::TenantShardId;
      47              : use serde::{Deserialize, Serialize};
      48              : use tokio::sync::OnceCell;
      49              : use tokio_epoll_uring::IoBuf;
      50              : use tokio_util::sync::CancellationToken;
      51              : use tracing::*;
      52              : use utils::bin_ser::BeSer;
      53              : use utils::bin_ser::SerializeError;
      54              : use utils::id::{TenantId, TimelineId};
      55              : use utils::lsn::Lsn;
      56              : use wal_decoder::models::value::Value;
      57              : 
      58              : use super::errors::PutError;
      59              : use super::{
      60              :     AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
      61              :     ValuesReconstructState,
      62              : };
      63              : use crate::config::PageServerConf;
      64              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      65              : use crate::page_cache::{self, FileId, PAGE_SZ};
      66              : use crate::tenant::blob_io::BlobWriter;
      67              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      68              : use crate::tenant::disk_btree::{
      69              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      70              : };
      71              : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
      72              : use crate::tenant::timeline::GetVectoredError;
      73              : use crate::tenant::vectored_blob_io::{
      74              :     BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      75              :     VectoredReadPlanner,
      76              : };
      77              : use crate::virtual_file::TempVirtualFile;
      78              : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
      79              : use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
      80              : use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
      81              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      82              : 
      83              : ///
      84              : /// Header stored in the beginning of the file
      85              : ///
      86              : /// After this comes the 'values' part, starting on block 1. After that,
      87              : /// the 'index' starts at the block indicated by 'index_start_blk'
      88              : ///
      89            0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      90              : pub struct Summary {
      91              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      92              :     pub magic: u16,
      93              :     pub format_version: u16,
      94              : 
      95              :     pub tenant_id: TenantId,
      96              :     pub timeline_id: TimelineId,
      97              :     pub key_range: Range<Key>,
      98              :     pub lsn_range: Range<Lsn>,
      99              : 
     100              :     /// Block number where the 'index' part of the file begins.
     101              :     pub index_start_blk: u32,
     102              :     /// Block within the 'index', where the B-tree root page is stored
     103              :     pub index_root_blk: u32,
     104              : }
     105              : 
     106              : impl From<&DeltaLayer> for Summary {
     107            0 :     fn from(layer: &DeltaLayer) -> Self {
     108            0 :         Self::expected(
     109            0 :             layer.desc.tenant_shard_id.tenant_id,
     110            0 :             layer.desc.timeline_id,
     111            0 :             layer.desc.key_range.clone(),
     112            0 :             layer.desc.lsn_range.clone(),
     113              :         )
     114            0 :     }
     115              : }
     116              : 
     117              : impl Summary {
     118              :     /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
     119          736 :     pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
     120          736 :         let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
     121          736 :         Self::ser_into(self, &mut buf)?;
     122              :         // Pad zeroes to the buffer so the length is a multiple of the alignment.
     123          736 :         buf.extend_with(0, buf.capacity() - buf.len());
     124          736 :         Ok(buf.freeze())
     125          736 :     }
     126              : 
     127          560 :     pub(super) fn expected(
     128          560 :         tenant_id: TenantId,
     129          560 :         timeline_id: TimelineId,
     130          560 :         keys: Range<Key>,
     131          560 :         lsns: Range<Lsn>,
     132          560 :     ) -> Self {
     133          560 :         Self {
     134          560 :             magic: DELTA_FILE_MAGIC,
     135          560 :             format_version: STORAGE_FORMAT_VERSION,
     136          560 : 
     137          560 :             tenant_id,
     138          560 :             timeline_id,
     139          560 :             key_range: keys,
     140          560 :             lsn_range: lsns,
     141          560 : 
     142          560 :             index_start_blk: 0,
     143          560 :             index_root_blk: 0,
     144          560 :         }
     145          560 :     }
     146              : }
     147              : 
     148              : // Flag indicating that this version initialize the page
     149              : const WILL_INIT: u64 = 1;
     150              : 
     151              : /// Struct representing reference to BLOB in layers.
     152              : ///
     153              : /// Reference contains BLOB offset, and for WAL records it also contains
     154              : /// `will_init` flag. The flag helps to determine the range of records
     155              : /// that needs to be applied, without reading/deserializing records themselves.
     156            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     157              : pub struct BlobRef(pub u64);
     158              : 
     159              : impl BlobRef {
     160      2421170 :     pub fn will_init(&self) -> bool {
     161      2421170 :         (self.0 & WILL_INIT) != 0
     162      2421170 :     }
     163              : 
     164      3882368 :     pub fn pos(&self) -> u64 {
     165      3882368 :         self.0 >> 1
     166      3882368 :     }
     167              : 
     168      3246043 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     169      3246043 :         let mut blob_ref = pos << 1;
     170      3246043 :         if will_init {
     171      3235085 :             blob_ref |= WILL_INIT;
     172      3235085 :         }
     173      3246043 :         BlobRef(blob_ref)
     174      3246043 :     }
     175              : }
     176              : 
     177              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     178              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     179              : 
     180              : /// This is the key of the B-tree index stored in the delta layer. It consists
     181              : /// of the serialized representation of a Key and LSN.
     182              : impl DeltaKey {
     183      1032099 :     fn from_slice(buf: &[u8]) -> Self {
     184      1032099 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     185      1032099 :         bytes.copy_from_slice(buf);
     186      1032099 :         DeltaKey(bytes)
     187      1032099 :     }
     188              : 
     189      3373437 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     190      3373437 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     191      3373437 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     192      3373437 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     193      3373437 :         DeltaKey(bytes)
     194      3373437 :     }
     195              : 
     196      1032099 :     fn key(&self) -> Key {
     197      1032099 :         Key::from_slice(&self.0)
     198      1032099 :     }
     199              : 
     200      1032099 :     fn lsn(&self) -> Lsn {
     201      1032099 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     202      1032099 :     }
     203              : 
     204      2850239 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     205      2850239 :         let mut lsn_buf = [0u8; 8];
     206      2850239 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     207      2850239 :         Lsn(u64::from_be_bytes(lsn_buf))
     208      2850239 :     }
     209              : }
     210              : 
     211              : /// This is used only from `pagectl`. Within pageserver, all layers are
     212              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     213              : pub struct DeltaLayer {
     214              :     path: Utf8PathBuf,
     215              :     pub desc: PersistentLayerDesc,
     216              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     217              : }
     218              : 
     219              : impl std::fmt::Debug for DeltaLayer {
     220            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     221              :         use super::RangeDisplayDebug;
     222              : 
     223            0 :         f.debug_struct("DeltaLayer")
     224            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     225            0 :             .field("lsn_range", &self.desc.lsn_range)
     226            0 :             .field("file_size", &self.desc.file_size)
     227            0 :             .field("inner", &self.inner)
     228            0 :             .finish()
     229            0 :     }
     230              : }
     231              : 
     232              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     233              : /// file.
     234              : pub struct DeltaLayerInner {
     235              :     // values copied from summary
     236              :     index_start_blk: u32,
     237              :     index_root_blk: u32,
     238              : 
     239              :     file: Arc<VirtualFile>,
     240              :     file_id: FileId,
     241              : 
     242              :     layer_key_range: Range<Key>,
     243              :     layer_lsn_range: Range<Lsn>,
     244              : 
     245              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     246              : }
     247              : 
     248              : impl DeltaLayerInner {
     249            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     250            0 :         format!(
     251            0 :             "delta {}..{} {}..{}",
     252            0 :             self.key_range().start,
     253            0 :             self.key_range().end,
     254            0 :             self.lsn_range().start,
     255            0 :             self.lsn_range().end
     256              :         )
     257            0 :     }
     258              : }
     259              : 
     260              : impl std::fmt::Debug for DeltaLayerInner {
     261            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     262            0 :         f.debug_struct("DeltaLayerInner")
     263            0 :             .field("index_start_blk", &self.index_start_blk)
     264            0 :             .field("index_root_blk", &self.index_root_blk)
     265            0 :             .finish()
     266            0 :     }
     267              : }
     268              : 
     269              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     270              : impl std::fmt::Display for DeltaLayer {
     271            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     272            0 :         write!(f, "{}", self.layer_desc().short_id())
     273            0 :     }
     274              : }
     275              : 
     276              : impl AsLayerDesc for DeltaLayer {
     277            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     278            0 :         &self.desc
     279            0 :     }
     280              : }
     281              : 
     282              : impl DeltaLayer {
     283            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     284            0 :         self.desc.dump();
     285              : 
     286            0 :         if !verbose {
     287            0 :             return Ok(());
     288            0 :         }
     289              : 
     290            0 :         let inner = self.load(ctx).await?;
     291              : 
     292            0 :         inner.dump(ctx).await
     293            0 :     }
     294              : 
     295          749 :     fn temp_path_for(
     296          749 :         conf: &PageServerConf,
     297          749 :         tenant_shard_id: &TenantShardId,
     298          749 :         timeline_id: &TimelineId,
     299          749 :         key_start: Key,
     300          749 :         lsn_range: &Range<Lsn>,
     301          749 :     ) -> Utf8PathBuf {
     302              :         // TempVirtualFile requires us to never reuse a filename while an old
     303              :         // instance of TempVirtualFile created with that filename is not done dropping yet.
     304              :         // So, we use a monotonic counter to disambiguate the filenames.
     305              :         static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
     306          749 :         let filename_disambiguator =
     307          749 :             NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     308              : 
     309          749 :         conf.timeline_path(tenant_shard_id, timeline_id)
     310          749 :             .join(format!(
     311          749 :                 "{}-XXX__{:016X}-{:016X}.{:x}.{}",
     312          749 :                 key_start,
     313          749 :                 u64::from(lsn_range.start),
     314          749 :                 u64::from(lsn_range.end),
     315          749 :                 filename_disambiguator,
     316          749 :                 TEMP_FILE_SUFFIX,
     317          749 :             ))
     318          749 :     }
     319              : 
     320              :     ///
     321              :     /// Open the underlying file and read the metadata into memory, if it's
     322              :     /// not loaded already.
     323              :     ///
     324            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
     325              :         // Quick exit if already loaded
     326            0 :         self.inner
     327            0 :             .get_or_try_init(|| self.load_inner(ctx))
     328            0 :             .await
     329            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     330            0 :     }
     331              : 
     332            0 :     async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
     333            0 :         let path = self.path();
     334              : 
     335            0 :         let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
     336              : 
     337              :         // not production code
     338            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     339            0 :         let expected_layer_name = self.layer_desc().layer_name();
     340              : 
     341            0 :         if actual_layer_name != expected_layer_name {
     342            0 :             println!("warning: filename does not match what is expected from in-file summary");
     343            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     344            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     345            0 :         }
     346              : 
     347            0 :         Ok(Arc::new(loaded))
     348            0 :     }
     349              : 
     350              :     /// Create a DeltaLayer struct representing an existing file on disk.
     351              :     ///
     352              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     353            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     354            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     355            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     356            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     357              : 
     358            0 :         let metadata = file
     359            0 :             .metadata()
     360            0 :             .context("get file metadata to determine size")?;
     361              : 
     362              :         // This function is never used for constructing layers in a running pageserver,
     363              :         // so it does not need an accurate TenantShardId.
     364            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     365              : 
     366            0 :         Ok(DeltaLayer {
     367            0 :             path: path.to_path_buf(),
     368            0 :             desc: PersistentLayerDesc::new_delta(
     369            0 :                 tenant_shard_id,
     370            0 :                 summary.timeline_id,
     371            0 :                 summary.key_range,
     372            0 :                 summary.lsn_range,
     373            0 :                 metadata.len(),
     374            0 :             ),
     375            0 :             inner: OnceCell::new(),
     376            0 :         })
     377            0 :     }
     378              : 
     379              :     /// Path to the layer file in pageserver workdir.
     380            0 :     fn path(&self) -> Utf8PathBuf {
     381            0 :         self.path.clone()
     382            0 :     }
     383              : }
     384              : 
     385              : /// A builder object for constructing a new delta layer.
     386              : ///
     387              : /// Usage:
     388              : ///
     389              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     390              : ///
     391              : /// 2. Write the contents by calling `put_value` for every page
     392              : ///    version to store in the layer.
     393              : ///
     394              : /// 3. Call `finish`.
     395              : ///
     396              : struct DeltaLayerWriterInner {
     397              :     pub path: Utf8PathBuf,
     398              :     timeline_id: TimelineId,
     399              :     tenant_shard_id: TenantShardId,
     400              : 
     401              :     key_start: Key,
     402              :     lsn_range: Range<Lsn>,
     403              : 
     404              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     405              : 
     406              :     blob_writer: BlobWriter<TempVirtualFile>,
     407              : 
     408              :     // Number of key-lsns in the layer.
     409              :     num_keys: usize,
     410              : }
     411              : 
     412              : impl DeltaLayerWriterInner {
     413              :     ///
     414              :     /// Start building a new delta layer.
     415              :     ///
     416              :     #[allow(clippy::too_many_arguments)]
     417          749 :     async fn new(
     418          749 :         conf: &'static PageServerConf,
     419          749 :         timeline_id: TimelineId,
     420          749 :         tenant_shard_id: TenantShardId,
     421          749 :         key_start: Key,
     422          749 :         lsn_range: Range<Lsn>,
     423          749 :         gate: &utils::sync::gate::Gate,
     424          749 :         cancel: CancellationToken,
     425          749 :         ctx: &RequestContext,
     426          749 :     ) -> anyhow::Result<Self> {
     427              :         // Create the file initially with a temporary filename. We don't know
     428              :         // the end key yet, so we cannot form the final filename yet. We will
     429              :         // rename it when we're done.
     430          749 :         let path =
     431          749 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     432          749 :         let file = TempVirtualFile::new(
     433          749 :             VirtualFile::open_with_options_v2(
     434          749 :                 &path,
     435          749 :                 virtual_file::OpenOptions::new()
     436          749 :                     .create_new(true)
     437          749 :                     .write(true),
     438          749 :                 ctx,
     439          749 :             )
     440          749 :             .await?,
     441          749 :             gate.enter()?,
     442              :         );
     443              : 
     444              :         // Start at PAGE_SZ, make room for the header block
     445          749 :         let blob_writer = BlobWriter::new(
     446          749 :             file,
     447          749 :             PAGE_SZ as u64,
     448          749 :             gate,
     449          749 :             cancel,
     450          749 :             ctx,
     451          749 :             info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
     452            0 :         )?;
     453              : 
     454              :         // Initialize the b-tree index builder
     455          749 :         let block_buf = BlockBuf::new();
     456          749 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     457              : 
     458          749 :         Ok(Self {
     459          749 :             path,
     460          749 :             timeline_id,
     461          749 :             tenant_shard_id,
     462          749 :             key_start,
     463          749 :             lsn_range,
     464          749 :             tree: tree_builder,
     465          749 :             blob_writer,
     466          749 :             num_keys: 0,
     467          749 :         })
     468          749 :     }
     469              : 
     470              :     ///
     471              :     /// Append a key-value pair to the file.
     472              :     ///
     473              :     /// The values must be appended in key, lsn order.
     474              :     ///
     475      1052659 :     async fn put_value(
     476      1052659 :         &mut self,
     477      1052659 :         key: Key,
     478      1052659 :         lsn: Lsn,
     479      1052659 :         val: Value,
     480      1052659 :         ctx: &RequestContext,
     481      1052659 :     ) -> Result<(), PutError> {
     482      1052659 :         let (_, res) = self
     483      1052659 :             .put_value_bytes(
     484      1052659 :                 key,
     485      1052659 :                 lsn,
     486      1052659 :                 Value::ser(&val)
     487      1052659 :                     .map_err(anyhow::Error::new)
     488      1052659 :                     .map_err(PutError::Other)?
     489      1052659 :                     .slice_len(),
     490      1052659 :                 val.will_init(),
     491      1052659 :                 ctx,
     492              :             )
     493      1052659 :             .await;
     494      1052659 :         res
     495      1052659 :     }
     496              : 
     497      3245991 :     async fn put_value_bytes<Buf>(
     498      3245991 :         &mut self,
     499      3245991 :         key: Key,
     500      3245991 :         lsn: Lsn,
     501      3245991 :         val: FullSlice<Buf>,
     502      3245991 :         will_init: bool,
     503      3245991 :         ctx: &RequestContext,
     504      3245991 :     ) -> (FullSlice<Buf>, Result<(), PutError>)
     505      3245991 :     where
     506      3245991 :         Buf: IoBuf + Send,
     507      3245991 :     {
     508      3245991 :         assert!(
     509      3245991 :             self.lsn_range.start <= lsn,
     510            0 :             "lsn_start={}, lsn={}",
     511              :             self.lsn_range.start,
     512              :             lsn
     513              :         );
     514              :         // We don't want to use compression in delta layer creation
     515      3245991 :         let compression = ImageCompressionAlgorithm::Disabled;
     516      3245991 :         let (val, res) = self
     517      3245991 :             .blob_writer
     518      3245991 :             .write_blob_maybe_compressed(val, ctx, compression)
     519      3245991 :             .await;
     520      3245991 :         let res = res.map_err(PutError::WriteBlob);
     521      3245991 :         let off = match res {
     522      3245991 :             Ok((off, _)) => off,
     523            0 :             Err(e) => return (val, Err(e)),
     524              :         };
     525              : 
     526      3245991 :         let blob_ref = BlobRef::new(off, will_init);
     527              : 
     528      3245991 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     529      3245991 :         let res = self
     530      3245991 :             .tree
     531      3245991 :             .append(&delta_key.0, blob_ref.0)
     532      3245991 :             .map_err(anyhow::Error::new)
     533      3245991 :             .map_err(PutError::Other);
     534              : 
     535      3245991 :         self.num_keys += 1;
     536              : 
     537      3245991 :         (val, res)
     538      3245991 :     }
     539              : 
     540      1017680 :     fn size(&self) -> u64 {
     541      1017680 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     542      1017680 :     }
     543              : 
     544              :     ///
     545              :     /// Finish writing the delta layer.
     546              :     ///
     547          736 :     async fn finish(
     548          736 :         self,
     549          736 :         key_end: Key,
     550          736 :         ctx: &RequestContext,
     551          736 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     552          736 :         let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
     553              : 
     554          736 :         let file = self
     555          736 :             .blob_writer
     556          736 :             .shutdown(
     557          736 :                 BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
     558          736 :                 ctx,
     559          736 :             )
     560          736 :             .await?;
     561              : 
     562              :         // Write out the index
     563          736 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     564          736 :         let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
     565              : 
     566              :         // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
     567              :         // Should we just replace BlockBuf::blocks with one big buffer
     568         7600 :         for buf in block_buf.blocks {
     569         6864 :             let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
     570         6864 :             res?;
     571         6864 :             offset += PAGE_SZ as u64;
     572              :         }
     573          736 :         assert!(self.lsn_range.start < self.lsn_range.end);
     574              :         // Fill in the summary on blk 0
     575          736 :         let summary = Summary {
     576          736 :             magic: DELTA_FILE_MAGIC,
     577          736 :             format_version: STORAGE_FORMAT_VERSION,
     578          736 :             tenant_id: self.tenant_shard_id.tenant_id,
     579          736 :             timeline_id: self.timeline_id,
     580          736 :             key_range: self.key_start..key_end,
     581          736 :             lsn_range: self.lsn_range.clone(),
     582          736 :             index_start_blk,
     583          736 :             index_root_blk,
     584          736 :         };
     585              : 
     586              :         // Writes summary at the first block (offset 0).
     587          736 :         let buf = summary.ser_into_page()?;
     588          736 :         let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
     589          736 :         res?;
     590              : 
     591          736 :         let metadata = file
     592          736 :             .metadata()
     593          736 :             .await
     594          736 :             .context("get file metadata to determine size")?;
     595              : 
     596              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     597              :         // Make it a little bit below to account for differing GB units
     598              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     599          736 :         ensure!(
     600          736 :             metadata.len() <= S3_UPLOAD_LIMIT,
     601            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     602            0 :             file.path(),
     603            0 :             metadata.len()
     604              :         );
     605              : 
     606              :         // Note: Because we opened the file in write-only mode, we cannot
     607              :         // reuse the same VirtualFile for reading later. That's why we don't
     608              :         // set inner.file here. The first read will have to re-open it.
     609              : 
     610          736 :         let desc = PersistentLayerDesc::new_delta(
     611          736 :             self.tenant_shard_id,
     612          736 :             self.timeline_id,
     613          736 :             self.key_start..key_end,
     614          736 :             self.lsn_range.clone(),
     615          736 :             metadata.len(),
     616              :         );
     617              : 
     618              :         // fsync the file
     619          736 :         file.sync_all()
     620          736 :             .await
     621          736 :             .maybe_fatal_err("delta_layer sync_all")?;
     622              : 
     623          736 :         trace!("created delta layer {}", self.path);
     624              : 
     625              :         // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
     626              :         // keep the gate open also, so that it's safe for them to rename the file to its final destination.
     627          736 :         file.disarm_into_inner();
     628              : 
     629          736 :         Ok((desc, self.path))
     630          736 :     }
     631              : }
     632              : 
     633              : /// A builder object for constructing a new delta layer.
     634              : ///
     635              : /// Usage:
     636              : ///
     637              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     638              : ///
     639              : /// 2. Write the contents by calling `put_value` for every page
     640              : ///    version to store in the layer.
     641              : ///
     642              : /// 3. Call `finish`.
     643              : ///
     644              : /// # Note
     645              : ///
     646              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     647              : /// possible for the writer to drop before `finish` is actually called. So this
     648              : /// could lead to odd temporary files in the directory, exhausting file system.
     649              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     650              : /// implementation that cleans up the temporary file in failure. It's not
     651              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     652              : /// out some fields, making it impossible to implement `Drop`.
     653              : ///
     654              : #[must_use]
     655              : pub struct DeltaLayerWriter {
     656              :     inner: Option<DeltaLayerWriterInner>,
     657              : }
     658              : 
     659              : impl DeltaLayerWriter {
     660              :     ///
     661              :     /// Start building a new delta layer.
     662              :     ///
     663              :     #[allow(clippy::too_many_arguments)]
     664          749 :     pub async fn new(
     665          749 :         conf: &'static PageServerConf,
     666          749 :         timeline_id: TimelineId,
     667          749 :         tenant_shard_id: TenantShardId,
     668          749 :         key_start: Key,
     669          749 :         lsn_range: Range<Lsn>,
     670          749 :         gate: &utils::sync::gate::Gate,
     671          749 :         cancel: CancellationToken,
     672          749 :         ctx: &RequestContext,
     673          749 :     ) -> anyhow::Result<Self> {
     674              :         Ok(Self {
     675              :             inner: Some(
     676          749 :                 DeltaLayerWriterInner::new(
     677          749 :                     conf,
     678          749 :                     timeline_id,
     679          749 :                     tenant_shard_id,
     680          749 :                     key_start,
     681          749 :                     lsn_range,
     682          749 :                     gate,
     683          749 :                     cancel,
     684          749 :                     ctx,
     685          749 :                 )
     686          749 :                 .await?,
     687              :             ),
     688              :         })
     689          749 :     }
     690              : 
     691            0 :     pub fn is_empty(&self) -> bool {
     692            0 :         self.inner.as_ref().unwrap().num_keys == 0
     693            0 :     }
     694              : 
     695              :     ///
     696              :     /// Append a key-value pair to the file.
     697              :     ///
     698              :     /// The values must be appended in key, lsn order.
     699              :     ///
     700      1052659 :     pub async fn put_value(
     701      1052659 :         &mut self,
     702      1052659 :         key: Key,
     703      1052659 :         lsn: Lsn,
     704      1052659 :         val: Value,
     705      1052659 :         ctx: &RequestContext,
     706      1052659 :     ) -> Result<(), PutError> {
     707      1052659 :         self.inner
     708      1052659 :             .as_mut()
     709      1052659 :             .unwrap()
     710      1052659 :             .put_value(key, lsn, val, ctx)
     711      1052659 :             .await
     712      1052659 :     }
     713              : 
     714      2193332 :     pub async fn put_value_bytes<Buf>(
     715      2193332 :         &mut self,
     716      2193332 :         key: Key,
     717      2193332 :         lsn: Lsn,
     718      2193332 :         val: FullSlice<Buf>,
     719      2193332 :         will_init: bool,
     720      2193332 :         ctx: &RequestContext,
     721      2193332 :     ) -> (FullSlice<Buf>, Result<(), PutError>)
     722      2193332 :     where
     723      2193332 :         Buf: IoBuf + Send,
     724      2193332 :     {
     725      2193332 :         self.inner
     726      2193332 :             .as_mut()
     727      2193332 :             .unwrap()
     728      2193332 :             .put_value_bytes(key, lsn, val, will_init, ctx)
     729      2193332 :             .await
     730      2193332 :     }
     731              : 
     732      1017680 :     pub fn size(&self) -> u64 {
     733      1017680 :         self.inner.as_ref().unwrap().size()
     734      1017680 :     }
     735              : 
     736              :     ///
     737              :     /// Finish writing the delta layer.
     738              :     ///
     739          736 :     pub(crate) async fn finish(
     740          736 :         mut self,
     741          736 :         key_end: Key,
     742          736 :         ctx: &RequestContext,
     743          736 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     744          736 :         self.inner.take().unwrap().finish(key_end, ctx).await
     745          736 :     }
     746              : 
     747         6127 :     pub(crate) fn num_keys(&self) -> usize {
     748         6127 :         self.inner.as_ref().unwrap().num_keys
     749         6127 :     }
     750              : 
     751         7570 :     pub(crate) fn estimated_size(&self) -> u64 {
     752         7570 :         let inner = self.inner.as_ref().unwrap();
     753         7570 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
     754         7570 :     }
     755              : }
     756              : 
     757              : #[derive(thiserror::Error, Debug)]
     758              : pub enum RewriteSummaryError {
     759              :     #[error("magic mismatch")]
     760              :     MagicMismatch,
     761              :     #[error(transparent)]
     762              :     Other(#[from] anyhow::Error),
     763              : }
     764              : 
     765              : impl From<std::io::Error> for RewriteSummaryError {
     766            0 :     fn from(e: std::io::Error) -> Self {
     767            0 :         Self::Other(anyhow::anyhow!(e))
     768            0 :     }
     769              : }
     770              : 
     771              : impl DeltaLayer {
     772            0 :     pub async fn rewrite_summary<F>(
     773            0 :         path: &Utf8Path,
     774            0 :         rewrite: F,
     775            0 :         ctx: &RequestContext,
     776            0 :     ) -> Result<(), RewriteSummaryError>
     777            0 :     where
     778            0 :         F: Fn(Summary) -> Summary,
     779            0 :     {
     780            0 :         let file = VirtualFile::open_with_options_v2(
     781            0 :             path,
     782            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     783            0 :             ctx,
     784            0 :         )
     785            0 :         .await
     786            0 :         .with_context(|| format!("Failed to open file '{path}'"))?;
     787            0 :         let file_id = page_cache::next_file_id();
     788            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     789            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     790            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     791            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     792            0 :             return Err(RewriteSummaryError::MagicMismatch);
     793            0 :         }
     794              : 
     795            0 :         let new_summary = rewrite(actual_summary);
     796              : 
     797            0 :         let buf = new_summary.ser_into_page().context("serialize")?;
     798            0 :         let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
     799            0 :         res?;
     800            0 :         Ok(())
     801            0 :     }
     802              : }
     803              : 
     804              : impl DeltaLayerInner {
     805          520 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     806          520 :         &self.layer_key_range
     807          520 :     }
     808              : 
     809          520 :     pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
     810          520 :         &self.layer_lsn_range
     811          520 :     }
     812              : 
     813          560 :     pub(super) async fn load(
     814          560 :         path: &Utf8Path,
     815          560 :         summary: Option<Summary>,
     816          560 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     817          560 :         ctx: &RequestContext,
     818          560 :     ) -> anyhow::Result<Self> {
     819          560 :         let file = Arc::new(
     820          560 :             VirtualFile::open_v2(path, ctx)
     821          560 :                 .await
     822          560 :                 .context("open layer file")?,
     823              :         );
     824              : 
     825          560 :         let file_id = page_cache::next_file_id();
     826              : 
     827          560 :         let block_reader = FileBlockReader::new(&file, file_id);
     828              : 
     829          560 :         let summary_blk = block_reader
     830          560 :             .read_blk(0, ctx)
     831          560 :             .await
     832          560 :             .context("read first block")?;
     833              : 
     834              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     835          560 :         let actual_summary =
     836          560 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     837              : 
     838          560 :         if let Some(mut expected_summary) = summary {
     839              :             // production code path
     840          560 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     841          560 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     842              :             // mask out the timeline_id, but still require the layers to be from the same tenant
     843          560 :             expected_summary.timeline_id = actual_summary.timeline_id;
     844              : 
     845          560 :             if actual_summary != expected_summary {
     846            0 :                 bail!(
     847            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     848              :                     actual_summary,
     849              :                     expected_summary
     850              :                 );
     851          560 :             }
     852            0 :         }
     853              : 
     854          560 :         Ok(DeltaLayerInner {
     855          560 :             file,
     856          560 :             file_id,
     857          560 :             index_start_blk: actual_summary.index_start_blk,
     858          560 :             index_root_blk: actual_summary.index_root_blk,
     859          560 :             max_vectored_read_bytes,
     860          560 :             layer_key_range: actual_summary.key_range,
     861          560 :             layer_lsn_range: actual_summary.lsn_range,
     862          560 :         })
     863          560 :     }
     864              : 
     865              :     // Look up the keys in the provided keyspace and update
     866              :     // the reconstruct state with whatever is found.
     867              :     //
     868              :     // Currently, the index is visited for each range, but this
     869              :     // can be further optimised to visit the index only once.
     870       124723 :     pub(super) async fn get_values_reconstruct_data(
     871       124723 :         &self,
     872       124723 :         this: ResidentLayer,
     873       124723 :         keyspace: KeySpace,
     874       124723 :         lsn_range: Range<Lsn>,
     875       124723 :         reconstruct_state: &mut ValuesReconstructState,
     876       124723 :         ctx: &RequestContext,
     877       124723 :     ) -> Result<(), GetVectoredError> {
     878       124723 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     879       124723 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     880       124723 :             self.index_start_blk,
     881       124723 :             self.index_root_blk,
     882       124723 :             block_reader,
     883              :         );
     884              : 
     885       124723 :         let planner = VectoredReadPlanner::new(
     886       124723 :             self.max_vectored_read_bytes
     887       124723 :                 .expect("Layer is loaded with max vectored bytes config")
     888       124723 :                 .0
     889       124723 :                 .into(),
     890              :         );
     891              : 
     892       124723 :         let data_end_offset = self.index_start_offset();
     893              : 
     894       124723 :         let reads = Self::plan_reads(
     895       124723 :             &keyspace,
     896       124723 :             lsn_range.clone(),
     897       124723 :             data_end_offset,
     898       124723 :             index_reader,
     899       124723 :             planner,
     900       124723 :             ctx,
     901       124723 :         )
     902       124723 :         .await
     903       124723 :         .map_err(GetVectoredError::Other)?;
     904              : 
     905       124723 :         self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
     906       124723 :             .await;
     907              : 
     908       124723 :         Ok(())
     909       124723 :     }
     910              : 
     911       124824 :     async fn plan_reads<Reader>(
     912       124824 :         keyspace: &KeySpace,
     913       124824 :         lsn_range: Range<Lsn>,
     914       124824 :         data_end_offset: u64,
     915       124824 :         index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
     916       124824 :         mut planner: VectoredReadPlanner,
     917       124824 :         ctx: &RequestContext,
     918       124824 :     ) -> anyhow::Result<Vec<VectoredRead>>
     919       124824 :     where
     920       124824 :         Reader: BlockReader + Clone,
     921       124824 :     {
     922       124824 :         let ctx = RequestContextBuilder::from(ctx)
     923       124824 :             .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     924       124824 :             .attached_child();
     925              : 
     926       127429 :         for range in keyspace.ranges.iter() {
     927       127429 :             let mut range_end_handled = false;
     928              : 
     929       127429 :             let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
     930       127429 :             let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
     931       127429 :             let mut index_stream = std::pin::pin!(index_stream);
     932              : 
     933      1806369 :             while let Some(index_entry) = index_stream.next().await {
     934      1800698 :                 let (raw_key, value) = index_entry?;
     935      1800698 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     936      1800698 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
     937      1800698 :                 let blob_ref = BlobRef(value);
     938              : 
     939              :                 // Lsns are not monotonically increasing across keys, so we don't assert on them.
     940      1800698 :                 assert!(key >= range.start);
     941              : 
     942      1800698 :                 let outside_lsn_range = !lsn_range.contains(&lsn);
     943              : 
     944      1800698 :                 let flag = {
     945      1800698 :                     if outside_lsn_range {
     946       429069 :                         BlobFlag::Ignore
     947      1371629 :                     } else if blob_ref.will_init() {
     948       292986 :                         BlobFlag::ReplaceAll
     949              :                     } else {
     950              :                         // Usual path: add blob to the read
     951      1078643 :                         BlobFlag::None
     952              :                     }
     953              :                 };
     954              : 
     955      1800698 :                 if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
     956       121758 :                     planner.handle_range_end(blob_ref.pos());
     957       121758 :                     range_end_handled = true;
     958       121758 :                     break;
     959      1678940 :                 } else {
     960      1678940 :                     planner.handle(key, lsn, blob_ref.pos(), flag);
     961      1678940 :                 }
     962              :             }
     963              : 
     964       127429 :             if !range_end_handled {
     965         5671 :                 tracing::debug!("Handling range end fallback at {}", data_end_offset);
     966         5671 :                 planner.handle_range_end(data_end_offset);
     967       121758 :             }
     968              :         }
     969              : 
     970       124824 :         Ok(planner.finish())
     971       124824 :     }
     972              : 
     973       124823 :     fn get_min_read_buffer_size(
     974       124823 :         planned_reads: &[VectoredRead],
     975       124823 :         read_size_soft_max: usize,
     976       124823 :     ) -> usize {
     977       124823 :         let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
     978        45880 :             return read_size_soft_max;
     979              :         };
     980              : 
     981        78943 :         let largest_read_size = largest_read.size();
     982        78943 :         if largest_read_size > read_size_soft_max {
     983              :             // If the read is oversized, it should only contain one key.
     984          100 :             let offenders = largest_read
     985          100 :                 .blobs_at
     986          100 :                 .as_slice()
     987          100 :                 .iter()
     988          100 :                 .filter_map(|(_, blob_meta)| {
     989          100 :                     if blob_meta.key.is_rel_dir_key()
     990          100 :                         || blob_meta.key == DBDIR_KEY
     991          100 :                         || blob_meta.key.is_aux_file_key()
     992              :                     {
     993              :                         // The size of values for these keys is unbounded and can
     994              :                         // grow very large in pathological cases.
     995            0 :                         None
     996              :                     } else {
     997          100 :                         Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
     998              :                     }
     999          100 :                 })
    1000          100 :                 .join(", ");
    1001              : 
    1002          100 :             if !offenders.is_empty() {
    1003          100 :                 tracing::warn!(
    1004            0 :                     "Oversized vectored read ({} > {}) for keys {}",
    1005              :                     largest_read_size,
    1006              :                     read_size_soft_max,
    1007              :                     offenders
    1008              :                 );
    1009            0 :             }
    1010        78843 :         }
    1011              : 
    1012        78943 :         largest_read_size
    1013       124823 :     }
    1014              : 
    1015       124723 :     async fn do_reads_and_update_state(
    1016       124723 :         &self,
    1017       124723 :         this: ResidentLayer,
    1018       124723 :         reads: Vec<VectoredRead>,
    1019       124723 :         reconstruct_state: &mut ValuesReconstructState,
    1020       124723 :         ctx: &RequestContext,
    1021       124723 :     ) {
    1022       124723 :         let max_vectored_read_bytes = self
    1023       124723 :             .max_vectored_read_bytes
    1024       124723 :             .expect("Layer is loaded with max vectored bytes config")
    1025       124723 :             .0
    1026       124723 :             .into();
    1027       124723 :         let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
    1028              : 
    1029              :         // Note that reads are processed in reverse order (from highest key+lsn).
    1030              :         // This is the order that `ReconstructState` requires such that it can
    1031              :         // track when a key is done.
    1032       124723 :         for read in reads.into_iter().rev() {
    1033        88817 :             let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
    1034       797352 :             for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
    1035       797352 :                 let io = reconstruct_state.update_key(
    1036       797352 :                     &blob_meta.key,
    1037       797352 :                     blob_meta.lsn,
    1038       797352 :                     blob_meta.will_init,
    1039       797352 :                 );
    1040       797352 :                 ios.insert((blob_meta.key, blob_meta.lsn), io);
    1041       797352 :             }
    1042              : 
    1043        88817 :             let read_extend_residency = this.clone();
    1044        88817 :             let read_from = self.file.clone();
    1045        88817 :             let read_ctx = ctx.attached_child();
    1046        88817 :             reconstruct_state
    1047        88817 :                 .spawn_io(async move {
    1048        88817 :                     let vectored_blob_reader = VectoredBlobReader::new(&read_from);
    1049        88817 :                     let buf = IoBufferMut::with_capacity(buf_size);
    1050              : 
    1051        88817 :                     let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
    1052        88817 :                     match res {
    1053        88817 :                         Ok(blobs_buf) => {
    1054        88817 :                             let view = BufView::new_slice(&blobs_buf.buf);
    1055       797352 :                             for meta in blobs_buf.blobs.iter().rev() {
    1056       797352 :                                 let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
    1057              : 
    1058       797352 :                                 let blob_read = meta.read(&view).await;
    1059       797352 :                                 let blob_read = match blob_read {
    1060       797352 :                                     Ok(buf) => buf,
    1061            0 :                                     Err(e) => {
    1062            0 :                                         io.complete(Err(e));
    1063            0 :                                         continue;
    1064              :                                     }
    1065              :                                 };
    1066              : 
    1067       797352 :                                 io.complete(Ok(OnDiskValue::WalRecordOrImage(
    1068       797352 :                                     blob_read.into_bytes(),
    1069       797352 :                                 )));
    1070              :                             }
    1071              : 
    1072        88817 :                             assert!(ios.is_empty());
    1073              :                         }
    1074            0 :                         Err(err) => {
    1075            0 :                             for (_, sender) in ios {
    1076            0 :                                 sender.complete(Err(std::io::Error::new(
    1077            0 :                                     err.kind(),
    1078            0 :                                     "vec read failed",
    1079            0 :                                 )));
    1080            0 :                             }
    1081              :                         }
    1082              :                     }
    1083              : 
    1084              :                     // keep layer resident until this IO is done; this spawned IO future generally outlives the
    1085              :                     // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
    1086        88817 :                     drop(read_extend_residency);
    1087        88817 :                 })
    1088        88817 :                 .await;
    1089              :         }
    1090       124723 :     }
    1091              : 
    1092          203 :     pub(crate) async fn index_entries<'a>(
    1093          203 :         &'a self,
    1094          203 :         ctx: &RequestContext,
    1095          203 :     ) -> Result<Vec<DeltaEntry<'a>>> {
    1096          203 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1097          203 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1098          203 :             self.index_start_blk,
    1099          203 :             self.index_root_blk,
    1100          203 :             block_reader,
    1101              :         );
    1102              : 
    1103          203 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
    1104              : 
    1105          203 :         tree_reader
    1106          203 :             .visit(
    1107          203 :                 &[0u8; DELTA_KEY_SIZE],
    1108          203 :                 VisitDirection::Forwards,
    1109      1032023 :                 |key, value| {
    1110      1032023 :                     let delta_key = DeltaKey::from_slice(key);
    1111      1032023 :                     let val_ref = ValueRef {
    1112      1032023 :                         blob_ref: BlobRef(value),
    1113      1032023 :                         layer: self,
    1114      1032023 :                     };
    1115      1032023 :                     let pos = BlobRef(value).pos();
    1116      1032023 :                     if let Some(last) = all_keys.last_mut() {
    1117      1031820 :                         // subtract offset of the current and last entries to get the size
    1118      1031820 :                         // of the value associated with this (key, lsn) tuple
    1119      1031820 :                         let first_pos = last.size;
    1120      1031820 :                         last.size = pos - first_pos;
    1121      1031820 :                     }
    1122      1032023 :                     let entry = DeltaEntry {
    1123      1032023 :                         key: delta_key.key(),
    1124      1032023 :                         lsn: delta_key.lsn(),
    1125      1032023 :                         size: pos,
    1126      1032023 :                         val: val_ref,
    1127      1032023 :                     };
    1128      1032023 :                     all_keys.push(entry);
    1129      1032023 :                     true
    1130      1032023 :                 },
    1131          203 :                 &RequestContextBuilder::from(ctx)
    1132          203 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1133          203 :                     .attached_child(),
    1134              :             )
    1135          203 :             .await?;
    1136          203 :         if let Some(last) = all_keys.last_mut() {
    1137          203 :             // Last key occupies all space till end of value storage,
    1138          203 :             // which corresponds to beginning of the index
    1139          203 :             last.size = self.index_start_offset() - last.size;
    1140          203 :         }
    1141          203 :         Ok(all_keys)
    1142          203 :     }
    1143              : 
    1144              :     /// Using the given writer, write out a version which has the earlier Lsns than `until`.
    1145              :     ///
    1146              :     /// Return the amount of key value records pushed to the writer.
    1147            5 :     pub(super) async fn copy_prefix(
    1148            5 :         &self,
    1149            5 :         writer: &mut DeltaLayerWriter,
    1150            5 :         until: Lsn,
    1151            5 :         ctx: &RequestContext,
    1152            5 :     ) -> anyhow::Result<usize> {
    1153              :         use futures::stream::TryStreamExt;
    1154              : 
    1155              :         use crate::tenant::vectored_blob_io::{
    1156              :             BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
    1157              :         };
    1158              : 
    1159              :         #[derive(Debug)]
    1160              :         enum Item {
    1161              :             Actual(Key, Lsn, BlobRef),
    1162              :             Sentinel,
    1163              :         }
    1164              : 
    1165              :         impl From<Item> for Option<(Key, Lsn, BlobRef)> {
    1166           35 :             fn from(value: Item) -> Self {
    1167           35 :                 match value {
    1168           30 :                     Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
    1169            5 :                     Item::Sentinel => None,
    1170              :                 }
    1171           35 :             }
    1172              :         }
    1173              : 
    1174              :         impl Item {
    1175           35 :             fn offset(&self) -> Option<BlobRef> {
    1176           35 :                 match self {
    1177           30 :                     Item::Actual(_, _, blob) => Some(*blob),
    1178            5 :                     Item::Sentinel => None,
    1179              :                 }
    1180           35 :             }
    1181              : 
    1182           35 :             fn is_last(&self) -> bool {
    1183           35 :                 matches!(self, Item::Sentinel)
    1184           35 :             }
    1185              :         }
    1186              : 
    1187            5 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1188            5 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1189            5 :             self.index_start_blk,
    1190            5 :             self.index_root_blk,
    1191            5 :             block_reader,
    1192              :         );
    1193              : 
    1194            5 :         let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
    1195           30 :         let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
    1196              :         // put in a sentinel value for getting the end offset for last item, and not having to
    1197              :         // repeat the whole read part
    1198            5 :         let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
    1199            5 :             Item::Sentinel,
    1200            5 :         ))));
    1201            5 :         let mut stream = std::pin::pin!(stream);
    1202              : 
    1203            5 :         let mut prev: Option<(Key, Lsn, BlobRef)> = None;
    1204              : 
    1205            5 :         let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
    1206              : 
    1207            5 :         let max_read_size = self
    1208            5 :             .max_vectored_read_bytes
    1209            5 :             .map(|x| x.0.get())
    1210            5 :             .unwrap_or(8192);
    1211              : 
    1212            5 :         let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
    1213              : 
    1214              :         // FIXME: buffering of DeltaLayerWriter
    1215            5 :         let mut per_blob_copy = Vec::new();
    1216              : 
    1217            5 :         let mut records = 0;
    1218              : 
    1219           40 :         while let Some(item) = stream.try_next().await? {
    1220           35 :             tracing::debug!(?item, "popped");
    1221           35 :             let offset = item
    1222           35 :                 .offset()
    1223           35 :                 .unwrap_or(BlobRef::new(self.index_start_offset(), false));
    1224              : 
    1225           35 :             let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
    1226           30 :                 let end_offset = offset;
    1227              : 
    1228           30 :                 Some((
    1229           30 :                     BlobMeta {
    1230           30 :                         key,
    1231           30 :                         lsn,
    1232           30 :                         will_init: false,
    1233           30 :                     },
    1234           30 :                     start_offset..end_offset,
    1235           30 :                 ))
    1236              :             } else {
    1237            5 :                 None
    1238              :             };
    1239              : 
    1240           35 :             let is_last = item.is_last();
    1241              : 
    1242           35 :             prev = Option::from(item);
    1243              : 
    1244           35 :             let actionable = actionable.filter(|x| x.0.lsn < until);
    1245              : 
    1246           35 :             let builder = if let Some((meta, offsets)) = actionable {
    1247              :                 // extend or create a new builder
    1248           16 :                 if read_builder
    1249           16 :                     .as_mut()
    1250           16 :                     .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
    1251           16 :                     .unwrap_or(VectoredReadExtended::No)
    1252           16 :                     == VectoredReadExtended::Yes
    1253              :                 {
    1254            8 :                     None
    1255              :                 } else {
    1256            8 :                     read_builder.replace(ChunkedVectoredReadBuilder::new(
    1257            8 :                         offsets.start.pos(),
    1258            8 :                         offsets.end.pos(),
    1259            8 :                         meta,
    1260            8 :                         max_read_size,
    1261              :                     ))
    1262              :                 }
    1263              :             } else {
    1264              :                 // nothing to do, except perhaps flush any existing for the last element
    1265           19 :                 None
    1266              :             };
    1267              : 
    1268              :             // flush the possible older builder and also the new one if the item was the last one
    1269           35 :             let builders = builder.into_iter();
    1270           35 :             let builders = if is_last {
    1271            5 :                 builders.chain(read_builder.take())
    1272              :             } else {
    1273           30 :                 builders.chain(None)
    1274              :             };
    1275              : 
    1276           43 :             for builder in builders {
    1277            8 :                 let read = builder.build();
    1278              : 
    1279            8 :                 let reader = VectoredBlobReader::new(&self.file);
    1280              : 
    1281            8 :                 let mut buf = buffer.take().unwrap();
    1282              : 
    1283            8 :                 buf.clear();
    1284            8 :                 buf.reserve(read.size());
    1285            8 :                 let res = reader.read_blobs(&read, buf, ctx).await?;
    1286              : 
    1287            8 :                 let view = BufView::new_slice(&res.buf);
    1288              : 
    1289           24 :                 for blob in res.blobs {
    1290           16 :                     let key = blob.meta.key;
    1291           16 :                     let lsn = blob.meta.lsn;
    1292              : 
    1293           16 :                     let data = blob.read(&view).await?;
    1294              : 
    1295              :                     #[cfg(debug_assertions)]
    1296           16 :                     Value::des(&data)
    1297           16 :                         .with_context(|| {
    1298            0 :                             format!(
    1299            0 :                                 "blob failed to deserialize for {}: {:?}",
    1300              :                                 blob,
    1301            0 :                                 utils::Hex(&data)
    1302              :                             )
    1303            0 :                         })
    1304           16 :                         .unwrap();
    1305              : 
    1306              :                     // is it an image or will_init walrecord?
    1307              :                     // FIXME: this could be handled by threading the BlobRef to the
    1308              :                     // VectoredReadBuilder
    1309           16 :                     let will_init = wal_decoder::models::value::ValueBytes::will_init(&data)
    1310           16 :                         .inspect_err(|_e| {
    1311              :                             #[cfg(feature = "testing")]
    1312            0 :                             tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
    1313            0 :                         })
    1314           16 :                         .unwrap_or(false);
    1315              : 
    1316           16 :                     per_blob_copy.clear();
    1317           16 :                     per_blob_copy.extend_from_slice(&data);
    1318              : 
    1319           16 :                     let (tmp, res) = writer
    1320           16 :                         .put_value_bytes(
    1321           16 :                             key,
    1322           16 :                             lsn,
    1323           16 :                             std::mem::take(&mut per_blob_copy).slice_len(),
    1324           16 :                             will_init,
    1325           16 :                             ctx,
    1326              :                         )
    1327           16 :                         .await;
    1328           16 :                     per_blob_copy = tmp.into_raw_slice().into_inner();
    1329              : 
    1330           16 :                     res?;
    1331              : 
    1332           16 :                     records += 1;
    1333              :                 }
    1334              : 
    1335            8 :                 buffer = Some(res.buf);
    1336              :             }
    1337              :         }
    1338              : 
    1339            5 :         assert!(
    1340            5 :             read_builder.is_none(),
    1341            0 :             "with the sentinel above loop should had handled all"
    1342              :         );
    1343              : 
    1344            5 :         Ok(records)
    1345            5 :     }
    1346              : 
    1347            2 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1348            2 :         println!(
    1349            2 :             "index_start_blk: {}, root {}",
    1350              :             self.index_start_blk, self.index_root_blk
    1351              :         );
    1352              : 
    1353            2 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1354            2 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1355            2 :             self.index_start_blk,
    1356            2 :             self.index_root_blk,
    1357            2 :             block_reader,
    1358              :         );
    1359              : 
    1360            2 :         tree_reader.dump(ctx).await?;
    1361              : 
    1362            2 :         let keys = self.index_entries(ctx).await?;
    1363              : 
    1364            4 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1365            4 :             let buf = val.load_raw(ctx).await?;
    1366            4 :             let val = Value::des(&buf)?;
    1367            4 :             let desc = match val {
    1368            4 :                 Value::Image(img) => {
    1369            4 :                     format!(" img {} bytes", img.len())
    1370              :                 }
    1371            0 :                 Value::WalRecord(rec) => {
    1372            0 :                     let wal_desc = wal_decoder::models::record::describe_wal_record(&rec)?;
    1373            0 :                     format!(
    1374            0 :                         " rec {} bytes will_init: {} {}",
    1375            0 :                         buf.len(),
    1376            0 :                         rec.will_init(),
    1377              :                         wal_desc
    1378              :                     )
    1379              :                 }
    1380              :             };
    1381            4 :             Ok(desc)
    1382            4 :         }
    1383              : 
    1384            6 :         for entry in keys {
    1385            4 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1386            4 :             let desc = match dump_blob(&val, ctx).await {
    1387            4 :                 Ok(desc) => desc,
    1388            0 :                 Err(err) => {
    1389            0 :                     format!("ERROR: {err}")
    1390              :                 }
    1391              :             };
    1392            4 :             println!("  key {key} at {lsn}: {desc}");
    1393              : 
    1394              :             // Print more details about CHECKPOINT records. Would be nice to print details
    1395              :             // of many other record types too, but these are particularly interesting, as
    1396              :             // have a lot of special processing for them in walingest.rs.
    1397              :             use pageserver_api::key::CHECKPOINT_KEY;
    1398              :             use postgres_ffi::CheckPoint;
    1399            4 :             if key == CHECKPOINT_KEY {
    1400            0 :                 let val = val.load(ctx).await?;
    1401            0 :                 match val {
    1402            0 :                     Value::Image(img) => {
    1403            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1404            0 :                         println!("   CHECKPOINT: {checkpoint:?}");
    1405              :                     }
    1406            0 :                     Value::WalRecord(_rec) => {
    1407            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1408            0 :                     }
    1409              :                 }
    1410            4 :             }
    1411              :         }
    1412              : 
    1413            2 :         Ok(())
    1414            2 :     }
    1415              : 
    1416           15 :     fn stream_index_forwards<'a, R>(
    1417           15 :         &'a self,
    1418           15 :         reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
    1419           15 :         start: &'a [u8; DELTA_KEY_SIZE],
    1420           15 :         ctx: &'a RequestContext,
    1421           15 :     ) -> impl futures::stream::Stream<
    1422           15 :         Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
    1423           15 :     > + 'a
    1424           15 :     where
    1425           15 :         R: BlockReader + 'a,
    1426              :     {
    1427              :         use futures::stream::TryStreamExt;
    1428           15 :         let stream = reader.into_stream(start, ctx);
    1429           76 :         stream.map_ok(|(key, value)| {
    1430           76 :             let key = DeltaKey::from_slice(&key);
    1431           76 :             let (key, lsn) = (key.key(), key.lsn());
    1432           76 :             let offset = BlobRef(value);
    1433              : 
    1434           76 :             (key, lsn, offset)
    1435           76 :         })
    1436           15 :     }
    1437              : 
    1438              :     /// The file offset to the first block of index.
    1439              :     ///
    1440              :     /// The file structure is summary, values, and index. We often need this for the size of last blob.
    1441       125235 :     fn index_start_offset(&self) -> u64 {
    1442       125235 :         let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
    1443       125235 :         let bref = BlobRef(offset);
    1444       125235 :         tracing::debug!(
    1445              :             index_start_blk = self.index_start_blk,
    1446              :             offset,
    1447            0 :             pos = bref.pos(),
    1448            0 :             "index_start_offset"
    1449              :         );
    1450       125235 :         offset
    1451       125235 :     }
    1452              : 
    1453          288 :     pub fn iter_with_options<'a>(
    1454          288 :         &'a self,
    1455          288 :         ctx: &'a RequestContext,
    1456          288 :         max_read_size: u64,
    1457          288 :         max_batch_size: usize,
    1458          288 :     ) -> DeltaLayerIterator<'a> {
    1459          288 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1460          288 :         let tree_reader =
    1461          288 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
    1462          288 :         DeltaLayerIterator {
    1463          288 :             delta_layer: self,
    1464          288 :             ctx,
    1465          288 :             index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
    1466          288 :             key_values_batch: std::collections::VecDeque::new(),
    1467          288 :             is_end: false,
    1468          288 :             planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
    1469          288 :         }
    1470          288 :     }
    1471              : 
    1472              :     /// NB: not super efficient, but not terrible either. Should prob be an iterator.
    1473              :     //
    1474              :     // We're reusing the index traversal logical in plan_reads; would be nice to
    1475              :     // factor that out.
    1476            0 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
    1477            0 :         self.index_entries(ctx)
    1478            0 :             .await
    1479            0 :             .map(|entries| entries.into_iter().map(|entry| entry.key).collect())
    1480            0 :     }
    1481              : }
    1482              : 
    1483              : /// A set of data associated with a delta layer key and its value
    1484              : pub struct DeltaEntry<'a> {
    1485              :     pub key: Key,
    1486              :     pub lsn: Lsn,
    1487              :     /// Size of the stored value
    1488              :     pub size: u64,
    1489              :     /// Reference to the on-disk value
    1490              :     pub val: ValueRef<'a>,
    1491              : }
    1492              : 
    1493              : /// Reference to an on-disk value
    1494              : pub struct ValueRef<'a> {
    1495              :     blob_ref: BlobRef,
    1496              :     layer: &'a DeltaLayerInner,
    1497              : }
    1498              : 
    1499              : impl ValueRef<'_> {
    1500              :     /// Loads the value from disk
    1501            0 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1502            0 :         let buf = self.load_raw(ctx).await?;
    1503            0 :         let val = Value::des(&buf)?;
    1504            0 :         Ok(val)
    1505            0 :     }
    1506              : 
    1507            4 :     async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
    1508            4 :         let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
    1509            4 :             self.layer,
    1510            4 :         )));
    1511            4 :         let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1512            4 :         Ok(buf)
    1513            4 :     }
    1514              : }
    1515              : 
    1516              : pub(crate) struct Adapter<T>(T);
    1517              : 
    1518              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1519            4 :     pub(crate) async fn read_blk(
    1520            4 :         &self,
    1521            4 :         blknum: u32,
    1522            4 :         ctx: &RequestContext,
    1523            4 :     ) -> Result<BlockLease, std::io::Error> {
    1524            4 :         let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
    1525            4 :         block_reader.read_blk(blknum, ctx).await
    1526            4 :     }
    1527              : }
    1528              : 
    1529              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1530            8 :     fn as_ref(&self) -> &DeltaLayerInner {
    1531            8 :         self
    1532            8 :     }
    1533              : }
    1534              : 
    1535              : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
    1536            0 :     fn key(&self) -> Key {
    1537            0 :         self.key
    1538            0 :     }
    1539            0 :     fn lsn(&self) -> Lsn {
    1540            0 :         self.lsn
    1541            0 :     }
    1542            0 :     fn size(&self) -> u64 {
    1543            0 :         self.size
    1544            0 :     }
    1545              : }
    1546              : 
    1547              : pub struct DeltaLayerIterator<'a> {
    1548              :     delta_layer: &'a DeltaLayerInner,
    1549              :     ctx: &'a RequestContext,
    1550              :     planner: StreamingVectoredReadPlanner,
    1551              :     index_iter: DiskBtreeIterator<'a>,
    1552              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1553              :     is_end: bool,
    1554              : }
    1555              : 
    1556              : impl DeltaLayerIterator<'_> {
    1557            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1558            0 :         self.delta_layer.layer_dbg_info()
    1559            0 :     }
    1560              : 
    1561              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1562        10703 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1563        10703 :         assert!(self.key_values_batch.is_empty());
    1564        10703 :         assert!(!self.is_end);
    1565              : 
    1566        10703 :         let plan = loop {
    1567      1049815 :             if let Some(res) = self.index_iter.next().await {
    1568      1049541 :                 let (raw_key, value) = res?;
    1569      1049541 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
    1570      1049541 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
    1571      1049541 :                 let blob_ref = BlobRef(value);
    1572      1049541 :                 let offset = blob_ref.pos();
    1573        10429 :                 if let Some(batch_plan) =
    1574      1049541 :                     self.planner.handle(key, lsn, offset, blob_ref.will_init())
    1575              :                 {
    1576        10429 :                     break batch_plan;
    1577      1039112 :                 }
    1578              :             } else {
    1579          274 :                 self.is_end = true;
    1580          274 :                 let data_end_offset = self.delta_layer.index_start_offset();
    1581          274 :                 if let Some(item) = self.planner.handle_range_end(data_end_offset) {
    1582          274 :                     break item;
    1583              :                 } else {
    1584            0 :                     return Ok(()); // TODO: test empty iterator
    1585              :                 }
    1586              :             }
    1587              :         };
    1588        10703 :         let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
    1589        10703 :         let mut next_batch = std::collections::VecDeque::new();
    1590        10703 :         let buf_size = plan.size();
    1591        10703 :         let buf = IoBufferMut::with_capacity(buf_size);
    1592        10703 :         let blobs_buf = vectored_blob_reader
    1593        10703 :             .read_blobs(&plan, buf, self.ctx)
    1594        10703 :             .await?;
    1595        10703 :         let view = BufView::new_slice(&blobs_buf.buf);
    1596      1049527 :         for meta in blobs_buf.blobs.iter() {
    1597      1049527 :             let blob_read = meta.read(&view).await?;
    1598      1049527 :             let value = Value::des(&blob_read)?;
    1599              : 
    1600      1049527 :             next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
    1601              :         }
    1602        10703 :         self.key_values_batch = next_batch;
    1603        10703 :         Ok(())
    1604        10703 :     }
    1605              : 
    1606      1049921 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1607      1049921 :         if self.key_values_batch.is_empty() {
    1608        11192 :             if self.is_end {
    1609          531 :                 return Ok(None);
    1610        10661 :             }
    1611        10661 :             self.next_batch().await?;
    1612      1038729 :         }
    1613      1049390 :         Ok(Some(
    1614      1049390 :             self.key_values_batch
    1615      1049390 :                 .pop_front()
    1616      1049390 :                 .expect("should not be empty"),
    1617      1049390 :         ))
    1618      1049921 :     }
    1619              : }
    1620              : 
    1621              : #[cfg(test)]
    1622              : pub(crate) mod test {
    1623              :     use std::collections::BTreeMap;
    1624              : 
    1625              :     use super::*;
    1626              :     use crate::DEFAULT_PG_VERSION;
    1627              :     use crate::context::DownloadBehavior;
    1628              :     use crate::task_mgr::TaskKind;
    1629              :     use crate::tenant::disk_btree::tests::TestDisk;
    1630              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
    1631              :     use crate::tenant::storage_layer::{Layer, ResidentLayer};
    1632              :     use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
    1633              :     use crate::tenant::{TenantShard, Timeline};
    1634              :     use bytes::Bytes;
    1635              :     use itertools::MinMaxResult;
    1636              :     use postgres_ffi::PgMajorVersion;
    1637              :     use rand::prelude::{SeedableRng, SliceRandom, StdRng};
    1638              :     use rand::{Rng, RngCore};
    1639              : 
    1640              :     /// Construct an index for a fictional delta layer and and then
    1641              :     /// traverse in order to plan vectored reads for a query. Finally,
    1642              :     /// verify that the traversal fed the right index key and value
    1643              :     /// pairs into the planner.
    1644              :     #[tokio::test]
    1645            1 :     async fn test_delta_layer_index_traversal() {
    1646            1 :         let base_key = Key {
    1647            1 :             field1: 0,
    1648            1 :             field2: 1663,
    1649            1 :             field3: 12972,
    1650            1 :             field4: 16396,
    1651            1 :             field5: 0,
    1652            1 :             field6: 246080,
    1653            1 :         };
    1654              : 
    1655              :         // Populate the index with some entries
    1656            1 :         let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
    1657            1 :             (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
    1658            1 :             (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1659            1 :             (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1660            1 :             (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
    1661            1 :         ]);
    1662              : 
    1663            1 :         let mut disk = TestDisk::default();
    1664            1 :         let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
    1665              : 
    1666            1 :         let mut disk_offset = 0;
    1667            5 :         for (key, lsns) in &entries {
    1668           21 :             for lsn in lsns {
    1669           17 :                 let index_key = DeltaKey::from_key_lsn(key, *lsn);
    1670           17 :                 let blob_ref = BlobRef::new(disk_offset, false);
    1671           17 :                 writer
    1672           17 :                     .append(&index_key.0, blob_ref.0)
    1673           17 :                     .expect("In memory disk append should never fail");
    1674           17 : 
    1675           17 :                 disk_offset += 1;
    1676           17 :             }
    1677              :         }
    1678              : 
    1679              :         // Prepare all the arguments for the call into `plan_reads` below
    1680            1 :         let (root_offset, _writer) = writer
    1681            1 :             .finish()
    1682            1 :             .expect("In memory disk finish should never fail");
    1683            1 :         let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
    1684            1 :         let planner = VectoredReadPlanner::new(100);
    1685            1 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1686              : 
    1687            1 :         let keyspace = KeySpace {
    1688            1 :             ranges: vec![
    1689            1 :                 base_key..base_key.add(3),
    1690            1 :                 base_key.add(3)..base_key.add(100),
    1691            1 :             ],
    1692            1 :         };
    1693            1 :         let lsn_range = Lsn(2)..Lsn(40);
    1694              : 
    1695              :         // Plan and validate
    1696            1 :         let vectored_reads = DeltaLayerInner::plan_reads(
    1697            1 :             &keyspace,
    1698            1 :             lsn_range.clone(),
    1699            1 :             disk_offset,
    1700            1 :             reader,
    1701            1 :             planner,
    1702            1 :             &ctx,
    1703            1 :         )
    1704            1 :         .await
    1705            1 :         .expect("Read planning should not fail");
    1706              : 
    1707            1 :         validate(keyspace, lsn_range, vectored_reads, entries);
    1708            1 :     }
    1709              : 
    1710            1 :     fn validate(
    1711            1 :         keyspace: KeySpace,
    1712            1 :         lsn_range: Range<Lsn>,
    1713            1 :         vectored_reads: Vec<VectoredRead>,
    1714            1 :         index_entries: BTreeMap<Key, Vec<Lsn>>,
    1715            1 :     ) {
    1716              :         #[derive(Debug, PartialEq, Eq)]
    1717              :         struct BlobSpec {
    1718              :             key: Key,
    1719              :             lsn: Lsn,
    1720              :             at: u64,
    1721              :         }
    1722              : 
    1723            1 :         let mut planned_blobs = Vec::new();
    1724           15 :         for read in vectored_reads {
    1725           14 :             for (at, meta) in read.blobs_at.as_slice() {
    1726           14 :                 planned_blobs.push(BlobSpec {
    1727           14 :                     key: meta.key,
    1728           14 :                     lsn: meta.lsn,
    1729           14 :                     at: *at,
    1730           14 :                 });
    1731           14 :             }
    1732              :         }
    1733              : 
    1734            1 :         let mut expected_blobs = Vec::new();
    1735            1 :         let mut disk_offset = 0;
    1736            5 :         for (key, lsns) in index_entries {
    1737           21 :             for lsn in lsns {
    1738           21 :                 let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
    1739           17 :                 let lsn_included = lsn_range.contains(&lsn);
    1740              : 
    1741           17 :                 if key_included && lsn_included {
    1742           14 :                     expected_blobs.push(BlobSpec {
    1743           14 :                         key,
    1744           14 :                         lsn,
    1745           14 :                         at: disk_offset,
    1746           14 :                     });
    1747           14 :                 }
    1748              : 
    1749           17 :                 disk_offset += 1;
    1750              :             }
    1751              :         }
    1752              : 
    1753            1 :         assert_eq!(planned_blobs, expected_blobs);
    1754            1 :     }
    1755              : 
    1756              :     mod constants {
    1757              :         use utils::lsn::Lsn;
    1758              : 
    1759              :         /// Offset used by all lsns in this test
    1760              :         pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
    1761              :         /// Number of unique keys including in the test data
    1762              :         pub(super) const KEY_COUNT: u8 = 60;
    1763              :         /// Max number of different lsns for each key
    1764              :         pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
    1765              :         /// Possible value sizes for each key along with a probability weight
    1766              :         pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
    1767              :         /// Probability that there will be a gap between the current key and the next one (33.3%)
    1768              :         pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
    1769              :         /// The minimum size of a key range in all the generated reads
    1770              :         pub(super) const MIN_RANGE_SIZE: i128 = 10;
    1771              :         /// The number of ranges included in each vectored read
    1772              :         pub(super) const RANGES_COUNT: u8 = 2;
    1773              :         /// The number of vectored reads performed
    1774              :         pub(super) const READS_COUNT: u8 = 100;
    1775              :         /// Soft max size of a vectored read. Will be violated if we have to read keys
    1776              :         /// with values larger than the limit
    1777              :         pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
    1778              :     }
    1779              : 
    1780              :     struct Entry {
    1781              :         key: Key,
    1782              :         lsn: Lsn,
    1783              :         value: Vec<u8>,
    1784              :     }
    1785              : 
    1786            1 :     fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
    1787            1 :         let mut current_key = Key::MIN;
    1788              : 
    1789            1 :         let mut entries = Vec::new();
    1790           61 :         for _ in 0..constants::KEY_COUNT {
    1791           60 :             let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
    1792           60 :             let mut lsns_iter =
    1793         1130 :                 std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
    1794         1130 :                     Some(Lsn(lsn.0 + 0x08))
    1795         1130 :                 });
    1796           60 :             let mut lsns = Vec::new();
    1797         1190 :             while lsns.len() < count as usize {
    1798         1130 :                 let take = rng.gen_bool(0.5);
    1799         1130 :                 let lsn = lsns_iter.next().unwrap();
    1800         1130 :                 if take {
    1801          556 :                     lsns.push(lsn);
    1802          574 :                 }
    1803              :             }
    1804              : 
    1805          616 :             for lsn in lsns {
    1806          556 :                 let size = constants::VALUE_SIZES
    1807          556 :                     .choose_weighted(rng, |item| item.1)
    1808          556 :                     .unwrap()
    1809              :                     .0;
    1810          556 :                 let mut buf = vec![0; size];
    1811          556 :                 rng.fill_bytes(&mut buf);
    1812              : 
    1813          556 :                 entries.push(Entry {
    1814          556 :                     key: current_key,
    1815          556 :                     lsn,
    1816          556 :                     value: buf,
    1817          556 :                 })
    1818              :             }
    1819              : 
    1820           60 :             let gap = constants::KEY_GAP_CHANGES
    1821           60 :                 .choose_weighted(rng, |item| item.1)
    1822           60 :                 .unwrap()
    1823              :                 .0;
    1824           60 :             if gap {
    1825           19 :                 current_key = current_key.add(2);
    1826           41 :             } else {
    1827           41 :                 current_key = current_key.add(1);
    1828           41 :             }
    1829              :         }
    1830              : 
    1831            1 :         entries
    1832            1 :     }
    1833              : 
    1834              :     struct EntriesMeta {
    1835              :         key_range: Range<Key>,
    1836              :         lsn_range: Range<Lsn>,
    1837              :         index: BTreeMap<(Key, Lsn), Vec<u8>>,
    1838              :     }
    1839              : 
    1840            1 :     fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
    1841            1 :         let key_range = match entries.iter().minmax_by_key(|e| e.key) {
    1842            1 :             MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
    1843            0 :             _ => panic!("More than one entry is always expected"),
    1844              :         };
    1845              : 
    1846            1 :         let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
    1847            1 :             MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
    1848            0 :             _ => panic!("More than one entry is always expected"),
    1849              :         };
    1850              : 
    1851            1 :         let mut index = BTreeMap::new();
    1852          556 :         for entry in entries.iter() {
    1853          556 :             index.insert((entry.key, entry.lsn), entry.value.clone());
    1854          556 :         }
    1855              : 
    1856            1 :         EntriesMeta {
    1857            1 :             key_range,
    1858            1 :             lsn_range,
    1859            1 :             index,
    1860            1 :         }
    1861            1 :     }
    1862              : 
    1863          100 :     fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
    1864          100 :         let start = key_range.start.to_i128();
    1865          100 :         let end = key_range.end.to_i128();
    1866              : 
    1867          100 :         let mut keyspace = KeySpace::default();
    1868              : 
    1869          300 :         for _ in 0..constants::RANGES_COUNT {
    1870          200 :             let mut range: Option<Range<Key>> = Option::default();
    1871          622 :             while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
    1872          422 :                 let range_start = rng.gen_range(start..end);
    1873          422 :                 let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
    1874          422 :                 if range_end_offset >= end {
    1875           50 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(end));
    1876          372 :                 } else {
    1877          372 :                     let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
    1878          372 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
    1879          372 :                 }
    1880              :             }
    1881          200 :             keyspace.ranges.push(range.unwrap());
    1882              :         }
    1883              : 
    1884          100 :         keyspace
    1885          100 :     }
    1886              : 
    1887              :     #[tokio::test]
    1888            1 :     async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
    1889            1 :         let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
    1890            1 :         let (tenant, ctx) = harness.load().await;
    1891              : 
    1892            1 :         let timeline_id = TimelineId::generate();
    1893            1 :         let timeline = tenant
    1894            1 :             .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
    1895            1 :             .await?;
    1896              : 
    1897            1 :         tracing::info!("Generating test data ...");
    1898              : 
    1899            1 :         let rng = &mut StdRng::seed_from_u64(0);
    1900            1 :         let entries = generate_entries(rng);
    1901            1 :         let entries_meta = get_entries_meta(&entries);
    1902              : 
    1903            1 :         tracing::info!("Done generating {} entries", entries.len());
    1904              : 
    1905            1 :         tracing::info!("Writing test data to delta layer ...");
    1906            1 :         let mut writer = DeltaLayerWriter::new(
    1907            1 :             harness.conf,
    1908            1 :             timeline_id,
    1909            1 :             harness.tenant_shard_id,
    1910            1 :             entries_meta.key_range.start,
    1911            1 :             entries_meta.lsn_range.clone(),
    1912            1 :             &timeline.gate,
    1913            1 :             timeline.cancel.clone(),
    1914            1 :             &ctx,
    1915            1 :         )
    1916            1 :         .await?;
    1917              : 
    1918          557 :         for entry in entries {
    1919          556 :             let (_, res) = writer
    1920          556 :                 .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
    1921          556 :                 .await;
    1922          556 :             res?;
    1923              :         }
    1924              : 
    1925            1 :         let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
    1926            1 :         let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
    1927              : 
    1928            1 :         let inner = resident.get_as_delta(&ctx).await?;
    1929              : 
    1930            1 :         let file_size = inner.file.metadata().await?.len();
    1931            1 :         tracing::info!(
    1932            0 :             "Done writing test data to delta layer. Resulting file size is: {}",
    1933              :             file_size
    1934              :         );
    1935              : 
    1936          101 :         for i in 0..constants::READS_COUNT {
    1937          100 :             tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
    1938            1 : 
    1939          100 :             let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
    1940          100 :             let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1941          100 :                 inner.index_start_blk,
    1942          100 :                 inner.index_root_blk,
    1943          100 :                 block_reader,
    1944            1 :             );
    1945            1 : 
    1946          100 :             let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
    1947          100 :             let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
    1948          100 :             let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
    1949            1 : 
    1950          100 :             let vectored_reads = DeltaLayerInner::plan_reads(
    1951          100 :                 &keyspace,
    1952          100 :                 entries_meta.lsn_range.clone(),
    1953          100 :                 data_end_offset,
    1954          100 :                 index_reader,
    1955          100 :                 planner,
    1956          100 :                 &ctx,
    1957          100 :             )
    1958          100 :             .await?;
    1959            1 : 
    1960          100 :             let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
    1961          100 :             let buf_size = DeltaLayerInner::get_min_read_buffer_size(
    1962          100 :                 &vectored_reads,
    1963            1 :                 constants::MAX_VECTORED_READ_BYTES,
    1964            1 :             );
    1965          100 :             let mut buf = Some(IoBufferMut::with_capacity(buf_size));
    1966            1 : 
    1967         9962 :             for read in vectored_reads {
    1968         9862 :                 let blobs_buf = vectored_blob_reader
    1969         9862 :                     .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
    1970         9862 :                     .await?;
    1971         9862 :                 let view = BufView::new_slice(&blobs_buf.buf);
    1972        28652 :                 for meta in blobs_buf.blobs.iter() {
    1973        28652 :                     let value = meta.read(&view).await?;
    1974        28652 :                     assert_eq!(
    1975        28652 :                         &value[..],
    1976        28652 :                         &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
    1977            1 :                     );
    1978            1 :                 }
    1979            1 : 
    1980         9862 :                 buf = Some(blobs_buf.buf);
    1981            1 :             }
    1982            1 :         }
    1983            1 : 
    1984            1 :         Ok(())
    1985            1 :     }
    1986              : 
    1987              :     #[tokio::test]
    1988            1 :     async fn copy_delta_prefix_smoke() {
    1989              :         use bytes::Bytes;
    1990              :         use wal_decoder::models::record::NeonWalRecord;
    1991              : 
    1992            1 :         let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
    1993            1 :             .await
    1994            1 :             .unwrap();
    1995            1 :         let (tenant, ctx) = h.load().await;
    1996            1 :         let ctx = &ctx;
    1997            1 :         let timeline = tenant
    1998            1 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), PgMajorVersion::PG14, ctx)
    1999            1 :             .await
    2000            1 :             .unwrap();
    2001            1 :         let ctx = &ctx.with_scope_timeline(&timeline);
    2002              : 
    2003            1 :         let initdb_layer = timeline
    2004            1 :             .layers
    2005            1 :             .read(crate::tenant::timeline::layer_manager::LayerManagerLockHolder::Testing)
    2006            1 :             .await
    2007            1 :             .likely_resident_layers()
    2008            1 :             .next()
    2009            1 :             .cloned()
    2010            1 :             .unwrap();
    2011              : 
    2012              :         {
    2013            1 :             let mut writer = timeline.writer().await;
    2014              : 
    2015            1 :             let data = [
    2016            1 :                 (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
    2017            1 :                 (
    2018            1 :                     0x30,
    2019            1 :                     12,
    2020            1 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2021            1 :                         will_init: false,
    2022            1 :                         rec: Bytes::from_static(b"1"),
    2023            1 :                     }),
    2024            1 :                 ),
    2025            1 :                 (
    2026            1 :                     0x40,
    2027            1 :                     12,
    2028            1 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2029            1 :                         will_init: true,
    2030            1 :                         rec: Bytes::from_static(b"2"),
    2031            1 :                     }),
    2032            1 :                 ),
    2033              :                 // build an oversized value so we cannot extend and existing read over
    2034              :                 // this
    2035              :                 (
    2036              :                     0x50,
    2037              :                     12,
    2038              :                     Value::WalRecord(NeonWalRecord::Postgres {
    2039              :                         will_init: true,
    2040              :                         rec: {
    2041            1 :                             let mut buf =
    2042            1 :                                 vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
    2043            1 :                             buf.iter_mut()
    2044            1 :                                 .enumerate()
    2045       134144 :                                 .for_each(|(i, slot)| *slot = (i % 256) as u8);
    2046            1 :                             Bytes::from(buf)
    2047              :                         },
    2048              :                     }),
    2049              :                 ),
    2050              :                 // because the oversized read cannot be extended further, we are sure to exercise the
    2051              :                 // builder created on the last round with this:
    2052            1 :                 (
    2053            1 :                     0x60,
    2054            1 :                     12,
    2055            1 :                     Value::WalRecord(NeonWalRecord::Postgres {
    2056            1 :                         will_init: true,
    2057            1 :                         rec: Bytes::from_static(b"3"),
    2058            1 :                     }),
    2059            1 :                 ),
    2060            1 :                 (
    2061            1 :                     0x60,
    2062            1 :                     9,
    2063            1 :                     Value::Image(Bytes::from_static(b"something for a different key")),
    2064            1 :                 ),
    2065              :             ];
    2066              : 
    2067            1 :             let mut last_lsn = None;
    2068              : 
    2069            7 :             for (lsn, key, value) in data {
    2070            6 :                 let key = Key::from_i128(key);
    2071            6 :                 writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
    2072            6 :                 last_lsn = Some(lsn);
    2073              :             }
    2074              : 
    2075            1 :             writer.finish_write(Lsn(last_lsn.unwrap()));
    2076              :         }
    2077            1 :         timeline.freeze_and_flush().await.unwrap();
    2078              : 
    2079            1 :         let new_layer = timeline
    2080            1 :             .layers
    2081            1 :             .read(LayerManagerLockHolder::Testing)
    2082            1 :             .await
    2083            1 :             .likely_resident_layers()
    2084            1 :             .find(|&x| x != &initdb_layer)
    2085            1 :             .cloned()
    2086            1 :             .unwrap();
    2087              : 
    2088              :         // create a copy for the timeline, so we don't overwrite the file
    2089            1 :         let branch = tenant
    2090            1 :             .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
    2091            1 :             .await
    2092            1 :             .unwrap();
    2093              : 
    2094            1 :         assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
    2095              : 
    2096              :         // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
    2097              :         // a single key
    2098              : 
    2099            6 :         for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
    2100            5 :             let truncate_at = Lsn(truncate_at);
    2101            1 : 
    2102            5 :             let mut writer = DeltaLayerWriter::new(
    2103            5 :                 tenant.conf,
    2104            5 :                 branch.timeline_id,
    2105            5 :                 tenant.tenant_shard_id,
    2106            5 :                 Key::MIN,
    2107            5 :                 Lsn(0x11)..truncate_at,
    2108            5 :                 &branch.gate,
    2109            5 :                 branch.cancel.clone(),
    2110            5 :                 ctx,
    2111            5 :             )
    2112            5 :             .await
    2113            5 :             .unwrap();
    2114            1 : 
    2115            5 :             let new_layer = new_layer.download_and_keep_resident(ctx).await.unwrap();
    2116            1 : 
    2117            5 :             new_layer
    2118            5 :                 .copy_delta_prefix(&mut writer, truncate_at, ctx)
    2119            5 :                 .await
    2120            5 :                 .unwrap();
    2121            1 : 
    2122            5 :             let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
    2123            5 :             let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
    2124            1 : 
    2125            5 :             copied_layer.get_as_delta(ctx).await.unwrap();
    2126            1 : 
    2127            5 :             assert_keys_and_values_eq(
    2128            5 :                 new_layer.get_as_delta(ctx).await.unwrap(),
    2129            5 :                 copied_layer.get_as_delta(ctx).await.unwrap(),
    2130            5 :                 truncate_at,
    2131            5 :                 ctx,
    2132            1 :             )
    2133            5 :             .await;
    2134            1 :         }
    2135            1 :     }
    2136              : 
    2137            5 :     async fn assert_keys_and_values_eq(
    2138            5 :         source: &DeltaLayerInner,
    2139            5 :         truncated: &DeltaLayerInner,
    2140            5 :         truncated_at: Lsn,
    2141            5 :         ctx: &RequestContext,
    2142            5 :     ) {
    2143              :         use futures::future::ready;
    2144              :         use futures::stream::TryStreamExt;
    2145              : 
    2146            5 :         let start_key = [0u8; DELTA_KEY_SIZE];
    2147              : 
    2148            5 :         let source_reader = FileBlockReader::new(&source.file, source.file_id);
    2149            5 :         let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2150            5 :             source.index_start_blk,
    2151            5 :             source.index_root_blk,
    2152            5 :             &source_reader,
    2153              :         );
    2154            5 :         let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
    2155           30 :         let source_stream = source_stream.filter(|res| match res {
    2156           30 :             Ok((_, lsn, _)) => ready(lsn < &truncated_at),
    2157            0 :             _ => ready(true),
    2158           30 :         });
    2159            5 :         let mut source_stream = std::pin::pin!(source_stream);
    2160              : 
    2161            5 :         let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
    2162            5 :         let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    2163            5 :             truncated.index_start_blk,
    2164            5 :             truncated.index_root_blk,
    2165            5 :             &truncated_reader,
    2166              :         );
    2167            5 :         let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
    2168            5 :         let mut truncated_stream = std::pin::pin!(truncated_stream);
    2169              : 
    2170            5 :         let mut scratch_left = Vec::new();
    2171            5 :         let mut scratch_right = Vec::new();
    2172              : 
    2173              :         loop {
    2174           21 :             let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
    2175           21 :             let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
    2176              : 
    2177           21 :             if src.is_none() {
    2178            5 :                 assert!(truncated.is_none());
    2179            5 :                 break;
    2180           16 :             }
    2181              : 
    2182           16 :             let (src, truncated) = (src.unwrap(), truncated.unwrap());
    2183              : 
    2184              :             // because we've filtered the source with Lsn, we should always have the same keys from both.
    2185           16 :             assert_eq!(src.0, truncated.0);
    2186           16 :             assert_eq!(src.1, truncated.1);
    2187              : 
    2188              :             // if this is needed for something else, just drop this assert.
    2189           16 :             assert!(
    2190           16 :                 src.2.pos() >= truncated.2.pos(),
    2191            0 :                 "value position should not go backwards {} vs. {}",
    2192            0 :                 src.2.pos(),
    2193            0 :                 truncated.2.pos()
    2194              :             );
    2195              : 
    2196           16 :             scratch_left.clear();
    2197           16 :             let src_cursor = source_reader.block_cursor();
    2198           16 :             let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
    2199           16 :             scratch_right.clear();
    2200           16 :             let trunc_cursor = truncated_reader.block_cursor();
    2201           16 :             let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
    2202              : 
    2203           16 :             tokio::try_join!(left, right).unwrap();
    2204              : 
    2205           16 :             assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
    2206              :         }
    2207            5 :     }
    2208              : 
    2209         9112 :     pub(crate) fn sort_delta(
    2210         9112 :         (k1, l1, _): &(Key, Lsn, Value),
    2211         9112 :         (k2, l2, _): &(Key, Lsn, Value),
    2212         9112 :     ) -> std::cmp::Ordering {
    2213         9112 :         (k1, l1).cmp(&(k2, l2))
    2214         9112 :     }
    2215              : 
    2216              :     #[cfg(feature = "testing")]
    2217           47 :     pub(crate) fn sort_delta_value(
    2218           47 :         (k1, l1, v1): &(Key, Lsn, Value),
    2219           47 :         (k2, l2, v2): &(Key, Lsn, Value),
    2220           47 :     ) -> std::cmp::Ordering {
    2221           47 :         let order_1 = if v1.is_image() { 0 } else { 1 };
    2222           47 :         let order_2 = if v2.is_image() { 0 } else { 1 };
    2223           47 :         (k1, l1, order_1).cmp(&(k2, l2, order_2))
    2224           47 :     }
    2225              : 
    2226           11 :     pub(crate) async fn produce_delta_layer(
    2227           11 :         tenant: &TenantShard,
    2228           11 :         tline: &Arc<Timeline>,
    2229           11 :         mut deltas: Vec<(Key, Lsn, Value)>,
    2230           11 :         ctx: &RequestContext,
    2231           11 :     ) -> anyhow::Result<ResidentLayer> {
    2232           11 :         deltas.sort_by(sort_delta);
    2233           11 :         let (key_start, _, _) = deltas.first().unwrap();
    2234           11 :         let (key_max, _, _) = deltas.last().unwrap();
    2235           11 :         let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
    2236           11 :         let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
    2237           11 :         let lsn_end = Lsn(lsn_max.0 + 1);
    2238           11 :         let mut writer = DeltaLayerWriter::new(
    2239           11 :             tenant.conf,
    2240           11 :             tline.timeline_id,
    2241           11 :             tenant.tenant_shard_id,
    2242           11 :             *key_start,
    2243           11 :             (*lsn_min)..lsn_end,
    2244           11 :             &tline.gate,
    2245           11 :             tline.cancel.clone(),
    2246           11 :             ctx,
    2247           11 :         )
    2248           11 :         .await?;
    2249           11 :         let key_end = key_max.next();
    2250              : 
    2251         4131 :         for (key, lsn, value) in deltas {
    2252         4120 :             writer.put_value(key, lsn, value, ctx).await?;
    2253              :         }
    2254              : 
    2255           11 :         let (desc, path) = writer.finish(key_end, ctx).await?;
    2256           11 :         let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    2257              : 
    2258           11 :         Ok::<_, anyhow::Error>(delta_layer)
    2259           11 :     }
    2260              : 
    2261           14 :     async fn assert_delta_iter_equal(
    2262           14 :         delta_iter: &mut DeltaLayerIterator<'_>,
    2263           14 :         expect: &[(Key, Lsn, Value)],
    2264           14 :     ) {
    2265           14 :         let mut expect_iter = expect.iter();
    2266              :         loop {
    2267        14014 :             let o1 = delta_iter.next().await.unwrap();
    2268        14014 :             let o2 = expect_iter.next();
    2269        14014 :             assert_eq!(o1.is_some(), o2.is_some());
    2270        14014 :             if o1.is_none() && o2.is_none() {
    2271           14 :                 break;
    2272        14000 :             }
    2273        14000 :             let (k1, l1, v1) = o1.unwrap();
    2274        14000 :             let (k2, l2, v2) = o2.unwrap();
    2275        14000 :             assert_eq!(&k1, k2);
    2276        14000 :             assert_eq!(l1, *l2);
    2277        14000 :             assert_eq!(&v1, v2);
    2278              :         }
    2279           14 :     }
    2280              : 
    2281              :     #[tokio::test]
    2282            1 :     async fn delta_layer_iterator() {
    2283            1 :         let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
    2284            1 :         let (tenant, ctx) = harness.load().await;
    2285              : 
    2286            1 :         let tline = tenant
    2287            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2288            1 :             .await
    2289            1 :             .unwrap();
    2290              : 
    2291         1000 :         fn get_key(id: u32) -> Key {
    2292         1000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    2293         1000 :             key.field6 = id;
    2294         1000 :             key
    2295         1000 :         }
    2296              :         const N: usize = 1000;
    2297            1 :         let test_deltas = (0..N)
    2298         1000 :             .map(|idx| {
    2299         1000 :                 (
    2300         1000 :                     get_key(idx as u32 / 10),
    2301         1000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
    2302         1000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
    2303         1000 :                 )
    2304         1000 :             })
    2305            1 :             .collect_vec();
    2306            1 :         let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
    2307            1 :             .await
    2308            1 :             .unwrap();
    2309            1 :         let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
    2310            3 :         for max_read_size in [1, 1024] {
    2311           16 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    2312           14 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    2313            1 :                 // Test if the batch size is correctly determined
    2314           14 :                 let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
    2315           14 :                 let mut num_items = 0;
    2316           56 :                 for _ in 0..3 {
    2317           42 :                     iter.next_batch().await.unwrap();
    2318           42 :                     num_items += iter.key_values_batch.len();
    2319           42 :                     if max_read_size == 1 {
    2320            1 :                         // every key should be a batch b/c the value is larger than max_read_size
    2321           21 :                         assert_eq!(iter.key_values_batch.len(), 1);
    2322            1 :                     } else {
    2323           21 :                         assert!(iter.key_values_batch.len() <= batch_size);
    2324            1 :                     }
    2325           42 :                     if num_items >= N {
    2326            1 :                         break;
    2327           42 :                     }
    2328           42 :                     iter.key_values_batch.clear();
    2329            1 :                 }
    2330            1 :                 // Test if the result is correct
    2331           14 :                 let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
    2332           14 :                 assert_delta_iter_equal(&mut iter, &test_deltas).await;
    2333            1 :             }
    2334            1 :         }
    2335            1 :     }
    2336              : }
        

Generated by: LCOV version 2.1-beta