LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 87.7 % 666 584 82 584
Current Date: 2023-10-19 02:04:12 Functions: 77.1 % 105 81 24 81
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2                 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3                 : //!
       4                 : //! Usually a delta layer only contains differences, in the form of WAL records
       5                 : //! against a base LSN. However, if a relation extended or a whole new relation
       6                 : //! is created, there would be no base for the new pages. The entries for them
       7                 : //! must be page images or WAL records with the 'will_init' flag set, so that
       8                 : //! they can be replayed without referring to an older page version.
       9                 : //!
      10                 : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11                 : //! there are no subdirectories, and each delta file is named like this:
      12                 : //!
      13                 : //! ```text
      14                 : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15                 : //! ```
      16                 : //!
      17                 : //! For example:
      18                 : //!
      19                 : //! ```text
      20                 : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21                 : //! ```
      22                 : //!
      23                 : //! Every delta file consists of three parts: "summary", "index", and
      24                 : //! "values". The summary is a fixed size header at the beginning of the file,
      25                 : //! and it contains basic information about the layer, and offsets to the other
      26                 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27                 : //! "values" part.  The actual page images and WAL records are stored in the
      28                 : //! "values" part.
      29                 : //!
      30                 : use crate::config::PageServerConf;
      31                 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32                 : use crate::page_cache::PAGE_SZ;
      33                 : use crate::repository::{Key, Value, KEY_SIZE};
      34                 : use crate::tenant::blob_io::BlobWriter;
      35                 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36                 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37                 : use crate::tenant::storage_layer::{
      38                 :     PersistentLayer, ValueReconstructResult, ValueReconstructState,
      39                 : };
      40                 : use crate::virtual_file::VirtualFile;
      41                 : use crate::{walrecord, TEMP_FILE_SUFFIX};
      42                 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      43                 : use anyhow::{bail, ensure, Context, Result};
      44                 : use camino::{Utf8Path, Utf8PathBuf};
      45                 : use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
      46                 : use rand::{distributions::Alphanumeric, Rng};
      47                 : use serde::{Deserialize, Serialize};
      48                 : use std::fs::{self, File};
      49                 : use std::io::SeekFrom;
      50                 : use std::ops::Range;
      51                 : use std::os::unix::fs::FileExt;
      52                 : use std::sync::Arc;
      53                 : use tokio::sync::OnceCell;
      54                 : use tracing::*;
      55                 : 
      56                 : use utils::{
      57                 :     bin_ser::BeSer,
      58                 :     id::{TenantId, TimelineId},
      59                 :     lsn::Lsn,
      60                 : };
      61                 : 
      62                 : use super::{
      63                 :     AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf,
      64                 :     PersistentLayerDesc,
      65                 : };
      66                 : 
      67                 : ///
      68                 : /// Header stored in the beginning of the file
      69                 : ///
      70                 : /// After this comes the 'values' part, starting on block 1. After that,
      71                 : /// the 'index' starts at the block indicated by 'index_start_blk'
      72                 : ///
      73 CBC       15721 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      74                 : pub struct Summary {
      75                 :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      76                 :     magic: u16,
      77                 :     format_version: u16,
      78                 : 
      79                 :     tenant_id: TenantId,
      80                 :     timeline_id: TimelineId,
      81                 :     key_range: Range<Key>,
      82                 :     lsn_range: Range<Lsn>,
      83                 : 
      84                 :     /// Block number where the 'index' part of the file begins.
      85                 :     pub index_start_blk: u32,
      86                 :     /// Block within the 'index', where the B-tree root page is stored
      87                 :     pub index_root_blk: u32,
      88                 : }
      89                 : 
      90                 : impl From<&DeltaLayer> for Summary {
      91           10807 :     fn from(layer: &DeltaLayer) -> Self {
      92           10807 :         Self::expected(
      93           10807 :             layer.desc.tenant_id,
      94           10807 :             layer.desc.timeline_id,
      95           10807 :             layer.desc.key_range.clone(),
      96           10807 :             layer.desc.lsn_range.clone(),
      97           10807 :         )
      98           10807 :     }
      99                 : }
     100                 : 
     101                 : impl Summary {
     102           10807 :     pub(super) fn expected(
     103           10807 :         tenant_id: TenantId,
     104           10807 :         timeline_id: TimelineId,
     105           10807 :         keys: Range<Key>,
     106           10807 :         lsns: Range<Lsn>,
     107           10807 :     ) -> Self {
     108           10807 :         Self {
     109           10807 :             magic: DELTA_FILE_MAGIC,
     110           10807 :             format_version: STORAGE_FORMAT_VERSION,
     111           10807 : 
     112           10807 :             tenant_id,
     113           10807 :             timeline_id,
     114           10807 :             key_range: keys,
     115           10807 :             lsn_range: lsns,
     116           10807 : 
     117           10807 :             index_start_blk: 0,
     118           10807 :             index_root_blk: 0,
     119           10807 :         }
     120           10807 :     }
     121                 : }
     122                 : 
     123                 : // Flag indicating that this version initialize the page
     124                 : const WILL_INIT: u64 = 1;
     125                 : 
     126                 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     127                 : /// offset, and for WAL records it also contains `will_init` flag. The flag
     128                 : /// helps to determine the range of records that needs to be applied, without
     129                 : /// reading/deserializing records themselves.
     130 UBC           0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     131                 : pub struct BlobRef(pub u64);
     132                 : 
     133                 : impl BlobRef {
     134 CBC   131433446 :     pub fn will_init(&self) -> bool {
     135       131433446 :         (self.0 & WILL_INIT) != 0
     136       131433446 :     }
     137                 : 
     138       191246763 :     pub fn pos(&self) -> u64 {
     139       191246763 :         self.0 >> 1
     140       191246763 :     }
     141                 : 
     142        82730190 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     143        82730190 :         let mut blob_ref = pos << 1;
     144        82730190 :         if will_init {
     145         8619558 :             blob_ref |= WILL_INIT;
     146        74110632 :         }
     147        82730190 :         BlobRef(blob_ref)
     148        82730190 :     }
     149                 : }
     150                 : 
     151                 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     152                 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     153                 : 
     154                 : /// This is the key of the B-tree index stored in the delta layer. It consists
     155                 : /// of the serialized representation of a Key and LSN.
     156                 : impl DeltaKey {
     157        30235446 :     fn from_slice(buf: &[u8]) -> Self {
     158        30235446 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     159        30235446 :         bytes.copy_from_slice(buf);
     160        30235446 :         DeltaKey(bytes)
     161        30235446 :     }
     162                 : 
     163       106149550 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     164       106149550 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     165       106149550 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     166       106149550 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     167       106149550 :         DeltaKey(bytes)
     168       106149550 :     }
     169                 : 
     170        30235446 :     fn key(&self) -> Key {
     171        30235446 :         Key::from_slice(&self.0)
     172        30235446 :     }
     173                 : 
     174        30235446 :     fn lsn(&self) -> Lsn {
     175        30235446 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     176        30235446 :     }
     177                 : 
     178       132123317 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     179       132123317 :         let mut lsn_buf = [0u8; 8];
     180       132123317 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     181       132123317 :         Lsn(u64::from_be_bytes(lsn_buf))
     182       132123317 :     }
     183                 : }
     184                 : 
     185                 : /// DeltaLayer is the in-memory data structure associated with an on-disk delta
     186                 : /// file.
     187                 : ///
     188                 : /// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
     189                 : /// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
     190                 : /// Otherwise the struct is just a placeholder for a file that exists on disk,
     191                 : /// and it needs to be loaded before using it in queries.
     192                 : pub struct DeltaLayer {
     193                 :     path_or_conf: PathOrConf,
     194                 : 
     195                 :     pub desc: PersistentLayerDesc,
     196                 : 
     197                 :     access_stats: LayerAccessStats,
     198                 : 
     199                 :     inner: OnceCell<Arc<DeltaLayerInner>>,
     200                 : }
     201                 : 
     202                 : impl std::fmt::Debug for DeltaLayer {
     203 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     204               0 :         use super::RangeDisplayDebug;
     205               0 : 
     206               0 :         f.debug_struct("DeltaLayer")
     207               0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     208               0 :             .field("lsn_range", &self.desc.lsn_range)
     209               0 :             .field("file_size", &self.desc.file_size)
     210               0 :             .field("inner", &self.inner)
     211               0 :             .finish()
     212               0 :     }
     213                 : }
     214                 : 
     215                 : pub struct DeltaLayerInner {
     216                 :     // values copied from summary
     217                 :     index_start_blk: u32,
     218                 :     index_root_blk: u32,
     219                 : 
     220                 :     /// Reader object for reading blocks from the file.
     221                 :     file: FileBlockReader,
     222                 : }
     223                 : 
     224                 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
     225 CBC    31066950 :     fn as_ref(&self) -> &DeltaLayerInner {
     226        31066950 :         self
     227        31066950 :     }
     228                 : }
     229                 : 
     230                 : impl std::fmt::Debug for DeltaLayerInner {
     231 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     232               0 :         f.debug_struct("DeltaLayerInner")
     233               0 :             .field("index_start_blk", &self.index_start_blk)
     234               0 :             .field("index_root_blk", &self.index_root_blk)
     235               0 :             .finish()
     236               0 :     }
     237                 : }
     238                 : 
     239                 : #[async_trait::async_trait]
     240                 : impl Layer for DeltaLayer {
     241 CBC    23419370 :     async fn get_value_reconstruct_data(
     242        23419370 :         &self,
     243        23419370 :         key: Key,
     244        23419370 :         lsn_range: Range<Lsn>,
     245        23419370 :         reconstruct_state: &mut ValueReconstructState,
     246        23419370 :         ctx: &RequestContext,
     247        23419370 :     ) -> anyhow::Result<ValueReconstructResult> {
     248        23419349 :         self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
     249         2118353 :             .await
     250        46838692 :     }
     251                 : }
     252                 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     253                 : impl std::fmt::Display for DeltaLayer {
     254            4571 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     255            4571 :         write!(f, "{}", self.layer_desc().short_id())
     256            4571 :     }
     257                 : }
     258                 : 
     259                 : impl AsLayerDesc for DeltaLayer {
     260          143635 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     261          143635 :         &self.desc
     262          143635 :     }
     263                 : }
     264                 : 
     265                 : impl PersistentLayer for DeltaLayer {
     266            4363 :     fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
     267            4363 :         Some(self)
     268            4363 :     }
     269                 : 
     270             218 :     fn local_path(&self) -> Option<Utf8PathBuf> {
     271             218 :         self.local_path()
     272             218 :     }
     273                 : 
     274            6951 :     fn delete_resident_layer_file(&self) -> Result<()> {
     275            6951 :         self.delete_resident_layer_file()
     276            6951 :     }
     277                 : 
     278            1370 :     fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
     279            1370 :         self.info(reset)
     280            1370 :     }
     281                 : 
     282            1455 :     fn access_stats(&self) -> &LayerAccessStats {
     283            1455 :         self.access_stats()
     284            1455 :     }
     285                 : }
     286                 : 
     287                 : impl DeltaLayer {
     288               4 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     289               4 :         println!(
     290               4 :             "----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
     291               4 :             self.desc.tenant_id,
     292               4 :             self.desc.timeline_id,
     293               4 :             self.desc.key_range.start,
     294               4 :             self.desc.key_range.end,
     295               4 :             self.desc.lsn_range.start,
     296               4 :             self.desc.lsn_range.end,
     297               4 :             self.desc.file_size,
     298               4 :         );
     299               4 : 
     300               4 :         if !verbose {
     301               2 :             return Ok(());
     302               2 :         }
     303                 : 
     304               2 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     305                 : 
     306               2 :         println!(
     307               2 :             "index_start_blk: {}, root {}",
     308               2 :             inner.index_start_blk, inner.index_root_blk
     309               2 :         );
     310               2 : 
     311               2 :         let file = &inner.file;
     312               2 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     313               2 :             inner.index_start_blk,
     314               2 :             inner.index_root_blk,
     315               2 :             file,
     316               2 :         );
     317               2 : 
     318               2 :         tree_reader.dump().await?;
     319                 : 
     320               2 :         let keys = DeltaLayerInner::load_keys(&inner, ctx).await?;
     321                 : 
     322                 :         // A subroutine to dump a single blob
     323               4 :         async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> Result<String> {
     324               4 :             let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
     325               4 :             let val = Value::des(&buf)?;
     326               4 :             let desc = match val {
     327               4 :                 Value::Image(img) => {
     328               4 :                     format!(" img {} bytes", img.len())
     329                 :                 }
     330 UBC           0 :                 Value::WalRecord(rec) => {
     331               0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
     332               0 :                     format!(
     333               0 :                         " rec {} bytes will_init: {} {}",
     334               0 :                         buf.len(),
     335               0 :                         rec.will_init(),
     336               0 :                         wal_desc
     337               0 :                     )
     338                 :                 }
     339                 :             };
     340 CBC           4 :             Ok(desc)
     341               4 :         }
     342                 : 
     343               6 :         for entry in keys {
     344               4 :             let DeltaEntry { key, lsn, val, .. } = entry;
     345               4 :             let desc = match dump_blob(val, ctx).await {
     346               4 :                 Ok(desc) => desc,
     347 UBC           0 :                 Err(err) => {
     348               0 :                     let err: anyhow::Error = err;
     349               0 :                     format!("ERROR: {err}")
     350                 :                 }
     351                 :             };
     352 CBC           4 :             println!("  key {key} at {lsn}: {desc}");
     353                 :         }
     354                 : 
     355               2 :         Ok(())
     356               4 :     }
     357                 : 
     358        23419370 :     pub(crate) async fn get_value_reconstruct_data(
     359        23419370 :         &self,
     360        23419370 :         key: Key,
     361        23419370 :         lsn_range: Range<Lsn>,
     362        23419370 :         reconstruct_state: &mut ValueReconstructState,
     363        23419370 :         ctx: &RequestContext,
     364        23419370 :     ) -> anyhow::Result<ValueReconstructResult> {
     365        23419340 :         ensure!(lsn_range.start >= self.desc.lsn_range.start);
     366                 : 
     367        23419340 :         ensure!(self.desc.key_range.contains(&key));
     368                 : 
     369        23419340 :         let inner = self
     370        23419340 :             .load(LayerAccessKind::GetValueReconstructData, ctx)
     371             309 :             .await?;
     372        23419330 :         inner
     373        23419330 :             .get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
     374         2118044 :             .await
     375        23419336 :     }
     376                 : 
     377             218 :     pub(crate) fn local_path(&self) -> Option<Utf8PathBuf> {
     378             218 :         Some(self.path())
     379             218 :     }
     380                 : 
     381                 :     pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
     382                 :         // delete underlying file
     383            6951 :         fs::remove_file(self.path())?;
     384            6951 :         Ok(())
     385            6951 :     }
     386                 : 
     387            1370 :     pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
     388            1370 :         let layer_file_name = self.layer_desc().filename().file_name();
     389            1370 :         let lsn_range = self.layer_desc().lsn_range.clone();
     390            1370 : 
     391            1370 :         let access_stats = self.access_stats.as_api_model(reset);
     392            1370 : 
     393            1370 :         HistoricLayerInfo::Delta {
     394            1370 :             layer_file_name,
     395            1370 :             layer_file_size: self.desc.file_size,
     396            1370 :             lsn_start: lsn_range.start,
     397            1370 :             lsn_end: lsn_range.end,
     398            1370 :             remote: false,
     399            1370 :             access_stats,
     400            1370 :         }
     401            1370 :     }
     402                 : 
     403           16653 :     pub(crate) fn access_stats(&self) -> &LayerAccessStats {
     404           16653 :         &self.access_stats
     405           16653 :     }
     406                 : 
     407           59302 :     fn path_for(
     408           59302 :         path_or_conf: &PathOrConf,
     409           59302 :         tenant_id: &TenantId,
     410           59302 :         timeline_id: &TimelineId,
     411           59302 :         fname: &DeltaFileName,
     412           59302 :     ) -> Utf8PathBuf {
     413           59302 :         match path_or_conf {
     414 UBC           0 :             PathOrConf::Path(path) => path.clone(),
     415 CBC       59302 :             PathOrConf::Conf(conf) => conf
     416           59302 :                 .timeline_path(tenant_id, timeline_id)
     417           59302 :                 .join(fname.to_string()),
     418                 :         }
     419           59302 :     }
     420                 : 
     421           15728 :     fn temp_path_for(
     422           15728 :         conf: &PageServerConf,
     423           15728 :         tenant_id: &TenantId,
     424           15728 :         timeline_id: &TimelineId,
     425           15728 :         key_start: Key,
     426           15728 :         lsn_range: &Range<Lsn>,
     427           15728 :     ) -> Utf8PathBuf {
     428           15728 :         let rand_string: String = rand::thread_rng()
     429           15728 :             .sample_iter(&Alphanumeric)
     430           15728 :             .take(8)
     431           15728 :             .map(char::from)
     432           15728 :             .collect();
     433           15728 : 
     434           15728 :         conf.timeline_path(tenant_id, timeline_id).join(format!(
     435           15728 :             "{}-XXX__{:016X}-{:016X}.{}.{}",
     436           15728 :             key_start,
     437           15728 :             u64::from(lsn_range.start),
     438           15728 :             u64::from(lsn_range.end),
     439           15728 :             rand_string,
     440           15728 :             TEMP_FILE_SUFFIX,
     441           15728 :         ))
     442           15728 :     }
     443                 : 
     444                 :     ///
     445                 :     /// Open the underlying file and read the metadata into memory, if it's
     446                 :     /// not loaded already.
     447                 :     ///
     448        23423705 :     async fn load(
     449        23423705 :         &self,
     450        23423705 :         access_kind: LayerAccessKind,
     451        23423705 :         ctx: &RequestContext,
     452        23423705 :     ) -> Result<&Arc<DeltaLayerInner>> {
     453        23423675 :         self.access_stats.record_access(access_kind, ctx);
     454        23423675 :         // Quick exit if already loaded
     455        23423675 :         self.inner
     456        23423675 :             .get_or_try_init(|| self.load_inner(ctx))
     457             311 :             .await
     458        23423675 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     459        23423675 :     }
     460                 : 
     461           10807 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     462           10807 :         let path = self.path();
     463                 : 
     464           10807 :         let summary = match &self.path_or_conf {
     465           10807 :             PathOrConf::Conf(_) => Some(Summary::from(self)),
     466 UBC           0 :             PathOrConf::Path(_) => None,
     467                 :         };
     468                 : 
     469 CBC       10807 :         let loaded = DeltaLayerInner::load(&path, summary, ctx).await?;
     470                 : 
     471           10797 :         if let PathOrConf::Path(ref path) = self.path_or_conf {
     472                 :             // not production code
     473                 : 
     474 UBC           0 :             let actual_filename = path.file_name().unwrap().to_owned();
     475               0 :             let expected_filename = self.filename().file_name();
     476               0 : 
     477               0 :             if actual_filename != expected_filename {
     478               0 :                 println!("warning: filename does not match what is expected from in-file summary");
     479               0 :                 println!("actual: {:?}", actual_filename);
     480               0 :                 println!("expected: {:?}", expected_filename);
     481               0 :             }
     482 CBC       10797 :         }
     483                 : 
     484           10797 :         Ok(Arc::new(loaded))
     485           10807 :     }
     486                 : 
     487                 :     /// Create a DeltaLayer struct representing an existing file on disk.
     488            2690 :     pub fn new(
     489            2690 :         conf: &'static PageServerConf,
     490            2690 :         timeline_id: TimelineId,
     491            2690 :         tenant_id: TenantId,
     492            2690 :         filename: &DeltaFileName,
     493            2690 :         file_size: u64,
     494            2690 :         access_stats: LayerAccessStats,
     495            2690 :     ) -> DeltaLayer {
     496            2690 :         DeltaLayer {
     497            2690 :             path_or_conf: PathOrConf::Conf(conf),
     498            2690 :             desc: PersistentLayerDesc::new_delta(
     499            2690 :                 tenant_id,
     500            2690 :                 timeline_id,
     501            2690 :                 filename.key_range.clone(),
     502            2690 :                 filename.lsn_range.clone(),
     503            2690 :                 file_size,
     504            2690 :             ),
     505            2690 :             access_stats,
     506            2690 :             inner: OnceCell::new(),
     507            2690 :         }
     508            2690 :     }
     509                 : 
     510                 :     /// Create a DeltaLayer struct representing an existing file on disk.
     511                 :     ///
     512                 :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     513 UBC           0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     514               0 :         let mut summary_buf = vec![0; PAGE_SZ];
     515               0 :         file.read_exact_at(&mut summary_buf, 0)?;
     516               0 :         let summary = Summary::des_prefix(&summary_buf)?;
     517                 : 
     518               0 :         let metadata = file
     519               0 :             .metadata()
     520               0 :             .context("get file metadata to determine size")?;
     521                 : 
     522               0 :         Ok(DeltaLayer {
     523               0 :             path_or_conf: PathOrConf::Path(path.to_path_buf()),
     524               0 :             desc: PersistentLayerDesc::new_delta(
     525               0 :                 summary.tenant_id,
     526               0 :                 summary.timeline_id,
     527               0 :                 summary.key_range,
     528               0 :                 summary.lsn_range,
     529               0 :                 metadata.len(),
     530               0 :             ),
     531               0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     532               0 :             inner: OnceCell::new(),
     533               0 :         })
     534               0 :     }
     535                 : 
     536 CBC       43581 :     fn layer_name(&self) -> DeltaFileName {
     537           43581 :         self.desc.delta_file_name()
     538           43581 :     }
     539                 :     /// Path to the layer file in pageserver workdir.
     540           43581 :     pub fn path(&self) -> Utf8PathBuf {
     541           43581 :         Self::path_for(
     542           43581 :             &self.path_or_conf,
     543           43581 :             &self.desc.tenant_id,
     544           43581 :             &self.desc.timeline_id,
     545           43581 :             &self.layer_name(),
     546           43581 :         )
     547           43581 :     }
     548                 :     /// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
     549                 :     ///
     550                 :     /// The value can be obtained via the [`ValueRef::load`] function.
     551            4333 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
     552            4333 :         let inner = self
     553            4333 :             .load(LayerAccessKind::KeyIter, ctx)
     554               2 :             .await
     555            4333 :             .context("load delta layer keys")?;
     556            4333 :         DeltaLayerInner::load_keys(inner, ctx)
     557            1504 :             .await
     558            4333 :             .context("Layer index is corrupted")
     559            4333 :     }
     560                 : }
     561                 : 
     562                 : /// A builder object for constructing a new delta layer.
     563                 : ///
     564                 : /// Usage:
     565                 : ///
     566                 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     567                 : ///
     568                 : /// 2. Write the contents by calling `put_value` for every page
     569                 : ///    version to store in the layer.
     570                 : ///
     571                 : /// 3. Call `finish`.
     572                 : ///
     573                 : struct DeltaLayerWriterInner {
     574                 :     conf: &'static PageServerConf,
     575                 :     pub path: Utf8PathBuf,
     576                 :     timeline_id: TimelineId,
     577                 :     tenant_id: TenantId,
     578                 : 
     579                 :     key_start: Key,
     580                 :     lsn_range: Range<Lsn>,
     581                 : 
     582                 :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     583                 : 
     584                 :     blob_writer: BlobWriter<true>,
     585                 : }
     586                 : 
     587                 : impl DeltaLayerWriterInner {
     588                 :     ///
     589                 :     /// Start building a new delta layer.
     590                 :     ///
     591           15728 :     async fn new(
     592           15728 :         conf: &'static PageServerConf,
     593           15728 :         timeline_id: TimelineId,
     594           15728 :         tenant_id: TenantId,
     595           15728 :         key_start: Key,
     596           15728 :         lsn_range: Range<Lsn>,
     597           15728 :     ) -> anyhow::Result<Self> {
     598           15728 :         // Create the file initially with a temporary filename. We don't know
     599           15728 :         // the end key yet, so we cannot form the final filename yet. We will
     600           15728 :         // rename it when we're done.
     601           15728 :         //
     602           15728 :         // Note: This overwrites any existing file. There shouldn't be any.
     603           15728 :         // FIXME: throw an error instead?
     604           15728 :         let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);
     605                 : 
     606           15728 :         let mut file = VirtualFile::create(&path).await?;
     607                 :         // make room for the header block
     608           15728 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     609           15728 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     610           15728 : 
     611           15728 :         // Initialize the b-tree index builder
     612           15728 :         let block_buf = BlockBuf::new();
     613           15728 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     614           15728 : 
     615           15728 :         Ok(Self {
     616           15728 :             conf,
     617           15728 :             path,
     618           15728 :             timeline_id,
     619           15728 :             tenant_id,
     620           15728 :             key_start,
     621           15728 :             lsn_range,
     622           15728 :             tree: tree_builder,
     623           15728 :             blob_writer,
     624           15728 :         })
     625           15728 :     }
     626                 : 
     627                 :     ///
     628                 :     /// Append a key-value pair to the file.
     629                 :     ///
     630                 :     /// The values must be appended in key, lsn order.
     631                 :     ///
     632        29577866 :     async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     633        29577757 :         self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
     634 UBC           0 :             .await
     635 CBC    29577757 :     }
     636                 : 
     637        82730191 :     async fn put_value_bytes(
     638        82730191 :         &mut self,
     639        82730191 :         key: Key,
     640        82730191 :         lsn: Lsn,
     641        82730191 :         val: &[u8],
     642        82730191 :         will_init: bool,
     643        82730191 :     ) -> anyhow::Result<()> {
     644        82729950 :         assert!(self.lsn_range.start <= lsn);
     645                 : 
     646        82729950 :         let off = self.blob_writer.write_blob(val).await?;
     647                 : 
     648        82729948 :         let blob_ref = BlobRef::new(off, will_init);
     649        82729948 : 
     650        82729948 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     651        82729948 :         self.tree.append(&delta_key.0, blob_ref.0)?;
     652                 : 
     653        82729948 :         Ok(())
     654        82729948 :     }
     655                 : 
     656         1870701 :     fn size(&self) -> u64 {
     657         1870701 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     658         1870701 :     }
     659                 : 
     660                 :     ///
     661                 :     /// Finish writing the delta layer.
     662                 :     ///
     663           15722 :     async fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
     664           15721 :         let index_start_blk =
     665           15721 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     666                 : 
     667           15721 :         let mut file = self.blob_writer.into_inner().await?;
     668                 : 
     669                 :         // Write out the index
     670           15721 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     671           15721 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     672 UBC           0 :             .await?;
     673 CBC      192976 :         for buf in block_buf.blocks {
     674          177255 :             file.write_all(buf.as_ref()).await?;
     675                 :         }
     676           15721 :         assert!(self.lsn_range.start < self.lsn_range.end);
     677                 :         // Fill in the summary on blk 0
     678           15721 :         let summary = Summary {
     679           15721 :             magic: DELTA_FILE_MAGIC,
     680           15721 :             format_version: STORAGE_FORMAT_VERSION,
     681           15721 :             tenant_id: self.tenant_id,
     682           15721 :             timeline_id: self.timeline_id,
     683           15721 :             key_range: self.key_start..key_end,
     684           15721 :             lsn_range: self.lsn_range.clone(),
     685           15721 :             index_start_blk,
     686           15721 :             index_root_blk,
     687           15721 :         };
     688           15721 : 
     689           15721 :         let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
     690           15721 :         Summary::ser_into(&summary, &mut buf)?;
     691           15721 :         if buf.spilled() {
     692                 :             // This is bad as we only have one free block for the summary
     693 UBC           0 :             warn!(
     694               0 :                 "Used more than one page size for summary buffer: {}",
     695               0 :                 buf.len()
     696               0 :             );
     697 CBC       15721 :         }
     698           15721 :         file.seek(SeekFrom::Start(0)).await?;
     699           15721 :         file.write_all(&buf).await?;
     700                 : 
     701           15721 :         let metadata = file
     702           15721 :             .metadata()
     703 UBC           0 :             .await
     704 CBC       15721 :             .context("get file metadata to determine size")?;
     705                 : 
     706                 :         // 5GB limit for objects without multipart upload (which we don't want to use)
     707                 :         // Make it a little bit below to account for differing GB units
     708                 :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     709                 :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     710           15721 :         ensure!(
     711           15721 :             metadata.len() <= S3_UPLOAD_LIMIT,
     712 UBC           0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     713               0 :             file.path,
     714               0 :             metadata.len()
     715                 :         );
     716                 : 
     717                 :         // Note: Because we opened the file in write-only mode, we cannot
     718                 :         // reuse the same VirtualFile for reading later. That's why we don't
     719                 :         // set inner.file here. The first read will have to re-open it.
     720 CBC       15721 :         let layer = DeltaLayer {
     721           15721 :             path_or_conf: PathOrConf::Conf(self.conf),
     722           15721 :             desc: PersistentLayerDesc::new_delta(
     723           15721 :                 self.tenant_id,
     724           15721 :                 self.timeline_id,
     725           15721 :                 self.key_start..key_end,
     726           15721 :                 self.lsn_range.clone(),
     727           15721 :                 metadata.len(),
     728           15721 :             ),
     729           15721 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     730           15721 :             inner: OnceCell::new(),
     731           15721 :         };
     732           15721 : 
     733           15721 :         // fsync the file
     734           15721 :         file.sync_all().await?;
     735                 :         // Rename the file to its final name
     736                 :         //
     737                 :         // Note: This overwrites any existing file. There shouldn't be any.
     738                 :         // FIXME: throw an error instead?
     739           15721 :         let final_path = DeltaLayer::path_for(
     740           15721 :             &PathOrConf::Conf(self.conf),
     741           15721 :             &self.tenant_id,
     742           15721 :             &self.timeline_id,
     743           15721 :             &DeltaFileName {
     744           15721 :                 key_range: self.key_start..key_end,
     745           15721 :                 lsn_range: self.lsn_range,
     746           15721 :             },
     747           15721 :         );
     748           15721 :         std::fs::rename(self.path, &final_path)?;
     749                 : 
     750 UBC           0 :         trace!("created delta layer {final_path}");
     751                 : 
     752 CBC       15721 :         Ok(layer)
     753           15721 :     }
     754                 : }
     755                 : 
     756                 : /// A builder object for constructing a new delta layer.
     757                 : ///
     758                 : /// Usage:
     759                 : ///
     760                 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     761                 : ///
     762                 : /// 2. Write the contents by calling `put_value` for every page
     763                 : ///    version to store in the layer.
     764                 : ///
     765                 : /// 3. Call `finish`.
     766                 : ///
     767                 : /// # Note
     768                 : ///
     769                 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     770                 : /// possible for the writer to drop before `finish` is actually called. So this
     771                 : /// could lead to odd temporary files in the directory, exhausting file system.
     772                 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     773                 : /// implementation that cleans up the temporary file in failure. It's not
     774                 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     775                 : /// out some fields, making it impossible to implement `Drop`.
     776                 : ///
     777                 : #[must_use]
     778                 : pub struct DeltaLayerWriter {
     779                 :     inner: Option<DeltaLayerWriterInner>,
     780                 : }
     781                 : 
     782                 : impl DeltaLayerWriter {
     783                 :     ///
     784                 :     /// Start building a new delta layer.
     785                 :     ///
     786           15728 :     pub async fn new(
     787           15728 :         conf: &'static PageServerConf,
     788           15728 :         timeline_id: TimelineId,
     789           15728 :         tenant_id: TenantId,
     790           15728 :         key_start: Key,
     791           15728 :         lsn_range: Range<Lsn>,
     792           15728 :     ) -> anyhow::Result<Self> {
     793                 :         Ok(Self {
     794                 :             inner: Some(
     795           15728 :                 DeltaLayerWriterInner::new(conf, timeline_id, tenant_id, key_start, lsn_range)
     796 UBC           0 :                     .await?,
     797                 :             ),
     798                 :         })
     799 CBC       15728 :     }
     800                 : 
     801                 :     ///
     802                 :     /// Append a key-value pair to the file.
     803                 :     ///
     804                 :     /// The values must be appended in key, lsn order.
     805                 :     ///
     806        29577866 :     pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     807        29577757 :         self.inner.as_mut().unwrap().put_value(key, lsn, val).await
     808        29577757 :     }
     809                 : 
     810        53152325 :     pub async fn put_value_bytes(
     811        53152325 :         &mut self,
     812        53152325 :         key: Key,
     813        53152325 :         lsn: Lsn,
     814        53152325 :         val: &[u8],
     815        53152325 :         will_init: bool,
     816        53152325 :     ) -> anyhow::Result<()> {
     817        53152193 :         self.inner
     818        53152193 :             .as_mut()
     819        53152193 :             .unwrap()
     820        53152193 :             .put_value_bytes(key, lsn, val, will_init)
     821 UBC           0 :             .await
     822 CBC    53152191 :     }
     823                 : 
     824         1870701 :     pub fn size(&self) -> u64 {
     825         1870701 :         self.inner.as_ref().unwrap().size()
     826         1870701 :     }
     827                 : 
     828                 :     ///
     829                 :     /// Finish writing the delta layer.
     830                 :     ///
     831           15722 :     pub async fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
     832           15721 :         self.inner.take().unwrap().finish(key_end).await
     833           15721 :     }
     834                 : }
     835                 : 
     836                 : impl Drop for DeltaLayerWriter {
     837                 :     fn drop(&mut self) {
     838           15721 :         if let Some(inner) = self.inner.take() {
     839 UBC           0 :             // We want to remove the virtual file here, so it's fine to not
     840               0 :             // having completely flushed unwritten data.
     841               0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     842               0 :             vfile.remove();
     843 CBC       15721 :         }
     844           15721 :     }
     845                 : }
     846                 : 
     847                 : impl DeltaLayerInner {
     848           10807 :     pub(super) async fn load(
     849           10807 :         path: &Utf8Path,
     850           10807 :         summary: Option<Summary>,
     851           10807 :         ctx: &RequestContext,
     852           10807 :     ) -> anyhow::Result<Self> {
     853           10807 :         let file = VirtualFile::open(path)
     854 UBC           0 :             .await
     855 CBC       10807 :             .with_context(|| format!("Failed to open file '{path}'"))?;
     856           10807 :         let file = FileBlockReader::new(file);
     857                 : 
     858           10807 :         let summary_blk = file.read_blk(0, ctx).await?;
     859           10807 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
     860                 : 
     861           10807 :         if let Some(mut expected_summary) = summary {
     862                 :             // production code path
     863           10807 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     864           10807 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     865           10807 :             if actual_summary != expected_summary {
     866              10 :                 bail!(
     867              10 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     868              10 :                     actual_summary,
     869              10 :                     expected_summary
     870              10 :                 );
     871           10797 :             }
     872 UBC           0 :         }
     873                 : 
     874 CBC       10797 :         Ok(DeltaLayerInner {
     875           10797 :             file,
     876           10797 :             index_start_blk: actual_summary.index_start_blk,
     877           10797 :             index_root_blk: actual_summary.index_root_blk,
     878           10797 :         })
     879           10807 :     }
     880                 : 
     881        23419360 :     pub(super) async fn get_value_reconstruct_data(
     882        23419360 :         &self,
     883        23419360 :         key: Key,
     884        23419360 :         lsn_range: Range<Lsn>,
     885        23419360 :         reconstruct_state: &mut ValueReconstructState,
     886        23419360 :         ctx: &RequestContext,
     887        23419360 :     ) -> anyhow::Result<ValueReconstructResult> {
     888        23419330 :         let mut need_image = true;
     889        23419330 :         // Scan the page versions backwards, starting from `lsn`.
     890        23419330 :         let file = &self.file;
     891        23419330 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     892        23419330 :             self.index_start_blk,
     893        23419330 :             self.index_root_blk,
     894        23419330 :             file,
     895        23419330 :         );
     896        23419330 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     897        23419330 : 
     898        23419330 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     899        23419330 : 
     900        23419330 :         tree_reader
     901        23419330 :             .visit(
     902        23419330 :                 &search_key.0,
     903        23419330 :                 VisitDirection::Backwards,
     904       140907650 :                 |key, value| {
     905       140907650 :                     let blob_ref = BlobRef(value);
     906       140907650 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     907         8784530 :                         return false;
     908       132123120 :                     }
     909       132123120 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     910       132123120 :                     if entry_lsn < lsn_range.start {
     911          689871 :                         return false;
     912       131433249 :                     }
     913       131433249 :                     offsets.push((entry_lsn, blob_ref.pos()));
     914       131433249 : 
     915       131433249 :                     !blob_ref.will_init()
     916       140907650 :                 },
     917        23419330 :                 &RequestContextBuilder::extend(ctx)
     918        23419330 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     919        23419330 :                     .build(),
     920        23419330 :             )
     921          277660 :             .await?;
     922                 : 
     923        23419337 :         let ctx = &RequestContextBuilder::extend(ctx)
     924        23419337 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     925        23419337 :             .build();
     926        23419337 : 
     927        23419337 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     928        23419337 :         let cursor = file.block_cursor();
     929        23419337 :         let mut buf = Vec::new();
     930       152135744 :         for (entry_lsn, pos) in offsets {
     931       131432736 :             cursor
     932       131432736 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     933         1840384 :                 .await
     934       131432732 :                 .with_context(|| {
     935 UBC           0 :                     format!("Failed to read blob from virtual file {}", file.file.path)
     936 CBC   131432732 :                 })?;
     937       131432732 :             let val = Value::des(&buf).with_context(|| {
     938 UBC           0 :                 format!(
     939               0 :                     "Failed to deserialize file blob from virtual file {}",
     940               0 :                     file.file.path
     941               0 :                 )
     942 CBC   131432732 :             })?;
     943       131432732 :             match val {
     944         1434401 :                 Value::Image(img) => {
     945         1434401 :                     reconstruct_state.img = Some((entry_lsn, img));
     946         1434401 :                     need_image = false;
     947         1434401 :                     break;
     948                 :                 }
     949       129998331 :                 Value::WalRecord(rec) => {
     950       129998331 :                     let will_init = rec.will_init();
     951       129998331 :                     reconstruct_state.records.push((entry_lsn, rec));
     952       129998331 :                     if will_init {
     953                 :                         // This WAL record initializes the page, so no need to go further back
     954         1281924 :                         need_image = false;
     955         1281924 :                         break;
     956       128716407 :                     }
     957                 :                 }
     958                 :             }
     959                 :         }
     960                 : 
     961                 :         // If an older page image is needed to reconstruct the page, let the
     962                 :         // caller know.
     963        23419333 :         if need_image {
     964        20703008 :             Ok(ValueReconstructResult::Continue)
     965                 :         } else {
     966         2716325 :             Ok(ValueReconstructResult::Complete)
     967                 :         }
     968        23419333 :     }
     969                 : 
     970            4335 :     pub(super) async fn load_keys<'a, 'b, T: AsRef<DeltaLayerInner> + Clone>(
     971            4335 :         this: &'a T,
     972            4335 :         ctx: &'b RequestContext,
     973            4335 :     ) -> Result<Vec<DeltaEntry<'a>>> {
     974            4335 :         let dl = this.as_ref();
     975            4335 :         let file = &dl.file;
     976            4335 : 
     977            4335 :         let tree_reader =
     978            4335 :             DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
     979            4335 : 
     980            4335 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
     981            4335 : 
     982            4335 :         tree_reader
     983            4335 :             .visit(
     984            4335 :                 &[0u8; DELTA_KEY_SIZE],
     985            4335 :                 VisitDirection::Forwards,
     986        30235446 :                 |key, value| {
     987        30235446 :                     let delta_key = DeltaKey::from_slice(key);
     988        30235446 :                     let val_ref = ValueRef {
     989        30235446 :                         blob_ref: BlobRef(value),
     990        30235446 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
     991        30235446 :                             Adapter(dl),
     992        30235446 :                         )),
     993        30235446 :                     };
     994        30235446 :                     let pos = BlobRef(value).pos();
     995        30235446 :                     if let Some(last) = all_keys.last_mut() {
     996        30231111 :                         // subtract offset of the current and last entries to get the size
     997        30231111 :                         // of the value associated with this (key, lsn) tuple
     998        30231111 :                         let first_pos = last.size;
     999        30231111 :                         last.size = pos - first_pos;
    1000        30231111 :                     }
    1001        30235446 :                     let entry = DeltaEntry {
    1002        30235446 :                         key: delta_key.key(),
    1003        30235446 :                         lsn: delta_key.lsn(),
    1004        30235446 :                         size: pos,
    1005        30235446 :                         val: val_ref,
    1006        30235446 :                     };
    1007        30235446 :                     all_keys.push(entry);
    1008        30235446 :                     true
    1009        30235446 :                 },
    1010            4335 :                 &RequestContextBuilder::extend(ctx)
    1011            4335 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1012            4335 :                     .build(),
    1013            4335 :             )
    1014            1504 :             .await?;
    1015            4335 :         if let Some(last) = all_keys.last_mut() {
    1016            4335 :             // Last key occupies all space till end of value storage,
    1017            4335 :             // which corresponds to beginning of the index
    1018            4335 :             last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
    1019            4335 :         }
    1020            4335 :         Ok(all_keys)
    1021            4335 :     }
    1022                 : }
    1023                 : 
    1024                 : /// A set of data associated with a delta layer key and its value
    1025                 : pub struct DeltaEntry<'a> {
    1026                 :     pub key: Key,
    1027                 :     pub lsn: Lsn,
    1028                 :     /// Size of the stored value
    1029                 :     pub size: u64,
    1030                 :     /// Reference to the on-disk value
    1031                 :     pub val: ValueRef<'a>,
    1032                 : }
    1033                 : 
    1034                 : /// Reference to an on-disk value
    1035                 : pub struct ValueRef<'a> {
    1036                 :     blob_ref: BlobRef,
    1037                 :     reader: BlockCursor<'a>,
    1038                 : }
    1039                 : 
    1040                 : impl<'a> ValueRef<'a> {
    1041                 :     /// Loads the value from disk
    1042        29577867 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1043                 :         // theoretically we *could* record an access time for each, but it does not really matter
    1044        29577759 :         let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1045        29577759 :         let val = Value::des(&buf)?;
    1046        29577759 :         Ok(val)
    1047        29577759 :     }
    1048                 : }
    1049                 : 
    1050                 : pub(crate) struct Adapter<T>(T);
    1051                 : 
    1052                 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1053        31066841 :     pub(crate) async fn read_blk(
    1054        31066841 :         &self,
    1055        31066841 :         blknum: u32,
    1056        31066841 :         ctx: &RequestContext,
    1057        31066841 :     ) -> Result<BlockLease, std::io::Error> {
    1058        31066841 :         self.0.as_ref().file.read_blk(blknum, ctx).await
    1059        31066841 :     }
    1060                 : }
        

Generated by: LCOV version 2.1-beta