LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - delta_layer.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 71.2 % 622 443
Test Date: 2024-02-12 20:26:03 Functions: 51.0 % 100 51

            Line data    Source code
       1              : //! A DeltaLayer represents a collection of WAL records or page images in a range of
       2              : //! LSNs, and in a range of Keys. It is stored on a file on disk.
       3              : //!
       4              : //! Usually a delta layer only contains differences, in the form of WAL records
       5              : //! against a base LSN. However, if a relation extended or a whole new relation
       6              : //! is created, there would be no base for the new pages. The entries for them
       7              : //! must be page images or WAL records with the 'will_init' flag set, so that
       8              : //! they can be replayed without referring to an older page version.
       9              : //!
      10              : //! The delta files are stored in `timelines/<timeline_id>` directory.  Currently,
      11              : //! there are no subdirectories, and each delta file is named like this:
      12              : //!
      13              : //! ```text
      14              : //!    <key start>-<key end>__<start LSN>-<end LSN>
      15              : //! ```
      16              : //!
      17              : //! For example:
      18              : //!
      19              : //! ```text
      20              : //!    000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
      21              : //! ```
      22              : //!
      23              : //! Every delta file consists of three parts: "summary", "index", and
      24              : //! "values". The summary is a fixed size header at the beginning of the file,
      25              : //! and it contains basic information about the layer, and offsets to the other
      26              : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
      27              : //! "values" part.  The actual page images and WAL records are stored in the
      28              : //! "values" part.
      29              : //!
      30              : use crate::config::PageServerConf;
      31              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache::PAGE_SZ;
      33              : use crate::repository::{Key, Value, KEY_SIZE};
      34              : use crate::tenant::blob_io::BlobWriter;
      35              : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
      36              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      37              : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
      38              : use crate::tenant::Timeline;
      39              : use crate::virtual_file::{self, VirtualFile};
      40              : use crate::{walrecord, TEMP_FILE_SUFFIX};
      41              : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
      42              : use anyhow::{bail, ensure, Context, Result};
      43              : use camino::{Utf8Path, Utf8PathBuf};
      44              : use pageserver_api::models::LayerAccessKind;
      45              : use pageserver_api::shard::TenantShardId;
      46              : use rand::{distributions::Alphanumeric, Rng};
      47              : use serde::{Deserialize, Serialize};
      48              : use std::fs::File;
      49              : use std::io::SeekFrom;
      50              : use std::ops::Range;
      51              : use std::os::unix::fs::FileExt;
      52              : use std::sync::Arc;
      53              : use tokio::sync::OnceCell;
      54              : use tracing::*;
      55              : 
      56              : use utils::{
      57              :     bin_ser::BeSer,
      58              :     id::{TenantId, TimelineId},
      59              :     lsn::Lsn,
      60              : };
      61              : 
      62              : use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
      63              : 
      64              : ///
      65              : /// Header stored in the beginning of the file
      66              : ///
      67              : /// After this comes the 'values' part, starting on block 1. After that,
      68              : /// the 'index' starts at the block indicated by 'index_start_blk'
      69              : ///
      70        15893 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      71              : pub struct Summary {
      72              :     /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
      73              :     pub magic: u16,
      74              :     pub format_version: u16,
      75              : 
      76              :     pub tenant_id: TenantId,
      77              :     pub timeline_id: TimelineId,
      78              :     pub key_range: Range<Key>,
      79              :     pub lsn_range: Range<Lsn>,
      80              : 
      81              :     /// Block number where the 'index' part of the file begins.
      82              :     pub index_start_blk: u32,
      83              :     /// Block within the 'index', where the B-tree root page is stored
      84              :     pub index_root_blk: u32,
      85              : }
      86              : 
      87              : impl From<&DeltaLayer> for Summary {
      88            0 :     fn from(layer: &DeltaLayer) -> Self {
      89            0 :         Self::expected(
      90            0 :             layer.desc.tenant_shard_id.tenant_id,
      91            0 :             layer.desc.timeline_id,
      92            0 :             layer.desc.key_range.clone(),
      93            0 :             layer.desc.lsn_range.clone(),
      94            0 :         )
      95            0 :     }
      96              : }
      97              : 
      98              : impl Summary {
      99        12147 :     pub(super) fn expected(
     100        12147 :         tenant_id: TenantId,
     101        12147 :         timeline_id: TimelineId,
     102        12147 :         keys: Range<Key>,
     103        12147 :         lsns: Range<Lsn>,
     104        12147 :     ) -> Self {
     105        12147 :         Self {
     106        12147 :             magic: DELTA_FILE_MAGIC,
     107        12147 :             format_version: STORAGE_FORMAT_VERSION,
     108        12147 : 
     109        12147 :             tenant_id,
     110        12147 :             timeline_id,
     111        12147 :             key_range: keys,
     112        12147 :             lsn_range: lsns,
     113        12147 : 
     114        12147 :             index_start_blk: 0,
     115        12147 :             index_root_blk: 0,
     116        12147 :         }
     117        12147 :     }
     118              : }
     119              : 
     120              : // Flag indicating that this version initialize the page
     121              : const WILL_INIT: u64 = 1;
     122              : 
     123              : /// Struct representing reference to BLOB in layers. Reference contains BLOB
     124              : /// offset, and for WAL records it also contains `will_init` flag. The flag
     125              : /// helps to determine the range of records that needs to be applied, without
     126              : /// reading/deserializing records themselves.
     127            0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
     128              : pub struct BlobRef(pub u64);
     129              : 
     130              : impl BlobRef {
     131     49126552 :     pub fn will_init(&self) -> bool {
     132     49126552 :         (self.0 & WILL_INIT) != 0
     133     49126552 :     }
     134              : 
     135     83723293 :     pub fn pos(&self) -> u64 {
     136     83723293 :         self.0 >> 1
     137     83723293 :     }
     138              : 
     139     52098927 :     pub fn new(pos: u64, will_init: bool) -> BlobRef {
     140     52098927 :         let mut blob_ref = pos << 1;
     141     52098927 :         if will_init {
     142      9289693 :             blob_ref |= WILL_INIT;
     143     42809234 :         }
     144     52098927 :         BlobRef(blob_ref)
     145     52098927 :     }
     146              : }
     147              : 
     148              : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
     149              : struct DeltaKey([u8; DELTA_KEY_SIZE]);
     150              : 
     151              : /// This is the key of the B-tree index stored in the delta layer. It consists
     152              : /// of the serialized representation of a Key and LSN.
     153              : impl DeltaKey {
     154     17304147 :     fn from_slice(buf: &[u8]) -> Self {
     155     17304147 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     156     17304147 :         bytes.copy_from_slice(buf);
     157     17304147 :         DeltaKey(bytes)
     158     17304147 :     }
     159              : 
     160     68493871 :     fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
     161     68493871 :         let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
     162     68493871 :         key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
     163     68493871 :         bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
     164     68493871 :         DeltaKey(bytes)
     165     68493871 :     }
     166              : 
     167     17304147 :     fn key(&self) -> Key {
     168     17304147 :         Key::from_slice(&self.0)
     169     17304147 :     }
     170              : 
     171     17304147 :     fn lsn(&self) -> Lsn {
     172     17304147 :         Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
     173     17304147 :     }
     174              : 
     175     49585494 :     fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
     176     49585494 :         let mut lsn_buf = [0u8; 8];
     177     49585494 :         lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
     178     49585494 :         Lsn(u64::from_be_bytes(lsn_buf))
     179     49585494 :     }
     180              : }
     181              : 
     182              : /// This is used only from `pagectl`. Within pageserver, all layers are
     183              : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
     184              : pub struct DeltaLayer {
     185              :     path: Utf8PathBuf,
     186              :     pub desc: PersistentLayerDesc,
     187              :     access_stats: LayerAccessStats,
     188              :     inner: OnceCell<Arc<DeltaLayerInner>>,
     189              : }
     190              : 
     191              : impl std::fmt::Debug for DeltaLayer {
     192            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     193            0 :         use super::RangeDisplayDebug;
     194            0 : 
     195            0 :         f.debug_struct("DeltaLayer")
     196            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     197            0 :             .field("lsn_range", &self.desc.lsn_range)
     198            0 :             .field("file_size", &self.desc.file_size)
     199            0 :             .field("inner", &self.inner)
     200            0 :             .finish()
     201            0 :     }
     202              : }
     203              : 
     204              : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
     205              : /// file.
     206              : pub struct DeltaLayerInner {
     207              :     // values copied from summary
     208              :     index_start_blk: u32,
     209              :     index_root_blk: u32,
     210              : 
     211              :     /// Reader object for reading blocks from the file.
     212              :     file: FileBlockReader,
     213              : }
     214              : 
     215              : impl std::fmt::Debug for DeltaLayerInner {
     216            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     217            0 :         f.debug_struct("DeltaLayerInner")
     218            0 :             .field("index_start_blk", &self.index_start_blk)
     219            0 :             .field("index_root_blk", &self.index_root_blk)
     220            0 :             .finish()
     221            0 :     }
     222              : }
     223              : 
     224              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     225              : impl std::fmt::Display for DeltaLayer {
     226            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     227            0 :         write!(f, "{}", self.layer_desc().short_id())
     228            0 :     }
     229              : }
     230              : 
     231              : impl AsLayerDesc for DeltaLayer {
     232            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     233            0 :         &self.desc
     234            0 :     }
     235              : }
     236              : 
     237              : impl DeltaLayer {
     238            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     239            0 :         self.desc.dump();
     240            0 : 
     241            0 :         if !verbose {
     242            0 :             return Ok(());
     243            0 :         }
     244              : 
     245            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     246              : 
     247            0 :         inner.dump(ctx).await
     248            0 :     }
     249              : 
     250        15895 :     fn temp_path_for(
     251        15895 :         conf: &PageServerConf,
     252        15895 :         tenant_shard_id: &TenantShardId,
     253        15895 :         timeline_id: &TimelineId,
     254        15895 :         key_start: Key,
     255        15895 :         lsn_range: &Range<Lsn>,
     256        15895 :     ) -> Utf8PathBuf {
     257        15895 :         let rand_string: String = rand::thread_rng()
     258        15895 :             .sample_iter(&Alphanumeric)
     259        15895 :             .take(8)
     260        15895 :             .map(char::from)
     261        15895 :             .collect();
     262        15895 : 
     263        15895 :         conf.timeline_path(tenant_shard_id, timeline_id)
     264        15895 :             .join(format!(
     265        15895 :                 "{}-XXX__{:016X}-{:016X}.{}.{}",
     266        15895 :                 key_start,
     267        15895 :                 u64::from(lsn_range.start),
     268        15895 :                 u64::from(lsn_range.end),
     269        15895 :                 rand_string,
     270        15895 :                 TEMP_FILE_SUFFIX,
     271        15895 :             ))
     272        15895 :     }
     273              : 
     274              :     ///
     275              :     /// Open the underlying file and read the metadata into memory, if it's
     276              :     /// not loaded already.
     277              :     ///
     278            0 :     async fn load(
     279            0 :         &self,
     280            0 :         access_kind: LayerAccessKind,
     281            0 :         ctx: &RequestContext,
     282            0 :     ) -> Result<&Arc<DeltaLayerInner>> {
     283            0 :         self.access_stats.record_access(access_kind, ctx);
     284            0 :         // Quick exit if already loaded
     285            0 :         self.inner
     286            0 :             .get_or_try_init(|| self.load_inner(ctx))
     287            0 :             .await
     288            0 :             .with_context(|| format!("Failed to load delta layer {}", self.path()))
     289            0 :     }
     290              : 
     291            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
     292            0 :         let path = self.path();
     293              : 
     294            0 :         let loaded = DeltaLayerInner::load(&path, None, ctx)
     295            0 :             .await
     296            0 :             .and_then(|res| res)?;
     297              : 
     298              :         // not production code
     299            0 :         let actual_filename = path.file_name().unwrap().to_owned();
     300            0 :         let expected_filename = self.layer_desc().filename().file_name();
     301            0 : 
     302            0 :         if actual_filename != expected_filename {
     303            0 :             println!("warning: filename does not match what is expected from in-file summary");
     304            0 :             println!("actual: {:?}", actual_filename);
     305            0 :             println!("expected: {:?}", expected_filename);
     306            0 :         }
     307              : 
     308            0 :         Ok(Arc::new(loaded))
     309            0 :     }
     310              : 
     311              :     /// Create a DeltaLayer struct representing an existing file on disk.
     312              :     ///
     313              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     314            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
     315            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     316            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     317            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     318              : 
     319            0 :         let metadata = file
     320            0 :             .metadata()
     321            0 :             .context("get file metadata to determine size")?;
     322              : 
     323              :         // This function is never used for constructing layers in a running pageserver,
     324              :         // so it does not need an accurate TenantShardId.
     325            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     326            0 : 
     327            0 :         Ok(DeltaLayer {
     328            0 :             path: path.to_path_buf(),
     329            0 :             desc: PersistentLayerDesc::new_delta(
     330            0 :                 tenant_shard_id,
     331            0 :                 summary.timeline_id,
     332            0 :                 summary.key_range,
     333            0 :                 summary.lsn_range,
     334            0 :                 metadata.len(),
     335            0 :             ),
     336            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     337            0 :             inner: OnceCell::new(),
     338            0 :         })
     339            0 :     }
     340              : 
     341              :     /// Path to the layer file in pageserver workdir.
     342            0 :     fn path(&self) -> Utf8PathBuf {
     343            0 :         self.path.clone()
     344            0 :     }
     345              : }
     346              : 
     347              : /// A builder object for constructing a new delta layer.
     348              : ///
     349              : /// Usage:
     350              : ///
     351              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     352              : ///
     353              : /// 2. Write the contents by calling `put_value` for every page
     354              : ///    version to store in the layer.
     355              : ///
     356              : /// 3. Call `finish`.
     357              : ///
     358              : struct DeltaLayerWriterInner {
     359              :     conf: &'static PageServerConf,
     360              :     pub path: Utf8PathBuf,
     361              :     timeline_id: TimelineId,
     362              :     tenant_shard_id: TenantShardId,
     363              : 
     364              :     key_start: Key,
     365              :     lsn_range: Range<Lsn>,
     366              : 
     367              :     tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
     368              : 
     369              :     blob_writer: BlobWriter<true>,
     370              : }
     371              : 
     372              : impl DeltaLayerWriterInner {
     373              :     ///
     374              :     /// Start building a new delta layer.
     375              :     ///
     376        15895 :     async fn new(
     377        15895 :         conf: &'static PageServerConf,
     378        15895 :         timeline_id: TimelineId,
     379        15895 :         tenant_shard_id: TenantShardId,
     380        15895 :         key_start: Key,
     381        15895 :         lsn_range: Range<Lsn>,
     382        15895 :     ) -> anyhow::Result<Self> {
     383        15895 :         // Create the file initially with a temporary filename. We don't know
     384        15895 :         // the end key yet, so we cannot form the final filename yet. We will
     385        15895 :         // rename it when we're done.
     386        15895 :         //
     387        15895 :         // Note: This overwrites any existing file. There shouldn't be any.
     388        15895 :         // FIXME: throw an error instead?
     389        15895 :         let path =
     390        15895 :             DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
     391              : 
     392        15895 :         let mut file = VirtualFile::create(&path).await?;
     393              :         // make room for the header block
     394        15895 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     395        15895 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     396        15895 : 
     397        15895 :         // Initialize the b-tree index builder
     398        15895 :         let block_buf = BlockBuf::new();
     399        15895 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     400        15895 : 
     401        15895 :         Ok(Self {
     402        15895 :             conf,
     403        15895 :             path,
     404        15895 :             timeline_id,
     405        15895 :             tenant_shard_id,
     406        15895 :             key_start,
     407        15895 :             lsn_range,
     408        15895 :             tree: tree_builder,
     409        15895 :             blob_writer,
     410        15895 :         })
     411        15895 :     }
     412              : 
     413              :     ///
     414              :     /// Append a key-value pair to the file.
     415              :     ///
     416              :     /// The values must be appended in key, lsn order.
     417              :     ///
     418     17292586 :     async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     419     17292586 :         let (_, res) = self
     420     17292586 :             .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
     421        20553 :             .await;
     422     17292586 :         res
     423     17292586 :     }
     424              : 
     425     52098927 :     async fn put_value_bytes(
     426     52098927 :         &mut self,
     427     52098927 :         key: Key,
     428     52098927 :         lsn: Lsn,
     429     52098927 :         val: Vec<u8>,
     430     52098927 :         will_init: bool,
     431     52098927 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     432     52098906 :         assert!(self.lsn_range.start <= lsn);
     433     52098906 :         let (val, res) = self.blob_writer.write_blob(val).await;
     434     52098905 :         let off = match res {
     435     52098905 :             Ok(off) => off,
     436            0 :             Err(e) => return (val, Err(anyhow::anyhow!(e))),
     437              :         };
     438              : 
     439     52098905 :         let blob_ref = BlobRef::new(off, will_init);
     440     52098905 : 
     441     52098905 :         let delta_key = DeltaKey::from_key_lsn(&key, lsn);
     442     52098905 :         let res = self.tree.append(&delta_key.0, blob_ref.0);
     443     52098905 :         (val, res.map_err(|e| anyhow::anyhow!(e)))
     444     52098905 :     }
     445              : 
     446      2280229 :     fn size(&self) -> u64 {
     447      2280229 :         self.blob_writer.size() + self.tree.borrow_writer().size()
     448      2280229 :     }
     449              : 
     450              :     ///
     451              :     /// Finish writing the delta layer.
     452              :     ///
     453        15893 :     async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
     454        15893 :         let index_start_blk =
     455        15893 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     456              : 
     457        15893 :         let mut file = self.blob_writer.into_inner().await?;
     458              : 
     459              :         // Write out the index
     460        15893 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     461        15893 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     462            0 :             .await?;
     463       130570 :         for buf in block_buf.blocks {
     464       114677 :             file.write_all(buf.as_ref()).await?;
     465              :         }
     466        15893 :         assert!(self.lsn_range.start < self.lsn_range.end);
     467              :         // Fill in the summary on blk 0
     468        15893 :         let summary = Summary {
     469        15893 :             magic: DELTA_FILE_MAGIC,
     470        15893 :             format_version: STORAGE_FORMAT_VERSION,
     471        15893 :             tenant_id: self.tenant_shard_id.tenant_id,
     472        15893 :             timeline_id: self.timeline_id,
     473        15893 :             key_range: self.key_start..key_end,
     474        15893 :             lsn_range: self.lsn_range.clone(),
     475        15893 :             index_start_blk,
     476        15893 :             index_root_blk,
     477        15893 :         };
     478        15893 : 
     479        15893 :         let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
     480        15893 :         Summary::ser_into(&summary, &mut buf)?;
     481        15893 :         if buf.spilled() {
     482              :             // This is bad as we only have one free block for the summary
     483            0 :             warn!(
     484            0 :                 "Used more than one page size for summary buffer: {}",
     485            0 :                 buf.len()
     486            0 :             );
     487        15893 :         }
     488        15893 :         file.seek(SeekFrom::Start(0)).await?;
     489        15893 :         file.write_all(&buf).await?;
     490              : 
     491        15893 :         let metadata = file
     492        15893 :             .metadata()
     493          225 :             .await
     494        15893 :             .context("get file metadata to determine size")?;
     495              : 
     496              :         // 5GB limit for objects without multipart upload (which we don't want to use)
     497              :         // Make it a little bit below to account for differing GB units
     498              :         // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
     499              :         const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
     500        15893 :         ensure!(
     501        15893 :             metadata.len() <= S3_UPLOAD_LIMIT,
     502            0 :             "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
     503            0 :             file.path,
     504            0 :             metadata.len()
     505              :         );
     506              : 
     507              :         // Note: Because we opened the file in write-only mode, we cannot
     508              :         // reuse the same VirtualFile for reading later. That's why we don't
     509              :         // set inner.file here. The first read will have to re-open it.
     510              : 
     511        15893 :         let desc = PersistentLayerDesc::new_delta(
     512        15893 :             self.tenant_shard_id,
     513        15893 :             self.timeline_id,
     514        15893 :             self.key_start..key_end,
     515        15893 :             self.lsn_range.clone(),
     516        15893 :             metadata.len(),
     517        15893 :         );
     518        15893 : 
     519        15893 :         // fsync the file
     520        15893 :         file.sync_all().await?;
     521              : 
     522        15893 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     523              : 
     524            0 :         trace!("created delta layer {}", layer.local_path());
     525              : 
     526        15893 :         Ok(layer)
     527        15893 :     }
     528              : }
     529              : 
     530              : /// A builder object for constructing a new delta layer.
     531              : ///
     532              : /// Usage:
     533              : ///
     534              : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
     535              : ///
     536              : /// 2. Write the contents by calling `put_value` for every page
     537              : ///    version to store in the layer.
     538              : ///
     539              : /// 3. Call `finish`.
     540              : ///
     541              : /// # Note
     542              : ///
     543              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     544              : /// possible for the writer to drop before `finish` is actually called. So this
     545              : /// could lead to odd temporary files in the directory, exhausting file system.
     546              : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
     547              : /// implementation that cleans up the temporary file in failure. It's not
     548              : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
     549              : /// out some fields, making it impossible to implement `Drop`.
     550              : ///
     551              : #[must_use]
     552              : pub struct DeltaLayerWriter {
     553              :     inner: Option<DeltaLayerWriterInner>,
     554              : }
     555              : 
     556              : impl DeltaLayerWriter {
     557              :     ///
     558              :     /// Start building a new delta layer.
     559              :     ///
     560        15895 :     pub async fn new(
     561        15895 :         conf: &'static PageServerConf,
     562        15895 :         timeline_id: TimelineId,
     563        15895 :         tenant_shard_id: TenantShardId,
     564        15895 :         key_start: Key,
     565        15895 :         lsn_range: Range<Lsn>,
     566        15895 :     ) -> anyhow::Result<Self> {
     567        15895 :         Ok(Self {
     568        15895 :             inner: Some(
     569        15895 :                 DeltaLayerWriterInner::new(
     570        15895 :                     conf,
     571        15895 :                     timeline_id,
     572        15895 :                     tenant_shard_id,
     573        15895 :                     key_start,
     574        15895 :                     lsn_range,
     575        15895 :                 )
     576          217 :                 .await?,
     577              :             ),
     578              :         })
     579        15895 :     }
     580              : 
     581              :     ///
     582              :     /// Append a key-value pair to the file.
     583              :     ///
     584              :     /// The values must be appended in key, lsn order.
     585              :     ///
     586     17292586 :     pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
     587     17292586 :         self.inner.as_mut().unwrap().put_value(key, lsn, val).await
     588     17292586 :     }
     589              : 
     590     34806341 :     pub async fn put_value_bytes(
     591     34806341 :         &mut self,
     592     34806341 :         key: Key,
     593     34806341 :         lsn: Lsn,
     594     34806341 :         val: Vec<u8>,
     595     34806341 :         will_init: bool,
     596     34806341 :     ) -> (Vec<u8>, anyhow::Result<()>) {
     597     34806320 :         self.inner
     598     34806320 :             .as_mut()
     599     34806320 :             .unwrap()
     600     34806320 :             .put_value_bytes(key, lsn, val, will_init)
     601        48280 :             .await
     602     34806320 :     }
     603              : 
     604      2280229 :     pub fn size(&self) -> u64 {
     605      2280229 :         self.inner.as_ref().unwrap().size()
     606      2280229 :     }
     607              : 
     608              :     ///
     609              :     /// Finish writing the delta layer.
     610              :     ///
     611        15893 :     pub(crate) async fn finish(
     612        15893 :         mut self,
     613        15893 :         key_end: Key,
     614        15893 :         timeline: &Arc<Timeline>,
     615        15893 :     ) -> anyhow::Result<ResidentLayer> {
     616        15893 :         let inner = self.inner.take().unwrap();
     617        15893 :         let temp_path = inner.path.clone();
     618        15893 :         let result = inner.finish(key_end, timeline).await;
     619              :         // The delta layer files can sometimes be really large. Clean them up.
     620        15893 :         if result.is_err() {
     621            0 :             tracing::warn!(
     622            0 :                 "Cleaning up temporary delta file {temp_path} after error during writing"
     623            0 :             );
     624            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     625            0 :                 tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
     626            0 :             }
     627        15893 :         }
     628        15893 :         result
     629        15893 :     }
     630              : }
     631              : 
     632              : impl Drop for DeltaLayerWriter {
     633        15893 :     fn drop(&mut self) {
     634        15893 :         if let Some(inner) = self.inner.take() {
     635            0 :             // We want to remove the virtual file here, so it's fine to not
     636            0 :             // having completely flushed unwritten data.
     637            0 :             let vfile = inner.blob_writer.into_inner_no_flush();
     638            0 :             vfile.remove();
     639        15893 :         }
     640        15893 :     }
     641              : }
     642              : 
     643            0 : #[derive(thiserror::Error, Debug)]
     644              : pub enum RewriteSummaryError {
     645              :     #[error("magic mismatch")]
     646              :     MagicMismatch,
     647              :     #[error(transparent)]
     648              :     Other(#[from] anyhow::Error),
     649              : }
     650              : 
     651              : impl From<std::io::Error> for RewriteSummaryError {
     652            0 :     fn from(e: std::io::Error) -> Self {
     653            0 :         Self::Other(anyhow::anyhow!(e))
     654            0 :     }
     655              : }
     656              : 
     657              : impl DeltaLayer {
     658            0 :     pub async fn rewrite_summary<F>(
     659            0 :         path: &Utf8Path,
     660            0 :         rewrite: F,
     661            0 :         ctx: &RequestContext,
     662            0 :     ) -> Result<(), RewriteSummaryError>
     663            0 :     where
     664            0 :         F: Fn(Summary) -> Summary,
     665            0 :     {
     666            0 :         let file = VirtualFile::open_with_options(
     667            0 :             path,
     668            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     669            0 :         )
     670            0 :         .await
     671            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     672            0 :         let file = FileBlockReader::new(file);
     673            0 :         let summary_blk = file.read_blk(0, ctx).await?;
     674            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     675            0 :         let mut file = file.file;
     676            0 :         if actual_summary.magic != DELTA_FILE_MAGIC {
     677            0 :             return Err(RewriteSummaryError::MagicMismatch);
     678            0 :         }
     679            0 : 
     680            0 :         let new_summary = rewrite(actual_summary);
     681            0 : 
     682            0 :         let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
     683            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     684            0 :         if buf.spilled() {
     685              :             // The code in DeltaLayerWriterInner just warn!()s for this.
     686              :             // It should probably error out as well.
     687            0 :             return Err(RewriteSummaryError::Other(anyhow::anyhow!(
     688            0 :                 "Used more than one page size for summary buffer: {}",
     689            0 :                 buf.len()
     690            0 :             )));
     691            0 :         }
     692            0 :         file.seek(SeekFrom::Start(0)).await?;
     693            0 :         file.write_all(&buf).await?;
     694            0 :         Ok(())
     695            0 :     }
     696              : }
     697              : 
     698              : impl DeltaLayerInner {
     699              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     700              :     /// - inner has the success or transient failure
     701              :     /// - outer has the permanent failure
     702        12147 :     pub(super) async fn load(
     703        12147 :         path: &Utf8Path,
     704        12147 :         summary: Option<Summary>,
     705        12147 :         ctx: &RequestContext,
     706        12147 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     707        12147 :         let file = match VirtualFile::open(path).await {
     708        12147 :             Ok(file) => file,
     709            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     710              :         };
     711        12147 :         let file = FileBlockReader::new(file);
     712              : 
     713        12147 :         let summary_blk = match file.read_blk(0, ctx).await {
     714        12147 :             Ok(blk) => blk,
     715            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     716              :         };
     717              : 
     718              :         // TODO: this should be an assertion instead; see ImageLayerInner::load
     719        12147 :         let actual_summary =
     720        12147 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     721              : 
     722        12147 :         if let Some(mut expected_summary) = summary {
     723              :             // production code path
     724        12147 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     725        12147 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     726        12147 :             if actual_summary != expected_summary {
     727            1 :                 bail!(
     728            1 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     729            1 :                     actual_summary,
     730            1 :                     expected_summary
     731            1 :                 );
     732        12146 :             }
     733            0 :         }
     734              : 
     735        12146 :         Ok(Ok(DeltaLayerInner {
     736        12146 :             file,
     737        12146 :             index_start_blk: actual_summary.index_start_blk,
     738        12146 :             index_root_blk: actual_summary.index_root_blk,
     739        12146 :         }))
     740        12147 :     }
     741              : 
     742     16394944 :     pub(super) async fn get_value_reconstruct_data(
     743     16394944 :         &self,
     744     16394944 :         key: Key,
     745     16394944 :         lsn_range: Range<Lsn>,
     746     16394944 :         reconstruct_state: &mut ValueReconstructState,
     747     16394944 :         ctx: &RequestContext,
     748     16394944 :     ) -> anyhow::Result<ValueReconstructResult> {
     749     16394915 :         let mut need_image = true;
     750     16394915 :         // Scan the page versions backwards, starting from `lsn`.
     751     16394915 :         let file = &self.file;
     752     16394915 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     753     16394915 :             self.index_start_blk,
     754     16394915 :             self.index_root_blk,
     755     16394915 :             file,
     756     16394915 :         );
     757     16394915 :         let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
     758     16394915 : 
     759     16394915 :         let mut offsets: Vec<(Lsn, u64)> = Vec::new();
     760     16394915 : 
     761     16394915 :         tree_reader
     762     16394915 :             .visit(
     763     16394915 :                 &search_key.0,
     764     16394915 :                 VisitDirection::Backwards,
     765     52241249 :                 |key, value| {
     766     52241249 :                     let blob_ref = BlobRef(value);
     767     52241249 :                     if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
     768      2656889 :                         return false;
     769     49584360 :                     }
     770     49584360 :                     let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
     771     49584360 :                     if entry_lsn < lsn_range.start {
     772       458942 :                         return false;
     773     49125417 :                     }
     774     49125417 :                     offsets.push((entry_lsn, blob_ref.pos()));
     775     49125417 : 
     776     49125417 :                     !blob_ref.will_init()
     777     52241248 :                 },
     778     16394915 :                 &RequestContextBuilder::extend(ctx)
     779     16394915 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     780     16394915 :                     .build(),
     781     16394915 :             )
     782       211694 :             .await?;
     783              : 
     784     16394914 :         let ctx = &RequestContextBuilder::extend(ctx)
     785     16394914 :             .page_content_kind(PageContentKind::DeltaLayerValue)
     786     16394914 :             .build();
     787     16394914 : 
     788     16394914 :         // Ok, 'offsets' now contains the offsets of all the entries we need to read
     789     16394914 :         let cursor = file.block_cursor();
     790     16394914 :         let mut buf = Vec::new();
     791     62485233 :         for (entry_lsn, pos) in offsets {
     792     49121438 :             cursor
     793     49121438 :                 .read_blob_into_buf(pos, &mut buf, ctx)
     794       746117 :                 .await
     795     49121436 :                 .with_context(|| {
     796            0 :                     format!("Failed to read blob from virtual file {}", file.file.path)
     797     49121436 :                 })?;
     798     49121436 :             let val = Value::des(&buf).with_context(|| {
     799            0 :                 format!(
     800            0 :                     "Failed to deserialize file blob from virtual file {}",
     801            0 :                     file.file.path
     802            0 :                 )
     803     49121436 :             })?;
     804     49121436 :             match val {
     805      1740816 :                 Value::Image(img) => {
     806      1740816 :                     reconstruct_state.img = Some((entry_lsn, img));
     807      1740816 :                     need_image = false;
     808      1740816 :                     break;
     809              :                 }
     810     47380620 :                 Value::WalRecord(rec) => {
     811     47380620 :                     let will_init = rec.will_init();
     812     47380620 :                     reconstruct_state.records.push((entry_lsn, rec));
     813     47380620 :                     if will_init {
     814              :                         // This WAL record initializes the page, so no need to go further back
     815      1290301 :                         need_image = false;
     816      1290301 :                         break;
     817     46090319 :                     }
     818              :                 }
     819              :             }
     820              :         }
     821              : 
     822              :         // If an older page image is needed to reconstruct the page, let the
     823              :         // caller know.
     824     16394912 :         if need_image {
     825     13363795 :             Ok(ValueReconstructResult::Continue)
     826              :         } else {
     827      3031116 :             Ok(ValueReconstructResult::Complete)
     828              :         }
     829     16394911 :     }
     830              : 
     831         4152 :     pub(super) async fn load_keys<'a>(
     832         4152 :         &'a self,
     833         4152 :         ctx: &RequestContext,
     834         4152 :     ) -> Result<Vec<DeltaEntry<'a>>> {
     835         4152 :         let file = &self.file;
     836         4152 : 
     837         4152 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     838         4152 :             self.index_start_blk,
     839         4152 :             self.index_root_blk,
     840         4152 :             file,
     841         4152 :         );
     842         4152 : 
     843         4152 :         let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
     844         4152 : 
     845         4152 :         tree_reader
     846         4152 :             .visit(
     847         4152 :                 &[0u8; DELTA_KEY_SIZE],
     848         4152 :                 VisitDirection::Forwards,
     849     17304147 :                 |key, value| {
     850     17304147 :                     let delta_key = DeltaKey::from_slice(key);
     851     17304147 :                     let val_ref = ValueRef {
     852     17304147 :                         blob_ref: BlobRef(value),
     853     17304147 :                         reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
     854     17304147 :                             Adapter(self),
     855     17304147 :                         )),
     856     17304147 :                     };
     857     17304147 :                     let pos = BlobRef(value).pos();
     858     17304147 :                     if let Some(last) = all_keys.last_mut() {
     859     17299995 :                         // subtract offset of the current and last entries to get the size
     860     17299995 :                         // of the value associated with this (key, lsn) tuple
     861     17299995 :                         let first_pos = last.size;
     862     17299995 :                         last.size = pos - first_pos;
     863     17299995 :                     }
     864     17304147 :                     let entry = DeltaEntry {
     865     17304147 :                         key: delta_key.key(),
     866     17304147 :                         lsn: delta_key.lsn(),
     867     17304147 :                         size: pos,
     868     17304147 :                         val: val_ref,
     869     17304147 :                     };
     870     17304147 :                     all_keys.push(entry);
     871     17304147 :                     true
     872     17304147 :                 },
     873         4152 :                 &RequestContextBuilder::extend(ctx)
     874         4152 :                     .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
     875         4152 :                     .build(),
     876         4152 :             )
     877         2304 :             .await?;
     878         4152 :         if let Some(last) = all_keys.last_mut() {
     879         4152 :             // Last key occupies all space till end of value storage,
     880         4152 :             // which corresponds to beginning of the index
     881         4152 :             last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
     882         4152 :         }
     883         4152 :         Ok(all_keys)
     884         4152 :     }
     885              : 
     886            4 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     887            4 :         println!(
     888            4 :             "index_start_blk: {}, root {}",
     889            4 :             self.index_start_blk, self.index_root_blk
     890            4 :         );
     891            4 : 
     892            4 :         let file = &self.file;
     893            4 :         let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
     894            4 :             self.index_start_blk,
     895            4 :             self.index_root_blk,
     896            4 :             file,
     897            4 :         );
     898            4 : 
     899            4 :         tree_reader.dump().await?;
     900              : 
     901            4 :         let keys = self.load_keys(ctx).await?;
     902              : 
     903            8 :         async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
     904            8 :             let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
     905            8 :             let val = Value::des(&buf)?;
     906            8 :             let desc = match val {
     907            8 :                 Value::Image(img) => {
     908            8 :                     format!(" img {} bytes", img.len())
     909              :                 }
     910            0 :                 Value::WalRecord(rec) => {
     911            0 :                     let wal_desc = walrecord::describe_wal_record(&rec)?;
     912            0 :                     format!(
     913            0 :                         " rec {} bytes will_init: {} {}",
     914            0 :                         buf.len(),
     915            0 :                         rec.will_init(),
     916            0 :                         wal_desc
     917            0 :                     )
     918              :                 }
     919              :             };
     920            8 :             Ok(desc)
     921            8 :         }
     922              : 
     923           12 :         for entry in keys {
     924            8 :             let DeltaEntry { key, lsn, val, .. } = entry;
     925            8 :             let desc = match dump_blob(&val, ctx).await {
     926            8 :                 Ok(desc) => desc,
     927            0 :                 Err(err) => {
     928            0 :                     format!("ERROR: {err}")
     929              :                 }
     930              :             };
     931            8 :             println!("  key {key} at {lsn}: {desc}");
     932            8 : 
     933            8 :             // Print more details about CHECKPOINT records. Would be nice to print details
     934            8 :             // of many other record types too, but these are particularly interesting, as
     935            8 :             // have a lot of special processing for them in walingest.rs.
     936            8 :             use pageserver_api::key::CHECKPOINT_KEY;
     937            8 :             use postgres_ffi::CheckPoint;
     938            8 :             if key == CHECKPOINT_KEY {
     939            0 :                 let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
     940            0 :                 let val = Value::des(&buf)?;
     941            0 :                 match val {
     942            0 :                     Value::Image(img) => {
     943            0 :                         let checkpoint = CheckPoint::decode(&img)?;
     944            0 :                         println!("   CHECKPOINT: {:?}", checkpoint);
     945              :                     }
     946            0 :                     Value::WalRecord(_rec) => {
     947            0 :                         println!("   unexpected walrecord value for checkpoint key");
     948            0 :                     }
     949              :                 }
     950            8 :             }
     951              :         }
     952              : 
     953            4 :         Ok(())
     954            4 :     }
     955              : }
     956              : 
     957              : /// A set of data associated with a delta layer key and its value
     958              : pub struct DeltaEntry<'a> {
     959              :     pub key: Key,
     960              :     pub lsn: Lsn,
     961              :     /// Size of the stored value
     962              :     pub size: u64,
     963              :     /// Reference to the on-disk value
     964              :     pub val: ValueRef<'a>,
     965              : }
     966              : 
     967              : /// Reference to an on-disk value
     968              : pub struct ValueRef<'a> {
     969              :     blob_ref: BlobRef,
     970              :     reader: BlockCursor<'a>,
     971              : }
     972              : 
     973              : impl<'a> ValueRef<'a> {
     974              :     /// Loads the value from disk
     975     17292586 :     pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
     976              :         // theoretically we *could* record an access time for each, but it does not really matter
     977     17292586 :         let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
     978     17292586 :         let val = Value::des(&buf)?;
     979     17292586 :         Ok(val)
     980     17292586 :     }
     981              : }
     982              : 
     983              : pub(crate) struct Adapter<T>(T);
     984              : 
     985              : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
     986     18616583 :     pub(crate) async fn read_blk(
     987     18616583 :         &self,
     988     18616583 :         blknum: u32,
     989     18616583 :         ctx: &RequestContext,
     990     18616583 :     ) -> Result<BlockLease, std::io::Error> {
     991     18616583 :         self.0.as_ref().file.read_blk(blknum, ctx).await
     992     18616583 :     }
     993              : }
     994              : 
     995              : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
     996     18616583 :     fn as_ref(&self) -> &DeltaLayerInner {
     997     18616583 :         self
     998     18616583 :     }
     999              : }
        

Generated by: LCOV version 2.1-beta