LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 85.7 % 652 559
Test Date: 2023-09-06 10:18:01 Functions: 75.5 % 106 80

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "index", and
      24              : //! "values". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::RequestContext;
      32              : use crate::page_cache::PAGE_SZ;
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::WriteBlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37              : use crate::tenant::storage_layer::{
      38              :     PersistentLayer, ValueReconstructResult, ValueReconstructState,
      39              : };
      40              : use crate::virtual_file::VirtualFile;
      41              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      42              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      43              : use anyhow::{bail, ensure, Context, Result};
      44              : use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
      45              : use rand::{distributions::Alphanumeric, Rng};
      46              : use serde::{Deserialize, Serialize};
      47              : use std::fs::{self, File};
      48              : use std::io::SeekFrom;
      49              : use std::io::{BufWriter, Write};
      50              : use std::ops::Range;
      51              : use std::os::unix::fs::FileExt;
      52              : use std::path::{Path, PathBuf};
      53              : use std::sync::Arc;
      54              : use tokio::sync::OnceCell;
      55              : use tracing::*;
      56              : 
      57              : use utils::{
      58              :     bin_ser::BeSer,
      59              :     id::{TenantId, TimelineId},
      60              :     lsn::Lsn,
      61              : };
      62              : 
      63              : use super::{
      64              :     AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf,
      65              :     PersistentLayerDesc,
      66              : };
      67              : 
      68              : ///
      69              : /// Header stored in the beginning of the file
      70              : ///
      71              : /// After this comes the 'values' part, starting on block 1. After that,
      72              : /// the 'index' starts at the block indicated by 'index_start_blk'
      73              : ///
      74        15508 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      75              : pub struct Summary {
      76              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      77              :     magic: u16,
      78              :     format_version: u16,
      79              : 
      80              :     tenant_id: TenantId,
      81              :     timeline_id: TimelineId,
      82              :     key_range: Range<Key>,
      83              :     lsn_range: Range<Lsn>,
      84              : 
      85              :     /// Block number where the 'index' part of the file begins.
      86              :     pub index_start_blk: u32,
      87              :     /// Block within the 'index', where the B-tree root page is stored
      88              :     pub index_root_blk: u32,
      89              : }
      90              : 
      91              : impl From<&DeltaLayer> for Summary {
      92        10578 :     fn from(layer: &DeltaLayer) -> Self {
      93        10578 :         Self::expected(
      94        10578 :             layer.desc.tenant_id,
      95        10578 :             layer.desc.timeline_id,
      96        10578 :             layer.desc.key_range.clone(),
      97        10578 :             layer.desc.lsn_range.clone(),
      98        10578 :         )
      99        10578 :     }
     100              : }
     101              : 
     102              : impl Summary {
     103        10578 :     pub(super) fn expected(
     104        10578 :         tenant_id: TenantId,
     105        10578 :         timeline_id: TimelineId,
     106        10578 :         keys: Range<Key>,
     107        10578 :         lsns: Range<Lsn>,
     108        10578 :     ) -> Self {
     109        10578 :         Self {
     110        10578 :             magic: DELTA_FILE_MAGIC,
     111        10578 :             format_version: STORAGE_FORMAT_VERSION,
     112        10578 : 
     113        10578 :             tenant_id,
     114        10578 :             timeline_id,
     115        10578 :             key_range: keys,
     116        10578 :             lsn_range: lsns,
     117        10578 : 
     118        10578 :             index_start_blk: 0,
     119        10578 :             index_root_blk: 0,
     120        10578 :         }
     121        10578 :     }
     122              : }
     123              : 
     124              : // Flag indicating that this version initialize the page
     125              : const WILL_INIT: u64 = 1;
     126              : 
     127              : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     128              : /// offset, and for WAL records it also contains `will_init` flag. The flag
     129              : /// helps to determine the range of records that needs to be applied, without
     130              : /// reading/deserializing records themselves.
     131            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     132              : pub struct BlobRef(pub u64);
     133              : 
     134              : impl BlobRef {
     135    177283003 :     pub fn will_init(&self) -> bool {
     136    177283003 :         (self.0 & WILL_INIT) != 0
     137    177283003 :     }
     138              : 
     139    256231626 :     pub fn pos(&self) -> u64 {
     140    256231626 :         self.0 >> 1
     141    256231626 :     }
     142              : 
     143     91515920 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     144     91515920 :         let mut blob_ref = pos << 1;
     145     91515920 :         if will_init {
     146      8798982 :             blob_ref |= WILL_INIT;
     147     82716938 :         }
     148     91515920 :         BlobRef(blob_ref)
     149     91515920 :     }
     150              : }
     151              : 
     152              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     153              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     154              : 
     155              : /// This is the key of the B-tree index stored in the delta layer. It consists
     156              : /// of the serialized representation of a Key and LSN.
     157              : impl DeltaKey {
     158     46455422 :     fn from_slice(buf: &[u8]) -> Self {
     159     46455422 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     160     46455422 :         bytes.copy_from_slice(buf);
     161     46455422 :         DeltaKey(bytes)
     162     46455422 :     }
     163              : 
     164    136552371 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     165    136552371 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     166    136552371 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     167    136552371 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     168    136552371 :         DeltaKey(bytes)
     169    136552371 :     }
     170              : 
     171     46455422 :     fn key(&self) -> Key {
     172     46455422 :         Key::from_slice(&self.0)
     173     46455422 :     }
     174              : 
     175     46455422 :     fn lsn(&self) -> Lsn {
     176     46455422 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     177     46455422 :     }
     178              : 
     179    178427717 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     180    178427717 :         let mut lsn_buf = [0u8; 8];
     181    178427717 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     182    178427717 :         Lsn(u64::from_be_bytes(lsn_buf))
     183    178427717 :     }
     184              : }
     185              : 
     186              : /// DeltaLayer is the in-memory data structure associated with an on-disk delta
     187              : /// file.
     188              : ///
     189              : /// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
     190              : /// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
     191              : /// Otherwise the struct is just a placeholder for a file that exists on disk,
     192              : /// and it needs to be loaded before using it in queries.
     193              : pub struct DeltaLayer {
     194              :     path_or_conf: PathOrConf,
     195              : 
     196              :     pub desc: PersistentLayerDesc,
     197              : 
     198              :     access_stats: LayerAccessStats,
     199              : 
     200              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     201              : }
     202              : 
     203              : impl std::fmt::Debug for DeltaLayer {
     204            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     205            0 :         use super::RangeDisplayDebug;
     206            0 : 
     207            0 :         f.debug_struct("DeltaLayer")
     208            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     209            0 :             .field("lsn_range", &self.desc.lsn_range)
     210            0 :             .field("file_size", &self.desc.file_size)
     211            0 :             .field("inner", &self.inner)
     212            0 :             .finish()
     213            0 :     }
     214              : }
     215              : 
     216              : pub struct DeltaLayerInner {
     217              :     // values copied from summary
     218              :     index_start_blk: u32,
     219              :     index_root_blk: u32,
     220              : 
     221              :     /// Reader object for reading blocks from the file.
     222              :     file: FileBlockReader,
     223              : }
     224              : 
     225              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
     226     33813857 :     fn as_ref(&self) -> &DeltaLayerInner {
     227     33813857 :         self
     228     33813857 :     }
     229              : }
     230              : 
     231              : impl std::fmt::Debug for DeltaLayerInner {
     232            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     233            0 :         f.debug_struct("DeltaLayerInner")
     234            0 :             .field("index_start_blk", &self.index_start_blk)
     235            0 :             .field("index_root_blk", &self.index_root_blk)
     236            0 :             .finish()
     237            0 :     }
     238              : }
     239              : 
     240              : #[async_trait::async_trait]
     241              : impl Layer for DeltaLayer {
     242     45036462 :     async fn get_value_reconstruct_data(
     243     45036462 :         &self,
     244     45036462 :         key: Key,
     245     45036462 :         lsn_range: Range<Lsn>,
     246     45036462 :         reconstruct_state: &mut ValueReconstructState,
     247     45036462 :         ctx: &RequestContext,
     248     45036482 :     ) -> anyhow::Result<ValueReconstructResult> {
     249     45036482 :         self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
     250      1369957 :             .await
     251     90072961 :     }
     252              : }
     253              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     254              : impl std::fmt::Display for DeltaLayer {
     255         5156 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     256         5156 :         write!(f, "{}", self.layer_desc().short_id())
     257         5156 :     }
     258              : }
     259              : 
     260              : impl AsLayerDesc for DeltaLayer {
     261       152073 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     262       152073 :         &self.desc
     263       152073 :     }
     264              : }
     265              : 
     266              : impl PersistentLayer for DeltaLayer {
     267         7794 :     fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
     268         7794 :         Some(self)
     269         7794 :     }
     270              : 
     271          203 :     fn local_path(&self) -> Option<PathBuf> {
     272          203 :         self.local_path()
     273          203 :     }
     274              : 
     275         5257 :     fn delete_resident_layer_file(&self) -> Result<()> {
     276         5257 :         self.delete_resident_layer_file()
     277         5257 :     }
     278              : 
     279         1867 :     fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
     280         1867 :         self.info(reset)
     281         1867 :     }
     282              : 
     283         1440 :     fn access_stats(&self) -> &LayerAccessStats {
     284         1440 :         self.access_stats()
     285         1440 :     }
     286              : }
     287              : 
     288              : impl DeltaLayer {
     289            4 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     290            4 :         println!(
     291            4 :             "----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
     292            4 :             self.desc.tenant_id,
     293            4 :             self.desc.timeline_id,
     294            4 :             self.desc.key_range.start,
     295            4 :             self.desc.key_range.end,
     296            4 :             self.desc.lsn_range.start,
     297            4 :             self.desc.lsn_range.end,
     298            4 :             self.desc.file_size,
     299            4 :         );
     300            4 : 
     301            4 :         if !verbose {
     302            2 :             return Ok(());
     303            2 :         }
     304              : 
     305            2 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     306              : 
     307            2 :         println!(
     308            2 :             "index_start_blk: {}, root {}",
     309            2 :             inner.index_start_blk, inner.index_root_blk
     310            2 :         );
     311            2 : 
     312            2 :         let file = &inner.file;
     313            2 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     314            2 :             inner.index_start_blk,
     315            2 :             inner.index_root_blk,
     316            2 :             file,
     317            2 :         );
     318            2 : 
     319            2 :         tree_reader.dump().await?;
     320              : 
     321            2 :         let keys = DeltaLayerInner::load_keys(&inner).await?;
     322              : 
     323              :         // A subroutine to dump a single blob
     324            4 :         async fn dump_blob(val: ValueRef<'_>) -> Result<String> {
     325            4 :             let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
     326            4 :             let val = Value::des(&buf)?;
     327            4 :             let desc = match val {
     328            4 :                 Value::Image(img) => {
     329            4 :                     format!(" img {} bytes", img.len())
     330              :                 }
     331            0 :                 Value::WalRecord(rec) => {
     332            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
     333            0 :                     format!(
     334            0 :                         " rec {} bytes will_init: {} {}",
     335            0 :                         buf.len(),
     336            0 :                         rec.will_init(),
     337            0 :                         wal_desc
     338            0 :                     )
     339              :                 }
     340              :             };
     341            4 :             Ok(desc)
     342            4 :         }
     343              : 
     344            6 :         for entry in keys {
     345            4 :             let DeltaEntry { key, lsn, val, .. } = entry;
     346            4 :             let desc = match dump_blob(val).await {
     347            4 :                 Ok(desc) => desc,
     348            0 :                 Err(err) => {
     349            0 :                     let err: anyhow::Error = err;
     350            0 :                     format!("ERROR: {err}")
     351              :                 }
     352              :             };
     353            4 :             println!("  key {key} at {lsn}: {desc}");
     354              :         }
     355              : 
     356            2 :         Ok(())
     357            4 :     }
     358              : 
     359     45036462 :     pub(crate) async fn get_value_reconstruct_data(
     360     45036462 :         &self,
     361     45036462 :         key: Key,
     362     45036462 :         lsn_range: Range<Lsn>,
     363     45036462 :         reconstruct_state: &mut ValueReconstructState,
     364     45036462 :         ctx: &RequestContext,
     365     45036482 :     ) -> anyhow::Result<ValueReconstructResult> {
     366     45036482 :         ensure!(lsn_range.start >= self.desc.lsn_range.start);
     367              : 
     368     45036482 :         ensure!(self.desc.key_range.contains(&key));
     369              : 
     370     45036482 :         let inner = self
     371     45036482 :             .load(LayerAccessKind::GetValueReconstructData, ctx)
     372          368 :             .await?;
     373     45036471 :         inner
     374     45036471 :             .get_value_reconstruct_data(key, lsn_range, reconstruct_state)
     375      1369589 :             .await
     376     45036478 :     }
     377              : 
     378          203 :     pub(crate) fn local_path(&self) -> Option<PathBuf> {
     379          203 :         Some(self.path())
     380          203 :     }
     381              : 
     382              :     pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
     383              :         // delete underlying file
     384         5257 :         fs::remove_file(self.path())?;
     385         5257 :         Ok(())
     386         5257 :     }
     387              : 
     388         1867 :     pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
     389         1867 :         let layer_file_name = self.layer_desc().filename().file_name();
     390         1867 :         let lsn_range = self.layer_desc().lsn_range.clone();
     391         1867 : 
     392         1867 :         let access_stats = self.access_stats.as_api_model(reset);
     393         1867 : 
     394         1867 :         HistoricLayerInfo::Delta {
     395         1867 :             layer_file_name,
     396         1867 :             layer_file_size: self.desc.file_size,
     397         1867 :             lsn_start: lsn_range.start,
     398         1867 :             lsn_end: lsn_range.end,
     399         1867 :             remote: false,
     400         1867 :             access_stats,
     401         1867 :         }
     402         1867 :     }
     403              : 
     404        18957 :     pub(crate) fn access_stats(&self) -> &LayerAccessStats {
     405        18957 :         &self.access_stats
     406        18957 :     }
     407              : 
     408        57812 :     fn path_for(
     409        57812 :         path_or_conf: &PathOrConf,
     410        57812 :         tenant_id: &TenantId,
     411        57812 :         timeline_id: &TimelineId,
     412        57812 :         fname: &DeltaFileName,
     413        57812 :     ) -> PathBuf {
     414        57812 :         match path_or_conf {
     415            0 :             PathOrConf::Path(path) => path.clone(),
     416        57812 :             PathOrConf::Conf(conf) => conf
     417        57812 :                 .timeline_path(tenant_id, timeline_id)
     418        57812 :                 .join(fname.to_string()),
     419              :         }
     420        57812 :     }
     421              : 
     422        15514 :     fn temp_path_for(
     423        15514 :         conf: &PageServerConf,
     424        15514 :         tenant_id: &TenantId,
     425        15514 :         timeline_id: &TimelineId,
     426        15514 :         key_start: Key,
     427        15514 :         lsn_range: &Range<Lsn>,
     428        15514 :     ) -> PathBuf {
     429        15514 :         let rand_string: String = rand::thread_rng()
     430        15514 :             .sample_iter(&Alphanumeric)
     431        15514 :             .take(8)
     432        15514 :             .map(char::from)
     433        15514 :             .collect();
     434        15514 : 
     435        15514 :         conf.timeline_path(tenant_id, timeline_id).join(format!(
     436        15514 :             "{}-XXX__{:016X}-{:016X}.{}.{}",
     437        15514 :             key_start,
     438        15514 :             u64::from(lsn_range.start),
     439        15514 :             u64::from(lsn_range.end),
     440        15514 :             rand_string,
     441        15514 :             TEMP_FILE_SUFFIX,
     442        15514 :         ))
     443        15514 :     }
     444              : 
     445              :     ///
     446              :     /// Open the underlying file and read the metadata into memory, if it's
     447              :     /// not loaded already.
     448              :     ///
     449     45041417 :     async fn load(
     450     45041417 :         &self,
     451     45041417 :         access_kind: LayerAccessKind,
     452     45041417 :         ctx: &RequestContext,
     453     45041437 :     ) -> Result<&Arc<DeltaLayerInner>> {
     454     45041437 :         self.access_stats.record_access(access_kind, ctx);
     455     45041437 :         // Quick exit if already loaded
     456     45041437 :         self.inner
     457     45041437 :             .get_or_try_init(|| self.load_inner())
     458          368 :             .await
     459     45041437 :             .with_context(|| format!("Failed to load delta layer {}", self.path().display()))
     460     45041437 :     }
     461              : 
     462        10578 :     async fn load_inner(&self) -> Result<Arc<DeltaLayerInner>> {
     463        10578 :         let path = self.path();
     464              : 
     465        10578 :         let summary = match &self.path_or_conf {
     466        10578 :             PathOrConf::Conf(_) => Some(Summary::from(self)),
     467            0 :             PathOrConf::Path(_) => None,
     468              :         };
     469              : 
     470        10578 :         let loaded = DeltaLayerInner::load(&path, summary).await?;
     471              : 
     472        10567 :         if let PathOrConf::Path(ref path) = self.path_or_conf {
     473              :             // not production code
     474              : 
     475            0 :             let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
     476            0 :             let expected_filename = self.filename().file_name();
     477            0 : 
     478            0 :             if actual_filename != expected_filename {
     479            0 :                 println!("warning: filename does not match what is expected from in-file summary");
     480            0 :                 println!("actual: {:?}", actual_filename);
     481            0 :                 println!("expected: {:?}", expected_filename);
     482            0 :             }
     483        10567 :         }
     484              : 
     485        10567 :         Ok(Arc::new(loaded))
     486        10578 :     }
     487              : 
     488              :     /// Create a DeltaLayer struct representing an existing file on disk.
     489         4113 :     pub fn new(
     490         4113 :         conf: &'static PageServerConf,
     491         4113 :         timeline_id: TimelineId,
     492         4113 :         tenant_id: TenantId,
     493         4113 :         filename: &DeltaFileName,
     494         4113 :         file_size: u64,
     495         4113 :         access_stats: LayerAccessStats,
     496         4113 :     ) -> DeltaLayer {
     497         4113 :         DeltaLayer {
     498         4113 :             path_or_conf: PathOrConf::Conf(conf),
     499         4113 :             desc: PersistentLayerDesc::new_delta(
     500         4113 :                 tenant_id,
     501         4113 :                 timeline_id,
     502         4113 :                 filename.key_range.clone(),
     503         4113 :                 filename.lsn_range.clone(),
     504         4113 :                 file_size,
     505         4113 :             ),
     506         4113 :             access_stats,
     507         4113 :             inner: OnceCell::new(),
     508         4113 :         }
     509         4113 :     }
     510              : 
     511              :     /// Create a DeltaLayer struct representing an existing file on disk.
     512              :     ///
     513              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     514            0 :     pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
     515            0 :         let mut summary_buf = Vec::new();
     516            0 :         summary_buf.resize(PAGE_SZ, 0);
     517            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     518            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     519              : 
     520            0 :         let metadata = file
     521            0 :             .metadata()
     522            0 :             .context("get file metadata to determine size")?;
     523              : 
     524            0 :         Ok(DeltaLayer {
     525            0 :             path_or_conf: PathOrConf::Path(path.to_path_buf()),
     526            0 :             desc: PersistentLayerDesc::new_delta(
     527            0 :                 summary.tenant_id,
     528            0 :                 summary.timeline_id,
     529            0 :                 summary.key_range,
     530            0 :                 summary.lsn_range,
     531            0 :                 metadata.len(),
     532            0 :             ),
     533            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     534            0 :             inner: OnceCell::new(),
     535            0 :         })
     536            0 :     }
     537              : 
     538        42304 :     fn layer_name(&self) -> DeltaFileName {
     539        42304 :         self.desc.delta_file_name()
     540        42304 :     }
     541              :     /// Path to the layer file in pageserver workdir.
     542        42304 :     pub fn path(&self) -> PathBuf {
     543        42304 :         Self::path_for(
     544        42304 :             &self.path_or_conf,
     545        42304 :             &self.desc.tenant_id,
     546        42304 :             &self.desc.timeline_id,
     547        42304 :             &self.layer_name(),
     548        42304 :         )
     549        42304 :     }
     550              :     /// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
     551              :     ///
     552              :     /// The value can be obtained via the [`ValueRef::load`] function.
     553         4953 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
     554         4953 :         let inner = self
     555         4953 :             .load(LayerAccessKind::KeyIter, ctx)
     556            0 :             .await
     557         4953 :             .context("load delta layer keys")?;
     558         4953 :         DeltaLayerInner::load_keys(inner)
     559         1285 :             .await
     560         4953 :             .context("Layer index is corrupted")
     561         4953 :     }
     562              : }
     563              : 
     564              : /// A builder object for constructing a new delta layer.
     565              : ///
     566              : /// Usage:
     567              : ///
     568              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     569              : ///
     570              : /// 2. Write the contents by calling `put_value` for every page
     571              : ///    version to store in the layer.
     572              : ///
     573              : /// 3. Call `finish`.
     574              : ///
     575              : struct DeltaLayerWriterInner {
     576              :     conf: &'static PageServerConf,
     577              :     pub path: PathBuf,
     578              :     timeline_id: TimelineId,
     579              :     tenant_id: TenantId,
     580              : 
     581              :     key_start: Key,
     582              :     lsn_range: Range<Lsn>,
     583              : 
     584              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     585              : 
     586              :     blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
     587              : }
     588              : 
     589              : impl DeltaLayerWriterInner {
     590              :     ///
     591              :     /// Start building a new delta layer.
     592              :     ///
     593        15514 :     async fn new(
     594        15514 :         conf: &'static PageServerConf,
     595        15514 :         timeline_id: TimelineId,
     596        15514 :         tenant_id: TenantId,
     597        15514 :         key_start: Key,
     598        15514 :         lsn_range: Range<Lsn>,
     599        15514 :     ) -> anyhow::Result<Self> {
     600        15514 :         // Create the file initially with a temporary filename. We don't know
     601        15514 :         // the end key yet, so we cannot form the final filename yet. We will
     602        15514 :         // rename it when we're done.
     603        15514 :         //
     604        15514 :         // Note: This overwrites any existing file. There shouldn't be any.
     605        15514 :         // FIXME: throw an error instead?
     606        15514 :         let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);
     607              : 
     608        15514 :         let mut file = VirtualFile::create(&path)?;
     609              :         // make room for the header block
     610        15514 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     611        15514 :         let buf_writer = BufWriter::new(file);
     612        15514 :         let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
     613        15514 : 
     614        15514 :         // Initialize the b-tree index builder
     615        15514 :         let block_buf = BlockBuf::new();
     616        15514 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     617        15514 : 
     618        15514 :         Ok(Self {
     619        15514 :             conf,
     620        15514 :             path,
     621        15514 :             timeline_id,
     622        15514 :             tenant_id,
     623        15514 :             key_start,
     624        15514 :             lsn_range,
     625        15514 :             tree: tree_builder,
     626        15514 :             blob_writer,
     627        15514 :         })
     628        15514 :     }
     629              : 
     630              :     ///
     631              :     /// Append a key-value pair to the file.
     632              :     ///
     633              :     /// The values must be appended in key, lsn order.
     634              :     ///
     635     32493194 :     async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     636     32493264 :         self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
     637            0 :             .await
     638     32493262 :     }
     639              : 
     640     91515921 :     async fn put_value_bytes(
     641     91515921 :         &mut self,
     642     91515921 :         key: Key,
     643     91515921 :         lsn: Lsn,
     644     91515921 :         val: &[u8],
     645     91515921 :         will_init: bool,
     646     91516026 :     ) -> anyhow::Result<()> {
     647     91516026 :         assert!(self.lsn_range.start <= lsn);
     648              : 
     649     91516027 :         let off = self.blob_writer.write_blob(val).await?;
     650              : 
     651     91516027 :         let blob_ref = BlobRef::new(off, will_init);
     652     91516027 : 
     653     91516027 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     654     91516027 :         self.tree.append(&delta_key.0, blob_ref.0)?;
     655              : 
     656     91516027 :         Ok(())
     657     91516027 :     }
     658              : 
     659      1751918 :     fn size(&self) -> u64 {
     660      1751918 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     661      1751918 :     }
     662              : 
     663              :     ///
     664              :     /// Finish writing the delta layer.
     665              :     ///
     666        15508 :     async fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
     667        15508 :         let index_start_blk =
     668        15508 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     669        15508 : 
     670        15508 :         let buf_writer = self.blob_writer.into_inner();
     671        15508 :         let mut file = buf_writer.into_inner()?;
     672              : 
     673              :         // Write out the index
     674        15508 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     675        15508 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     676            0 :             .await?;
     677       207575 :         for buf in block_buf.blocks {
     678       192067 :             file.write_all(buf.as_ref())?;
     679              :         }
     680        15508 :         assert!(self.lsn_range.start < self.lsn_range.end);
     681              :         // Fill in the summary on blk 0
     682        15508 :         let summary = Summary {
     683        15508 :             magic: DELTA_FILE_MAGIC,
     684        15508 :             format_version: STORAGE_FORMAT_VERSION,
     685        15508 :             tenant_id: self.tenant_id,
     686        15508 :             timeline_id: self.timeline_id,
     687        15508 :             key_range: self.key_start..key_end,
     688        15508 :             lsn_range: self.lsn_range.clone(),
     689        15508 :             index_start_blk,
     690        15508 :             index_root_blk,
     691        15508 :         };
     692        15508 : 
     693        15508 :         let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
     694        15508 :         Summary::ser_into(&summary, &mut buf)?;
     695        15508 :         if buf.spilled() {
     696              :             // This is bad as we only have one free block for the summary
     697            0 :             warn!(
     698            0 :                 "Used more than one page size for summary buffer: {}",
     699            0 :                 buf.len()
     700            0 :             );
     701        15508 :         }
     702        15508 :         file.seek(SeekFrom::Start(0)).await?;
     703        15508 :         file.write_all(&buf)?;
     704              : 
     705        15508 :         let metadata = file
     706        15508 :             .metadata()
     707            0 :             .await
     708        15508 :             .context("get file metadata to determine size")?;
     709              : 
     710              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     711              :         // Make it a little bit below to account for differing GB units
     712              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     713              :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     714        15508 :         ensure!(
     715        15508 :             metadata.len() <= S3_UPLOAD_LIMIT,
     716            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     717            0 :             file.path.display(),
     718            0 :             metadata.len()
     719              :         );
     720              : 
     721              :         // Note: Because we opened the file in write-only mode, we cannot
     722              :         // reuse the same VirtualFile for reading later. That's why we don't
     723              :         // set inner.file here. The first read will have to re-open it.
     724        15508 :         let layer = DeltaLayer {
     725        15508 :             path_or_conf: PathOrConf::Conf(self.conf),
     726        15508 :             desc: PersistentLayerDesc::new_delta(
     727        15508 :                 self.tenant_id,
     728        15508 :                 self.timeline_id,
     729        15508 :                 self.key_start..key_end,
     730        15508 :                 self.lsn_range.clone(),
     731        15508 :                 metadata.len(),
     732        15508 :             ),
     733        15508 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     734        15508 :             inner: OnceCell::new(),
     735        15508 :         };
     736        15508 : 
     737        15508 :         // fsync the file
     738        15508 :         file.sync_all()?;
     739              :         // Rename the file to its final name
     740              :         //
     741              :         // Note: This overwrites any existing file. There shouldn't be any.
     742              :         // FIXME: throw an error instead?
     743        15508 :         let final_path = DeltaLayer::path_for(
     744        15508 :             &PathOrConf::Conf(self.conf),
     745        15508 :             &self.tenant_id,
     746        15508 :             &self.timeline_id,
     747        15508 :             &DeltaFileName {
     748        15508 :                 key_range: self.key_start..key_end,
     749        15508 :                 lsn_range: self.lsn_range,
     750        15508 :             },
     751        15508 :         );
     752        15508 :         std::fs::rename(self.path, &final_path)?;
     753              : 
     754            0 :         trace!("created delta layer {}", final_path.display());
     755              : 
     756        15508 :         Ok(layer)
     757        15508 :     }
     758              : }
     759              : 
     760              : /// A builder object for constructing a new delta layer.
     761              : ///
     762              : /// Usage:
     763              : ///
     764              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     765              : ///
     766              : /// 2. Write the contents by calling `put_value` for every page
     767              : ///    version to store in the layer.
     768              : ///
     769              : /// 3. Call `finish`.
     770              : ///
     771              : /// # Note
     772              : ///
     773              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     774              : /// possible for the writer to drop before `finish` is actually called. So this
     775              : /// could lead to odd temporary files in the directory, exhausting file system.
     776              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     777              : /// implementation that cleans up the temporary file in failure. It's not
     778              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     779              : /// out some fields, making it impossible to implement `Drop`.
     780              : ///
     781              : #[must_use]
     782              : pub struct DeltaLayerWriter {
     783              :     inner: Option<DeltaLayerWriterInner>,
     784              : }
     785              : 
     786              : impl DeltaLayerWriter {
     787              :     ///
     788              :     /// Start building a new delta layer.
     789              :     ///
     790        15514 :     pub async fn new(
     791        15514 :         conf: &'static PageServerConf,
     792        15514 :         timeline_id: TimelineId,
     793        15514 :         tenant_id: TenantId,
     794        15514 :         key_start: Key,
     795        15514 :         lsn_range: Range<Lsn>,
     796        15514 :     ) -> anyhow::Result<Self> {
     797              :         Ok(Self {
     798              :             inner: Some(
     799        15514 :                 DeltaLayerWriterInner::new(conf, timeline_id, tenant_id, key_start, lsn_range)
     800            0 :                     .await?,
     801              :             ),
     802              :         })
     803        15514 :     }
     804              : 
     805              :     ///
     806              :     /// Append a key-value pair to the file.
     807              :     ///
     808              :     /// The values must be appended in key, lsn order.
     809              :     ///
     810     32493194 :     pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     811     32493266 :         self.inner.as_mut().unwrap().put_value(key, lsn, val).await
     812     32493264 :     }
     813              : 
     814     59022728 :     pub async fn put_value_bytes(
     815     59022728 :         &mut self,
     816     59022728 :         key: Key,
     817     59022728 :         lsn: Lsn,
     818     59022728 :         val: &[u8],
     819     59022728 :         will_init: bool,
     820     59022728 :     ) -> anyhow::Result<()> {
     821     59022764 :         self.inner
     822     59022764 :             .as_mut()
     823     59022764 :             .unwrap()
     824     59022764 :             .put_value_bytes(key, lsn, val, will_init)
     825            0 :             .await
     826     59022763 :     }
     827              : 
     828      1751918 :     pub fn size(&self) -> u64 {
     829      1751918 :         self.inner.as_ref().unwrap().size()
     830      1751918 :     }
     831              : 
     832              :     ///
     833              :     /// Finish writing the delta layer.
     834              :     ///
     835        15508 :     pub async fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
     836        15508 :         self.inner.take().unwrap().finish(key_end).await
     837        15508 :     }
     838              : }
     839              : 
     840              : impl Drop for DeltaLayerWriter {
     841              :     fn drop(&mut self) {
     842        15508 :         if let Some(inner) = self.inner.take() {
     843            0 :             match inner.blob_writer.into_inner().into_inner() {
     844            0 :                 Ok(vfile) => vfile.remove(),
     845            0 :                 Err(err) => warn!(
     846            0 :                     "error while flushing buffer of image layer temporary file: {}",
     847            0 :                     err
     848            0 :                 ),
     849              :             }
     850        15508 :         }
     851        15508 :     }
     852              : }
     853              : 
     854              : impl DeltaLayerInner {
     855        10578 :     pub(super) async fn load(
     856        10578 :         path: &std::path::Path,
     857        10578 :         summary: Option<Summary>,
     858        10578 :     ) -> anyhow::Result<Self> {
     859        10578 :         let file = VirtualFile::open(path)
     860        10578 :             .with_context(|| format!("Failed to open file '{}'", path.display()))?;
     861        10578 :         let file = FileBlockReader::new(file);
     862              : 
     863        10578 :         let summary_blk = file.read_blk(0).await?;
     864        10567 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
     865              : 
     866        10567 :         if let Some(mut expected_summary) = summary {
     867              :             // production code path
     868        10567 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     869        10567 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     870        10567 :             if actual_summary != expected_summary {
     871            0 :                 bail!(
     872            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     873            0 :                     actual_summary,
     874            0 :                     expected_summary
     875            0 :                 );
     876        10567 :             }
     877            0 :         }
     878              : 
     879        10567 :         Ok(DeltaLayerInner {
     880        10567 :             file,
     881        10567 :             index_start_blk: actual_summary.index_start_blk,
     882        10567 :             index_root_blk: actual_summary.index_root_blk,
     883        10567 :         })
     884        10578 :     }
     885              : 
     886     45036451 :     pub(super) async fn get_value_reconstruct_data(
     887     45036451 :         &self,
     888     45036451 :         key: Key,
     889     45036451 :         lsn_range: Range<Lsn>,
     890     45036451 :         reconstruct_state: &mut ValueReconstructState,
     891     45036471 :     ) -> anyhow::Result<ValueReconstructResult> {
     892     45036471 :         let mut need_image = true;
     893     45036471 :         // Scan the page versions backwards, starting from `lsn`.
     894     45036471 :         let file = &self.file;
     895     45036471 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     896     45036471 :             self.index_start_blk,
     897     45036471 :             self.index_root_blk,
     898     45036471 :             file,
     899     45036471 :         );
     900     45036471 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     901     45036471 : 
     902     45036471 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     903     45036471 : 
     904     45036471 :         tree_reader
     905    203946202 :             .visit(&search_key.0, VisitDirection::Backwards, |key, value| {
     906    203946202 :                 let blob_ref = BlobRef(value);
     907    203946202 :                 if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     908     25518103 :                     return false;
     909    178428099 :                 }
     910    178428099 :                 let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     911    178428099 :                 if entry_lsn < lsn_range.start {
     912      1144715 :                     return false;
     913    177283384 :                 }
     914    177283384 :                 offsets.push((entry_lsn, blob_ref.pos()));
     915    177283384 : 
     916    177283384 :                 !blob_ref.will_init()
     917    203946202 :             })
     918       311457 :             .await?;
     919              : 
     920              :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     921     45036470 :         let cursor = file.block_cursor();
     922     45036470 :         let mut buf = Vec::new();
     923    219437037 :         for (entry_lsn, pos) in offsets {
     924    177283037 :             cursor
     925    177283037 :                 .read_blob_into_buf(pos, &mut buf)
     926      1058132 :                 .await
     927    177283035 :                 .with_context(|| {
     928            0 :                     format!(
     929            0 :                         "Failed to read blob from virtual file {}",
     930            0 :                         file.file.path.display()
     931            0 :                     )
     932    177283035 :                 })?;
     933    177283035 :             let val = Value::des(&buf).with_context(|| {
     934            0 :                 format!(
     935            0 :                     "Failed to deserialize file blob from virtual file {}",
     936            0 :                     file.file.path.display()
     937            0 :                 )
     938    177283035 :             })?;
     939    177283035 :             match val {
     940      1455205 :                 Value::Image(img) => {
     941      1455205 :                     reconstruct_state.img = Some((entry_lsn, img));
     942      1455205 :                     need_image = false;
     943      1455205 :                     break;
     944              :                 }
     945    175827830 :                 Value::WalRecord(rec) => {
     946    175827830 :                     let will_init = rec.will_init();
     947    175827830 :                     reconstruct_state.records.push((entry_lsn, rec));
     948    175827830 :                     if will_init {
     949              :                         // This WAL record initializes the page, so no need to go further back
     950      1427263 :                         need_image = false;
     951      1427263 :                         break;
     952    174400567 :                     }
     953              :                 }
     954              :             }
     955              :         }
     956              : 
     957              :         // If an older page image is needed to reconstruct the page, let the
     958              :         // caller know.
     959     45036468 :         if need_image {
     960     42154000 :             Ok(ValueReconstructResult::Continue)
     961              :         } else {
     962      2882468 :             Ok(ValueReconstructResult::Complete)
     963              :         }
     964     45036468 :     }
     965              : 
     966         4955 :     pub(super) async fn load_keys<T: AsRef<DeltaLayerInner> + Clone>(
     967         4955 :         this: &T,
     968         4955 :     ) -> Result<Vec<DeltaEntry<'_>>> {
     969         4955 :         let dl = this.as_ref();
     970         4955 :         let file = &dl.file;
     971         4955 : 
     972         4955 :         let tree_reader =
     973         4955 :             DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
     974         4955 : 
     975         4955 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
     976         4955 : 
     977         4955 :         tree_reader
     978         4955 :             .visit(
     979         4955 :                 &[0u8; DELTA_KEY_SIZE],
     980         4955 :                 VisitDirection::Forwards,
     981     46455422 :                 |key, value| {
     982     46455422 :                     let delta_key = DeltaKey::from_slice(key);
     983     46455422 :                     let val_ref = ValueRef {
     984     46455422 :                         blob_ref: BlobRef(value),
     985     46455422 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
     986     46455422 :                             Adapter(dl),
     987     46455422 :                         )),
     988     46455422 :                     };
     989     46455422 :                     let pos = BlobRef(value).pos();
     990     46455422 :                     if let Some(last) = all_keys.last_mut() {
     991     46450467 :                         // subtract offset of the current and last entries to get the size
     992     46450467 :                         // of the value associated with this (key, lsn) tuple
     993     46450467 :                         let first_pos = last.size;
     994     46450467 :                         last.size = pos - first_pos;
     995     46450467 :                     }
     996     46455422 :                     let entry = DeltaEntry {
     997     46455422 :                         key: delta_key.key(),
     998     46455422 :                         lsn: delta_key.lsn(),
     999     46455422 :                         size: pos,
    1000     46455422 :                         val: val_ref,
    1001     46455422 :                     };
    1002     46455422 :                     all_keys.push(entry);
    1003     46455422 :                     true
    1004     46455422 :                 },
    1005         4955 :             )
    1006         1285 :             .await?;
    1007         4955 :         if let Some(last) = all_keys.last_mut() {
    1008         4955 :             // Last key occupies all space till end of value storage,
    1009         4955 :             // which corresponds to beginning of the index
    1010         4955 :             last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
    1011         4955 :         }
    1012         4955 :         Ok(all_keys)
    1013         4955 :     }
    1014              : }
    1015              : 
    1016              : /// A set of data associated with a delta layer key and its value
    1017              : pub struct DeltaEntry<'a> {
    1018              :     pub key: Key,
    1019              :     pub lsn: Lsn,
    1020              :     /// Size of the stored value
    1021              :     pub size: u64,
    1022              :     /// Reference to the on-disk value
    1023              :     pub val: ValueRef<'a>,
    1024              : }
    1025              : 
    1026              : /// Reference to an on-disk value
    1027              : pub struct ValueRef<'a> {
    1028              :     blob_ref: BlobRef,
    1029              :     reader: BlockCursor<'a>,
    1030              : }
    1031              : 
    1032              : impl<'a> ValueRef<'a> {
    1033              :     /// Loads the value from disk
    1034     32493197 :     pub async fn load(&self) -> Result<Value> {
    1035              :         // theoretically we *could* record an access time for each, but it does not really matter
    1036     32493269 :         let buf = self.reader.read_blob(self.blob_ref.pos()).await?;
    1037     32493267 :         let val = Value::des(&buf)?;
    1038     32493266 :         Ok(val)
    1039     32493266 :     }
    1040              : }
    1041              : 
    1042              : pub(crate) struct Adapter<T>(T);
    1043              : 
    1044              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
    1045     33813933 :     pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
    1046     33813933 :         self.0.as_ref().file.read_blk(blknum).await
    1047     33813931 :     }
    1048              : }
        

Generated by: LCOV version 2.1-beta