LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 70.5 % 723 510
Test Date: 2024-02-23 13:21:27 Functions: 51.5 % 103 53

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "index", and
      24              : //! "values". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache::PAGE_SZ;
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::BlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37              : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
      38              : use crate::tenant::timeline::GetVectoredError;
      39              : use crate::tenant::{PageReconstructError, Timeline};
      40              : use crate::virtual_file::{self, VirtualFile};
      41              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      42              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      43              : use anyhow::{anyhow, bail, ensure, Context, Result};
      44              : use camino::{Utf8Path, Utf8PathBuf};
      45              : use pageserver_api::keyspace::KeySpace;
      46              : use pageserver_api::models::LayerAccessKind;
      47              : use pageserver_api::shard::TenantShardId;
      48              : use rand::{distributions::Alphanumeric, Rng};
      49              : use serde::{Deserialize, Serialize};
      50              : use std::collections::BTreeMap;
      51              : use std::fs::File;
      52              : use std::io::SeekFrom;
      53              : use std::ops::Range;
      54              : use std::os::unix::fs::FileExt;
      55              : use std::sync::Arc;
      56              : use tokio::sync::OnceCell;
      57              : use tracing::*;
      58              : 
      59              : use utils::{
      60              :     bin_ser::BeSer,
      61              :     id::{TenantId, TimelineId},
      62              :     lsn::Lsn,
      63              : };
      64              : 
      65              : use super::{
      66              :     AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation,
      67              :     ValuesReconstructState,
      68              : };
      69              : 
      70              : ///
      71              : /// Header stored in the beginning of the file
      72              : ///
      73              : /// After this comes the 'values' part, starting on block 1. After that,
      74              : /// the 'index' starts at the block indicated by 'index_start_blk'
      75              : ///
      76          480 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      77              : pub struct Summary {
      78              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      79              :     pub magic: u16,
      80              :     pub format_version: u16,
      81              : 
      82              :     pub tenant_id: TenantId,
      83              :     pub timeline_id: TimelineId,
      84              :     pub key_range: Range<Key>,
      85              :     pub lsn_range: Range<Lsn>,
      86              : 
      87              :     /// Block number where the 'index' part of the file begins.
      88              :     pub index_start_blk: u32,
      89              :     /// Block within the 'index', where the B-tree root page is stored
      90              :     pub index_root_blk: u32,
      91              : }
      92              : 
      93              : impl From<&DeltaLayer> for Summary {
      94            0 :     fn from(layer: &DeltaLayer) -> Self {
      95            0 :         Self::expected(
      96            0 :             layer.desc.tenant_shard_id.tenant_id,
      97            0 :             layer.desc.timeline_id,
      98            0 :             layer.desc.key_range.clone(),
      99            0 :             layer.desc.lsn_range.clone(),
     100            0 :         )
     101            0 :     }
     102              : }
     103              : 
     104              : impl Summary {
     105          440 :     pub(super) fn expected(
     106          440 :         tenant_id: TenantId,
     107          440 :         timeline_id: TimelineId,
     108          440 :         keys: Range<Key>,
     109          440 :         lsns: Range<Lsn>,
     110          440 :     ) -> Self {
     111          440 :         Self {
     112          440 :             magic: DELTA_FILE_MAGIC,
     113          440 :             format_version: STORAGE_FORMAT_VERSION,
     114          440 : 
     115          440 :             tenant_id,
     116          440 :             timeline_id,
     117          440 :             key_range: keys,
     118          440 :             lsn_range: lsns,
     119          440 : 
     120          440 :             index_start_blk: 0,
     121          440 :             index_root_blk: 0,
     122          440 :         }
     123          440 :     }
     124              : }
     125              : 
     126              : // Flag indicating that this version initialize the page
     127              : const WILL_INIT: u64 = 1;
     128              : 
     129              : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     130              : /// offset, and for WAL records it also contains `will_init` flag. The flag
     131              : /// helps to determine the range of records that needs to be applied, without
     132              : /// reading/deserializing records themselves.
     133            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     134              : pub struct BlobRef(pub u64);
     135              : 
     136              : impl BlobRef {
     137        72371 :     pub fn will_init(&self) -> bool {
     138        72371 :         (self.0 & WILL_INIT) != 0
     139        72371 :     }
     140              : 
     141      4276387 :     pub fn pos(&self) -> u64 {
     142      4276387 :         self.0 >> 1
     143      4276387 :     }
     144              : 
     145      4311466 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     146      4311466 :         let mut blob_ref = pos << 1;
     147      4311466 :         if will_init {
     148      4311466 :             blob_ref |= WILL_INIT;
     149      4311466 :         }
     150      4311466 :         BlobRef(blob_ref)
     151      4311466 :     }
     152              : }
     153              : 
     154              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     155              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     156              : 
     157              : /// This is the key of the B-tree index stored in the delta layer. It consists
     158              : /// of the serialized representation of a Key and LSN.
     159              : impl DeltaKey {
     160      2102008 :     fn from_slice(buf: &[u8]) -> Self {
     161      2102008 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     162      2102008 :         bytes.copy_from_slice(buf);
     163      2102008 :         DeltaKey(bytes)
     164      2102008 :     }
     165              : 
     166      4434815 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     167      4434815 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     168      4434815 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     169      4434815 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     170      4434815 :         DeltaKey(bytes)
     171      4434815 :     }
     172              : 
     173      2102008 :     fn key(&self) -> Key {
     174      2102008 :         Key::from_slice(&self.0)
     175      2102008 :     }
     176              : 
     177      2102008 :     fn lsn(&self) -> Lsn {
     178      2102008 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     179      2102008 :     }
     180              : 
     181        72381 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     182        72381 :         let mut lsn_buf = [0u8; 8];
     183        72381 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     184        72381 :         Lsn(u64::from_be_bytes(lsn_buf))
     185        72381 :     }
     186              : }
     187              : 
     188              : /// This is used only from `pagectl`. Within pageserver, all layers are
     189              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     190              : pub struct DeltaLayer {
     191              :     path: Utf8PathBuf,
     192              :     pub desc: PersistentLayerDesc,
     193              :     access_stats: LayerAccessStats,
     194              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     195              : }
     196              : 
     197              : impl std::fmt::Debug for DeltaLayer {
     198            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     199            0 :         use super::RangeDisplayDebug;
     200            0 : 
     201            0 :         f.debug_struct("DeltaLayer")
     202            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     203            0 :             .field("lsn_range", &self.desc.lsn_range)
     204            0 :             .field("file_size", &self.desc.file_size)
     205            0 :             .field("inner", &self.inner)
     206            0 :             .finish()
     207            0 :     }
     208              : }
     209              : 
     210              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     211              : /// file.
     212              : pub struct DeltaLayerInner {
     213              :     // values copied from summary
     214              :     index_start_blk: u32,
     215              :     index_root_blk: u32,
     216              : 
     217              :     /// Reader object for reading blocks from the file.
     218              :     file: FileBlockReader,
     219              : }
     220              : 
     221              : impl std::fmt::Debug for DeltaLayerInner {
     222            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     223            0 :         f.debug_struct("DeltaLayerInner")
     224            0 :             .field("index_start_blk", &self.index_start_blk)
     225            0 :             .field("index_root_blk", &self.index_root_blk)
     226            0 :             .finish()
     227            0 :     }
     228              : }
     229              : 
     230              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     231              : impl std::fmt::Display for DeltaLayer {
     232            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     233            0 :         write!(f, "{}", self.layer_desc().short_id())
     234            0 :     }
     235              : }
     236              : 
     237              : impl AsLayerDesc for DeltaLayer {
     238            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     239            0 :         &self.desc
     240            0 :     }
     241              : }
     242              : 
     243              : impl DeltaLayer {
     244            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     245            0 :         self.desc.dump();
     246            0 : 
     247            0 :         if !verbose {
     248            0 :             return Ok(());
     249            0 :         }
     250              : 
     251            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     252              : 
     253            0 :         inner.dump(ctx).await
     254            0 :     }
     255              : 
     256          480 :     fn temp_path_for(
     257          480 :         conf: &PageServerConf,
     258          480 :         tenant_shard_id: &TenantShardId,
     259          480 :         timeline_id: &TimelineId,
     260          480 :         key_start: Key,
     261          480 :         lsn_range: &Range<Lsn>,
     262          480 :     ) -> Utf8PathBuf {
     263          480 :         let rand_string: String = rand::thread_rng()
     264          480 :             .sample_iter(&Alphanumeric)
     265          480 :             .take(8)
     266          480 :             .map(char::from)
     267          480 :             .collect();
     268          480 : 
     269          480 :         conf.timeline_path(tenant_shard_id, timeline_id)
     270          480 :             .join(format!(
     271          480 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     272          480 :                 key_start,
     273          480 :                 u64::from(lsn_range.start),
     274          480 :                 u64::from(lsn_range.end),
     275          480 :                 rand_string,
     276          480 :                 TEMP_FILE_SUFFIX,
     277          480 :             ))
     278          480 :     }
     279              : 
     280              :     ///
     281              :     /// Open the underlying file and read the metadata into memory, if it's
     282              :     /// not loaded already.
     283              :     ///
     284            0 :     async fn load(
     285            0 :         &self,
     286            0 :         access_kind: LayerAccessKind,
     287            0 :         ctx: &RequestContext,
     288            0 :     ) -> Result<&Arc<DeltaLayerInner>> {
     289            0 :         self.access_stats.record_access(access_kind, ctx);
     290            0 :         // Quick exit if already loaded
     291            0 :         self.inner
     292            0 :             .get_or_try_init(|| self.load_inner(ctx))
     293            0 :             .await
     294            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     295            0 :     }
     296              : 
     297            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     298            0 :         let path = self.path();
     299              : 
     300            0 :         let loaded = DeltaLayerInner::load(&path, None, ctx)
     301            0 :             .await
     302            0 :             .and_then(|res| res)?;
     303              : 
     304              :         // not production code
     305            0 :         let actual_filename = path.file_name().unwrap().to_owned();
     306            0 :         let expected_filename = self.layer_desc().filename().file_name();
     307            0 : 
     308            0 :         if actual_filename != expected_filename {
     309            0 :             println!("warning: filename does not match what is expected from in-file summary");
     310            0 :             println!("actual: {:?}", actual_filename);
     311            0 :             println!("expected: {:?}", expected_filename);
     312            0 :         }
     313              : 
     314            0 :         Ok(Arc::new(loaded))
     315            0 :     }
     316              : 
     317              :     /// Create a DeltaLayer struct representing an existing file on disk.
     318              :     ///
     319              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     320            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     321            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     322            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     323            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     324              : 
     325            0 :         let metadata = file
     326            0 :             .metadata()
     327            0 :             .context("get file metadata to determine size")?;
     328              : 
     329              :         // This function is never used for constructing layers in a running pageserver,
     330              :         // so it does not need an accurate TenantShardId.
     331            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     332            0 : 
     333            0 :         Ok(DeltaLayer {
     334            0 :             path: path.to_path_buf(),
     335            0 :             desc: PersistentLayerDesc::new_delta(
     336            0 :                 tenant_shard_id,
     337            0 :                 summary.timeline_id,
     338            0 :                 summary.key_range,
     339            0 :                 summary.lsn_range,
     340            0 :                 metadata.len(),
     341            0 :             ),
     342            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     343            0 :             inner: OnceCell::new(),
     344            0 :         })
     345            0 :     }
     346              : 
     347              :     /// Path to the layer file in pageserver workdir.
     348            0 :     fn path(&self) -> Utf8PathBuf {
     349            0 :         self.path.clone()
     350            0 :     }
     351              : }
     352              : 
     353              : /// A builder object for constructing a new delta layer.
     354              : ///
     355              : /// Usage:
     356              : ///
     357              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     358              : ///
     359              : /// 2. Write the contents by calling `put_value` for every page
     360              : ///    version to store in the layer.
     361              : ///
     362              : /// 3. Call `finish`.
     363              : ///
     364              : struct DeltaLayerWriterInner {
     365              :     conf: &'static PageServerConf,
     366              :     pub path: Utf8PathBuf,
     367              :     timeline_id: TimelineId,
     368              :     tenant_shard_id: TenantShardId,
     369              : 
     370              :     key_start: Key,
     371              :     lsn_range: Range<Lsn>,
     372              : 
     373              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     374              : 
     375              :     blob_writer: BlobWriter<true>,
     376              : }
     377              : 
     378              : impl DeltaLayerWriterInner {
     379              :     ///
     380              :     /// Start building a new delta layer.
     381              :     ///
     382          480 :     async fn new(
     383          480 :         conf: &'static PageServerConf,
     384          480 :         timeline_id: TimelineId,
     385          480 :         tenant_shard_id: TenantShardId,
     386          480 :         key_start: Key,
     387          480 :         lsn_range: Range<Lsn>,
     388          480 :     ) -> anyhow::Result<Self> {
     389          480 :         // Create the file initially with a temporary filename. We don't know
     390          480 :         // the end key yet, so we cannot form the final filename yet. We will
     391          480 :         // rename it when we're done.
     392          480 :         //
     393          480 :         // Note: This overwrites any existing file. There shouldn't be any.
     394          480 :         // FIXME: throw an error instead?
     395          480 :         let path =
     396          480 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     397              : 
     398          480 :         let mut file = VirtualFile::create(&path).await?;
     399              :         // make room for the header block
     400          480 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     401          480 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     402          480 : 
     403          480 :         // Initialize the b-tree index builder
     404          480 :         let block_buf = BlockBuf::new();
     405          480 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     406          480 : 
     407          480 :         Ok(Self {
     408          480 :             conf,
     409          480 :             path,
     410          480 :             timeline_id,
     411          480 :             tenant_shard_id,
     412          480 :             key_start,
     413          480 :             lsn_range,
     414          480 :             tree: tree_builder,
     415          480 :             blob_writer,
     416          480 :         })
     417          480 :     }
     418              : 
     419              :     ///
     420              :     /// Append a key-value pair to the file.
     421              :     ///
     422              :     /// The values must be appended in key, lsn order.
     423              :     ///
     424      2102000 :     async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     425      2102000 :         let (_, res) = self
     426      2102000 :             .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
     427          158 :             .await;
     428      2102000 :         res
     429      2102000 :     }
     430              : 
     431      4311466 :     async fn put_value_bytes(
     432      4311466 :         &mut self,
     433      4311466 :         key: Key,
     434      4311466 :         lsn: Lsn,
     435      4311466 :         val: Vec<u8>,
     436      4311466 :         will_init: bool,
     437      4311466 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     438      4311466 :         assert!(self.lsn_range.start <= lsn);
     439      4311466 :         let (val, res) = self.blob_writer.write_blob(val).await;
     440      4311466 :         let off = match res {
     441      4311466 :             Ok(off) => off,
     442            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     443              :         };
     444              : 
     445      4311466 :         let blob_ref = BlobRef::new(off, will_init);
     446      4311466 : 
     447      4311466 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     448      4311466 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     449      4311466 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     450      4311466 :     }
     451              : 
     452      2009969 :     fn size(&self) -> u64 {
     453      2009969 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     454      2009969 :     }
     455              : 
     456              :     ///
     457              :     /// Finish writing the delta layer.
     458              :     ///
     459          480 :     async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
     460          480 :         let index_start_blk =
     461          480 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     462              : 
     463          480 :         let mut file = self.blob_writer.into_inner().await?;
     464              : 
     465              :         // Write out the index
     466          480 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     467          480 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     468            0 :             .await?;
     469         9114 :         for buf in block_buf.blocks {
     470         8634 :             let (_buf, res) = file.write_all(buf).await;
     471         8634 :             res?;
     472              :         }
     473          480 :         assert!(self.lsn_range.start < self.lsn_range.end);
     474              :         // Fill in the summary on blk 0
     475          480 :         let summary = Summary {
     476          480 :             magic: DELTA_FILE_MAGIC,
     477          480 :             format_version: STORAGE_FORMAT_VERSION,
     478          480 :             tenant_id: self.tenant_shard_id.tenant_id,
     479          480 :             timeline_id: self.timeline_id,
     480          480 :             key_range: self.key_start..key_end,
     481          480 :             lsn_range: self.lsn_range.clone(),
     482          480 :             index_start_blk,
     483          480 :             index_root_blk,
     484          480 :         };
     485          480 : 
     486          480 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     487          480 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     488          480 :         Summary::ser_into(&summary, &mut buf)?;
     489          480 :         file.seek(SeekFrom::Start(0)).await?;
     490          480 :         let (_buf, res) = file.write_all(buf).await;
     491          480 :         res?;
     492              : 
     493          480 :         let metadata = file
     494          480 :             .metadata()
     495            0 :             .await
     496          480 :             .context("get file metadata to determine size")?;
     497              : 
     498              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     499              :         // Make it a little bit below to account for differing GB units
     500              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     501              :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     502          480 :         ensure!(
     503          480 :             metadata.len() <= S3_UPLOAD_LIMIT,
     504            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     505            0 :             file.path,
     506            0 :             metadata.len()
     507              :         );
     508              : 
     509              :         // Note: Because we opened the file in write-only mode, we cannot
     510              :         // reuse the same VirtualFile for reading later. That's why we don't
     511              :         // set inner.file here. The first read will have to re-open it.
     512              : 
     513          480 :         let desc = PersistentLayerDesc::new_delta(
     514          480 :             self.tenant_shard_id,
     515          480 :             self.timeline_id,
     516          480 :             self.key_start..key_end,
     517          480 :             self.lsn_range.clone(),
     518          480 :             metadata.len(),
     519          480 :         );
     520          480 : 
     521          480 :         // fsync the file
     522          480 :         file.sync_all().await?;
     523              : 
     524          480 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     525              : 
     526            0 :         trace!("created delta layer {}", layer.local_path());
     527              : 
     528          480 :         Ok(layer)
     529          480 :     }
     530              : }
     531              : 
     532              : /// A builder object for constructing a new delta layer.
     533              : ///
     534              : /// Usage:
     535              : ///
     536              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     537              : ///
     538              : /// 2. Write the contents by calling `put_value` for every page
     539              : ///    version to store in the layer.
     540              : ///
     541              : /// 3. Call `finish`.
     542              : ///
     543              : /// # Note
     544              : ///
     545              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     546              : /// possible for the writer to drop before `finish` is actually called. So this
     547              : /// could lead to odd temporary files in the directory, exhausting file system.
     548              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     549              : /// implementation that cleans up the temporary file in failure. It's not
     550              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     551              : /// out some fields, making it impossible to implement `Drop`.
     552              : ///
     553              : #[must_use]
     554              : pub struct DeltaLayerWriter {
     555              :     inner: Option<DeltaLayerWriterInner>,
     556              : }
     557              : 
     558              : impl DeltaLayerWriter {
     559              :     ///
     560              :     /// Start building a new delta layer.
     561              :     ///
     562          480 :     pub async fn new(
     563          480 :         conf: &'static PageServerConf,
     564          480 :         timeline_id: TimelineId,
     565          480 :         tenant_shard_id: TenantShardId,
     566          480 :         key_start: Key,
     567          480 :         lsn_range: Range<Lsn>,
     568          480 :     ) -> anyhow::Result<Self> {
     569          480 :         Ok(Self {
     570          480 :             inner: Some(
     571          480 :                 DeltaLayerWriterInner::new(
     572          480 :                     conf,
     573          480 :                     timeline_id,
     574          480 :                     tenant_shard_id,
     575          480 :                     key_start,
     576          480 :                     lsn_range,
     577          480 :                 )
     578          282 :                 .await?,
     579              :             ),
     580              :         })
     581          480 :     }
     582              : 
     583              :     ///
     584              :     /// Append a key-value pair to the file.
     585              :     ///
     586              :     /// The values must be appended in key, lsn order.
     587              :     ///
     588      2102000 :     pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     589      2102000 :         self.inner.as_mut().unwrap().put_value(key, lsn, val).await
     590      2102000 :     }
     591              : 
     592      2209466 :     pub async fn put_value_bytes(
     593      2209466 :         &mut self,
     594      2209466 :         key: Key,
     595      2209466 :         lsn: Lsn,
     596      2209466 :         val: Vec<u8>,
     597      2209466 :         will_init: bool,
     598      2209466 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     599      2209466 :         self.inner
     600      2209466 :             .as_mut()
     601      2209466 :             .unwrap()
     602      2209466 :             .put_value_bytes(key, lsn, val, will_init)
     603          162 :             .await
     604      2209466 :     }
     605              : 
     606      2009969 :     pub fn size(&self) -> u64 {
     607      2009969 :         self.inner.as_ref().unwrap().size()
     608      2009969 :     }
     609              : 
     610              :     ///
     611              :     /// Finish writing the delta layer.
     612              :     ///
     613          480 :     pub(crate) async fn finish(
     614          480 :         mut self,
     615          480 :         key_end: Key,
     616          480 :         timeline: &Arc<Timeline>,
     617          480 :     ) -> anyhow::Result<ResidentLayer> {
     618          480 :         let inner = self.inner.take().unwrap();
     619          480 :         let temp_path = inner.path.clone();
     620          480 :         let result = inner.finish(key_end, timeline).await;
     621              :         // The delta layer files can sometimes be really large. Clean them up.
     622          480 :         if result.is_err() {
     623            0 :             tracing::warn!(
     624            0 :                 "Cleaning up temporary delta file {temp_path} after error during writing"
     625            0 :             );
     626            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     627            0 :                 tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
     628            0 :             }
     629          480 :         }
     630          480 :         result
     631          480 :     }
     632              : }
     633              : 
     634              : impl Drop for DeltaLayerWriter {
     635          480 :     fn drop(&mut self) {
     636          480 :         if let Some(inner) = self.inner.take() {
     637            0 :             // We want to remove the virtual file here, so it's fine to not
     638            0 :             // having completely flushed unwritten data.
     639            0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     640            0 :             vfile.remove();
     641          480 :         }
     642          480 :     }
     643              : }
     644              : 
     645            0 : #[derive(thiserror::Error, Debug)]
     646              : pub enum RewriteSummaryError {
     647              :     #[error("magic mismatch")]
     648              :     MagicMismatch,
     649              :     #[error(transparent)]
     650              :     Other(#[from] anyhow::Error),
     651              : }
     652              : 
     653              : impl From<std::io::Error> for RewriteSummaryError {
     654            0 :     fn from(e: std::io::Error) -> Self {
     655            0 :         Self::Other(anyhow::anyhow!(e))
     656            0 :     }
     657              : }
     658              : 
     659              : impl DeltaLayer {
     660            0 :     pub async fn rewrite_summary<F>(
     661            0 :         path: &Utf8Path,
     662            0 :         rewrite: F,
     663            0 :         ctx: &RequestContext,
     664            0 :     ) -> Result<(), RewriteSummaryError>
     665            0 :     where
     666            0 :         F: Fn(Summary) -> Summary,
     667            0 :     {
     668            0 :         let file = VirtualFile::open_with_options(
     669            0 :             path,
     670            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     671            0 :         )
     672            0 :         .await
     673            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     674            0 :         let file = FileBlockReader::new(file);
     675            0 :         let summary_blk = file.read_blk(0, ctx).await?;
     676            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     677            0 :         let mut file = file.file;
     678            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     679            0 :             return Err(RewriteSummaryError::MagicMismatch);
     680            0 :         }
     681            0 : 
     682            0 :         let new_summary = rewrite(actual_summary);
     683            0 : 
     684            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     685            0 :         // TODO: could use smallvec here, but it's a pain with Slice<T>
     686            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     687            0 :         file.seek(SeekFrom::Start(0)).await?;
     688            0 :         let (_buf, res) = file.write_all(buf).await;
     689            0 :         res?;
     690            0 :         Ok(())
     691            0 :     }
     692              : }
     693              : 
     694              : impl DeltaLayerInner {
     695              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     696              :     /// - inner has the success or transient failure
     697              :     /// - outer has the permanent failure
     698          440 :     pub(super) async fn load(
     699          440 :         path: &Utf8Path,
     700          440 :         summary: Option<Summary>,
     701          440 :         ctx: &RequestContext,
     702          440 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     703          440 :         let file = match VirtualFile::open(path).await {
     704          440 :             Ok(file) => file,
     705            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     706              :         };
     707          440 :         let file = FileBlockReader::new(file);
     708              : 
     709          440 :         let summary_blk = match file.read_blk(0, ctx).await {
     710          440 :             Ok(blk) => blk,
     711            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     712              :         };
     713              : 
     714              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     715          440 :         let actual_summary =
     716          440 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     717              : 
     718          440 :         if let Some(mut expected_summary) = summary {
     719              :             // production code path
     720          440 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     721          440 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     722          440 :             if actual_summary != expected_summary {
     723            0 :                 bail!(
     724            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     725            0 :                     actual_summary,
     726            0 :                     expected_summary
     727            0 :                 );
     728          440 :             }
     729            0 :         }
     730              : 
     731          440 :         Ok(Ok(DeltaLayerInner {
     732          440 :             file,
     733          440 :             index_start_blk: actual_summary.index_start_blk,
     734          440 :             index_root_blk: actual_summary.index_root_blk,
     735          440 :         }))
     736          440 :     }
     737              : 
     738       123339 :     pub(super) async fn get_value_reconstruct_data(
     739       123339 :         &self,
     740       123339 :         key: Key,
     741       123339 :         lsn_range: Range<Lsn>,
     742       123339 :         reconstruct_state: &mut ValueReconstructState,
     743       123339 :         ctx: &RequestContext,
     744       123339 :     ) -> anyhow::Result<ValueReconstructResult> {
     745       123339 :         let mut need_image = true;
     746       123339 :         // Scan the page versions backwards, starting from `lsn`.
     747       123339 :         let file = &self.file;
     748       123339 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     749       123339 :             self.index_start_blk,
     750       123339 :             self.index_root_blk,
     751       123339 :             file,
     752       123339 :         );
     753       123339 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     754       123339 : 
     755       123339 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     756       123339 : 
     757       123339 :         tree_reader
     758       123339 :             .visit(
     759       123339 :                 &search_key.0,
     760       123339 :                 VisitDirection::Backwards,
     761       123339 :                 |key, value| {
     762       118175 :                     let blob_ref = BlobRef(value);
     763       118175 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     764        46124 :                         return false;
     765        72051 :                     }
     766        72051 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     767        72051 :                     if entry_lsn < lsn_range.start {
     768            0 :                         return false;
     769        72051 :                     }
     770        72051 :                     offsets.push((entry_lsn, blob_ref.pos()));
     771        72051 : 
     772        72051 :                     !blob_ref.will_init()
     773       123339 :                 },
     774       123339 :                 &RequestContextBuilder::extend(ctx)
     775       123339 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     776       123339 :                     .build(),
     777       123339 :             )
     778        17381 :             .await?;
     779              : 
     780       123339 :         let ctx = &RequestContextBuilder::extend(ctx)
     781       123339 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     782       123339 :             .build();
     783       123339 : 
     784       123339 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     785       123339 :         let cursor = file.block_cursor();
     786       123339 :         let mut buf = Vec::new();
     787       123339 :         for (entry_lsn, pos) in offsets {
     788        72051 :             cursor
     789        72051 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     790         5537 :                 .await
     791        72051 :                 .with_context(|| {
     792            0 :                     format!("Failed to read blob from virtual file {}", file.file.path)
     793        72051 :                 })?;
     794        72051 :             let val = Value::des(&buf).with_context(|| {
     795            0 :                 format!(
     796            0 :                     "Failed to deserialize file blob from virtual file {}",
     797            0 :                     file.file.path
     798            0 :                 )
     799        72051 :             })?;
     800        72051 :             match val {
     801        72051 :                 Value::Image(img) => {
     802        72051 :                     reconstruct_state.img = Some((entry_lsn, img));
     803        72051 :                     need_image = false;
     804        72051 :                     break;
     805              :                 }
     806            0 :                 Value::WalRecord(rec) => {
     807            0 :                     let will_init = rec.will_init();
     808            0 :                     reconstruct_state.records.push((entry_lsn, rec));
     809            0 :                     if will_init {
     810              :                         // This WAL record initializes the page, so no need to go further back
     811            0 :                         need_image = false;
     812            0 :                         break;
     813            0 :                     }
     814              :                 }
     815              :             }
     816              :         }
     817              : 
     818              :         // If an older page image is needed to reconstruct the page, let the
     819              :         // caller know.
     820       123339 :         if need_image {
     821        51288 :             Ok(ValueReconstructResult::Continue)
     822              :         } else {
     823        72051 :             Ok(ValueReconstructResult::Complete)
     824              :         }
     825       123339 :     }
     826              : 
     827              :     // Look up the keys in the provided keyspace and update
     828              :     // the reconstruct state with whatever is found.
     829              :     //
     830              :     // If the key is cached, go no further than the cached Lsn.
     831              :     //
     832              :     // Currently, the index is visited for each range, but this
     833              :     // can be further optimised to visit the index only once.
     834           10 :     pub(super) async fn get_values_reconstruct_data(
     835           10 :         &self,
     836           10 :         keyspace: KeySpace,
     837           10 :         end_lsn: Lsn,
     838           10 :         reconstruct_state: &mut ValuesReconstructState,
     839           10 :         ctx: &RequestContext,
     840           10 :     ) -> Result<(), GetVectoredError> {
     841           10 :         let file = &self.file;
     842           10 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     843           10 :             self.index_start_blk,
     844           10 :             self.index_root_blk,
     845           10 :             file,
     846           10 :         );
     847           10 : 
     848           10 :         let mut offsets: BTreeMap<Key, Vec<(Lsn, u64)>> = BTreeMap::new();
     849              : 
     850           10 :         for range in keyspace.ranges.iter() {
     851           10 :             let mut ignore_key = None;
     852           10 : 
     853           10 :             // Scan the page versions backwards, starting from the last key in the range.
     854           10 :             // to collect all the offsets at which need to be read.
     855           10 :             let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1));
     856           10 :             tree_reader
     857           10 :                 .visit(
     858           10 :                     &end_key.0,
     859           10 :                     VisitDirection::Backwards,
     860          330 :                     |raw_key, value| {
     861          330 :                         let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     862          330 :                         let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key);
     863          330 : 
     864          330 :                         if entry_lsn >= end_lsn {
     865            0 :                             return true;
     866          330 :                         }
     867          330 : 
     868          330 :                         if key < range.start {
     869            0 :                             return false;
     870          330 :                         }
     871          330 : 
     872          330 :                         if key >= range.end {
     873           10 :                             return true;
     874          320 :                         }
     875          320 : 
     876          320 :                         if Some(key) == ignore_key {
     877            0 :                             return true;
     878          320 :                         }
     879              : 
     880          320 :                         if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) {
     881            0 :                             if entry_lsn <= cached_lsn {
     882            0 :                                 return key != range.start;
     883            0 :                             }
     884          320 :                         }
     885              : 
     886          320 :                         let blob_ref = BlobRef(value);
     887          320 :                         let lsns_at = offsets.entry(key).or_default();
     888          320 :                         lsns_at.push((entry_lsn, blob_ref.pos()));
     889          320 : 
     890          320 :                         if blob_ref.will_init() {
     891          320 :                             if key == range.start {
     892           10 :                                 return false;
     893              :                             } else {
     894          310 :                                 ignore_key = Some(key);
     895          310 :                                 return true;
     896              :                             }
     897            0 :                         }
     898            0 : 
     899            0 :                         true
     900          330 :                     },
     901           10 :                     &RequestContextBuilder::extend(ctx)
     902           10 :                         .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     903           10 :                         .build(),
     904           10 :                 )
     905           10 :                 .await
     906           10 :                 .map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
     907              :         }
     908              : 
     909           10 :         let ctx = &RequestContextBuilder::extend(ctx)
     910           10 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     911           10 :             .build();
     912           10 : 
     913           10 :         let cursor = file.block_cursor();
     914           10 :         let mut buf = Vec::new();
     915          330 :         for (key, lsns_at) in offsets {
     916          320 :             for (lsn, block_offset) in lsns_at {
     917          320 :                 let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await;
     918              : 
     919          320 :                 if let Err(e) = res {
     920            0 :                     reconstruct_state.on_key_error(
     921            0 :                         key,
     922            0 :                         PageReconstructError::from(anyhow!(e).context(format!(
     923            0 :                             "Failed to read blob from virtual file {}",
     924            0 :                             file.file.path
     925            0 :                         ))),
     926            0 :                     );
     927            0 : 
     928            0 :                     break;
     929          320 :                 }
     930          320 : 
     931          320 :                 let value = Value::des(&buf);
     932          320 :                 if let Err(e) = value {
     933            0 :                     reconstruct_state.on_key_error(
     934            0 :                         key,
     935            0 :                         PageReconstructError::from(anyhow!(e).context(format!(
     936            0 :                             "Failed to deserialize file blob from virtual file {}",
     937            0 :                             file.file.path
     938            0 :                         ))),
     939            0 :                     );
     940            0 : 
     941            0 :                     break;
     942          320 :                 }
     943          320 : 
     944          320 :                 let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap());
     945          320 :                 if key_situation == ValueReconstructSituation::Complete {
     946          320 :                     break;
     947            0 :                 }
     948              :             }
     949              :         }
     950              : 
     951           10 :         Ok(())
     952           10 :     }
     953              : 
     954          304 :     pub(super) async fn load_keys<'a>(
     955          304 :         &'a self,
     956          304 :         ctx: &RequestContext,
     957          304 :     ) -> Result<Vec<DeltaEntry<'a>>> {
     958          304 :         let file = &self.file;
     959          304 : 
     960          304 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     961          304 :             self.index_start_blk,
     962          304 :             self.index_root_blk,
     963          304 :             file,
     964          304 :         );
     965          304 : 
     966          304 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
     967          304 : 
     968          304 :         tree_reader
     969          304 :             .visit(
     970          304 :                 &[0u8; DELTA_KEY_SIZE],
     971          304 :                 VisitDirection::Forwards,
     972      2102008 :                 |key, value| {
     973      2102008 :                     let delta_key = DeltaKey::from_slice(key);
     974      2102008 :                     let val_ref = ValueRef {
     975      2102008 :                         blob_ref: BlobRef(value),
     976      2102008 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
     977      2102008 :                             Adapter(self),
     978      2102008 :                         )),
     979      2102008 :                     };
     980      2102008 :                     let pos = BlobRef(value).pos();
     981      2102008 :                     if let Some(last) = all_keys.last_mut() {
     982      2101704 :                         // subtract offset of the current and last entries to get the size
     983      2101704 :                         // of the value associated with this (key, lsn) tuple
     984      2101704 :                         let first_pos = last.size;
     985      2101704 :                         last.size = pos - first_pos;
     986      2101704 :                     }
     987      2102008 :                     let entry = DeltaEntry {
     988      2102008 :                         key: delta_key.key(),
     989      2102008 :                         lsn: delta_key.lsn(),
     990      2102008 :                         size: pos,
     991      2102008 :                         val: val_ref,
     992      2102008 :                     };
     993      2102008 :                     all_keys.push(entry);
     994      2102008 :                     true
     995      2102008 :                 },
     996          304 :                 &RequestContextBuilder::extend(ctx)
     997          304 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     998          304 :                     .build(),
     999          304 :             )
    1000         2212 :             .await?;
    1001          304 :         if let Some(last) = all_keys.last_mut() {
    1002          304 :             // Last key occupies all space till end of value storage,
    1003          304 :             // which corresponds to beginning of the index
    1004          304 :             last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
    1005          304 :         }
    1006          304 :         Ok(all_keys)
    1007          304 :     }
    1008              : 
    1009            4 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1010            4 :         println!(
    1011            4 :             "index_start_blk: {}, root {}",
    1012            4 :             self.index_start_blk, self.index_root_blk
    1013            4 :         );
    1014            4 : 
    1015            4 :         let file = &self.file;
    1016            4 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1017            4 :             self.index_start_blk,
    1018            4 :             self.index_root_blk,
    1019            4 :             file,
    1020            4 :         );
    1021            4 : 
    1022            4 :         tree_reader.dump().await?;
    1023              : 
    1024            4 :         let keys = self.load_keys(ctx).await?;
    1025              : 
    1026            8 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1027            8 :             let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1028            8 :             let val = Value::des(&buf)?;
    1029            8 :             let desc = match val {
    1030            8 :                 Value::Image(img) => {
    1031            8 :                     format!(" img {} bytes", img.len())
    1032              :                 }
    1033            0 :                 Value::WalRecord(rec) => {
    1034            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
    1035            0 :                     format!(
    1036            0 :                         " rec {} bytes will_init: {} {}",
    1037            0 :                         buf.len(),
    1038            0 :                         rec.will_init(),
    1039            0 :                         wal_desc
    1040            0 :                     )
    1041              :                 }
    1042              :             };
    1043            8 :             Ok(desc)
    1044            8 :         }
    1045              : 
    1046           12 :         for entry in keys {
    1047            8 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1048            8 :             let desc = match dump_blob(&val, ctx).await {
    1049            8 :                 Ok(desc) => desc,
    1050            0 :                 Err(err) => {
    1051            0 :                     format!("ERROR: {err}")
    1052              :                 }
    1053              :             };
    1054            8 :             println!("  key {key} at {lsn}: {desc}");
    1055            8 : 
    1056            8 :             // Print more details about CHECKPOINT records. Would be nice to print details
    1057            8 :             // of many other record types too, but these are particularly interesting, as
    1058            8 :             // have a lot of special processing for them in walingest.rs.
    1059            8 :             use pageserver_api::key::CHECKPOINT_KEY;
    1060            8 :             use postgres_ffi::CheckPoint;
    1061            8 :             if key == CHECKPOINT_KEY {
    1062            0 :                 let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1063            0 :                 let val = Value::des(&buf)?;
    1064            0 :                 match val {
    1065            0 :                     Value::Image(img) => {
    1066            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1067            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
    1068              :                     }
    1069            0 :                     Value::WalRecord(_rec) => {
    1070            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1071            0 :                     }
    1072              :                 }
    1073            8 :             }
    1074              :         }
    1075              : 
    1076            4 :         Ok(())
    1077            4 :     }
    1078              : }
    1079              : 
    1080              : /// A set of data associated with a delta layer key and its value
    1081              : pub struct DeltaEntry<'a> {
    1082              :     pub key: Key,
    1083              :     pub lsn: Lsn,
    1084              :     /// Size of the stored value
    1085              :     pub size: u64,
    1086              :     /// Reference to the on-disk value
    1087              :     pub val: ValueRef<'a>,
    1088              : }
    1089              : 
    1090              : /// Reference to an on-disk value
    1091              : pub struct ValueRef<'a> {
    1092              :     blob_ref: BlobRef,
    1093              :     reader: BlockCursor<'a>,
    1094              : }
    1095              : 
    1096              : impl<'a> ValueRef<'a> {
    1097              :     /// Loads the value from disk
    1098      2102000 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1099              :         // theoretically we *could* record an access time for each, but it does not really matter
    1100      2102000 :         let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1101      2102000 :         let val = Value::des(&buf)?;
    1102      2102000 :         Ok(val)
    1103      2102000 :     }
    1104              : }
    1105              : 
    1106              : pub(crate) struct Adapter<T>(T);
    1107              : 
    1108              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1109      2121326 :     pub(crate) async fn read_blk(
    1110      2121326 :         &self,
    1111      2121326 :         blknum: u32,
    1112      2121326 :         ctx: &RequestContext,
    1113      2121326 :     ) -> Result<BlockLease, std::io::Error> {
    1114      2121326 :         self.0.as_ref().file.read_blk(blknum, ctx).await
    1115      2121326 :     }
    1116              : }
    1117              : 
    1118              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1119      2121326 :     fn as_ref(&self) -> &DeltaLayerInner {
    1120      2121326 :         self
    1121      2121326 :     }
    1122              : }
        

Generated by: LCOV version 2.1-beta