LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 71.6 % 588 421 167 421
Current Date: 2024-01-09 02:06:09 Functions: 52.6 % 97 51 46 51
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2                 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3                 : //!
       4                 : //! Usually a delta layer only contains differences, in the form of WAL records
       5                 : //! against a base LSN. However, if a relation extended or a whole new relation
       6                 : //! is created, there would be no base for the new pages. The entries for them
       7                 : //! must be page images or WAL records with the 'will_init' flag set, so that
       8                 : //! they can be replayed without referring to an older page version.
       9                 : //!
      10                 : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11                 : //! there are no subdirectories, and each delta file is named like this:
      12                 : //!
      13                 : //! ```text
      14                 : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15                 : //! ```
      16                 : //!
      17                 : //! For example:
      18                 : //!
      19                 : //! ```text
      20                 : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21                 : //! ```
      22                 : //!
      23                 : //! Every delta file consists of three parts: "summary", "index", and
      24                 : //! "values". The summary is a fixed size header at the beginning of the file,
      25                 : //! and it contains basic information about the layer, and offsets to the other
      26                 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27                 : //! "values" part.  The actual page images and WAL records are stored in the
      28                 : //! "values" part.
      29                 : //!
      30                 : use crate::config::PageServerConf;
      31                 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32                 : use crate::page_cache::PAGE_SZ;
      33                 : use crate::repository::{Key, Value, KEY_SIZE};
      34                 : use crate::tenant::blob_io::BlobWriter;
      35                 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36                 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37                 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
      38                 : use crate::tenant::Timeline;
      39                 : use crate::virtual_file::VirtualFile;
      40                 : use crate::{walrecord, TEMP_FILE_SUFFIX};
      41                 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      42                 : use anyhow::{bail, ensure, Context, Result};
      43                 : use camino::{Utf8Path, Utf8PathBuf};
      44                 : use pageserver_api::models::LayerAccessKind;
      45                 : use pageserver_api::shard::TenantShardId;
      46                 : use rand::{distributions::Alphanumeric, Rng};
      47                 : use serde::{Deserialize, Serialize};
      48                 : use std::fs::File;
      49                 : use std::io::SeekFrom;
      50                 : use std::ops::Range;
      51                 : use std::os::unix::fs::FileExt;
      52                 : use std::sync::Arc;
      53                 : use tokio::sync::OnceCell;
      54                 : use tracing::*;
      55                 : 
      56                 : use utils::{
      57                 :     bin_ser::BeSer,
      58                 :     id::{TenantId, TimelineId},
      59                 :     lsn::Lsn,
      60                 : };
      61                 : 
      62                 : use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
      63                 : 
      64                 : ///
      65                 : /// Header stored in the beginning of the file
      66                 : ///
      67                 : /// After this comes the 'values' part, starting on block 1. After that,
      68                 : /// the 'index' starts at the block indicated by 'index_start_blk'
      69                 : ///
      70 CBC       14809 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      71                 : pub struct Summary {
      72                 :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      73                 :     pub magic: u16,
      74                 :     pub format_version: u16,
      75                 : 
      76                 :     pub tenant_id: TenantId,
      77                 :     pub timeline_id: TimelineId,
      78                 :     pub key_range: Range<Key>,
      79                 :     pub lsn_range: Range<Lsn>,
      80                 : 
      81                 :     /// Block number where the 'index' part of the file begins.
      82                 :     pub index_start_blk: u32,
      83                 :     /// Block within the 'index', where the B-tree root page is stored
      84                 :     pub index_root_blk: u32,
      85                 : }
      86                 : 
      87                 : impl From<&DeltaLayer> for Summary {
      88 UBC           0 :     fn from(layer: &DeltaLayer) -> Self {
      89               0 :         Self::expected(
      90               0 :             layer.desc.tenant_shard_id.tenant_id,
      91               0 :             layer.desc.timeline_id,
      92               0 :             layer.desc.key_range.clone(),
      93               0 :             layer.desc.lsn_range.clone(),
      94               0 :         )
      95               0 :     }
      96                 : }
      97                 : 
      98                 : impl Summary {
      99 CBC       11055 :     pub(super) fn expected(
     100           11055 :         tenant_id: TenantId,
     101           11055 :         timeline_id: TimelineId,
     102           11055 :         keys: Range<Key>,
     103           11055 :         lsns: Range<Lsn>,
     104           11055 :     ) -> Self {
     105           11055 :         Self {
     106           11055 :             magic: DELTA_FILE_MAGIC,
     107           11055 :             format_version: STORAGE_FORMAT_VERSION,
     108           11055 : 
     109           11055 :             tenant_id,
     110           11055 :             timeline_id,
     111           11055 :             key_range: keys,
     112           11055 :             lsn_range: lsns,
     113           11055 : 
     114           11055 :             index_start_blk: 0,
     115           11055 :             index_root_blk: 0,
     116           11055 :         }
     117           11055 :     }
     118                 : }
     119                 : 
     120                 : // Flag indicating that this version initialize the page
     121                 : const WILL_INIT: u64 = 1;
     122                 : 
     123                 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     124                 : /// offset, and for WAL records it also contains `will_init` flag. The flag
     125                 : /// helps to determine the range of records that needs to be applied, without
     126                 : /// reading/deserializing records themselves.
     127 UBC           0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     128                 : pub struct BlobRef(pub u64);
     129                 : 
     130                 : impl BlobRef {
     131 CBC    46208731 :     pub fn will_init(&self) -> bool {
     132        46208731 :         (self.0 & WILL_INIT) != 0
     133        46208731 :     }
     134                 : 
     135        79394338 :     pub fn pos(&self) -> u64 {
     136        79394338 :         self.0 >> 1
     137        79394338 :     }
     138                 : 
     139        46500457 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     140        46500457 :         let mut blob_ref = pos << 1;
     141        46500457 :         if will_init {
     142         7470345 :             blob_ref |= WILL_INIT;
     143        39030112 :         }
     144        46500457 :         BlobRef(blob_ref)
     145        46500457 :     }
     146                 : }
     147                 : 
     148                 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     149                 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     150                 : 
     151                 : /// This is the key of the B-tree index stored in the delta layer. It consists
     152                 : /// of the serialized representation of a Key and LSN.
     153                 : impl DeltaKey {
     154        16792834 :     fn from_slice(buf: &[u8]) -> Self {
     155        16792834 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     156        16792834 :         bytes.copy_from_slice(buf);
     157        16792834 :         DeltaKey(bytes)
     158        16792834 :     }
     159                 : 
     160        61503217 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     161        61503217 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     162        61503217 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     163        61503217 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     164        61503217 :         DeltaKey(bytes)
     165        61503217 :     }
     166                 : 
     167        16792834 :     fn key(&self) -> Key {
     168        16792834 :         Key::from_slice(&self.0)
     169        16792834 :     }
     170                 : 
     171        16792834 :     fn lsn(&self) -> Lsn {
     172        16792834 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     173        16792834 :     }
     174                 : 
     175        46608683 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     176        46608683 :         let mut lsn_buf = [0u8; 8];
     177        46608683 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     178        46608683 :         Lsn(u64::from_be_bytes(lsn_buf))
     179        46608683 :     }
     180                 : }
     181                 : 
     182                 : /// This is used only from `pagectl`. Within pageserver, all layers are
     183                 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     184                 : pub struct DeltaLayer {
     185                 :     path: Utf8PathBuf,
     186                 :     pub desc: PersistentLayerDesc,
     187                 :     access_stats: LayerAccessStats,
     188                 :     inner: OnceCell<Arc<DeltaLayerInner>>,
     189                 : }
     190                 : 
     191                 : impl std::fmt::Debug for DeltaLayer {
     192 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     193               0 :         use super::RangeDisplayDebug;
     194               0 : 
     195               0 :         f.debug_struct("DeltaLayer")
     196               0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     197               0 :             .field("lsn_range", &self.desc.lsn_range)
     198               0 :             .field("file_size", &self.desc.file_size)
     199               0 :             .field("inner", &self.inner)
     200               0 :             .finish()
     201               0 :     }
     202                 : }
     203                 : 
     204                 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     205                 : /// file.
     206                 : pub struct DeltaLayerInner {
     207                 :     // values copied from summary
     208                 :     index_start_blk: u32,
     209                 :     index_root_blk: u32,
     210                 : 
     211                 :     /// Reader object for reading blocks from the file.
     212                 :     file: FileBlockReader,
     213                 : }
     214                 : 
     215                 : impl std::fmt::Debug for DeltaLayerInner {
     216               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     217               0 :         f.debug_struct("DeltaLayerInner")
     218               0 :             .field("index_start_blk", &self.index_start_blk)
     219               0 :             .field("index_root_blk", &self.index_root_blk)
     220               0 :             .finish()
     221               0 :     }
     222                 : }
     223                 : 
     224                 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     225                 : impl std::fmt::Display for DeltaLayer {
     226               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     227               0 :         write!(f, "{}", self.layer_desc().short_id())
     228               0 :     }
     229                 : }
     230                 : 
     231                 : impl AsLayerDesc for DeltaLayer {
     232               0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     233               0 :         &self.desc
     234               0 :     }
     235                 : }
     236                 : 
     237                 : impl DeltaLayer {
     238               0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     239               0 :         self.desc.dump();
     240               0 : 
     241               0 :         if !verbose {
     242               0 :             return Ok(());
     243               0 :         }
     244                 : 
     245               0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     246                 : 
     247               0 :         inner.dump(ctx).await
     248               0 :     }
     249                 : 
     250 CBC       14815 :     fn temp_path_for(
     251           14815 :         conf: &PageServerConf,
     252           14815 :         tenant_shard_id: &TenantShardId,
     253           14815 :         timeline_id: &TimelineId,
     254           14815 :         key_start: Key,
     255           14815 :         lsn_range: &Range<Lsn>,
     256           14815 :     ) -> Utf8PathBuf {
     257           14815 :         let rand_string: String = rand::thread_rng()
     258           14815 :             .sample_iter(&Alphanumeric)
     259           14815 :             .take(8)
     260           14815 :             .map(char::from)
     261           14815 :             .collect();
     262           14815 : 
     263           14815 :         conf.timeline_path(tenant_shard_id, timeline_id)
     264           14815 :             .join(format!(
     265           14815 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     266           14815 :                 key_start,
     267           14815 :                 u64::from(lsn_range.start),
     268           14815 :                 u64::from(lsn_range.end),
     269           14815 :                 rand_string,
     270           14815 :                 TEMP_FILE_SUFFIX,
     271           14815 :             ))
     272           14815 :     }
     273                 : 
     274                 :     ///
     275                 :     /// Open the underlying file and read the metadata into memory, if it's
     276                 :     /// not loaded already.
     277                 :     ///
     278 UBC           0 :     async fn load(
     279               0 :         &self,
     280               0 :         access_kind: LayerAccessKind,
     281               0 :         ctx: &RequestContext,
     282               0 :     ) -> Result<&Arc<DeltaLayerInner>> {
     283               0 :         self.access_stats.record_access(access_kind, ctx);
     284               0 :         // Quick exit if already loaded
     285               0 :         self.inner
     286               0 :             .get_or_try_init(|| self.load_inner(ctx))
     287               0 :             .await
     288               0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     289               0 :     }
     290                 : 
     291               0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     292               0 :         let path = self.path();
     293                 : 
     294               0 :         let loaded = DeltaLayerInner::load(&path, None, ctx)
     295               0 :             .await
     296               0 :             .and_then(|res| res)?;
     297                 : 
     298                 :         // not production code
     299               0 :         let actual_filename = path.file_name().unwrap().to_owned();
     300               0 :         let expected_filename = self.layer_desc().filename().file_name();
     301               0 : 
     302               0 :         if actual_filename != expected_filename {
     303               0 :             println!("warning: filename does not match what is expected from in-file summary");
     304               0 :             println!("actual: {:?}", actual_filename);
     305               0 :             println!("expected: {:?}", expected_filename);
     306               0 :         }
     307                 : 
     308               0 :         Ok(Arc::new(loaded))
     309               0 :     }
     310                 : 
     311                 :     /// Create a DeltaLayer struct representing an existing file on disk.
     312                 :     ///
     313                 :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     314               0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     315               0 :         let mut summary_buf = vec![0; PAGE_SZ];
     316               0 :         file.read_exact_at(&mut summary_buf, 0)?;
     317               0 :         let summary = Summary::des_prefix(&summary_buf)?;
     318                 : 
     319               0 :         let metadata = file
     320               0 :             .metadata()
     321               0 :             .context("get file metadata to determine size")?;
     322                 : 
     323                 :         // This function is never used for constructing layers in a running pageserver,
     324                 :         // so it does not need an accurate TenantShardId.
     325               0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     326               0 : 
     327               0 :         Ok(DeltaLayer {
     328               0 :             path: path.to_path_buf(),
     329               0 :             desc: PersistentLayerDesc::new_delta(
     330               0 :                 tenant_shard_id,
     331               0 :                 summary.timeline_id,
     332               0 :                 summary.key_range,
     333               0 :                 summary.lsn_range,
     334               0 :                 metadata.len(),
     335               0 :             ),
     336               0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     337               0 :             inner: OnceCell::new(),
     338               0 :         })
     339               0 :     }
     340                 : 
     341                 :     /// Path to the layer file in pageserver workdir.
     342               0 :     fn path(&self) -> Utf8PathBuf {
     343               0 :         self.path.clone()
     344               0 :     }
     345                 : }
     346                 : 
     347                 : /// A builder object for constructing a new delta layer.
     348                 : ///
     349                 : /// Usage:
     350                 : ///
     351                 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     352                 : ///
     353                 : /// 2. Write the contents by calling `put_value` for every page
     354                 : ///    version to store in the layer.
     355                 : ///
     356                 : /// 3. Call `finish`.
     357                 : ///
     358                 : struct DeltaLayerWriterInner {
     359                 :     conf: &'static PageServerConf,
     360                 :     pub path: Utf8PathBuf,
     361                 :     timeline_id: TimelineId,
     362                 :     tenant_shard_id: TenantShardId,
     363                 : 
     364                 :     key_start: Key,
     365                 :     lsn_range: Range<Lsn>,
     366                 : 
     367                 :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     368                 : 
     369                 :     blob_writer: BlobWriter<true>,
     370                 : }
     371                 : 
     372                 : impl DeltaLayerWriterInner {
     373                 :     ///
     374                 :     /// Start building a new delta layer.
     375                 :     ///
     376 CBC       14815 :     async fn new(
     377           14815 :         conf: &'static PageServerConf,
     378           14815 :         timeline_id: TimelineId,
     379           14815 :         tenant_shard_id: TenantShardId,
     380           14815 :         key_start: Key,
     381           14815 :         lsn_range: Range<Lsn>,
     382           14815 :     ) -> anyhow::Result<Self> {
     383           14815 :         // Create the file initially with a temporary filename. We don't know
     384           14815 :         // the end key yet, so we cannot form the final filename yet. We will
     385           14815 :         // rename it when we're done.
     386           14815 :         //
     387           14815 :         // Note: This overwrites any existing file. There shouldn't be any.
     388           14815 :         // FIXME: throw an error instead?
     389           14815 :         let path =
     390           14815 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     391                 : 
     392           14815 :         let mut file = VirtualFile::create(&path).await?;
     393                 :         // make room for the header block
     394           14815 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     395           14815 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     396           14815 : 
     397           14815 :         // Initialize the b-tree index builder
     398           14815 :         let block_buf = BlockBuf::new();
     399           14815 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     400           14815 : 
     401           14815 :         Ok(Self {
     402           14815 :             conf,
     403           14815 :             path,
     404           14815 :             timeline_id,
     405           14815 :             tenant_shard_id,
     406           14815 :             key_start,
     407           14815 :             lsn_range,
     408           14815 :             tree: tree_builder,
     409           14815 :             blob_writer,
     410           14815 :         })
     411           14815 :     }
     412                 : 
     413                 :     ///
     414                 :     /// Append a key-value pair to the file.
     415                 :     ///
     416                 :     /// The values must be appended in key, lsn order.
     417                 :     ///
     418        16392780 :     async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     419        16392780 :         self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
     420 UBC           0 :             .await
     421 CBC    16392780 :     }
     422                 : 
     423        46500457 :     async fn put_value_bytes(
     424        46500457 :         &mut self,
     425        46500457 :         key: Key,
     426        46500457 :         lsn: Lsn,
     427        46500457 :         val: &[u8],
     428        46500457 :         will_init: bool,
     429        46500485 :     ) -> anyhow::Result<()> {
     430        46500485 :         assert!(self.lsn_range.start <= lsn);
     431                 : 
     432        46500485 :         let off = self.blob_writer.write_blob(val).await?;
     433                 : 
     434        46500485 :         let blob_ref = BlobRef::new(off, will_init);
     435        46500485 : 
     436        46500485 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     437        46500485 :         self.tree.append(&delta_key.0, blob_ref.0)?;
     438                 : 
     439        46500485 :         Ok(())
     440        46500485 :     }
     441                 : 
     442         1757221 :     fn size(&self) -> u64 {
     443         1757221 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     444         1757221 :     }
     445                 : 
     446                 :     ///
     447                 :     /// Finish writing the delta layer.
     448                 :     ///
     449           14809 :     async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
     450           14809 :         let index_start_blk =
     451           14809 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     452                 : 
     453           14809 :         let mut file = self.blob_writer.into_inner().await?;
     454                 : 
     455                 :         // Write out the index
     456           14809 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     457           14809 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     458 UBC           0 :             .await?;
     459 CBC      113612 :         for buf in block_buf.blocks {
     460           98803 :             file.write_all(buf.as_ref()).await?;
     461                 :         }
     462           14809 :         assert!(self.lsn_range.start < self.lsn_range.end);
     463                 :         // Fill in the summary on blk 0
     464           14809 :         let summary = Summary {
     465           14809 :             magic: DELTA_FILE_MAGIC,
     466           14809 :             format_version: STORAGE_FORMAT_VERSION,
     467           14809 :             tenant_id: self.tenant_shard_id.tenant_id,
     468           14809 :             timeline_id: self.timeline_id,
     469           14809 :             key_range: self.key_start..key_end,
     470           14809 :             lsn_range: self.lsn_range.clone(),
     471           14809 :             index_start_blk,
     472           14809 :             index_root_blk,
     473           14809 :         };
     474           14809 : 
     475           14809 :         let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
     476           14809 :         Summary::ser_into(&summary, &mut buf)?;
     477           14809 :         if buf.spilled() {
     478                 :             // This is bad as we only have one free block for the summary
     479 UBC           0 :             warn!(
     480               0 :                 "Used more than one page size for summary buffer: {}",
     481               0 :                 buf.len()
     482               0 :             );
     483 CBC       14809 :         }
     484           14809 :         file.seek(SeekFrom::Start(0)).await?;
     485           14809 :         file.write_all(&buf).await?;
     486                 : 
     487           14809 :         let metadata = file
     488           14809 :             .metadata()
     489 UBC           0 :             .await
     490 CBC       14809 :             .context("get file metadata to determine size")?;
     491                 : 
     492                 :         // 5GB limit for objects without multipart upload (which we don't want to use)
     493                 :         // Make it a little bit below to account for differing GB units
     494                 :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     495                 :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     496                 :         ensure!(
     497           14809 :             metadata.len() <= S3_UPLOAD_LIMIT,
     498 UBC           0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     499               0 :             file.path,
     500               0 :             metadata.len()
     501                 :         );
     502                 : 
     503                 :         // Note: Because we opened the file in write-only mode, we cannot
     504                 :         // reuse the same VirtualFile for reading later. That's why we don't
     505                 :         // set inner.file here. The first read will have to re-open it.
     506                 : 
     507 CBC       14809 :         let desc = PersistentLayerDesc::new_delta(
     508           14809 :             self.tenant_shard_id,
     509           14809 :             self.timeline_id,
     510           14809 :             self.key_start..key_end,
     511           14809 :             self.lsn_range.clone(),
     512           14809 :             metadata.len(),
     513           14809 :         );
     514           14809 : 
     515           14809 :         // fsync the file
     516           14809 :         file.sync_all().await?;
     517                 : 
     518           14809 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     519                 : 
     520 UBC           0 :         trace!("created delta layer {}", layer.local_path());
     521                 : 
     522 CBC       14809 :         Ok(layer)
     523           14809 :     }
     524                 : }
     525                 : 
     526                 : /// A builder object for constructing a new delta layer.
     527                 : ///
     528                 : /// Usage:
     529                 : ///
     530                 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     531                 : ///
     532                 : /// 2. Write the contents by calling `put_value` for every page
     533                 : ///    version to store in the layer.
     534                 : ///
     535                 : /// 3. Call `finish`.
     536                 : ///
     537                 : /// # Note
     538                 : ///
     539                 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     540                 : /// possible for the writer to drop before `finish` is actually called. So this
     541                 : /// could lead to odd temporary files in the directory, exhausting file system.
     542                 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     543                 : /// implementation that cleans up the temporary file in failure. It's not
     544                 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     545                 : /// out some fields, making it impossible to implement `Drop`.
     546                 : ///
     547                 : #[must_use]
     548                 : pub struct DeltaLayerWriter {
     549                 :     inner: Option<DeltaLayerWriterInner>,
     550                 : }
     551                 : 
     552                 : impl DeltaLayerWriter {
     553                 :     ///
     554                 :     /// Start building a new delta layer.
     555                 :     ///
     556           14815 :     pub async fn new(
     557           14815 :         conf: &'static PageServerConf,
     558           14815 :         timeline_id: TimelineId,
     559           14815 :         tenant_shard_id: TenantShardId,
     560           14815 :         key_start: Key,
     561           14815 :         lsn_range: Range<Lsn>,
     562           14815 :     ) -> anyhow::Result<Self> {
     563           14815 :         Ok(Self {
     564           14815 :             inner: Some(
     565           14815 :                 DeltaLayerWriterInner::new(
     566           14815 :                     conf,
     567           14815 :                     timeline_id,
     568           14815 :                     tenant_shard_id,
     569           14815 :                     key_start,
     570           14815 :                     lsn_range,
     571           14815 :                 )
     572 UBC           0 :                 .await?,
     573                 :             ),
     574                 :         })
     575 CBC       14815 :     }
     576                 : 
     577                 :     ///
     578                 :     /// Append a key-value pair to the file.
     579                 :     ///
     580                 :     /// The values must be appended in key, lsn order.
     581                 :     ///
     582        16392780 :     pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     583        16392780 :         self.inner.as_mut().unwrap().put_value(key, lsn, val).await
     584        16392780 :     }
     585                 : 
     586        30107691 :     pub async fn put_value_bytes(
     587        30107691 :         &mut self,
     588        30107691 :         key: Key,
     589        30107691 :         lsn: Lsn,
     590        30107691 :         val: &[u8],
     591        30107691 :         will_init: bool,
     592        30107705 :     ) -> anyhow::Result<()> {
     593        30107705 :         self.inner
     594        30107705 :             .as_mut()
     595        30107705 :             .unwrap()
     596        30107705 :             .put_value_bytes(key, lsn, val, will_init)
     597 UBC           0 :             .await
     598 CBC    30107705 :     }
     599                 : 
     600         1757221 :     pub fn size(&self) -> u64 {
     601         1757221 :         self.inner.as_ref().unwrap().size()
     602         1757221 :     }
     603                 : 
     604                 :     ///
     605                 :     /// Finish writing the delta layer.
     606                 :     ///
     607           14809 :     pub(crate) async fn finish(
     608           14809 :         mut self,
     609           14809 :         key_end: Key,
     610           14809 :         timeline: &Arc<Timeline>,
     611           14809 :     ) -> anyhow::Result<ResidentLayer> {
     612           14809 :         self.inner.take().unwrap().finish(key_end, timeline).await
     613           14809 :     }
     614                 : }
     615                 : 
     616                 : impl Drop for DeltaLayerWriter {
     617           14809 :     fn drop(&mut self) {
     618           14809 :         if let Some(inner) = self.inner.take() {
     619 UBC           0 :             // We want to remove the virtual file here, so it's fine to not
     620               0 :             // having completely flushed unwritten data.
     621               0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     622               0 :             vfile.remove();
     623 CBC       14809 :         }
     624           14809 :     }
     625                 : }
     626                 : 
     627 UBC           0 : #[derive(thiserror::Error, Debug)]
     628                 : pub enum RewriteSummaryError {
     629                 :     #[error("magic mismatch")]
     630                 :     MagicMismatch,
     631                 :     #[error(transparent)]
     632                 :     Other(#[from] anyhow::Error),
     633                 : }
     634                 : 
     635                 : impl From<std::io::Error> for RewriteSummaryError {
     636               0 :     fn from(e: std::io::Error) -> Self {
     637               0 :         Self::Other(anyhow::anyhow!(e))
     638               0 :     }
     639                 : }
     640                 : 
     641                 : impl DeltaLayer {
     642               0 :     pub async fn rewrite_summary<F>(
     643               0 :         path: &Utf8Path,
     644               0 :         rewrite: F,
     645               0 :         ctx: &RequestContext,
     646               0 :     ) -> Result<(), RewriteSummaryError>
     647               0 :     where
     648               0 :         F: Fn(Summary) -> Summary,
     649               0 :     {
     650               0 :         let file = VirtualFile::open_with_options(
     651               0 :             path,
     652               0 :             &*std::fs::OpenOptions::new().read(true).write(true),
     653               0 :         )
     654               0 :         .await
     655               0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     656               0 :         let file = FileBlockReader::new(file);
     657               0 :         let summary_blk = file.read_blk(0, ctx).await?;
     658               0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     659               0 :         let mut file = file.file;
     660               0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     661               0 :             return Err(RewriteSummaryError::MagicMismatch);
     662               0 :         }
     663               0 : 
     664               0 :         let new_summary = rewrite(actual_summary);
     665               0 : 
     666               0 :         let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
     667               0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     668               0 :         if buf.spilled() {
     669                 :             // The code in DeltaLayerWriterInner just warn!()s for this.
     670                 :             // It should probably error out as well.
     671               0 :             return Err(RewriteSummaryError::Other(anyhow::anyhow!(
     672               0 :                 "Used more than one page size for summary buffer: {}",
     673               0 :                 buf.len()
     674               0 :             )));
     675               0 :         }
     676               0 :         file.seek(SeekFrom::Start(0)).await?;
     677               0 :         file.write_all(&buf).await?;
     678               0 :         Ok(())
     679               0 :     }
     680                 : }
     681                 : 
     682                 : impl DeltaLayerInner {
     683                 :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     684                 :     /// - inner has the success or transient failure
     685                 :     /// - outer has the permanent failure
     686 CBC       11055 :     pub(super) async fn load(
     687           11055 :         path: &Utf8Path,
     688           11055 :         summary: Option<Summary>,
     689           11055 :         ctx: &RequestContext,
     690           11055 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     691           11055 :         let file = match VirtualFile::open(path).await {
     692           11055 :             Ok(file) => file,
     693 UBC           0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     694                 :         };
     695 CBC       11055 :         let file = FileBlockReader::new(file);
     696                 : 
     697           11055 :         let summary_blk = match file.read_blk(0, ctx).await {
     698           11055 :             Ok(blk) => blk,
     699 UBC           0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     700                 :         };
     701                 : 
     702                 :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     703 CBC       11055 :         let actual_summary =
     704           11055 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     705                 : 
     706           11055 :         if let Some(mut expected_summary) = summary {
     707                 :             // production code path
     708           11055 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     709           11055 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     710           11055 :             if actual_summary != expected_summary {
     711               1 :                 bail!(
     712               1 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     713               1 :                     actual_summary,
     714               1 :                     expected_summary
     715               1 :                 );
     716           11054 :             }
     717 UBC           0 :         }
     718                 : 
     719 CBC       11054 :         Ok(Ok(DeltaLayerInner {
     720           11054 :             file,
     721           11054 :             index_start_blk: actual_summary.index_start_blk,
     722           11054 :             index_root_blk: actual_summary.index_root_blk,
     723           11054 :         }))
     724           11055 :     }
     725                 : 
     726        15002760 :     pub(super) async fn get_value_reconstruct_data(
     727        15002760 :         &self,
     728        15002760 :         key: Key,
     729        15002760 :         lsn_range: Range<Lsn>,
     730        15002760 :         reconstruct_state: &mut ValueReconstructState,
     731        15002760 :         ctx: &RequestContext,
     732        15002767 :     ) -> anyhow::Result<ValueReconstructResult> {
     733        15002767 :         let mut need_image = true;
     734        15002767 :         // Scan the page versions backwards, starting from `lsn`.
     735        15002767 :         let file = &self.file;
     736        15002767 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     737        15002767 :             self.index_start_blk,
     738        15002767 :             self.index_root_blk,
     739        15002767 :             file,
     740        15002767 :         );
     741        15002767 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     742        15002767 : 
     743        15002767 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     744        15002767 : 
     745        15002767 :         tree_reader
     746        15002767 :             .visit(
     747        15002767 :                 &search_key.0,
     748        15002767 :                 VisitDirection::Backwards,
     749        49738948 :                 |key, value| {
     750        49738948 :                     let blob_ref = BlobRef(value);
     751        49738948 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     752         3130263 :                         return false;
     753        46608685 :                     }
     754        46608685 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     755        46608685 :                     if entry_lsn < lsn_range.start {
     756          399952 :                         return false;
     757        46208733 :                     }
     758        46208733 :                     offsets.push((entry_lsn, blob_ref.pos()));
     759        46208733 : 
     760        46208733 :                     !blob_ref.will_init()
     761        49738948 :                 },
     762        15002767 :                 &RequestContextBuilder::extend(ctx)
     763        15002767 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     764        15002767 :                     .build(),
     765        15002767 :             )
     766          183313 :             .await?;
     767                 : 
     768        15002764 :         let ctx = &RequestContextBuilder::extend(ctx)
     769        15002764 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     770        15002764 :             .build();
     771        15002764 : 
     772        15002764 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     773        15002764 :         let cursor = file.block_cursor();
     774        15002764 :         let mut buf = Vec::new();
     775        58518894 :         for (entry_lsn, pos) in offsets {
     776        46207958 :             cursor
     777        46207958 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     778          667285 :                 .await
     779        46207957 :                 .with_context(|| {
     780 UBC           0 :                     format!("Failed to read blob from virtual file {}", file.file.path)
     781 CBC    46207957 :                 })?;
     782        46207957 :             let val = Value::des(&buf).with_context(|| {
     783 UBC           0 :                 format!(
     784               0 :                     "Failed to deserialize file blob from virtual file {}",
     785               0 :                     file.file.path
     786               0 :                 )
     787 CBC    46207957 :             })?;
     788        46207957 :             match val {
     789         1493998 :                 Value::Image(img) => {
     790         1493998 :                     reconstruct_state.img = Some((entry_lsn, img));
     791         1493998 :                     need_image = false;
     792         1493998 :                     break;
     793                 :                 }
     794        44713959 :                 Value::WalRecord(rec) => {
     795        44713959 :                     let will_init = rec.will_init();
     796        44713959 :                     reconstruct_state.records.push((entry_lsn, rec));
     797        44713959 :                     if will_init {
     798                 :                         // This WAL record initializes the page, so no need to go further back
     799         1197829 :                         need_image = false;
     800         1197829 :                         break;
     801        43516130 :                     }
     802                 :                 }
     803                 :             }
     804                 :         }
     805                 : 
     806                 :         // If an older page image is needed to reconstruct the page, let the
     807                 :         // caller know.
     808        15002763 :         if need_image {
     809        12310936 :             Ok(ValueReconstructResult::Continue)
     810                 :         } else {
     811         2691827 :             Ok(ValueReconstructResult::Complete)
     812                 :         }
     813        15002763 :     }
     814                 : 
     815            3672 :     pub(super) async fn load_keys<'a>(
     816            3672 :         &'a self,
     817            3672 :         ctx: &RequestContext,
     818            3672 :     ) -> Result<Vec<DeltaEntry<'a>>> {
     819            3672 :         let file = &self.file;
     820            3672 : 
     821            3672 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     822            3672 :             self.index_start_blk,
     823            3672 :             self.index_root_blk,
     824            3672 :             file,
     825            3672 :         );
     826            3672 : 
     827            3672 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
     828            3672 : 
     829            3672 :         tree_reader
     830            3672 :             .visit(
     831            3672 :                 &[0u8; DELTA_KEY_SIZE],
     832            3672 :                 VisitDirection::Forwards,
     833        16792834 :                 |key, value| {
     834        16792834 :                     let delta_key = DeltaKey::from_slice(key);
     835        16792834 :                     let val_ref = ValueRef {
     836        16792834 :                         blob_ref: BlobRef(value),
     837        16792834 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
     838        16792834 :                             Adapter(self),
     839        16792834 :                         )),
     840        16792834 :                     };
     841        16792834 :                     let pos = BlobRef(value).pos();
     842        16792834 :                     if let Some(last) = all_keys.last_mut() {
     843        16789162 :                         // subtract offset of the current and last entries to get the size
     844        16789162 :                         // of the value associated with this (key, lsn) tuple
     845        16789162 :                         let first_pos = last.size;
     846        16789162 :                         last.size = pos - first_pos;
     847        16789162 :                     }
     848        16792834 :                     let entry = DeltaEntry {
     849        16792834 :                         key: delta_key.key(),
     850        16792834 :                         lsn: delta_key.lsn(),
     851        16792834 :                         size: pos,
     852        16792834 :                         val: val_ref,
     853        16792834 :                     };
     854        16792834 :                     all_keys.push(entry);
     855        16792834 :                     true
     856        16792834 :                 },
     857            3672 :                 &RequestContextBuilder::extend(ctx)
     858            3672 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     859            3672 :                     .build(),
     860            3672 :             )
     861            1000 :             .await?;
     862            3672 :         if let Some(last) = all_keys.last_mut() {
     863            3672 :             // Last key occupies all space till end of value storage,
     864            3672 :             // which corresponds to beginning of the index
     865            3672 :             last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
     866            3672 :         }
     867            3672 :         Ok(all_keys)
     868            3672 :     }
     869                 : 
     870               2 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     871               2 :         println!(
     872               2 :             "index_start_blk: {}, root {}",
     873               2 :             self.index_start_blk, self.index_root_blk
     874               2 :         );
     875               2 : 
     876               2 :         let file = &self.file;
     877               2 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     878               2 :             self.index_start_blk,
     879               2 :             self.index_root_blk,
     880               2 :             file,
     881               2 :         );
     882               2 : 
     883               2 :         tree_reader.dump().await?;
     884                 : 
     885               2 :         let keys = self.load_keys(ctx).await?;
     886                 : 
     887               4 :         async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
     888               4 :             let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
     889               4 :             let val = Value::des(&buf)?;
     890               4 :             let desc = match val {
     891               4 :                 Value::Image(img) => {
     892               4 :                     format!(" img {} bytes", img.len())
     893                 :                 }
     894 UBC           0 :                 Value::WalRecord(rec) => {
     895               0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
     896               0 :                     format!(
     897               0 :                         " rec {} bytes will_init: {} {}",
     898               0 :                         buf.len(),
     899               0 :                         rec.will_init(),
     900               0 :                         wal_desc
     901               0 :                     )
     902                 :                 }
     903                 :             };
     904 CBC           4 :             Ok(desc)
     905               4 :         }
     906                 : 
     907               6 :         for entry in keys {
     908               4 :             let DeltaEntry { key, lsn, val, .. } = entry;
     909               4 :             let desc = match dump_blob(val, ctx).await {
     910               4 :                 Ok(desc) => desc,
     911 UBC           0 :                 Err(err) => {
     912               0 :                     format!("ERROR: {err}")
     913                 :                 }
     914                 :             };
     915 CBC           4 :             println!("  key {key} at {lsn}: {desc}");
     916                 :         }
     917                 : 
     918               2 :         Ok(())
     919               2 :     }
     920                 : }
     921                 : 
     922                 : /// A set of data associated with a delta layer key and its value
     923                 : pub struct DeltaEntry<'a> {
     924                 :     pub key: Key,
     925                 :     pub lsn: Lsn,
     926                 :     /// Size of the stored value
     927                 :     pub size: u64,
     928                 :     /// Reference to the on-disk value
     929                 :     pub val: ValueRef<'a>,
     930                 : }
     931                 : 
     932                 : /// Reference to an on-disk value
     933                 : pub struct ValueRef<'a> {
     934                 :     blob_ref: BlobRef,
     935                 :     reader: BlockCursor<'a>,
     936                 : }
     937                 : 
     938                 : impl<'a> ValueRef<'a> {
     939                 :     /// Loads the value from disk
     940        16392783 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
     941                 :         // theoretically we *could* record an access time for each, but it does not really matter
     942        16392783 :         let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
     943        16392780 :         let val = Value::des(&buf)?;
     944        16392780 :         Ok(val)
     945        16392780 :     }
     946                 : }
     947                 : 
     948                 : pub(crate) struct Adapter<T>(T);
     949                 : 
     950                 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
     951        17643804 :     pub(crate) async fn read_blk(
     952        17643804 :         &self,
     953        17643804 :         blknum: u32,
     954        17643804 :         ctx: &RequestContext,
     955        17643804 :     ) -> Result<BlockLease, std::io::Error> {
     956        17643804 :         self.0.as_ref().file.read_blk(blknum, ctx).await
     957        17643802 :     }
     958                 : }
     959                 : 
     960                 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
     961        17643788 :     fn as_ref(&self) -> &DeltaLayerInner {
     962        17643788 :         self
     963        17643788 :     }
     964                 : }
        

Generated by: LCOV version 2.1-beta