LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 70.3 % 762 536
Test Date: 2024-02-29 11:57:12 Functions: 51.4 % 111 57

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "index", and
      24              : //! "values". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache::{self, FileId, PAGE_SZ};
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::BlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37              : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
      38              : use crate::tenant::timeline::GetVectoredError;
      39              : use crate::tenant::vectored_blob_io::{
      40              :     BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
      41              : };
      42              : use crate::tenant::{PageReconstructError, Timeline};
      43              : use crate::virtual_file::{self, VirtualFile};
      44              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      45              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      46              : use anyhow::{anyhow, bail, ensure, Context, Result};
      47              : use bytes::BytesMut;
      48              : use camino::{Utf8Path, Utf8PathBuf};
      49              : use pageserver_api::keyspace::KeySpace;
      50              : use pageserver_api::models::LayerAccessKind;
      51              : use pageserver_api::shard::TenantShardId;
      52              : use rand::{distributions::Alphanumeric, Rng};
      53              : use serde::{Deserialize, Serialize};
      54              : use std::fs::File;
      55              : use std::io::SeekFrom;
      56              : use std::ops::Range;
      57              : use std::os::unix::fs::FileExt;
      58              : use std::sync::Arc;
      59              : use tokio::sync::OnceCell;
      60              : use tracing::*;
      61              : 
      62              : use utils::{
      63              :     bin_ser::BeSer,
      64              :     id::{TenantId, TimelineId},
      65              :     lsn::Lsn,
      66              : };
      67              : 
      68              : use super::{
      69              :     AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
      70              : };
      71              : 
      72              : ///
      73              : /// Header stored in the beginning of the file
      74              : ///
      75              : /// After this comes the 'values' part, starting on block 1. After that,
      76              : /// the 'index' starts at the block indicated by 'index_start_blk'
      77              : ///
      78          480 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      79              : pub struct Summary {
      80              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      81              :     pub magic: u16,
      82              :     pub format_version: u16,
      83              : 
      84              :     pub tenant_id: TenantId,
      85              :     pub timeline_id: TimelineId,
      86              :     pub key_range: Range<Key>,
      87              :     pub lsn_range: Range<Lsn>,
      88              : 
      89              :     /// Block number where the 'index' part of the file begins.
      90              :     pub index_start_blk: u32,
      91              :     /// Block within the 'index', where the B-tree root page is stored
      92              :     pub index_root_blk: u32,
      93              : }
      94              : 
      95              : impl From<&DeltaLayer> for Summary {
      96            0 :     fn from(layer: &DeltaLayer) -> Self {
      97            0 :         Self::expected(
      98            0 :             layer.desc.tenant_shard_id.tenant_id,
      99            0 :             layer.desc.timeline_id,
     100            0 :             layer.desc.key_range.clone(),
     101            0 :             layer.desc.lsn_range.clone(),
     102            0 :         )
     103            0 :     }
     104              : }
     105              : 
     106              : impl Summary {
     107          440 :     pub(super) fn expected(
     108          440 :         tenant_id: TenantId,
     109          440 :         timeline_id: TimelineId,
     110          440 :         keys: Range<Key>,
     111          440 :         lsns: Range<Lsn>,
     112          440 :     ) -> Self {
     113          440 :         Self {
     114          440 :             magic: DELTA_FILE_MAGIC,
     115          440 :             format_version: STORAGE_FORMAT_VERSION,
     116          440 : 
     117          440 :             tenant_id,
     118          440 :             timeline_id,
     119          440 :             key_range: keys,
     120          440 :             lsn_range: lsns,
     121          440 : 
     122          440 :             index_start_blk: 0,
     123          440 :             index_root_blk: 0,
     124          440 :         }
     125          440 :     }
     126              : }
     127              : 
     128              : // Flag indicating that this version initialize the page
     129              : const WILL_INIT: u64 = 1;
     130              : 
     131              : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     132              : /// offset, and for WAL records it also contains `will_init` flag. The flag
     133              : /// helps to determine the range of records that needs to be applied, without
     134              : /// reading/deserializing records themselves.
     135            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     136              : pub struct BlobRef(pub u64);
     137              : 
     138              : impl BlobRef {
     139        72401 :     pub fn will_init(&self) -> bool {
     140        72401 :         (self.0 & WILL_INIT) != 0
     141        72401 :     }
     142              : 
     143      4276417 :     pub fn pos(&self) -> u64 {
     144      4276417 :         self.0 >> 1
     145      4276417 :     }
     146              : 
     147      4311466 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     148      4311466 :         let mut blob_ref = pos << 1;
     149      4311466 :         if will_init {
     150      4311466 :             blob_ref |= WILL_INIT;
     151      4311466 :         }
     152      4311466 :         BlobRef(blob_ref)
     153      4311466 :     }
     154              : }
     155              : 
     156              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     157              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     158              : 
     159              : /// This is the key of the B-tree index stored in the delta layer. It consists
     160              : /// of the serialized representation of a Key and LSN.
     161              : impl DeltaKey {
     162      2102008 :     fn from_slice(buf: &[u8]) -> Self {
     163      2102008 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     164      2102008 :         bytes.copy_from_slice(buf);
     165      2102008 :         DeltaKey(bytes)
     166      2102008 :     }
     167              : 
     168      4434992 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     169      4434992 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     170      4434992 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     171      4434992 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     172      4434992 :         DeltaKey(bytes)
     173      4434992 :     }
     174              : 
     175      2102008 :     fn key(&self) -> Key {
     176      2102008 :         Key::from_slice(&self.0)
     177      2102008 :     }
     178              : 
     179      2102008 :     fn lsn(&self) -> Lsn {
     180      2102008 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     181      2102008 :     }
     182              : 
     183        72401 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     184        72401 :         let mut lsn_buf = [0u8; 8];
     185        72401 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     186        72401 :         Lsn(u64::from_be_bytes(lsn_buf))
     187        72401 :     }
     188              : }
     189              : 
     190              : /// This is used only from `pagectl`. Within pageserver, all layers are
     191              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     192              : pub struct DeltaLayer {
     193              :     path: Utf8PathBuf,
     194              :     pub desc: PersistentLayerDesc,
     195              :     access_stats: LayerAccessStats,
     196              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     197              : }
     198              : 
     199              : impl std::fmt::Debug for DeltaLayer {
     200            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     201            0 :         use super::RangeDisplayDebug;
     202            0 : 
     203            0 :         f.debug_struct("DeltaLayer")
     204            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     205            0 :             .field("lsn_range", &self.desc.lsn_range)
     206            0 :             .field("file_size", &self.desc.file_size)
     207            0 :             .field("inner", &self.inner)
     208            0 :             .finish()
     209            0 :     }
     210              : }
     211              : 
     212              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     213              : /// file.
     214              : pub struct DeltaLayerInner {
     215              :     // values copied from summary
     216              :     index_start_blk: u32,
     217              :     index_root_blk: u32,
     218              : 
     219              :     file: VirtualFile,
     220              :     file_id: FileId,
     221              : 
     222              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     223              : }
     224              : 
     225              : impl std::fmt::Debug for DeltaLayerInner {
     226            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     227            0 :         f.debug_struct("DeltaLayerInner")
     228            0 :             .field("index_start_blk", &self.index_start_blk)
     229            0 :             .field("index_root_blk", &self.index_root_blk)
     230            0 :             .finish()
     231            0 :     }
     232              : }
     233              : 
     234              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     235              : impl std::fmt::Display for DeltaLayer {
     236            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     237            0 :         write!(f, "{}", self.layer_desc().short_id())
     238            0 :     }
     239              : }
     240              : 
     241              : impl AsLayerDesc for DeltaLayer {
     242            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     243            0 :         &self.desc
     244            0 :     }
     245              : }
     246              : 
     247              : impl DeltaLayer {
     248            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     249            0 :         self.desc.dump();
     250            0 : 
     251            0 :         if !verbose {
     252            0 :             return Ok(());
     253            0 :         }
     254              : 
     255            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     256              : 
     257            0 :         inner.dump(ctx).await
     258            0 :     }
     259              : 
     260          480 :     fn temp_path_for(
     261          480 :         conf: &PageServerConf,
     262          480 :         tenant_shard_id: &TenantShardId,
     263          480 :         timeline_id: &TimelineId,
     264          480 :         key_start: Key,
     265          480 :         lsn_range: &Range<Lsn>,
     266          480 :     ) -> Utf8PathBuf {
     267          480 :         let rand_string: String = rand::thread_rng()
     268          480 :             .sample_iter(&Alphanumeric)
     269          480 :             .take(8)
     270          480 :             .map(char::from)
     271          480 :             .collect();
     272          480 : 
     273          480 :         conf.timeline_path(tenant_shard_id, timeline_id)
     274          480 :             .join(format!(
     275          480 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     276          480 :                 key_start,
     277          480 :                 u64::from(lsn_range.start),
     278          480 :                 u64::from(lsn_range.end),
     279          480 :                 rand_string,
     280          480 :                 TEMP_FILE_SUFFIX,
     281          480 :             ))
     282          480 :     }
     283              : 
     284              :     ///
     285              :     /// Open the underlying file and read the metadata into memory, if it's
     286              :     /// not loaded already.
     287              :     ///
     288            0 :     async fn load(
     289            0 :         &self,
     290            0 :         access_kind: LayerAccessKind,
     291            0 :         ctx: &RequestContext,
     292            0 :     ) -> Result<&Arc<DeltaLayerInner>> {
     293            0 :         self.access_stats.record_access(access_kind, ctx);
     294            0 :         // Quick exit if already loaded
     295            0 :         self.inner
     296            0 :             .get_or_try_init(|| self.load_inner(ctx))
     297            0 :             .await
     298            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     299            0 :     }
     300              : 
     301            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     302            0 :         let path = self.path();
     303              : 
     304            0 :         let loaded = DeltaLayerInner::load(&path, None, None, ctx)
     305            0 :             .await
     306            0 :             .and_then(|res| res)?;
     307              : 
     308              :         // not production code
     309            0 :         let actual_filename = path.file_name().unwrap().to_owned();
     310            0 :         let expected_filename = self.layer_desc().filename().file_name();
     311            0 : 
     312            0 :         if actual_filename != expected_filename {
     313            0 :             println!("warning: filename does not match what is expected from in-file summary");
     314            0 :             println!("actual: {:?}", actual_filename);
     315            0 :             println!("expected: {:?}", expected_filename);
     316            0 :         }
     317              : 
     318            0 :         Ok(Arc::new(loaded))
     319            0 :     }
     320              : 
     321              :     /// Create a DeltaLayer struct representing an existing file on disk.
     322              :     ///
     323              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     324            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     325            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     326            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     327            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     328              : 
     329            0 :         let metadata = file
     330            0 :             .metadata()
     331            0 :             .context("get file metadata to determine size")?;
     332              : 
     333              :         // This function is never used for constructing layers in a running pageserver,
     334              :         // so it does not need an accurate TenantShardId.
     335            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     336            0 : 
     337            0 :         Ok(DeltaLayer {
     338            0 :             path: path.to_path_buf(),
     339            0 :             desc: PersistentLayerDesc::new_delta(
     340            0 :                 tenant_shard_id,
     341            0 :                 summary.timeline_id,
     342            0 :                 summary.key_range,
     343            0 :                 summary.lsn_range,
     344            0 :                 metadata.len(),
     345            0 :             ),
     346            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     347            0 :             inner: OnceCell::new(),
     348            0 :         })
     349            0 :     }
     350              : 
     351              :     /// Path to the layer file in pageserver workdir.
     352            0 :     fn path(&self) -> Utf8PathBuf {
     353            0 :         self.path.clone()
     354            0 :     }
     355              : }
     356              : 
     357              : /// A builder object for constructing a new delta layer.
     358              : ///
     359              : /// Usage:
     360              : ///
     361              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     362              : ///
     363              : /// 2. Write the contents by calling `put_value` for every page
     364              : ///    version to store in the layer.
     365              : ///
     366              : /// 3. Call `finish`.
     367              : ///
     368              : struct DeltaLayerWriterInner {
     369              :     conf: &'static PageServerConf,
     370              :     pub path: Utf8PathBuf,
     371              :     timeline_id: TimelineId,
     372              :     tenant_shard_id: TenantShardId,
     373              : 
     374              :     key_start: Key,
     375              :     lsn_range: Range<Lsn>,
     376              : 
     377              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     378              : 
     379              :     blob_writer: BlobWriter<true>,
     380              : }
     381              : 
     382              : impl DeltaLayerWriterInner {
     383              :     ///
     384              :     /// Start building a new delta layer.
     385              :     ///
     386          480 :     async fn new(
     387          480 :         conf: &'static PageServerConf,
     388          480 :         timeline_id: TimelineId,
     389          480 :         tenant_shard_id: TenantShardId,
     390          480 :         key_start: Key,
     391          480 :         lsn_range: Range<Lsn>,
     392          480 :     ) -> anyhow::Result<Self> {
     393          480 :         // Create the file initially with a temporary filename. We don't know
     394          480 :         // the end key yet, so we cannot form the final filename yet. We will
     395          480 :         // rename it when we're done.
     396          480 :         //
     397          480 :         // Note: This overwrites any existing file. There shouldn't be any.
     398          480 :         // FIXME: throw an error instead?
     399          480 :         let path =
     400          480 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     401              : 
     402          480 :         let mut file = VirtualFile::create(&path).await?;
     403              :         // make room for the header block
     404          480 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     405          480 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     406          480 : 
     407          480 :         // Initialize the b-tree index builder
     408          480 :         let block_buf = BlockBuf::new();
     409          480 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     410          480 : 
     411          480 :         Ok(Self {
     412          480 :             conf,
     413          480 :             path,
     414          480 :             timeline_id,
     415          480 :             tenant_shard_id,
     416          480 :             key_start,
     417          480 :             lsn_range,
     418          480 :             tree: tree_builder,
     419          480 :             blob_writer,
     420          480 :         })
     421          480 :     }
     422              : 
     423              :     ///
     424              :     /// Append a key-value pair to the file.
     425              :     ///
     426              :     /// The values must be appended in key, lsn order.
     427              :     ///
     428      2102000 :     async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     429      2102000 :         let (_, res) = self
     430      2102000 :             .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
     431          153 :             .await;
     432      2102000 :         res
     433      2102000 :     }
     434              : 
     435      4311466 :     async fn put_value_bytes(
     436      4311466 :         &mut self,
     437      4311466 :         key: Key,
     438      4311466 :         lsn: Lsn,
     439      4311466 :         val: Vec<u8>,
     440      4311466 :         will_init: bool,
     441      4311466 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     442      4311466 :         assert!(self.lsn_range.start <= lsn);
     443      4311466 :         let (val, res) = self.blob_writer.write_blob(val).await;
     444      4311466 :         let off = match res {
     445      4311466 :             Ok(off) => off,
     446            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     447              :         };
     448              : 
     449      4311466 :         let blob_ref = BlobRef::new(off, will_init);
     450      4311466 : 
     451      4311466 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     452      4311466 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     453      4311466 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     454      4311466 :     }
     455              : 
     456      2009970 :     fn size(&self) -> u64 {
     457      2009970 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     458      2009970 :     }
     459              : 
     460              :     ///
     461              :     /// Finish writing the delta layer.
     462              :     ///
     463          480 :     async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
     464          480 :         let index_start_blk =
     465          480 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     466              : 
     467          480 :         let mut file = self.blob_writer.into_inner().await?;
     468              : 
     469              :         // Write out the index
     470          480 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     471          480 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     472            0 :             .await?;
     473         9119 :         for buf in block_buf.blocks {
     474         8639 :             let (_buf, res) = file.write_all(buf).await;
     475         8639 :             res?;
     476              :         }
     477          480 :         assert!(self.lsn_range.start < self.lsn_range.end);
     478              :         // Fill in the summary on blk 0
     479          480 :         let summary = Summary {
     480          480 :             magic: DELTA_FILE_MAGIC,
     481          480 :             format_version: STORAGE_FORMAT_VERSION,
     482          480 :             tenant_id: self.tenant_shard_id.tenant_id,
     483          480 :             timeline_id: self.timeline_id,
     484          480 :             key_range: self.key_start..key_end,
     485          480 :             lsn_range: self.lsn_range.clone(),
     486          480 :             index_start_blk,
     487          480 :             index_root_blk,
     488          480 :         };
     489          480 : 
     490          480 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     491          480 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     492          480 :         Summary::ser_into(&summary, &mut buf)?;
     493          480 :         file.seek(SeekFrom::Start(0)).await?;
     494          480 :         let (_buf, res) = file.write_all(buf).await;
     495          480 :         res?;
     496              : 
     497          480 :         let metadata = file
     498          480 :             .metadata()
     499            0 :             .await
     500          480 :             .context("get file metadata to determine size")?;
     501              : 
     502              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     503              :         // Make it a little bit below to account for differing GB units
     504              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     505              :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     506          480 :         ensure!(
     507          480 :             metadata.len() <= S3_UPLOAD_LIMIT,
     508            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     509            0 :             file.path,
     510            0 :             metadata.len()
     511              :         );
     512              : 
     513              :         // Note: Because we opened the file in write-only mode, we cannot
     514              :         // reuse the same VirtualFile for reading later. That's why we don't
     515              :         // set inner.file here. The first read will have to re-open it.
     516              : 
     517          480 :         let desc = PersistentLayerDesc::new_delta(
     518          480 :             self.tenant_shard_id,
     519          480 :             self.timeline_id,
     520          480 :             self.key_start..key_end,
     521          480 :             self.lsn_range.clone(),
     522          480 :             metadata.len(),
     523          480 :         );
     524          480 : 
     525          480 :         // fsync the file
     526          480 :         file.sync_all().await?;
     527              : 
     528          480 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     529              : 
     530            0 :         trace!("created delta layer {}", layer.local_path());
     531              : 
     532          480 :         Ok(layer)
     533          480 :     }
     534              : }
     535              : 
     536              : /// A builder object for constructing a new delta layer.
     537              : ///
     538              : /// Usage:
     539              : ///
     540              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     541              : ///
     542              : /// 2. Write the contents by calling `put_value` for every page
     543              : ///    version to store in the layer.
     544              : ///
     545              : /// 3. Call `finish`.
     546              : ///
     547              : /// # Note
     548              : ///
     549              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     550              : /// possible for the writer to drop before `finish` is actually called. So this
     551              : /// could lead to odd temporary files in the directory, exhausting file system.
     552              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     553              : /// implementation that cleans up the temporary file in failure. It's not
     554              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     555              : /// out some fields, making it impossible to implement `Drop`.
     556              : ///
     557              : #[must_use]
     558              : pub struct DeltaLayerWriter {
     559              :     inner: Option<DeltaLayerWriterInner>,
     560              : }
     561              : 
     562              : impl DeltaLayerWriter {
     563              :     ///
     564              :     /// Start building a new delta layer.
     565              :     ///
     566          480 :     pub async fn new(
     567          480 :         conf: &'static PageServerConf,
     568          480 :         timeline_id: TimelineId,
     569          480 :         tenant_shard_id: TenantShardId,
     570          480 :         key_start: Key,
     571          480 :         lsn_range: Range<Lsn>,
     572          480 :     ) -> anyhow::Result<Self> {
     573          480 :         Ok(Self {
     574          480 :             inner: Some(
     575          480 :                 DeltaLayerWriterInner::new(
     576          480 :                     conf,
     577          480 :                     timeline_id,
     578          480 :                     tenant_shard_id,
     579          480 :                     key_start,
     580          480 :                     lsn_range,
     581          480 :                 )
     582          282 :                 .await?,
     583              :             ),
     584              :         })
     585          480 :     }
     586              : 
     587              :     ///
     588              :     /// Append a key-value pair to the file.
     589              :     ///
     590              :     /// The values must be appended in key, lsn order.
     591              :     ///
     592      2102000 :     pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     593      2102000 :         self.inner.as_mut().unwrap().put_value(key, lsn, val).await
     594      2102000 :     }
     595              : 
     596      2209466 :     pub async fn put_value_bytes(
     597      2209466 :         &mut self,
     598      2209466 :         key: Key,
     599      2209466 :         lsn: Lsn,
     600      2209466 :         val: Vec<u8>,
     601      2209466 :         will_init: bool,
     602      2209466 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     603      2209466 :         self.inner
     604      2209466 :             .as_mut()
     605      2209466 :             .unwrap()
     606      2209466 :             .put_value_bytes(key, lsn, val, will_init)
     607          174 :             .await
     608      2209466 :     }
     609              : 
     610      2009970 :     pub fn size(&self) -> u64 {
     611      2009970 :         self.inner.as_ref().unwrap().size()
     612      2009970 :     }
     613              : 
     614              :     ///
     615              :     /// Finish writing the delta layer.
     616              :     ///
     617          480 :     pub(crate) async fn finish(
     618          480 :         mut self,
     619          480 :         key_end: Key,
     620          480 :         timeline: &Arc<Timeline>,
     621          480 :     ) -> anyhow::Result<ResidentLayer> {
     622          480 :         let inner = self.inner.take().unwrap();
     623          480 :         let temp_path = inner.path.clone();
     624          480 :         let result = inner.finish(key_end, timeline).await;
     625              :         // The delta layer files can sometimes be really large. Clean them up.
     626          480 :         if result.is_err() {
     627            0 :             tracing::warn!(
     628            0 :                 "Cleaning up temporary delta file {temp_path} after error during writing"
     629            0 :             );
     630            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     631            0 :                 tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
     632            0 :             }
     633          480 :         }
     634          480 :         result
     635          480 :     }
     636              : }
     637              : 
     638              : impl Drop for DeltaLayerWriter {
     639          480 :     fn drop(&mut self) {
     640          480 :         if let Some(inner) = self.inner.take() {
     641            0 :             // We want to remove the virtual file here, so it's fine to not
     642            0 :             // having completely flushed unwritten data.
     643            0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     644            0 :             vfile.remove();
     645          480 :         }
     646          480 :     }
     647              : }
     648              : 
     649            0 : #[derive(thiserror::Error, Debug)]
     650              : pub enum RewriteSummaryError {
     651              :     #[error("magic mismatch")]
     652              :     MagicMismatch,
     653              :     #[error(transparent)]
     654              :     Other(#[from] anyhow::Error),
     655              : }
     656              : 
     657              : impl From<std::io::Error> for RewriteSummaryError {
     658            0 :     fn from(e: std::io::Error) -> Self {
     659            0 :         Self::Other(anyhow::anyhow!(e))
     660            0 :     }
     661              : }
     662              : 
     663              : impl DeltaLayer {
     664            0 :     pub async fn rewrite_summary<F>(
     665            0 :         path: &Utf8Path,
     666            0 :         rewrite: F,
     667            0 :         ctx: &RequestContext,
     668            0 :     ) -> Result<(), RewriteSummaryError>
     669            0 :     where
     670            0 :         F: Fn(Summary) -> Summary,
     671            0 :     {
     672            0 :         let mut file = VirtualFile::open_with_options(
     673            0 :             path,
     674            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     675            0 :         )
     676            0 :         .await
     677            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     678            0 :         let file_id = page_cache::next_file_id();
     679            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     680            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     681            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     682            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     683            0 :             return Err(RewriteSummaryError::MagicMismatch);
     684            0 :         }
     685            0 : 
     686            0 :         let new_summary = rewrite(actual_summary);
     687            0 : 
     688            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     689            0 :         // TODO: could use smallvec here, but it's a pain with Slice<T>
     690            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     691            0 :         file.seek(SeekFrom::Start(0)).await?;
     692            0 :         let (_buf, res) = file.write_all(buf).await;
     693            0 :         res?;
     694            0 :         Ok(())
     695            0 :     }
     696              : }
     697              : 
     698              : impl DeltaLayerInner {
     699              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     700              :     /// - inner has the success or transient failure
     701              :     /// - outer has the permanent failure
     702          440 :     pub(super) async fn load(
     703          440 :         path: &Utf8Path,
     704          440 :         summary: Option<Summary>,
     705          440 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     706          440 :         ctx: &RequestContext,
     707          440 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     708          440 :         let file = match VirtualFile::open(path).await {
     709          440 :             Ok(file) => file,
     710            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     711              :         };
     712          440 :         let file_id = page_cache::next_file_id();
     713          440 : 
     714          440 :         let block_reader = FileBlockReader::new(&file, file_id);
     715              : 
     716          440 :         let summary_blk = match block_reader.read_blk(0, ctx).await {
     717          440 :             Ok(blk) => blk,
     718            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     719              :         };
     720              : 
     721              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     722          440 :         let actual_summary =
     723          440 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     724              : 
     725          440 :         if let Some(mut expected_summary) = summary {
     726              :             // production code path
     727          440 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     728          440 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     729          440 :             if actual_summary != expected_summary {
     730            0 :                 bail!(
     731            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     732            0 :                     actual_summary,
     733            0 :                     expected_summary
     734            0 :                 );
     735          440 :             }
     736            0 :         }
     737              : 
     738          440 :         Ok(Ok(DeltaLayerInner {
     739          440 :             file,
     740          440 :             file_id,
     741          440 :             index_start_blk: actual_summary.index_start_blk,
     742          440 :             index_root_blk: actual_summary.index_root_blk,
     743          440 :             max_vectored_read_bytes,
     744          440 :         }))
     745          440 :     }
     746              : 
     747       123516 :     pub(super) async fn get_value_reconstruct_data(
     748       123516 :         &self,
     749       123516 :         key: Key,
     750       123516 :         lsn_range: Range<Lsn>,
     751       123516 :         reconstruct_state: &mut ValueReconstructState,
     752       123516 :         ctx: &RequestContext,
     753       123516 :     ) -> anyhow::Result<ValueReconstructResult> {
     754       123516 :         let mut need_image = true;
     755       123516 :         // Scan the page versions backwards, starting from `lsn`.
     756       123516 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     757       123516 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     758       123516 :             self.index_start_blk,
     759       123516 :             self.index_root_blk,
     760       123516 :             &block_reader,
     761       123516 :         );
     762       123516 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     763       123516 : 
     764       123516 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     765       123516 : 
     766       123516 :         tree_reader
     767       123516 :             .visit(
     768       123516 :                 &search_key.0,
     769       123516 :                 VisitDirection::Backwards,
     770       123516 :                 |key, value| {
     771       118341 :                     let blob_ref = BlobRef(value);
     772       118341 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     773        46270 :                         return false;
     774        72071 :                     }
     775        72071 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     776        72071 :                     if entry_lsn < lsn_range.start {
     777            0 :                         return false;
     778        72071 :                     }
     779        72071 :                     offsets.push((entry_lsn, blob_ref.pos()));
     780        72071 : 
     781        72071 :                     !blob_ref.will_init()
     782       123516 :                 },
     783       123516 :                 &RequestContextBuilder::extend(ctx)
     784       123516 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     785       123516 :                     .build(),
     786       123516 :             )
     787        17171 :             .await?;
     788              : 
     789       123516 :         let ctx = &RequestContextBuilder::extend(ctx)
     790       123516 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     791       123516 :             .build();
     792       123516 : 
     793       123516 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     794       123516 :         let cursor = block_reader.block_cursor();
     795       123516 :         let mut buf = Vec::new();
     796       123516 :         for (entry_lsn, pos) in offsets {
     797        72071 :             cursor
     798        72071 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     799         5476 :                 .await
     800        72071 :                 .with_context(|| {
     801            0 :                     format!("Failed to read blob from virtual file {}", self.file.path)
     802        72071 :                 })?;
     803        72071 :             let val = Value::des(&buf).with_context(|| {
     804            0 :                 format!(
     805            0 :                     "Failed to deserialize file blob from virtual file {}",
     806            0 :                     self.file.path
     807            0 :                 )
     808        72071 :             })?;
     809        72071 :             match val {
     810        72071 :                 Value::Image(img) => {
     811        72071 :                     reconstruct_state.img = Some((entry_lsn, img));
     812        72071 :                     need_image = false;
     813        72071 :                     break;
     814              :                 }
     815            0 :                 Value::WalRecord(rec) => {
     816            0 :                     let will_init = rec.will_init();
     817            0 :                     reconstruct_state.records.push((entry_lsn, rec));
     818            0 :                     if will_init {
     819              :                         // This WAL record initializes the page, so no need to go further back
     820            0 :                         need_image = false;
     821            0 :                         break;
     822            0 :                     }
     823              :                 }
     824              :             }
     825              :         }
     826              : 
     827              :         // If an older page image is needed to reconstruct the page, let the
     828              :         // caller know.
     829       123516 :         if need_image {
     830        51445 :             Ok(ValueReconstructResult::Continue)
     831              :         } else {
     832        72071 :             Ok(ValueReconstructResult::Complete)
     833              :         }
     834       123516 :     }
     835              : 
     836              :     // Look up the keys in the provided keyspace and update
     837              :     // the reconstruct state with whatever is found.
     838              :     //
     839              :     // If the key is cached, go no further than the cached Lsn.
     840              :     //
     841              :     // Currently, the index is visited for each range, but this
     842              :     // can be further optimised to visit the index only once.
     843           10 :     pub(super) async fn get_values_reconstruct_data(
     844           10 :         &self,
     845           10 :         keyspace: KeySpace,
     846           10 :         lsn_range: Range<Lsn>,
     847           10 :         reconstruct_state: &mut ValuesReconstructState,
     848           10 :         ctx: &RequestContext,
     849           10 :     ) -> Result<(), GetVectoredError> {
     850           10 :         let reads = self
     851           10 :             .plan_reads(keyspace, lsn_range, reconstruct_state, ctx)
     852           10 :             .await
     853           10 :             .map_err(GetVectoredError::Other)?;
     854              : 
     855           10 :         self.do_reads_and_update_state(reads, reconstruct_state)
     856            5 :             .await;
     857              : 
     858           10 :         Ok(())
     859           10 :     }
     860              : 
     861           10 :     async fn plan_reads(
     862           10 :         &self,
     863           10 :         keyspace: KeySpace,
     864           10 :         lsn_range: Range<Lsn>,
     865           10 :         reconstruct_state: &mut ValuesReconstructState,
     866           10 :         ctx: &RequestContext,
     867           10 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     868           10 :         let mut planner = VectoredReadPlanner::new(
     869           10 :             self.max_vectored_read_bytes
     870           10 :                 .expect("Layer is loaded with max vectored bytes config")
     871           10 :                 .0
     872           10 :                 .into(),
     873           10 :         );
     874           10 : 
     875           10 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     876           10 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     877           10 :             self.index_start_blk,
     878           10 :             self.index_root_blk,
     879           10 :             block_reader,
     880           10 :         );
     881              : 
     882           10 :         for range in keyspace.ranges.iter() {
     883           10 :             let mut range_end_handled = false;
     884           10 : 
     885           10 :             let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
     886           10 :             tree_reader
     887           10 :                 .visit(
     888           10 :                     &start_key.0,
     889           10 :                     VisitDirection::Forwards,
     890          330 :                     |raw_key, value| {
     891          330 :                         let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     892          330 :                         let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
     893          330 :                         let blob_ref = BlobRef(value);
     894          330 : 
     895          330 :                         assert!(key >= range.start && lsn >= lsn_range.start);
     896              : 
     897          330 :                         let cached_lsn = reconstruct_state.get_cached_lsn(&key);
     898          330 :                         let flag = {
     899          330 :                             if cached_lsn >= Some(lsn) {
     900            0 :                                 BlobFlag::Ignore
     901          330 :                             } else if blob_ref.will_init() {
     902          330 :                                 BlobFlag::Replaces
     903              :                             } else {
     904            0 :                                 BlobFlag::None
     905              :                             }
     906              :                         };
     907              : 
     908          330 :                         if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
     909           10 :                             planner.handle_range_end(blob_ref.pos());
     910           10 :                             range_end_handled = true;
     911           10 :                             false
     912              :                         } else {
     913          320 :                             planner.handle(key, lsn, blob_ref.pos(), flag);
     914          320 :                             true
     915              :                         }
     916          330 :                     },
     917           10 :                     &RequestContextBuilder::extend(ctx)
     918           10 :                         .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     919           10 :                         .build(),
     920           10 :                 )
     921           10 :                 .await
     922           10 :                 .map_err(|err| anyhow!(err))?;
     923              : 
     924           10 :             if !range_end_handled {
     925            0 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     926            0 :                 tracing::info!("Handling range end fallback at {}", payload_end);
     927            0 :                 planner.handle_range_end(payload_end);
     928           10 :             }
     929              :         }
     930              : 
     931           10 :         Ok(planner.finish())
     932           10 :     }
     933              : 
     934           10 :     async fn do_reads_and_update_state(
     935           10 :         &self,
     936           10 :         reads: Vec<VectoredRead>,
     937           10 :         reconstruct_state: &mut ValuesReconstructState,
     938           10 :     ) {
     939           10 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     940           10 :         let mut ignore_key_with_err = None;
     941           10 : 
     942           10 :         let max_vectored_read_bytes = self
     943           10 :             .max_vectored_read_bytes
     944           10 :             .expect("Layer is loaded with max vectored bytes config")
     945           10 :             .0
     946           10 :             .into();
     947           10 :         let mut buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
     948              : 
     949              :         // Note that reads are processed in reverse order (from highest key+lsn).
     950              :         // This is the order that `ReconstructState` requires such that it can
     951              :         // track when a key is done.
     952           10 :         for read in reads.into_iter().rev() {
     953           10 :             let res = vectored_blob_reader
     954           10 :                 .read_blobs(&read, buf.take().expect("Should have a buffer"))
     955            5 :                 .await;
     956              : 
     957           10 :             let blobs_buf = match res {
     958           10 :                 Ok(blobs_buf) => blobs_buf,
     959            0 :                 Err(err) => {
     960            0 :                     let kind = err.kind();
     961            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
     962            0 :                         reconstruct_state.on_key_error(
     963            0 :                             blob_meta.key,
     964            0 :                             PageReconstructError::from(anyhow!(
     965            0 :                                 "Failed to read blobs from virtual file {}: {}",
     966            0 :                                 self.file.path,
     967            0 :                                 kind
     968            0 :                             )),
     969            0 :                         );
     970            0 :                     }
     971              : 
     972              :                     // We have "lost" the buffer since the lower level IO api
     973              :                     // doesn't return the buffer on error. Allocate a new one.
     974            0 :                     buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
     975            0 : 
     976            0 :                     continue;
     977              :                 }
     978              :             };
     979              : 
     980          320 :             for meta in blobs_buf.blobs.iter().rev() {
     981          320 :                 if Some(meta.meta.key) == ignore_key_with_err {
     982            0 :                     continue;
     983          320 :                 }
     984          320 : 
     985          320 :                 let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
     986          320 :                 let value = match value {
     987          320 :                     Ok(v) => v,
     988            0 :                     Err(e) => {
     989            0 :                         reconstruct_state.on_key_error(
     990            0 :                             meta.meta.key,
     991            0 :                             PageReconstructError::from(anyhow!(e).context(format!(
     992            0 :                                 "Failed to deserialize blob from virtual file {}",
     993            0 :                                 self.file.path,
     994            0 :                             ))),
     995            0 :                         );
     996            0 : 
     997            0 :                         ignore_key_with_err = Some(meta.meta.key);
     998            0 :                         continue;
     999              :                     }
    1000              :                 };
    1001              : 
    1002              :                 // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
    1003              :                 // state, no further updates shall be made to it. The call below will
    1004              :                 // panic if the invariant is violated.
    1005          320 :                 reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
    1006              :             }
    1007              : 
    1008           10 :             buf = Some(blobs_buf.buf);
    1009              :         }
    1010           10 :     }
    1011              : 
    1012          304 :     pub(super) async fn load_keys<'a>(
    1013          304 :         &'a self,
    1014          304 :         ctx: &RequestContext,
    1015          304 :     ) -> Result<Vec<DeltaEntry<'a>>> {
    1016          304 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1017          304 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1018          304 :             self.index_start_blk,
    1019          304 :             self.index_root_blk,
    1020          304 :             block_reader,
    1021          304 :         );
    1022          304 : 
    1023          304 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
    1024          304 : 
    1025          304 :         tree_reader
    1026          304 :             .visit(
    1027          304 :                 &[0u8; DELTA_KEY_SIZE],
    1028          304 :                 VisitDirection::Forwards,
    1029      2102008 :                 |key, value| {
    1030      2102008 :                     let delta_key = DeltaKey::from_slice(key);
    1031      2102008 :                     let val_ref = ValueRef {
    1032      2102008 :                         blob_ref: BlobRef(value),
    1033      2102008 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
    1034      2102008 :                             Adapter(self),
    1035      2102008 :                         )),
    1036      2102008 :                     };
    1037      2102008 :                     let pos = BlobRef(value).pos();
    1038      2102008 :                     if let Some(last) = all_keys.last_mut() {
    1039      2101704 :                         // subtract offset of the current and last entries to get the size
    1040      2101704 :                         // of the value associated with this (key, lsn) tuple
    1041      2101704 :                         let first_pos = last.size;
    1042      2101704 :                         last.size = pos - first_pos;
    1043      2101704 :                     }
    1044      2102008 :                     let entry = DeltaEntry {
    1045      2102008 :                         key: delta_key.key(),
    1046      2102008 :                         lsn: delta_key.lsn(),
    1047      2102008 :                         size: pos,
    1048      2102008 :                         val: val_ref,
    1049      2102008 :                     };
    1050      2102008 :                     all_keys.push(entry);
    1051      2102008 :                     true
    1052      2102008 :                 },
    1053          304 :                 &RequestContextBuilder::extend(ctx)
    1054          304 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
    1055          304 :                     .build(),
    1056          304 :             )
    1057         2197 :             .await?;
    1058          304 :         if let Some(last) = all_keys.last_mut() {
    1059          304 :             // Last key occupies all space till end of value storage,
    1060          304 :             // which corresponds to beginning of the index
    1061          304 :             last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
    1062          304 :         }
    1063          304 :         Ok(all_keys)
    1064          304 :     }
    1065              : 
    1066            4 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
    1067            4 :         println!(
    1068            4 :             "index_start_blk: {}, root {}",
    1069            4 :             self.index_start_blk, self.index_root_blk
    1070            4 :         );
    1071            4 : 
    1072            4 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
    1073            4 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
    1074            4 :             self.index_start_blk,
    1075            4 :             self.index_root_blk,
    1076            4 :             block_reader,
    1077            4 :         );
    1078            4 : 
    1079            4 :         tree_reader.dump().await?;
    1080              : 
    1081            4 :         let keys = self.load_keys(ctx).await?;
    1082              : 
    1083            8 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
    1084            8 :             let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1085            8 :             let val = Value::des(&buf)?;
    1086            8 :             let desc = match val {
    1087            8 :                 Value::Image(img) => {
    1088            8 :                     format!(" img {} bytes", img.len())
    1089              :                 }
    1090            0 :                 Value::WalRecord(rec) => {
    1091            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
    1092            0 :                     format!(
    1093            0 :                         " rec {} bytes will_init: {} {}",
    1094            0 :                         buf.len(),
    1095            0 :                         rec.will_init(),
    1096            0 :                         wal_desc
    1097            0 :                     )
    1098              :                 }
    1099              :             };
    1100            8 :             Ok(desc)
    1101            8 :         }
    1102              : 
    1103           12 :         for entry in keys {
    1104            8 :             let DeltaEntry { key, lsn, val, .. } = entry;
    1105            8 :             let desc = match dump_blob(&val, ctx).await {
    1106            8 :                 Ok(desc) => desc,
    1107            0 :                 Err(err) => {
    1108            0 :                     format!("ERROR: {err}")
    1109              :                 }
    1110              :             };
    1111            8 :             println!("  key {key} at {lsn}: {desc}");
    1112            8 : 
    1113            8 :             // Print more details about CHECKPOINT records. Would be nice to print details
    1114            8 :             // of many other record types too, but these are particularly interesting, as
    1115            8 :             // have a lot of special processing for them in walingest.rs.
    1116            8 :             use pageserver_api::key::CHECKPOINT_KEY;
    1117            8 :             use postgres_ffi::CheckPoint;
    1118            8 :             if key == CHECKPOINT_KEY {
    1119            0 :                 let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
    1120            0 :                 let val = Value::des(&buf)?;
    1121            0 :                 match val {
    1122            0 :                     Value::Image(img) => {
    1123            0 :                         let checkpoint = CheckPoint::decode(&img)?;
    1124            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
    1125              :                     }
    1126            0 :                     Value::WalRecord(_rec) => {
    1127            0 :                         println!("   unexpected walrecord value for checkpoint key");
    1128            0 :                     }
    1129              :                 }
    1130            8 :             }
    1131              :         }
    1132              : 
    1133            4 :         Ok(())
    1134            4 :     }
    1135              : }
    1136              : 
    1137              : /// A set of data associated with a delta layer key and its value
    1138              : pub struct DeltaEntry<'a> {
    1139              :     pub key: Key,
    1140              :     pub lsn: Lsn,
    1141              :     /// Size of the stored value
    1142              :     pub size: u64,
    1143              :     /// Reference to the on-disk value
    1144              :     pub val: ValueRef<'a>,
    1145              : }
    1146              : 
    1147              : /// Reference to an on-disk value
    1148              : pub struct ValueRef<'a> {
    1149              :     blob_ref: BlobRef,
    1150              :     reader: BlockCursor<'a>,
    1151              : }
    1152              : 
    1153              : impl<'a> ValueRef<'a> {
    1154              :     /// Loads the value from disk
    1155      2102000 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
    1156              :         // theoretically we *could* record an access time for each, but it does not really matter
    1157      2102000 :         let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
    1158      2102000 :         let val = Value::des(&buf)?;
    1159      2102000 :         Ok(val)
    1160      2102000 :     }
    1161              : }
    1162              : 
    1163              : pub(crate) struct Adapter<T>(T);
    1164              : 
    1165              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1166      2121326 :     pub(crate) async fn read_blk(
    1167      2121326 :         &self,
    1168      2121326 :         blknum: u32,
    1169      2121326 :         ctx: &RequestContext,
    1170      2121326 :     ) -> Result<BlockLease, std::io::Error> {
    1171      2121326 :         let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
    1172      2121326 :         block_reader.read_blk(blknum, ctx).await
    1173      2121326 :     }
    1174              : }
    1175              : 
    1176              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
    1177      4242652 :     fn as_ref(&self) -> &DeltaLayerInner {
    1178      4242652 :         self
    1179      4242652 :     }
    1180              : }
    1181              : 
    1182              : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
    1183            0 :     fn key(&self) -> Key {
    1184            0 :         self.key
    1185            0 :     }
    1186            0 :     fn lsn(&self) -> Lsn {
    1187            0 :         self.lsn
    1188            0 :     }
    1189            0 :     fn size(&self) -> u64 {
    1190            0 :         self.size
    1191            0 :     }
    1192              : }
        

Generated by: LCOV version 2.1-beta