LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 79.3 % 1071 849
Test Date: 2024-04-08 10:22:05 Functions: 63.2 % 125 79

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "index", and
      24              : //! "values". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache::{self, FileId, PAGE_SZ};
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::BlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37              : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
      38              : use crate::tenant::timeline::GetVectoredError;
      39              : use crate::tenant::vectored_blob_io::{
      40              :     BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
      41              : };
      42              : use crate::tenant::{PageReconstructError, Timeline};
      43              : use crate::virtual_file::{self, VirtualFile};
      44              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      45              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      46              : use anyhow::{anyhow, bail, ensure, Context, Result};
      47              : use bytes::BytesMut;
      48              : use camino::{Utf8Path, Utf8PathBuf};
      49              : use futures::StreamExt;
      50              : use itertools::Itertools;
      51              : use pageserver_api::keyspace::KeySpace;
      52              : use pageserver_api::models::LayerAccessKind;
      53              : use pageserver_api::shard::TenantShardId;
      54              : use rand::{distributions::Alphanumeric, Rng};
      55              : use serde::{Deserialize, Serialize};
      56              : use std::fs::File;
      57              : use std::io::SeekFrom;
      58              : use std::ops::Range;
      59              : use std::os::unix::fs::FileExt;
      60              : use std::sync::Arc;
      61              : use tokio::sync::OnceCell;
      62              : use tracing::*;
      63              : 
      64              : use utils::{
      65              :     bin_ser::BeSer,
      66              :     id::{TenantId, TimelineId},
      67              :     lsn::Lsn,
      68              : };
      69              : 
      70              : use super::{
      71              :     AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
      72              : };
      73              : 
      74              : ///
      75              : /// Header stored in the beginning of the file
      76              : ///
      77              : /// After this comes the 'values' part, starting on block 1. After that,
      78              : /// the 'index' starts at the block indicated by 'index_start_blk'
      79              : ///
      80          594 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      81              : pub struct Summary {
      82              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      83              :     pub magic: u16,
      84              :     pub format_version: u16,
      85              : 
      86              :     pub tenant_id: TenantId,
      87              :     pub timeline_id: TimelineId,
      88              :     pub key_range: Range<Key>,
      89              :     pub lsn_range: Range<Lsn>,
      90              : 
      91              :     /// Block number where the 'index' part of the file begins.
      92              :     pub index_start_blk: u32,
      93              :     /// Block within the 'index', where the B-tree root page is stored
      94              :     pub index_root_blk: u32,
      95              : }
      96              : 
      97              : impl From<&DeltaLayer> for Summary {
      98            0 :     fn from(layer: &DeltaLayer) -> Self {
      99            0 :         Self::expected(
     100            0 :             layer.desc.tenant_shard_id.tenant_id,
     101            0 :             layer.desc.timeline_id,
     102            0 :             layer.desc.key_range.clone(),
     103            0 :             layer.desc.lsn_range.clone(),
     104            0 :         )
     105            0 :     }
     106              : }
     107              : 
     108              : impl Summary {
     109          594 :     pub(super) fn expected(
     110          594 :         tenant_id: TenantId,
     111          594 :         timeline_id: TimelineId,
     112          594 :         keys: Range<Key>,
     113          594 :         lsns: Range<Lsn>,
     114          594 :     ) -> Self {
     115          594 :         Self {
     116          594 :             magic: DELTA_FILE_MAGIC,
     117          594 :             format_version: STORAGE_FORMAT_VERSION,
     118          594 : 
     119          594 :             tenant_id,
     120          594 :             timeline_id,
     121          594 :             key_range: keys,
     122          594 :             lsn_range: lsns,
     123          594 : 
     124          594 :             index_start_blk: 0,
     125          594 :             index_root_blk: 0,
     126          594 :         }
     127          594 :     }
     128              : }
     129              : 
     130              : // Flag indicating that this version initialize the page
     131              : const WILL_INIT: u64 = 1;
     132              : 
     133              : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     134              : /// offset, and for WAL records it also contains `will_init` flag. The flag
     135              : /// helps to determine the range of records that needs to be applied, without
     136              : /// reading/deserializing records themselves.
     137            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     138              : pub struct BlobRef(pub u64);
     139              : 
     140              : impl BlobRef {
     141       130136 :     pub fn will_init(&self) -> bool {
     142       130136 :         (self.0 & WILL_INIT) != 0
     143       130136 :     }
     144              : 
     145      6374152 :     pub fn pos(&self) -> u64 {
     146      6374152 :         self.0 >> 1
     147      6374152 :     }
     148              : 
     149      6352610 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     150      6352610 :         let mut blob_ref = pos << 1;
     151      6352610 :         if will_init {
     152      6351464 :             blob_ref |= WILL_INIT;
     153      6351464 :         }
     154      6352610 :         BlobRef(blob_ref)
     155      6352610 :     }
     156              : }
     157              : 
     158              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     159              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     160              : 
     161              : /// This is the key of the B-tree index stored in the delta layer. It consists
     162              : /// of the serialized representation of a Key and LSN.
     163              : impl DeltaKey {
     164      3122006 :     fn from_slice(buf: &[u8]) -> Self {
     165      3122006 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     166      3122006 :         bytes.copy_from_slice(buf);
     167      3122006 :         DeltaKey(bytes)
     168      3122006 :     }
     169              : 
     170      6477048 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     171      6477048 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     172      6477048 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     173      6477048 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     174      6477048 :         DeltaKey(bytes)
     175      6477048 :     }
     176              : 
     177      3122006 :     fn key(&self) -> Key {
     178      3122006 :         Key::from_slice(&self.0)
     179      3122006 :     }
     180              : 
     181      3122006 :     fn lsn(&self) -> Lsn {
     182      3122006 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     183      3122006 :     }
     184              : 
     185       130140 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     186       130140 :         let mut lsn_buf = [0u8; 8];
     187       130140 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     188       130140 :         Lsn(u64::from_be_bytes(lsn_buf))
     189       130140 :     }
     190              : }
     191              : 
     192              : /// This is used only from `pagectl`. Within pageserver, all layers are
     193              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     194              : pub struct DeltaLayer {
     195              :     path: Utf8PathBuf,
     196              :     pub desc: PersistentLayerDesc,
     197              :     access_stats: LayerAccessStats,
     198              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     199              : }
     200              : 
     201              : impl std::fmt::Debug for DeltaLayer {
     202            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     203            0 :         use super::RangeDisplayDebug;
     204            0 : 
     205            0 :         f.debug_struct("DeltaLayer")
     206            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     207            0 :             .field("lsn_range", &self.desc.lsn_range)
     208            0 :             .field("file_size", &self.desc.file_size)
     209            0 :             .field("inner", &self.inner)
     210            0 :             .finish()
     211            0 :     }
     212              : }
     213              : 
     214              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     215              : /// file.
     216              : pub struct DeltaLayerInner {
     217              :     // values copied from summary
     218              :     index_start_blk: u32,
     219              :     index_root_blk: u32,
     220              : 
     221              :     file: VirtualFile,
     222              :     file_id: FileId,
     223              : 
     224              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     225              : }
     226              : 
     227              : impl std::fmt::Debug for DeltaLayerInner {
     228            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     229            0 :         f.debug_struct("DeltaLayerInner")
     230            0 :             .field("index_start_blk", &self.index_start_blk)
     231            0 :             .field("index_root_blk", &self.index_root_blk)
     232            0 :             .finish()
     233            0 :     }
     234              : }
     235              : 
     236              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     237              : impl std::fmt::Display for DeltaLayer {
     238            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     239            0 :         write!(f, "{}", self.layer_desc().short_id())
     240            0 :     }
     241              : }
     242              : 
     243              : impl AsLayerDesc for DeltaLayer {
     244            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     245            0 :         &self.desc
     246            0 :     }
     247              : }
     248              : 
     249              : impl DeltaLayer {
     250            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     251            0 :         self.desc.dump();
     252            0 : 
     253            0 :         if !verbose {
     254            0 :             return Ok(());
     255            0 :         }
     256              : 
     257            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     258              : 
     259            0 :         inner.dump(ctx).await
     260            0 :     }
     261              : 
     262          838 :     fn temp_path_for(
     263          838 :         conf: &PageServerConf,
     264          838 :         tenant_shard_id: &TenantShardId,
     265          838 :         timeline_id: &TimelineId,
     266          838 :         key_start: Key,
     267          838 :         lsn_range: &Range<Lsn>,
     268          838 :     ) -> Utf8PathBuf {
     269          838 :         let rand_string: String = rand::thread_rng()
     270          838 :             .sample_iter(&Alphanumeric)
     271          838 :             .take(8)
     272          838 :             .map(char::from)
     273          838 :             .collect();
     274          838 : 
     275          838 :         conf.timeline_path(tenant_shard_id, timeline_id)
     276          838 :             .join(format!(
     277          838 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     278          838 :                 key_start,
     279          838 :                 u64::from(lsn_range.start),
     280          838 :                 u64::from(lsn_range.end),
     281          838 :                 rand_string,
     282          838 :                 TEMP_FILE_SUFFIX,
     283          838 :             ))
     284          838 :     }
     285              : 
     286              :     ///
     287              :     /// Open the underlying file and read the metadata into memory, if it's
     288              :     /// not loaded already.
     289              :     ///
     290            0 :     async fn load(
     291            0 :         &self,
     292            0 :         access_kind: LayerAccessKind,
     293            0 :         ctx: &RequestContext,
     294            0 :     ) -> Result<&Arc<DeltaLayerInner>> {
     295            0 :         self.access_stats.record_access(access_kind, ctx);
     296            0 :         // Quick exit if already loaded
     297            0 :         self.inner
     298            0 :             .get_or_try_init(|| self.load_inner(ctx))
     299            0 :             .await
     300            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     301            0 :     }
     302              : 
     303            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     304            0 :         let path = self.path();
     305              : 
     306            0 :         let loaded = DeltaLayerInner::load(&path, None, None, ctx)
     307            0 :             .await
     308            0 :             .and_then(|res| res)?;
     309              : 
     310              :         // not production code
     311            0 :         let actual_filename = path.file_name().unwrap().to_owned();
     312            0 :         let expected_filename = self.layer_desc().filename().file_name();
     313            0 : 
     314            0 :         if actual_filename != expected_filename {
     315            0 :             println!("warning: filename does not match what is expected from in-file summary");
     316            0 :             println!("actual: {:?}", actual_filename);
     317            0 :             println!("expected: {:?}", expected_filename);
     318            0 :         }
     319              : 
     320            0 :         Ok(Arc::new(loaded))
     321            0 :     }
     322              : 
     323              :     /// Create a DeltaLayer struct representing an existing file on disk.
     324              :     ///
     325              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     326            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     327            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     328            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     329            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     330              : 
     331            0 :         let metadata = file
     332            0 :             .metadata()
     333            0 :             .context("get file metadata to determine size")?;
     334              : 
     335              :         // This function is never used for constructing layers in a running pageserver,
     336              :         // so it does not need an accurate TenantShardId.
     337            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     338            0 : 
     339            0 :         Ok(DeltaLayer {
     340            0 :             path: path.to_path_buf(),
     341            0 :             desc: PersistentLayerDesc::new_delta(
     342            0 :                 tenant_shard_id,
     343            0 :                 summary.timeline_id,
     344            0 :                 summary.key_range,
     345            0 :                 summary.lsn_range,
     346            0 :                 metadata.len(),
     347            0 :             ),
     348            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     349            0 :             inner: OnceCell::new(),
     350            0 :         })
     351            0 :     }
     352              : 
     353              :     /// Path to the layer file in pageserver workdir.
     354            0 :     fn path(&self) -> Utf8PathBuf {
     355            0 :         self.path.clone()
     356            0 :     }
     357              : }
     358              : 
     359              : /// A builder object for constructing a new delta layer.
     360              : ///
     361              : /// Usage:
     362              : ///
     363              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     364              : ///
     365              : /// 2. Write the contents by calling `put_value` for every page
     366              : ///    version to store in the layer.
     367              : ///
     368              : /// 3. Call `finish`.
     369              : ///
     370              : struct DeltaLayerWriterInner {
     371              :     conf: &'static PageServerConf,
     372              :     pub path: Utf8PathBuf,
     373              :     timeline_id: TimelineId,
     374              :     tenant_shard_id: TenantShardId,
     375              : 
     376              :     key_start: Key,
     377              :     lsn_range: Range<Lsn>,
     378              : 
     379              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     380              : 
     381              :     blob_writer: BlobWriter<true>,
     382              : }
     383              : 
     384              : impl DeltaLayerWriterInner {
     385              :     ///
     386              :     /// Start building a new delta layer.
     387              :     ///
     388          838 :     async fn new(
     389          838 :         conf: &'static PageServerConf,
     390          838 :         timeline_id: TimelineId,
     391          838 :         tenant_shard_id: TenantShardId,
     392          838 :         key_start: Key,
     393          838 :         lsn_range: Range<Lsn>,
     394          838 :     ) -> anyhow::Result<Self> {
     395          838 :         // Create the file initially with a temporary filename. We don't know
     396          838 :         // the end key yet, so we cannot form the final filename yet. We will
     397          838 :         // rename it when we're done.
     398          838 :         //
     399          838 :         // Note: This overwrites any existing file. There shouldn't be any.
     400          838 :         // FIXME: throw an error instead?
     401          838 :         let path =
     402          838 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     403              : 
     404          838 :         let mut file = VirtualFile::create(&path).await?;
     405              :         // make room for the header block
     406          838 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     407          838 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     408          838 : 
     409          838 :         // Initialize the b-tree index builder
     410          838 :         let block_buf = BlockBuf::new();
     411          838 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     412          838 : 
     413          838 :         Ok(Self {
     414          838 :             conf,
     415          838 :             path,
     416          838 :             timeline_id,
     417          838 :             tenant_shard_id,
     418          838 :             key_start,
     419          838 :             lsn_range,
     420          838 :             tree: tree_builder,
     421          838 :             blob_writer,
     422          838 :         })
     423          838 :     }
     424              : 
     425              :     ///
     426              :     /// Append a key-value pair to the file.
     427              :     ///
     428              :     /// The values must be appended in key, lsn order.
     429              :     ///
     430      3121998 :     async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     431      3121998 :         let (_, res) = self
     432      3121998 :             .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
     433        14791 :             .await;
     434      3121998 :         res
     435      3121998 :     }
     436              : 
     437      6352576 :     async fn put_value_bytes(
     438      6352576 :         &mut self,
     439      6352576 :         key: Key,
     440      6352576 :         lsn: Lsn,
     441      6352576 :         val: Vec<u8>,
     442      6352576 :         will_init: bool,
     443      6352576 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     444      6352576 :         assert!(self.lsn_range.start <= lsn);
     445      6352576 :         let (val, res) = self.blob_writer.write_blob(val).await;
     446      6352576 :         let off = match res {
     447      6352576 :             Ok(off) => off,
     448            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     449              :         };
     450              : 
     451      6352576 :         let blob_ref = BlobRef::new(off, will_init);
     452      6352576 : 
     453      6352576 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     454      6352576 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     455      6352576 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     456      6352576 :     }
     457              : 
     458      3029956 :     fn size(&self) -> u64 {
     459      3029956 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     460      3029956 :     }
     461              : 
     462              :     ///
     463              :     /// Finish writing the delta layer.
     464              :     ///
     465          838 :     async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
     466          838 :         let index_start_blk =
     467          838 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     468              : 
     469          838 :         let mut file = self.blob_writer.into_inner().await?;
     470              : 
     471              :         // Write out the index
     472          838 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     473          838 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     474            0 :             .await?;
     475        13575 :         for buf in block_buf.blocks {
     476        12737 :             let (_buf, res) = file.write_all(buf).await;
     477        12737 :             res?;
     478              :         }
     479          838 :         assert!(self.lsn_range.start < self.lsn_range.end);
     480              :         // Fill in the summary on blk 0
     481          838 :         let summary = Summary {
     482          838 :             magic: DELTA_FILE_MAGIC,
     483          838 :             format_version: STORAGE_FORMAT_VERSION,
     484          838 :             tenant_id: self.tenant_shard_id.tenant_id,
     485          838 :             timeline_id: self.timeline_id,
     486          838 :             key_range: self.key_start..key_end,
     487          838 :             lsn_range: self.lsn_range.clone(),
     488          838 :             index_start_blk,
     489          838 :             index_root_blk,
     490          838 :         };
     491          838 : 
     492          838 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     493          838 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     494          838 :         Summary::ser_into(&summary, &mut buf)?;
     495          838 :         file.seek(SeekFrom::Start(0)).await?;
     496          838 :         let (_buf, res) = file.write_all(buf).await;
     497          838 :         res?;
     498              : 
     499          838 :         let metadata = file
     500          838 :             .metadata()
     501          421 :             .await
     502          838 :             .context("get file metadata to determine size")?;
     503              : 
     504              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     505              :         // Make it a little bit below to account for differing GB units
     506              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     507              :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     508          838 :         ensure!(
     509          838 :             metadata.len() <= S3_UPLOAD_LIMIT,
     510            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     511            0 :             file.path,
     512            0 :             metadata.len()
     513              :         );
     514              : 
     515              :         // Note: Because we opened the file in write-only mode, we cannot
     516              :         // reuse the same VirtualFile for reading later. That's why we don't
     517              :         // set inner.file here. The first read will have to re-open it.
     518              : 
     519          838 :         let desc = PersistentLayerDesc::new_delta(
     520          838 :             self.tenant_shard_id,
     521          838 :             self.timeline_id,
     522          838 :             self.key_start..key_end,
     523          838 :             self.lsn_range.clone(),
     524          838 :             metadata.len(),
     525          838 :         );
     526          838 : 
     527          838 :         // fsync the file
     528          838 :         file.sync_all().await?;
     529              : 
     530          838 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     531              : 
     532          838 :         trace!("created delta layer {}", layer.local_path());
     533              : 
     534          838 :         Ok(layer)
     535          838 :     }
     536              : }
     537              : 
     538              : /// A builder object for constructing a new delta layer.
     539              : ///
     540              : /// Usage:
     541              : ///
     542              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     543              : ///
     544              : /// 2. Write the contents by calling `put_value` for every page
     545              : ///    version to store in the layer.
     546              : ///
     547              : /// 3. Call `finish`.
     548              : ///
     549              : /// # Note
     550              : ///
     551              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     552              : /// possible for the writer to drop before `finish` is actually called. So this
     553              : /// could lead to odd temporary files in the directory, exhausting file system.
     554              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     555              : /// implementation that cleans up the temporary file in failure. It's not
     556              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     557              : /// out some fields, making it impossible to implement `Drop`.
     558              : ///
     559              : #[must_use]
     560              : pub struct DeltaLayerWriter {
     561              :     inner: Option<DeltaLayerWriterInner>,
     562              : }
     563              : 
     564              : impl DeltaLayerWriter {
     565              :     ///
     566              :     /// Start building a new delta layer.
     567              :     ///
     568          838 :     pub async fn new(
     569          838 :         conf: &'static PageServerConf,
     570          838 :         timeline_id: TimelineId,
     571          838 :         tenant_shard_id: TenantShardId,
     572          838 :         key_start: Key,
     573          838 :         lsn_range: Range<Lsn>,
     574          838 :     ) -> anyhow::Result<Self> {
     575          838 :         Ok(Self {
     576          838 :             inner: Some(
     577          838 :                 DeltaLayerWriterInner::new(
     578          838 :                     conf,
     579          838 :                     timeline_id,
     580          838 :                     tenant_shard_id,
     581          838 :                     key_start,
     582          838 :                     lsn_range,
     583          838 :                 )
     584          428 :                 .await?,
     585              :             ),
     586              :         })
     587          838 :     }
     588              : 
     589              :     ///
     590              :     /// Append a key-value pair to the file.
     591              :     ///
     592              :     /// The values must be appended in key, lsn order.
     593              :     ///
     594      3121998 :     pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     595      3121998 :         self.inner.as_mut().unwrap().put_value(key, lsn, val).await
     596      3121998 :     }
     597              : 
     598      3230578 :     pub async fn put_value_bytes(
     599      3230578 :         &mut self,
     600      3230578 :         key: Key,
     601      3230578 :         lsn: Lsn,
     602      3230578 :         val: Vec<u8>,
     603      3230578 :         will_init: bool,
     604      3230578 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     605      3230578 :         self.inner
     606      3230578 :             .as_mut()
     607      3230578 :             .unwrap()
     608      3230578 :             .put_value_bytes(key, lsn, val, will_init)
     609        18106 :             .await
     610      3230578 :     }
     611              : 
     612      3029956 :     pub fn size(&self) -> u64 {
     613      3029956 :         self.inner.as_ref().unwrap().size()
     614      3029956 :     }
     615              : 
     616              :     ///
     617              :     /// Finish writing the delta layer.
     618              :     ///
     619          838 :     pub(crate) async fn finish(
     620          838 :         mut self,
     621          838 :         key_end: Key,
     622          838 :         timeline: &Arc<Timeline>,
     623          838 :     ) -> anyhow::Result<ResidentLayer> {
     624          838 :         let inner = self.inner.take().unwrap();
     625          838 :         let temp_path = inner.path.clone();
     626         8107 :         let result = inner.finish(key_end, timeline).await;
     627              :         // The delta layer files can sometimes be really large. Clean them up.
     628          838 :         if result.is_err() {
     629            0 :             tracing::warn!(
     630            0 :                 "Cleaning up temporary delta file {temp_path} after error during writing"
     631            0 :             );
     632            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     633            0 :                 tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
     634            0 :             }
     635          838 :         }
     636          838 :         result
     637          838 :     }
     638              : }
     639              : 
     640              : impl Drop for DeltaLayerWriter {
     641          838 :     fn drop(&mut self) {
     642          838 :         if let Some(inner) = self.inner.take() {
     643            0 :             // We want to remove the virtual file here, so it's fine to not
     644            0 :             // having completely flushed unwritten data.
     645            0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     646            0 :             vfile.remove();
     647          838 :         }
     648          838 :     }
     649              : }
     650              : 
     651            0 : #[derive(thiserror::Error, Debug)]
     652              : pub enum RewriteSummaryError {
     653              :     #[error("magic mismatch")]
     654              :     MagicMismatch,
     655              :     #[error(transparent)]
     656              :     Other(#[from] anyhow::Error),
     657              : }
     658              : 
     659              : impl From<std::io::Error> for RewriteSummaryError {
     660            0 :     fn from(e: std::io::Error) -> Self {
     661            0 :         Self::Other(anyhow::anyhow!(e))
     662            0 :     }
     663              : }
     664              : 
     665              : impl DeltaLayer {
     666            0 :     pub async fn rewrite_summary<F>(
     667            0 :         path: &Utf8Path,
     668            0 :         rewrite: F,
     669            0 :         ctx: &RequestContext,
     670            0 :     ) -> Result<(), RewriteSummaryError>
     671            0 :     where
     672            0 :         F: Fn(Summary) -> Summary,
     673            0 :     {
     674            0 :         let mut file = VirtualFile::open_with_options(
     675            0 :             path,
     676            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     677            0 :         )
     678            0 :         .await
     679            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     680            0 :         let file_id = page_cache::next_file_id();
     681            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     682            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     683            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     684            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     685            0 :             return Err(RewriteSummaryError::MagicMismatch);
     686            0 :         }
     687            0 : 
     688            0 :         let new_summary = rewrite(actual_summary);
     689            0 : 
     690            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     691            0 :         // TODO: could use smallvec here, but it's a pain with Slice<T>
     692            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     693            0 :         file.seek(SeekFrom::Start(0)).await?;
     694            0 :         let (_buf, res) = file.write_all(buf).await;
     695            0 :         res?;
     696            0 :         Ok(())
     697            0 :     }
     698              : }
     699              : 
     700              : impl DeltaLayerInner {
     701              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     702              :     /// - inner has the success or transient failure
     703              :     /// - outer has the permanent failure
     704          594 :     pub(super) async fn load(
     705          594 :         path: &Utf8Path,
     706          594 :         summary: Option<Summary>,
     707          594 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     708          594 :         ctx: &RequestContext,
     709          594 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     710          594 :         let file = match VirtualFile::open(path).await {
     711          594 :             Ok(file) => file,
     712            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     713              :         };
     714          594 :         let file_id = page_cache::next_file_id();
     715          594 : 
     716          594 :         let block_reader = FileBlockReader::new(&file, file_id);
     717              : 
     718          594 :         let summary_blk = match block_reader.read_blk(0, ctx).await {
     719          594 :             Ok(blk) => blk,
     720            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     721              :         };
     722              : 
     723              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     724          594 :         let actual_summary =
     725          594 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     726              : 
     727          594 :         if let Some(mut expected_summary) = summary {
     728              :             // production code path
     729          594 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     730          594 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     731          594 :             if actual_summary != expected_summary {
     732            0 :                 bail!(
     733            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     734            0 :                     actual_summary,
     735            0 :                     expected_summary
     736            0 :                 );
     737          594 :             }
     738            0 :         }
     739              : 
     740          594 :         Ok(Ok(DeltaLayerInner {
     741          594 :             file,
     742          594 :             file_id,
     743          594 :             index_start_blk: actual_summary.index_start_blk,
     744          594 :             index_root_blk: actual_summary.index_root_blk,
     745          594 :             max_vectored_read_bytes,
     746          594 :         }))
     747          594 :     }
     748              : 
     749       124016 :     pub(super) async fn get_value_reconstruct_data(
     750       124016 :         &self,
     751       124016 :         key: Key,
     752       124016 :         lsn_range: Range<Lsn>,
     753       124016 :         reconstruct_state: &mut ValueReconstructState,
     754       124016 :         ctx: &RequestContext,
     755       124016 :     ) -> anyhow::Result<ValueReconstructResult> {
     756       124016 :         let mut need_image = true;
     757       124016 :         // Scan the page versions backwards, starting from `lsn`.
     758       124016 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     759       124016 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     760       124016 :             self.index_start_blk,
     761       124016 :             self.index_root_blk,
     762       124016 :             &block_reader,
     763       124016 :         );
     764       124016 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     765       124016 : 
     766       124016 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     767       124016 : 
     768       124016 :         tree_reader
     769       124016 :             .visit(
     770       124016 :                 &search_key.0,
     771       124016 :                 VisitDirection::Backwards,
     772       124016 :                 |key, value| {
     773       118772 :                     let blob_ref = BlobRef(value);
     774       118772 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     775        46658 :                         return false;
     776        72114 :                     }
     777        72114 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     778        72114 :                     if entry_lsn < lsn_range.start {
     779            0 :                         return false;
     780        72114 :                     }
     781        72114 :                     offsets.push((entry_lsn, blob_ref.pos()));
     782        72114 : 
     783        72114 :                     !blob_ref.will_init()
     784       124016 :                 },
     785       124016 :                 &RequestContextBuilder::extend(ctx)
     786       124016 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     787       124016 :                     .build(),
     788       124016 :             )
     789        17292 :             .await?;
     790              : 
     791       124016 :         let ctx = &RequestContextBuilder::extend(ctx)
     792       124016 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     793       124016 :             .build();
     794       124016 : 
     795       124016 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     796       124016 :         let cursor = block_reader.block_cursor();
     797       124016 :         let mut buf = Vec::new();
     798       124016 :         for (entry_lsn, pos) in offsets {
     799        72114 :             cursor
     800        72114 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     801         5573 :                 .await
     802        72114 :                 .with_context(|| {
     803            0 :                     format!("Failed to read blob from virtual file {}", self.file.path)
     804        72114 :                 })?;
     805        72114 :             let val = Value::des(&buf).with_context(|| {
     806            0 :                 format!(
     807            0 :                     "Failed to deserialize file blob from virtual file {}",
     808            0 :                     self.file.path
     809            0 :                 )
     810        72114 :             })?;
     811        72114 :             match val {
     812        72114 :                 Value::Image(img) => {
     813        72114 :                     reconstruct_state.img = Some((entry_lsn, img));
     814        72114 :                     need_image = false;
     815        72114 :                     break;
     816              :                 }
     817            0 :                 Value::WalRecord(rec) => {
     818            0 :                     let will_init = rec.will_init();
     819            0 :                     reconstruct_state.records.push((entry_lsn, rec));
     820            0 :                     if will_init {
     821              :                         // This WAL record initializes the page, so no need to go further back
     822            0 :                         need_image = false;
     823            0 :                         break;
     824            0 :                     }
     825              :                 }
     826              :             }
     827              :         }
     828              : 
     829              :         // If an older page image is needed to reconstruct the page, let the
     830              :         // caller know.
     831       124016 :         if need_image {
     832        51902 :             Ok(ValueReconstructResult::Continue)
     833              :         } else {
     834        72114 :             Ok(ValueReconstructResult::Complete)
     835              :         }
     836       124016 :     }
     837              : 
     838              :     // Look up the keys in the provided keyspace and update
     839              :     // the reconstruct state with whatever is found.
     840              :     //
     841              :     // If the key is cached, go no further than the cached Lsn.
     842              :     //
     843              :     // Currently, the index is visited for each range, but this
     844              :     // can be further optimised to visit the index only once.
     845           18 :     pub(super) async fn get_values_reconstruct_data(
     846           18 :         &self,
     847           18 :         keyspace: KeySpace,
     848           18 :         lsn_range: Range<Lsn>,
     849           18 :         reconstruct_state: &mut ValuesReconstructState,
     850           18 :         ctx: &RequestContext,
     851           18 :     ) -> Result<(), GetVectoredError> {
     852           18 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     853           18 :         let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     854           18 :             self.index_start_blk,
     855           18 :             self.index_root_blk,
     856           18 :             block_reader,
     857           18 :         );
     858           18 : 
     859           18 :         let planner = VectoredReadPlanner::new(
     860           18 :             self.max_vectored_read_bytes
     861           18 :                 .expect("Layer is loaded with max vectored bytes config")
     862           18 :                 .0
     863           18 :                 .into(),
     864           18 :         );
     865           18 : 
     866           18 :         let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
     867              : 
     868           18 :         let reads = Self::plan_reads(
     869           18 :             keyspace,
     870           18 :             lsn_range,
     871           18 :             data_end_offset,
     872           18 :             index_reader,
     873           18 :             planner,
     874           18 :             reconstruct_state,
     875           18 :             ctx,
     876           18 :         )
     877           15 :         .await
     878           18 :         .map_err(GetVectoredError::Other)?;
     879              : 
     880           18 :         self.do_reads_and_update_state(reads, reconstruct_state)
     881            9 :             .await;
     882              : 
     883           18 :         Ok(())
     884           18 :     }
     885              : 
     886          220 :     async fn plan_reads<Reader>(
     887          220 :         keyspace: KeySpace,
     888          220 :         lsn_range: Range<Lsn>,
     889          220 :         data_end_offset: u64,
     890          220 :         index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
     891          220 :         mut planner: VectoredReadPlanner,
     892          220 :         reconstruct_state: &mut ValuesReconstructState,
     893          220 :         ctx: &RequestContext,
     894          220 :     ) -> anyhow::Result<Vec<VectoredRead>>
     895          220 :     where
     896          220 :         Reader: BlockReader,
     897          220 :     {
     898          220 :         let ctx = RequestContextBuilder::extend(ctx)
     899          220 :             .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     900          220 :             .build();
     901              : 
     902          422 :         for range in keyspace.ranges.iter() {
     903          422 :             let mut range_end_handled = false;
     904          422 : 
     905          422 :             let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
     906          422 :             let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
     907          422 :             let mut index_stream = std::pin::pin!(index_stream);
     908              : 
     909        58118 :             while let Some(index_entry) = index_stream.next().await {
     910        58026 :                 let (raw_key, value) = index_entry?;
     911        58026 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     912        58026 :                 let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
     913        58026 :                 let blob_ref = BlobRef(value);
     914        58026 : 
     915        58026 :                 // Lsns are not monotonically increasing across keys, so we don't assert on them.
     916        58026 :                 assert!(key >= range.start);
     917              : 
     918        58026 :                 let outside_lsn_range = !lsn_range.contains(&lsn);
     919        58026 :                 let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
     920              : 
     921        58026 :                 let flag = {
     922        58026 :                     if outside_lsn_range || below_cached_lsn {
     923            4 :                         BlobFlag::Ignore
     924        58022 :                     } else if blob_ref.will_init() {
     925          374 :                         BlobFlag::ReplaceAll
     926              :                     } else {
     927              :                         // Usual path: add blob to the read
     928        57648 :                         BlobFlag::None
     929              :                     }
     930              :                 };
     931              : 
     932        58026 :                 if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
     933          330 :                     planner.handle_range_end(blob_ref.pos());
     934          330 :                     range_end_handled = true;
     935          330 :                     break;
     936        57696 :                 } else {
     937        57696 :                     planner.handle(key, lsn, blob_ref.pos(), flag);
     938        57696 :                 }
     939              :             }
     940              : 
     941          422 :             if !range_end_handled {
     942           92 :                 tracing::info!("Handling range end fallback at {}", data_end_offset);
     943           92 :                 planner.handle_range_end(data_end_offset);
     944          330 :             }
     945              :         }
     946              : 
     947          220 :         Ok(planner.finish())
     948          220 :     }
     949              : 
     950          218 :     fn get_min_read_buffer_size(
     951          218 :         planned_reads: &[VectoredRead],
     952          218 :         read_size_soft_max: usize,
     953          218 :     ) -> usize {
     954        19742 :         let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
     955            0 :             return read_size_soft_max;
     956              :         };
     957              : 
     958          218 :         let largest_read_size = largest_read.size();
     959          218 :         if largest_read_size > read_size_soft_max {
     960              :             // If the read is oversized, it should only contain one key.
     961          200 :             let offenders = largest_read
     962          200 :                 .blobs_at
     963          200 :                 .as_slice()
     964          200 :                 .iter()
     965          200 :                 .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
     966          200 :                 .join(", ");
     967          200 :             tracing::warn!(
     968          200 :                 "Oversized vectored read ({} > {}) for keys {}",
     969          200 :                 largest_read_size,
     970          200 :                 read_size_soft_max,
     971          200 :                 offenders
     972          200 :             );
     973           18 :         }
     974              : 
     975          218 :         largest_read_size
     976          218 :     }
     977              : 
     978           18 :     async fn do_reads_and_update_state(
     979           18 :         &self,
     980           18 :         reads: Vec<VectoredRead>,
     981           18 :         reconstruct_state: &mut ValuesReconstructState,
     982           18 :     ) {
     983           18 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     984           18 :         let mut ignore_key_with_err = None;
     985           18 : 
     986           18 :         let max_vectored_read_bytes = self
     987           18 :             .max_vectored_read_bytes
     988           18 :             .expect("Layer is loaded with max vectored bytes config")
     989           18 :             .0
     990           18 :             .into();
     991           18 :         let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
     992           18 :         let mut buf = Some(BytesMut::with_capacity(buf_size));
     993              : 
     994              :         // Note that reads are processed in reverse order (from highest key+lsn).
     995              :         // This is the order that `ReconstructState` requires such that it can
     996              :         // track when a key is done.
     997           18 :         for read in reads.into_iter().rev() {
     998           18 :             let res = vectored_blob_reader
     999           18 :                 .read_blobs(&read, buf.take().expect("Should have a buffer"))
    1000            9 :                 .await;
    1001              : 
    1002           18 :             let blobs_buf = match res {
    1003           18 :                 Ok(blobs_buf) => blobs_buf,
    1004            0 :                 Err(err) => {
    1005            0 :                     let kind = err.kind();
    1006            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
    1007            0 :                         reconstruct_state.on_key_error(
    1008            0 :                             blob_meta.key,
    1009            0 :                             PageReconstructError::from(anyhow!(
    1010            0 :                                 "Failed to read blobs from virtual file {}: {}",
    1011            0 :                                 self.file.path,
    1012            0 :                                 kind
    1013            0 :                             )),
    1014            0 :                         );
    1015            0 :                     }
    1016              : 
    1017              :                     // We have "lost" the buffer since the lower level IO api
    1018              :                     // doesn't return the buffer on error. Allocate a new one.
    1019            0 :                     buf = Some(BytesMut::with_capacity(buf_size));
    1020            0 : 
    1021            0 :                     continue;
    1022              :                 }
    1023              :             };
    1024              : 
    1025          362 :             for meta in blobs_buf.blobs.iter().rev() {
    1026          362 :                 if Some(meta.meta.key) == ignore_key_with_err {
    1027            0 :                     continue;
    1028          362 :                 }
    1029          362 : 
    1030          362 :                 let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
    1031          362 :                 let value = match value {
    1032          362 :                     Ok(v) => v,
    1033            0 :                     Err(e) => {
    1034            0 :                         reconstruct_state.on_key_error(
    1035            0 :                             meta.meta.key,
    1036            0 :                             PageReconstructError::from(anyhow!(e).context(format!(
    1037            0 :                                 "Failed to deserialize blob from virtual file {}",
    1038            0 :                                 self.file.path,
    1039            0 :                             ))),
    1040            0 :                         );
    1041            0 : 
    1042            0 :                         ignore_key_with_err = Some(meta.meta.key);
    1043            0 :                         continue;
    1044              :                     }
    1045              :                 };
    1046              : 
    1047              :                 // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
    1048              :                 // state, no further updates shall be made to it. The call below will
    1049              :                 // panic if the invariant is violated.
    1050          362 :                 reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
    1051              :             }
    1052              : 
    1053           18 :             buf = Some(blobs_buf.buf);
    1054              :         }
    1055           18 :     }
    1056              : 
    1057          446 :     pub(super) async fn load_keys<'a>(
    1058          446 :         &'a self,
    1059          446 :         ctx: &RequestContext,
    1060          446 :     ) -> Result<Vec<DeltaEntry<'a>>> {
    1061          446 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1062          446 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1063          446 :             self.index_start_blk,
    1064          446 :             self.index_root_blk,
    1065          446 :             block_reader,
    1066          446 :         );
    1067          446 : 
    1068          446 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
    1069          446 : 
    1070          446 :         tree_reader
    1071          446 :             .visit(
    1072          446 :                 &[0u8; DELTA_KEY_SIZE],
    1073          446 :                 VisitDirection::Forwards,
    1074      3122006 :                 |key, value| {
    1075      3122006 :                     let delta_key = DeltaKey::from_slice(key);
    1076      3122006 :                     let val_ref = ValueRef {
    1077      3122006 :                         blob_ref: BlobRef(value),
    1078      3122006 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
    1079      3122006 :                             Adapter(self),
    1080      3122006 :                         )),
    1081      3122006 :                     };
    1082      3122006 :                     let pos = BlobRef(value).pos();
    1083      3122006 :                     if let Some(last) = all_keys.last_mut() {
    1084      3121560 :                         // subtract offset of the current and last entries to get the size
    1085      3121560 :                         // of the value associated with this (key, lsn) tuple
    1086      3121560 :                         let first_pos = last.size;
    1087      3121560 :                         last.size = pos - first_pos;
    1088      3121560 :                     }
    1089      3122006 :                     let entry = DeltaEntry {
    1090      3122006 :                         key: delta_key.key(),
    1091      3122006 :                         lsn: delta_key.lsn(),
    1092      3122006 :                         size: pos,
    1093      3122006 :                         val: val_ref,
    1094      3122006 :                     };
    1095      3122006 :                     all_keys.push(entry);
    1096      3122006 :                     true
    1097      3122006 :                 },
    1098          446 :                 &RequestContextBuilder::extend(ctx)
    1099          446 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1100          446 :                     .build(),
    1101          446 :             )
    1102         3244 :             .await?;
    1103          446 :         if let Some(last) = all_keys.last_mut() {
    1104          446 :             // Last key occupies all space till end of value storage,
    1105          446 :             // which corresponds to beginning of the index
    1106          446 :             last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
    1107          446 :         }
    1108          446 :         Ok(all_keys)
    1109          446 :     }
    1110              : 
    1111            4 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1112            4 :         println!(
    1113            4 :             "index_start_blk: {}, root {}",
    1114            4 :             self.index_start_blk, self.index_root_blk
    1115            4 :         );
    1116            4 : 
    1117            4 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1118            4 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1119            4 :             self.index_start_blk,
    1120            4 :             self.index_root_blk,
    1121            4 :             block_reader,
    1122            4 :         );
    1123            4 : 
    1124            4 :         tree_reader.dump().await?;
    1125              : 
    1126            4 :         let keys = self.load_keys(ctx).await?;
    1127              : 
    1128            8 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1129            8 :             let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1130            8 :             let val = Value::des(&buf)?;
    1131            8 :             let desc = match val {
    1132            8 :                 Value::Image(img) => {
    1133            8 :                     format!(" img {} bytes", img.len())
    1134              :                 }
    1135            0 :                 Value::WalRecord(rec) => {
    1136            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
    1137            0 :                     format!(
    1138            0 :                         " rec {} bytes will_init: {} {}",
    1139            0 :                         buf.len(),
    1140            0 :                         rec.will_init(),
    1141            0 :                         wal_desc
    1142            0 :                     )
    1143              :                 }
    1144              :             };
    1145            8 :             Ok(desc)
    1146            8 :         }
    1147              : 
    1148           12 :         for entry in keys {
    1149            8 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1150            8 :             let desc = match dump_blob(&val, ctx).await {
    1151            8 :                 Ok(desc) => desc,
    1152            0 :                 Err(err) => {
    1153            0 :                     format!("ERROR: {err}")
    1154              :                 }
    1155              :             };
    1156            8 :             println!("  key {key} at {lsn}: {desc}");
    1157            8 : 
    1158            8 :             // Print more details about CHECKPOINT records. Would be nice to print details
    1159            8 :             // of many other record types too, but these are particularly interesting, as
    1160            8 :             // have a lot of special processing for them in walingest.rs.
    1161            8 :             use pageserver_api::key::CHECKPOINT_KEY;
    1162            8 :             use postgres_ffi::CheckPoint;
    1163            8 :             if key == CHECKPOINT_KEY {
    1164            0 :                 let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1165            0 :                 let val = Value::des(&buf)?;
    1166            0 :                 match val {
    1167            0 :                     Value::Image(img) => {
    1168            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1169            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
    1170              :                     }
    1171            0 :                     Value::WalRecord(_rec) => {
    1172            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1173            0 :                     }
    1174              :                 }
    1175            8 :             }
    1176              :         }
    1177              : 
    1178            4 :         Ok(())
    1179            4 :     }
    1180              : }
    1181              : 
    1182              : /// A set of data associated with a delta layer key and its value
    1183              : pub struct DeltaEntry<'a> {
    1184              :     pub key: Key,
    1185              :     pub lsn: Lsn,
    1186              :     /// Size of the stored value
    1187              :     pub size: u64,
    1188              :     /// Reference to the on-disk value
    1189              :     pub val: ValueRef<'a>,
    1190              : }
    1191              : 
    1192              : /// Reference to an on-disk value
    1193              : pub struct ValueRef<'a> {
    1194              :     blob_ref: BlobRef,
    1195              :     reader: BlockCursor<'a>,
    1196              : }
    1197              : 
    1198              : impl<'a> ValueRef<'a> {
    1199              :     /// Loads the value from disk
    1200      3121998 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1201              :         // theoretically we *could* record an access time for each, but it does not really matter
    1202      3121998 :         let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1203      3121998 :         let val = Value::des(&buf)?;
    1204      3121998 :         Ok(val)
    1205      3121998 :     }
    1206              : }
    1207              : 
    1208              : pub(crate) struct Adapter<T>(T);
    1209              : 
    1210              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1211      3150684 :     pub(crate) async fn read_blk(
    1212      3150684 :         &self,
    1213      3150684 :         blknum: u32,
    1214      3150684 :         ctx: &RequestContext,
    1215      3150684 :     ) -> Result<BlockLease, std::io::Error> {
    1216      3150684 :         let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
    1217      3150684 :         block_reader.read_blk(blknum, ctx).await
    1218      3150684 :     }
    1219              : }
    1220              : 
    1221              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1222      6301368 :     fn as_ref(&self) -> &DeltaLayerInner {
    1223      6301368 :         self
    1224      6301368 :     }
    1225              : }
    1226              : 
    1227              : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
    1228            0 :     fn key(&self) -> Key {
    1229            0 :         self.key
    1230            0 :     }
    1231            0 :     fn lsn(&self) -> Lsn {
    1232            0 :         self.lsn
    1233            0 :     }
    1234            0 :     fn size(&self) -> u64 {
    1235            0 :         self.size
    1236            0 :     }
    1237              : }
    1238              : 
    1239              : #[cfg(test)]
    1240              : mod test {
    1241              :     use std::collections::BTreeMap;
    1242              : 
    1243              :     use itertools::MinMaxResult;
    1244              :     use rand::prelude::{SeedableRng, SliceRandom, StdRng};
    1245              :     use rand::RngCore;
    1246              : 
    1247              :     use super::*;
    1248              :     use crate::{
    1249              :         context::DownloadBehavior,
    1250              :         task_mgr::TaskKind,
    1251              :         tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
    1252              :         DEFAULT_PG_VERSION,
    1253              :     };
    1254              : 
    1255              :     /// Construct an index for a fictional delta layer and and then
    1256              :     /// traverse in order to plan vectored reads for a query. Finally,
    1257              :     /// verify that the traversal fed the right index key and value
    1258              :     /// pairs into the planner.
    1259              :     #[tokio::test]
    1260            2 :     async fn test_delta_layer_index_traversal() {
    1261            2 :         let base_key = Key {
    1262            2 :             field1: 0,
    1263            2 :             field2: 1663,
    1264            2 :             field3: 12972,
    1265            2 :             field4: 16396,
    1266            2 :             field5: 0,
    1267            2 :             field6: 246080,
    1268            2 :         };
    1269            2 : 
    1270            2 :         // Populate the index with some entries
    1271            2 :         let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
    1272            2 :             (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
    1273            2 :             (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1274            2 :             (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
    1275            2 :             (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
    1276            2 :         ]);
    1277            2 : 
    1278            2 :         let mut disk = TestDisk::default();
    1279            2 :         let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
    1280            2 : 
    1281            2 :         let mut disk_offset = 0;
    1282           10 :         for (key, lsns) in &entries {
    1283           42 :             for lsn in lsns {
    1284           34 :                 let index_key = DeltaKey::from_key_lsn(key, *lsn);
    1285           34 :                 let blob_ref = BlobRef::new(disk_offset, false);
    1286           34 :                 writer
    1287           34 :                     .append(&index_key.0, blob_ref.0)
    1288           34 :                     .expect("In memory disk append should never fail");
    1289           34 : 
    1290           34 :                 disk_offset += 1;
    1291           34 :             }
    1292            2 :         }
    1293            2 : 
    1294            2 :         // Prepare all the arguments for the call into `plan_reads` below
    1295            2 :         let (root_offset, _writer) = writer
    1296            2 :             .finish()
    1297            2 :             .expect("In memory disk finish should never fail");
    1298            2 :         let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
    1299            2 :         let planner = VectoredReadPlanner::new(100);
    1300            2 :         let mut reconstruct_state = ValuesReconstructState::new();
    1301            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1302            2 : 
    1303            2 :         let keyspace = KeySpace {
    1304            2 :             ranges: vec![
    1305            2 :                 base_key..base_key.add(3),
    1306            2 :                 base_key.add(3)..base_key.add(100),
    1307            2 :             ],
    1308            2 :         };
    1309            2 :         let lsn_range = Lsn(2)..Lsn(40);
    1310            2 : 
    1311            2 :         // Plan and validate
    1312            2 :         let vectored_reads = DeltaLayerInner::plan_reads(
    1313            2 :             keyspace.clone(),
    1314            2 :             lsn_range.clone(),
    1315            2 :             disk_offset,
    1316            2 :             reader,
    1317            2 :             planner,
    1318            2 :             &mut reconstruct_state,
    1319            2 :             &ctx,
    1320            2 :         )
    1321            2 :         .await
    1322            2 :         .expect("Read planning should not fail");
    1323            2 : 
    1324            2 :         validate(keyspace, lsn_range, vectored_reads, entries);
    1325            2 :     }
    1326              : 
    1327            2 :     fn validate(
    1328            2 :         keyspace: KeySpace,
    1329            2 :         lsn_range: Range<Lsn>,
    1330            2 :         vectored_reads: Vec<VectoredRead>,
    1331            2 :         index_entries: BTreeMap<Key, Vec<Lsn>>,
    1332            2 :     ) {
    1333            2 :         #[derive(Debug, PartialEq, Eq)]
    1334            2 :         struct BlobSpec {
    1335            2 :             key: Key,
    1336            2 :             lsn: Lsn,
    1337            2 :             at: u64,
    1338            2 :         }
    1339            2 : 
    1340            2 :         let mut planned_blobs = Vec::new();
    1341            8 :         for read in vectored_reads {
    1342           28 :             for (at, meta) in read.blobs_at.as_slice() {
    1343           28 :                 planned_blobs.push(BlobSpec {
    1344           28 :                     key: meta.key,
    1345           28 :                     lsn: meta.lsn,
    1346           28 :                     at: *at,
    1347           28 :                 });
    1348           28 :             }
    1349              :         }
    1350              : 
    1351            2 :         let mut expected_blobs = Vec::new();
    1352            2 :         let mut disk_offset = 0;
    1353           10 :         for (key, lsns) in index_entries {
    1354           42 :             for lsn in lsns {
    1355           42 :                 let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
    1356           34 :                 let lsn_included = lsn_range.contains(&lsn);
    1357           34 : 
    1358           34 :                 if key_included && lsn_included {
    1359           28 :                     expected_blobs.push(BlobSpec {
    1360           28 :                         key,
    1361           28 :                         lsn,
    1362           28 :                         at: disk_offset,
    1363           28 :                     });
    1364           28 :                 }
    1365              : 
    1366           34 :                 disk_offset += 1;
    1367              :             }
    1368              :         }
    1369              : 
    1370            2 :         assert_eq!(planned_blobs, expected_blobs);
    1371            2 :     }
    1372              : 
    1373              :     mod constants {
    1374              :         use utils::lsn::Lsn;
    1375              : 
    1376              :         /// Offset used by all lsns in this test
    1377              :         pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
    1378              :         /// Number of unique keys including in the test data
    1379              :         pub(super) const KEY_COUNT: u8 = 60;
    1380              :         /// Max number of different lsns for each key
    1381              :         pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
    1382              :         /// Possible value sizes for each key along with a probability weight
    1383              :         pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
    1384              :         /// Probability that there will be a gap between the current key and the next one (33.3%)
    1385              :         pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
    1386              :         /// The minimum size of a key range in all the generated reads
    1387              :         pub(super) const MIN_RANGE_SIZE: i128 = 10;
    1388              :         /// The number of ranges included in each vectored read
    1389              :         pub(super) const RANGES_COUNT: u8 = 2;
    1390              :         /// The number of vectored reads performed
    1391              :         pub(super) const READS_COUNT: u8 = 100;
    1392              :         /// Soft max size of a vectored read. Will be violated if we have to read keys
    1393              :         /// with values larger than the limit
    1394              :         pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
    1395              :     }
    1396              : 
    1397              :     struct Entry {
    1398              :         key: Key,
    1399              :         lsn: Lsn,
    1400              :         value: Vec<u8>,
    1401              :     }
    1402              : 
    1403            2 :     fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
    1404            2 :         let mut current_key = Key::MIN;
    1405            2 : 
    1406            2 :         let mut entries = Vec::new();
    1407          122 :         for _ in 0..constants::KEY_COUNT {
    1408          120 :             let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
    1409          120 :             let mut lsns_iter =
    1410         2260 :                 std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
    1411         2260 :                     Some(Lsn(lsn.0 + 0x08))
    1412         2260 :                 });
    1413          120 :             let mut lsns = Vec::new();
    1414         2380 :             while lsns.len() < count as usize {
    1415         2260 :                 let take = rng.gen_bool(0.5);
    1416         2260 :                 let lsn = lsns_iter.next().unwrap();
    1417         2260 :                 if take {
    1418         1112 :                     lsns.push(lsn);
    1419         1148 :                 }
    1420              :             }
    1421              : 
    1422         1232 :             for lsn in lsns {
    1423         1112 :                 let size = constants::VALUE_SIZES
    1424         3336 :                     .choose_weighted(rng, |item| item.1)
    1425         1112 :                     .unwrap()
    1426         1112 :                     .0;
    1427         1112 :                 let mut buf = vec![0; size];
    1428         1112 :                 rng.fill_bytes(&mut buf);
    1429         1112 : 
    1430         1112 :                 entries.push(Entry {
    1431         1112 :                     key: current_key,
    1432         1112 :                     lsn,
    1433         1112 :                     value: buf,
    1434         1112 :                 })
    1435              :             }
    1436              : 
    1437          120 :             let gap = constants::KEY_GAP_CHANGES
    1438          240 :                 .choose_weighted(rng, |item| item.1)
    1439          120 :                 .unwrap()
    1440          120 :                 .0;
    1441          120 :             if gap {
    1442           38 :                 current_key = current_key.add(2);
    1443           82 :             } else {
    1444           82 :                 current_key = current_key.add(1);
    1445           82 :             }
    1446              :         }
    1447              : 
    1448            2 :         entries
    1449            2 :     }
    1450              : 
    1451              :     struct EntriesMeta {
    1452              :         key_range: Range<Key>,
    1453              :         lsn_range: Range<Lsn>,
    1454              :         index: BTreeMap<(Key, Lsn), Vec<u8>>,
    1455              :     }
    1456              : 
    1457            2 :     fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
    1458         1112 :         let key_range = match entries.iter().minmax_by_key(|e| e.key) {
    1459            2 :             MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
    1460            0 :             _ => panic!("More than one entry is always expected"),
    1461              :         };
    1462              : 
    1463         1112 :         let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
    1464            2 :             MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
    1465            0 :             _ => panic!("More than one entry is always expected"),
    1466              :         };
    1467              : 
    1468            2 :         let mut index = BTreeMap::new();
    1469         1112 :         for entry in entries.iter() {
    1470         1112 :             index.insert((entry.key, entry.lsn), entry.value.clone());
    1471         1112 :         }
    1472              : 
    1473            2 :         EntriesMeta {
    1474            2 :             key_range,
    1475            2 :             lsn_range,
    1476            2 :             index,
    1477            2 :         }
    1478            2 :     }
    1479              : 
    1480          200 :     fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
    1481          200 :         let start = key_range.start.to_i128();
    1482          200 :         let end = key_range.end.to_i128();
    1483          200 : 
    1484          200 :         let mut keyspace = KeySpace::default();
    1485              : 
    1486          600 :         for _ in 0..constants::RANGES_COUNT {
    1487          400 :             let mut range: Option<Range<Key>> = Option::default();
    1488         1244 :             while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
    1489          844 :                 let range_start = rng.gen_range(start..end);
    1490          844 :                 let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
    1491          844 :                 if range_end_offset >= end {
    1492          100 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(end));
    1493          744 :                 } else {
    1494          744 :                     let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
    1495          744 :                     range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
    1496          744 :                 }
    1497              :             }
    1498          400 :             keyspace.ranges.push(range.unwrap());
    1499              :         }
    1500              : 
    1501          200 :         keyspace
    1502          200 :     }
    1503              : 
    1504              :     #[tokio::test]
    1505            2 :     async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
    1506            2 :         let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
    1507            2 :         let (tenant, ctx) = harness.load().await;
    1508            2 : 
    1509            2 :         let timeline_id = TimelineId::generate();
    1510            2 :         let timeline = tenant
    1511            2 :             .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
    1512            6 :             .await?;
    1513            2 : 
    1514            2 :         tracing::info!("Generating test data ...");
    1515            2 : 
    1516            2 :         let rng = &mut StdRng::seed_from_u64(0);
    1517            2 :         let entries = generate_entries(rng);
    1518            2 :         let entries_meta = get_entries_meta(&entries);
    1519            2 : 
    1520            2 :         tracing::info!("Done generating {} entries", entries.len());
    1521            2 : 
    1522            2 :         tracing::info!("Writing test data to delta layer ...");
    1523            2 :         let mut writer = DeltaLayerWriter::new(
    1524            2 :             harness.conf,
    1525            2 :             timeline_id,
    1526            2 :             harness.tenant_shard_id,
    1527            2 :             entries_meta.key_range.start,
    1528            2 :             entries_meta.lsn_range.clone(),
    1529            2 :         )
    1530            2 :         .await?;
    1531            2 : 
    1532         1114 :         for entry in entries {
    1533         1112 :             let (_, res) = writer
    1534         1112 :                 .put_value_bytes(entry.key, entry.lsn, entry.value, false)
    1535          220 :                 .await;
    1536         1112 :             res?;
    1537            2 :         }
    1538            2 : 
    1539            5 :         let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
    1540            2 : 
    1541            2 :         let inner = resident.get_inner_delta(&ctx).await?;
    1542            2 : 
    1543            2 :         let file_size = inner.file.metadata().await?.len();
    1544            2 :         tracing::info!(
    1545            2 :             "Done writing test data to delta layer. Resulting file size is: {}",
    1546            2 :             file_size
    1547            2 :         );
    1548            2 : 
    1549          202 :         for i in 0..constants::READS_COUNT {
    1550          200 :             tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
    1551            2 : 
    1552          200 :             let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
    1553          200 :             let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1554          200 :                 inner.index_start_blk,
    1555          200 :                 inner.index_root_blk,
    1556          200 :                 block_reader,
    1557          200 :             );
    1558          200 : 
    1559          200 :             let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
    1560          200 :             let mut reconstruct_state = ValuesReconstructState::new();
    1561          200 :             let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
    1562          200 :             let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
    1563            2 : 
    1564          200 :             let vectored_reads = DeltaLayerInner::plan_reads(
    1565          200 :                 keyspace.clone(),
    1566          200 :                 entries_meta.lsn_range.clone(),
    1567          200 :                 data_end_offset,
    1568          200 :                 index_reader,
    1569          200 :                 planner,
    1570          200 :                 &mut reconstruct_state,
    1571          200 :                 &ctx,
    1572          200 :             )
    1573            3 :             .await?;
    1574            2 : 
    1575          200 :             let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
    1576          200 :             let buf_size = DeltaLayerInner::get_min_read_buffer_size(
    1577          200 :                 &vectored_reads,
    1578          200 :                 constants::MAX_VECTORED_READ_BYTES,
    1579          200 :             );
    1580          200 :             let mut buf = Some(BytesMut::with_capacity(buf_size));
    1581            2 : 
    1582        19924 :             for read in vectored_reads {
    1583        19724 :                 let blobs_buf = vectored_blob_reader
    1584        19724 :                     .read_blobs(&read, buf.take().expect("Should have a buffer"))
    1585        10017 :                     .await?;
    1586        57304 :                 for meta in blobs_buf.blobs.iter() {
    1587        57304 :                     let value = &blobs_buf.buf[meta.start..meta.end];
    1588        57304 :                     assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
    1589            2 :                 }
    1590            2 : 
    1591        19724 :                 buf = Some(blobs_buf.buf);
    1592            2 :             }
    1593            2 :         }
    1594            2 : 
    1595            2 :         Ok(())
    1596            2 :     }
    1597              : }
        

Generated by: LCOV version 2.1-beta