LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 52.2 % 1244 649
Test Date: 2024-05-10 13:18:37 Functions: 38.4 % 177 68

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::metrics::WAL_INGEST;
      13              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use crate::{aux_file, repository::*};
      16              : use anyhow::{ensure, Context};
      17              : use bytes::{Buf, Bytes, BytesMut};
      18              : use enum_map::Enum;
      19              : use itertools::Itertools;
      20              : use pageserver_api::key::{
      21              :     dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
      22              :     rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      23              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      24              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      25              : };
      26              : use pageserver_api::keyspace::SparseKeySpace;
      27              : use pageserver_api::models::AuxFilePolicy;
      28              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      29              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      30              : use postgres_ffi::BLCKSZ;
      31              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      32              : use serde::{Deserialize, Serialize};
      33              : use std::collections::{hash_map, HashMap, HashSet};
      34              : use std::ops::ControlFlow;
      35              : use std::ops::Range;
      36              : use strum::IntoEnumIterator;
      37              : use tokio_util::sync::CancellationToken;
      38              : use tracing::{debug, trace, warn};
      39              : use utils::bin_ser::DeserializeError;
      40              : use utils::vec_map::{VecMap, VecMapOrdering};
      41              : use utils::{bin_ser::BeSer, lsn::Lsn};
      42              : 
      43              : const MAX_AUX_FILE_DELTAS: usize = 1024;
      44              : 
      45              : #[derive(Debug)]
      46              : pub enum LsnForTimestamp {
      47              :     /// Found commits both before and after the given timestamp
      48              :     Present(Lsn),
      49              : 
      50              :     /// Found no commits after the given timestamp, this means
      51              :     /// that the newest data in the branch is older than the given
      52              :     /// timestamp.
      53              :     ///
      54              :     /// All commits <= LSN happened before the given timestamp
      55              :     Future(Lsn),
      56              : 
      57              :     /// The queried timestamp is past our horizon we look back at (PITR)
      58              :     ///
      59              :     /// All commits > LSN happened after the given timestamp,
      60              :     /// but any commits < LSN might have happened before or after
      61              :     /// the given timestamp. We don't know because no data before
      62              :     /// the given lsn is available.
      63              :     Past(Lsn),
      64              : 
      65              :     /// We have found no commit with a timestamp,
      66              :     /// so we can't return anything meaningful.
      67              :     ///
      68              :     /// The associated LSN is the lower bound value we can safely
      69              :     /// create branches on, but no statement is made if it is
      70              :     /// older or newer than the timestamp.
      71              :     ///
      72              :     /// This variant can e.g. be returned right after a
      73              :     /// cluster import.
      74              :     NoData(Lsn),
      75              : }
      76              : 
      77            0 : #[derive(Debug, thiserror::Error)]
      78              : pub enum CalculateLogicalSizeError {
      79              :     #[error("cancelled")]
      80              :     Cancelled,
      81              :     #[error(transparent)]
      82              :     Other(#[from] anyhow::Error),
      83              : }
      84              : 
      85            0 : #[derive(Debug, thiserror::Error)]
      86              : pub(crate) enum CollectKeySpaceError {
      87              :     #[error(transparent)]
      88              :     Decode(#[from] DeserializeError),
      89              :     #[error(transparent)]
      90              :     PageRead(PageReconstructError),
      91              :     #[error("cancelled")]
      92              :     Cancelled,
      93              : }
      94              : 
      95              : impl From<PageReconstructError> for CollectKeySpaceError {
      96            0 :     fn from(err: PageReconstructError) -> Self {
      97            0 :         match err {
      98            0 :             PageReconstructError::Cancelled => Self::Cancelled,
      99            0 :             err => Self::PageRead(err),
     100              :         }
     101            0 :     }
     102              : }
     103              : 
     104              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     105            0 :     fn from(pre: PageReconstructError) -> Self {
     106            0 :         match pre {
     107              :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
     108            0 :                 Self::Cancelled
     109              :             }
     110            0 :             _ => Self::Other(pre.into()),
     111              :         }
     112            0 :     }
     113              : }
     114              : 
     115            0 : #[derive(Debug, thiserror::Error)]
     116              : pub enum RelationError {
     117              :     #[error("Relation Already Exists")]
     118              :     AlreadyExists,
     119              :     #[error("invalid relnode")]
     120              :     InvalidRelnode,
     121              :     #[error(transparent)]
     122              :     Other(#[from] anyhow::Error),
     123              : }
     124              : 
     125              : ///
     126              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     127              : /// and other special kinds of files, in a versioned key-value store. The
     128              : /// Timeline struct provides the key-value store.
     129              : ///
     130              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     131              : /// implementation, and might be moved into a separate struct later.
     132              : impl Timeline {
     133              :     /// Start ingesting a WAL record, or other atomic modification of
     134              :     /// the timeline.
     135              :     ///
     136              :     /// This provides a transaction-like interface to perform a bunch
     137              :     /// of modifications atomically.
     138              :     ///
     139              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     140              :     /// DatadirModification object. Use the functions in the object to
     141              :     /// modify the repository state, updating all the pages and metadata
     142              :     /// that the WAL record affects. When you're done, call commit() to
     143              :     /// commit the changes.
     144              :     ///
     145              :     /// Lsn stored in modification is advanced by `ingest_record` and
     146              :     /// is used by `commit()` to update `last_record_lsn`.
     147              :     ///
     148              :     /// Calling commit() will flush all the changes and reset the state,
     149              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     150              :     ///
     151              :     /// Note that any pending modifications you make through the
     152              :     /// modification object won't be visible to calls to the 'get' and list
     153              :     /// functions of the timeline until you finish! And if you update the
     154              :     /// same page twice, the last update wins.
     155              :     ///
     156       268302 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     157       268302 :     where
     158       268302 :         Self: Sized,
     159       268302 :     {
     160       268302 :         DatadirModification {
     161       268302 :             tline: self,
     162       268302 :             pending_lsns: Vec::new(),
     163       268302 :             pending_updates: HashMap::new(),
     164       268302 :             pending_deletions: Vec::new(),
     165       268302 :             pending_nblocks: 0,
     166       268302 :             pending_directory_entries: Vec::new(),
     167       268302 :             lsn,
     168       268302 :         }
     169       268302 :     }
     170              : 
     171              :     //------------------------------------------------------------------------------
     172              :     // Public GET functions
     173              :     //------------------------------------------------------------------------------
     174              : 
     175              :     /// Look up given page version.
     176        18384 :     pub(crate) async fn get_rel_page_at_lsn(
     177        18384 :         &self,
     178        18384 :         tag: RelTag,
     179        18384 :         blknum: BlockNumber,
     180        18384 :         version: Version<'_>,
     181        18384 :         ctx: &RequestContext,
     182        18384 :     ) -> Result<Bytes, PageReconstructError> {
     183        18384 :         if tag.relnode == 0 {
     184            0 :             return Err(PageReconstructError::Other(
     185            0 :                 RelationError::InvalidRelnode.into(),
     186            0 :             ));
     187        18384 :         }
     188              : 
     189        18384 :         let nblocks = self.get_rel_size(tag, version, ctx).await?;
     190        18384 :         if blknum >= nblocks {
     191            0 :             debug!(
     192            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     193            0 :                 tag,
     194            0 :                 blknum,
     195            0 :                 version.get_lsn(),
     196              :                 nblocks
     197              :             );
     198            0 :             return Ok(ZERO_PAGE.clone());
     199        18384 :         }
     200        18384 : 
     201        18384 :         let key = rel_block_to_key(tag, blknum);
     202        18384 :         version.get(self, key, ctx).await
     203        18384 :     }
     204              : 
     205              :     // Get size of a database in blocks
     206            0 :     pub(crate) async fn get_db_size(
     207            0 :         &self,
     208            0 :         spcnode: Oid,
     209            0 :         dbnode: Oid,
     210            0 :         version: Version<'_>,
     211            0 :         ctx: &RequestContext,
     212            0 :     ) -> Result<usize, PageReconstructError> {
     213            0 :         let mut total_blocks = 0;
     214              : 
     215            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     216              : 
     217            0 :         for rel in rels {
     218            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     219            0 :             total_blocks += n_blocks as usize;
     220              :         }
     221            0 :         Ok(total_blocks)
     222            0 :     }
     223              : 
     224              :     /// Get size of a relation file
     225        24434 :     pub(crate) async fn get_rel_size(
     226        24434 :         &self,
     227        24434 :         tag: RelTag,
     228        24434 :         version: Version<'_>,
     229        24434 :         ctx: &RequestContext,
     230        24434 :     ) -> Result<BlockNumber, PageReconstructError> {
     231        24434 :         if tag.relnode == 0 {
     232            0 :             return Err(PageReconstructError::Other(
     233            0 :                 RelationError::InvalidRelnode.into(),
     234            0 :             ));
     235        24434 :         }
     236              : 
     237        24434 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     238        19294 :             return Ok(nblocks);
     239         5140 :         }
     240         5140 : 
     241         5140 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     242            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     243              :         {
     244              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     245              :             // FSM, and smgrnblocks() on it immediately afterwards,
     246              :             // without extending it.  Tolerate that by claiming that
     247              :             // any non-existent FSM fork has size 0.
     248            0 :             return Ok(0);
     249         5140 :         }
     250         5140 : 
     251         5140 :         let key = rel_size_to_key(tag);
     252         5140 :         let mut buf = version.get(self, key, ctx).await?;
     253         5136 :         let nblocks = buf.get_u32_le();
     254         5136 : 
     255         5136 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     256         5136 : 
     257         5136 :         Ok(nblocks)
     258        24434 :     }
     259              : 
     260              :     /// Does relation exist?
     261         6050 :     pub(crate) async fn get_rel_exists(
     262         6050 :         &self,
     263         6050 :         tag: RelTag,
     264         6050 :         version: Version<'_>,
     265         6050 :         ctx: &RequestContext,
     266         6050 :     ) -> Result<bool, PageReconstructError> {
     267         6050 :         if tag.relnode == 0 {
     268            0 :             return Err(PageReconstructError::Other(
     269            0 :                 RelationError::InvalidRelnode.into(),
     270            0 :             ));
     271         6050 :         }
     272              : 
     273              :         // first try to lookup relation in cache
     274         6050 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     275         6032 :             return Ok(true);
     276           18 :         }
     277           18 :         // fetch directory listing
     278           18 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     279           18 :         let buf = version.get(self, key, ctx).await?;
     280              : 
     281           18 :         match RelDirectory::des(&buf).context("deserialization failure") {
     282           18 :             Ok(dir) => {
     283           18 :                 let exists = dir.rels.contains(&(tag.relnode, tag.forknum));
     284           18 :                 Ok(exists)
     285              :             }
     286            0 :             Err(e) => Err(PageReconstructError::from(e)),
     287              :         }
     288         6050 :     }
     289              : 
     290              :     /// Get a list of all existing relations in given tablespace and database.
     291              :     ///
     292              :     /// # Cancel-Safety
     293              :     ///
     294              :     /// This method is cancellation-safe.
     295            0 :     pub(crate) async fn list_rels(
     296            0 :         &self,
     297            0 :         spcnode: Oid,
     298            0 :         dbnode: Oid,
     299            0 :         version: Version<'_>,
     300            0 :         ctx: &RequestContext,
     301            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     302            0 :         // fetch directory listing
     303            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     304            0 :         let buf = version.get(self, key, ctx).await?;
     305              : 
     306            0 :         match RelDirectory::des(&buf).context("deserialization failure") {
     307            0 :             Ok(dir) => {
     308            0 :                 let rels: HashSet<RelTag> =
     309            0 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     310            0 :                         spcnode,
     311            0 :                         dbnode,
     312            0 :                         relnode: *relnode,
     313            0 :                         forknum: *forknum,
     314            0 :                     }));
     315            0 : 
     316            0 :                 Ok(rels)
     317              :             }
     318            0 :             Err(e) => Err(PageReconstructError::from(e)),
     319              :         }
     320            0 :     }
     321              : 
     322              :     /// Get the whole SLRU segment
     323            0 :     pub(crate) async fn get_slru_segment(
     324            0 :         &self,
     325            0 :         kind: SlruKind,
     326            0 :         segno: u32,
     327            0 :         lsn: Lsn,
     328            0 :         ctx: &RequestContext,
     329            0 :     ) -> Result<Bytes, PageReconstructError> {
     330            0 :         let n_blocks = self
     331            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     332            0 :             .await?;
     333            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     334            0 :         for blkno in 0..n_blocks {
     335            0 :             let block = self
     336            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     337            0 :                 .await?;
     338            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     339              :         }
     340            0 :         Ok(segment.freeze())
     341            0 :     }
     342              : 
     343              :     /// Look up given SLRU page version.
     344            0 :     pub(crate) async fn get_slru_page_at_lsn(
     345            0 :         &self,
     346            0 :         kind: SlruKind,
     347            0 :         segno: u32,
     348            0 :         blknum: BlockNumber,
     349            0 :         lsn: Lsn,
     350            0 :         ctx: &RequestContext,
     351            0 :     ) -> Result<Bytes, PageReconstructError> {
     352            0 :         let key = slru_block_to_key(kind, segno, blknum);
     353            0 :         self.get(key, lsn, ctx).await
     354            0 :     }
     355              : 
     356              :     /// Get size of an SLRU segment
     357            0 :     pub(crate) async fn get_slru_segment_size(
     358            0 :         &self,
     359            0 :         kind: SlruKind,
     360            0 :         segno: u32,
     361            0 :         version: Version<'_>,
     362            0 :         ctx: &RequestContext,
     363            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     364            0 :         let key = slru_segment_size_to_key(kind, segno);
     365            0 :         let mut buf = version.get(self, key, ctx).await?;
     366            0 :         Ok(buf.get_u32_le())
     367            0 :     }
     368              : 
     369              :     /// Get size of an SLRU segment
     370            0 :     pub(crate) async fn get_slru_segment_exists(
     371            0 :         &self,
     372            0 :         kind: SlruKind,
     373            0 :         segno: u32,
     374            0 :         version: Version<'_>,
     375            0 :         ctx: &RequestContext,
     376            0 :     ) -> Result<bool, PageReconstructError> {
     377            0 :         // fetch directory listing
     378            0 :         let key = slru_dir_to_key(kind);
     379            0 :         let buf = version.get(self, key, ctx).await?;
     380              : 
     381            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     382            0 :             Ok(dir) => {
     383            0 :                 let exists = dir.segments.contains(&segno);
     384            0 :                 Ok(exists)
     385              :             }
     386            0 :             Err(e) => Err(PageReconstructError::from(e)),
     387              :         }
     388            0 :     }
     389              : 
     390              :     /// Locate LSN, such that all transactions that committed before
     391              :     /// 'search_timestamp' are visible, but nothing newer is.
     392              :     ///
     393              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     394              :     /// so it's not well defined which LSN you get if there were multiple commits
     395              :     /// "in flight" at that point in time.
     396              :     ///
     397            0 :     pub(crate) async fn find_lsn_for_timestamp(
     398            0 :         &self,
     399            0 :         search_timestamp: TimestampTz,
     400            0 :         cancel: &CancellationToken,
     401            0 :         ctx: &RequestContext,
     402            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     403            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     404            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     405            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     406            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     407            0 :         // on the safe side.
     408            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     409            0 :         let max_lsn = self.get_last_record_lsn();
     410            0 : 
     411            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     412            0 :         // LSN divided by 8.
     413            0 :         let mut low = min_lsn.0 / 8;
     414            0 :         let mut high = max_lsn.0 / 8 + 1;
     415            0 : 
     416            0 :         let mut found_smaller = false;
     417            0 :         let mut found_larger = false;
     418            0 :         while low < high {
     419            0 :             if cancel.is_cancelled() {
     420            0 :                 return Err(PageReconstructError::Cancelled);
     421            0 :             }
     422            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     423            0 :             let mid = (high + low) / 2;
     424              : 
     425            0 :             let cmp = self
     426            0 :                 .is_latest_commit_timestamp_ge_than(
     427            0 :                     search_timestamp,
     428            0 :                     Lsn(mid * 8),
     429            0 :                     &mut found_smaller,
     430            0 :                     &mut found_larger,
     431            0 :                     ctx,
     432            0 :                 )
     433            0 :                 .await?;
     434              : 
     435            0 :             if cmp {
     436            0 :                 high = mid;
     437            0 :             } else {
     438            0 :                 low = mid + 1;
     439            0 :             }
     440              :         }
     441              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     442              :         // so the LSN of the last commit record before or at `search_timestamp`.
     443              :         // Remove one from `low` to get `t`.
     444              :         //
     445              :         // FIXME: it would be better to get the LSN of the previous commit.
     446              :         // Otherwise, if you restore to the returned LSN, the database will
     447              :         // include physical changes from later commits that will be marked
     448              :         // as aborted, and will need to be vacuumed away.
     449            0 :         let commit_lsn = Lsn((low - 1) * 8);
     450            0 :         match (found_smaller, found_larger) {
     451              :             (false, false) => {
     452              :                 // This can happen if no commit records have been processed yet, e.g.
     453              :                 // just after importing a cluster.
     454            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     455              :             }
     456              :             (false, true) => {
     457              :                 // Didn't find any commit timestamps smaller than the request
     458            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     459              :             }
     460            0 :             (true, _) if commit_lsn < min_lsn => {
     461            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     462            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     463            0 :                 // below the min_lsn. We should never do that.
     464            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     465              :             }
     466              :             (true, false) => {
     467              :                 // Only found commits with timestamps smaller than the request.
     468              :                 // It's still a valid case for branch creation, return it.
     469              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     470              :                 // case, anyway.
     471            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     472              :             }
     473            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     474              :         }
     475            0 :     }
     476              : 
     477              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     478              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     479              :     ///
     480              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     481              :     /// with a smaller/larger timestamp.
     482              :     ///
     483            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     484            0 :         &self,
     485            0 :         search_timestamp: TimestampTz,
     486            0 :         probe_lsn: Lsn,
     487            0 :         found_smaller: &mut bool,
     488            0 :         found_larger: &mut bool,
     489            0 :         ctx: &RequestContext,
     490            0 :     ) -> Result<bool, PageReconstructError> {
     491            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     492            0 :             if timestamp >= search_timestamp {
     493            0 :                 *found_larger = true;
     494            0 :                 return ControlFlow::Break(true);
     495            0 :             } else {
     496            0 :                 *found_smaller = true;
     497            0 :             }
     498            0 :             ControlFlow::Continue(())
     499            0 :         })
     500            0 :         .await
     501            0 :     }
     502              : 
     503              :     /// Obtain the possible timestamp range for the given lsn.
     504              :     ///
     505              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     506            0 :     pub(crate) async fn get_timestamp_for_lsn(
     507            0 :         &self,
     508            0 :         probe_lsn: Lsn,
     509            0 :         ctx: &RequestContext,
     510            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     511            0 :         let mut max: Option<TimestampTz> = None;
     512            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     513            0 :             if let Some(max_prev) = max {
     514            0 :                 max = Some(max_prev.max(timestamp));
     515            0 :             } else {
     516            0 :                 max = Some(timestamp);
     517            0 :             }
     518            0 :             ControlFlow::Continue(())
     519            0 :         })
     520            0 :         .await?;
     521              : 
     522            0 :         Ok(max)
     523            0 :     }
     524              : 
     525              :     /// Runs the given function on all the timestamps for a given lsn
     526              :     ///
     527              :     /// The return value is either given by the closure, or set to the `Default`
     528              :     /// impl's output.
     529            0 :     async fn map_all_timestamps<T: Default>(
     530            0 :         &self,
     531            0 :         probe_lsn: Lsn,
     532            0 :         ctx: &RequestContext,
     533            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     534            0 :     ) -> Result<T, PageReconstructError> {
     535            0 :         for segno in self
     536            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     537            0 :             .await?
     538              :         {
     539            0 :             let nblocks = self
     540            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     541            0 :                 .await?;
     542            0 :             for blknum in (0..nblocks).rev() {
     543            0 :                 let clog_page = self
     544            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     545            0 :                     .await?;
     546              : 
     547            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     548            0 :                     let mut timestamp_bytes = [0u8; 8];
     549            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     550            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     551            0 : 
     552            0 :                     match f(timestamp) {
     553            0 :                         ControlFlow::Break(b) => return Ok(b),
     554            0 :                         ControlFlow::Continue(()) => (),
     555              :                     }
     556            0 :                 }
     557              :             }
     558              :         }
     559            0 :         Ok(Default::default())
     560            0 :     }
     561              : 
     562            0 :     pub(crate) async fn get_slru_keyspace(
     563            0 :         &self,
     564            0 :         version: Version<'_>,
     565            0 :         ctx: &RequestContext,
     566            0 :     ) -> Result<KeySpace, PageReconstructError> {
     567            0 :         let mut accum = KeySpaceAccum::new();
     568              : 
     569            0 :         for kind in SlruKind::iter() {
     570            0 :             let mut segments: Vec<u32> = self
     571            0 :                 .list_slru_segments(kind, version, ctx)
     572            0 :                 .await?
     573            0 :                 .into_iter()
     574            0 :                 .collect();
     575            0 :             segments.sort_unstable();
     576              : 
     577            0 :             for seg in segments {
     578            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     579              : 
     580            0 :                 accum.add_range(
     581            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     582            0 :                 );
     583              :             }
     584              :         }
     585              : 
     586            0 :         Ok(accum.to_keyspace())
     587            0 :     }
     588              : 
     589              :     /// Get a list of SLRU segments
     590            0 :     pub(crate) async fn list_slru_segments(
     591            0 :         &self,
     592            0 :         kind: SlruKind,
     593            0 :         version: Version<'_>,
     594            0 :         ctx: &RequestContext,
     595            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     596            0 :         // fetch directory entry
     597            0 :         let key = slru_dir_to_key(kind);
     598              : 
     599            0 :         let buf = version.get(self, key, ctx).await?;
     600            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     601            0 :             Ok(dir) => Ok(dir.segments),
     602            0 :             Err(e) => Err(PageReconstructError::from(e)),
     603              :         }
     604            0 :     }
     605              : 
     606            0 :     pub(crate) async fn get_relmap_file(
     607            0 :         &self,
     608            0 :         spcnode: Oid,
     609            0 :         dbnode: Oid,
     610            0 :         version: Version<'_>,
     611            0 :         ctx: &RequestContext,
     612            0 :     ) -> Result<Bytes, PageReconstructError> {
     613            0 :         let key = relmap_file_key(spcnode, dbnode);
     614              : 
     615            0 :         let buf = version.get(self, key, ctx).await?;
     616            0 :         Ok(buf)
     617            0 :     }
     618              : 
     619            0 :     pub(crate) async fn list_dbdirs(
     620            0 :         &self,
     621            0 :         lsn: Lsn,
     622            0 :         ctx: &RequestContext,
     623            0 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     624              :         // fetch directory entry
     625            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     626              : 
     627            0 :         match DbDirectory::des(&buf).context("deserialization failure") {
     628            0 :             Ok(dir) => Ok(dir.dbdirs),
     629            0 :             Err(e) => Err(PageReconstructError::from(e)),
     630              :         }
     631            0 :     }
     632              : 
     633            0 :     pub(crate) async fn get_twophase_file(
     634            0 :         &self,
     635            0 :         xid: TransactionId,
     636            0 :         lsn: Lsn,
     637            0 :         ctx: &RequestContext,
     638            0 :     ) -> Result<Bytes, PageReconstructError> {
     639            0 :         let key = twophase_file_key(xid);
     640            0 :         let buf = self.get(key, lsn, ctx).await?;
     641            0 :         Ok(buf)
     642            0 :     }
     643              : 
     644            0 :     pub(crate) async fn list_twophase_files(
     645            0 :         &self,
     646            0 :         lsn: Lsn,
     647            0 :         ctx: &RequestContext,
     648            0 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     649              :         // fetch directory entry
     650            0 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     651              : 
     652            0 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     653            0 :             Ok(dir) => Ok(dir.xids),
     654            0 :             Err(e) => Err(PageReconstructError::from(e)),
     655              :         }
     656            0 :     }
     657              : 
     658            0 :     pub(crate) async fn get_control_file(
     659            0 :         &self,
     660            0 :         lsn: Lsn,
     661            0 :         ctx: &RequestContext,
     662            0 :     ) -> Result<Bytes, PageReconstructError> {
     663            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     664            0 :     }
     665              : 
     666           12 :     pub(crate) async fn get_checkpoint(
     667           12 :         &self,
     668           12 :         lsn: Lsn,
     669           12 :         ctx: &RequestContext,
     670           12 :     ) -> Result<Bytes, PageReconstructError> {
     671           12 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     672           12 :     }
     673              : 
     674            6 :     async fn list_aux_files_v1(
     675            6 :         &self,
     676            6 :         lsn: Lsn,
     677            6 :         ctx: &RequestContext,
     678            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     679            6 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     680            6 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     681            6 :                 Ok(dir) => Ok(dir.files),
     682            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     683              :             },
     684            0 :             Err(e) => {
     685            0 :                 // This is expected: historical databases do not have the key.
     686            0 :                 debug!("Failed to get info about AUX files: {}", e);
     687            0 :                 Ok(HashMap::new())
     688              :             }
     689              :         }
     690            6 :     }
     691              : 
     692            0 :     async fn list_aux_files_v2(
     693            0 :         &self,
     694            0 :         lsn: Lsn,
     695            0 :         ctx: &RequestContext,
     696            0 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     697            0 :         let kv = self
     698            0 :             .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
     699            0 :             .await
     700            0 :             .context("scan")?;
     701            0 :         let mut result = HashMap::new();
     702            0 :         for (_, v) in kv {
     703            0 :             let v = v.context("get value")?;
     704            0 :             let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
     705            0 :             for (fname, content) in v {
     706            0 :                 result.insert(fname, content);
     707            0 :             }
     708              :         }
     709            0 :         Ok(result)
     710            0 :     }
     711              : 
     712            6 :     pub(crate) async fn list_aux_files(
     713            6 :         &self,
     714            6 :         lsn: Lsn,
     715            6 :         ctx: &RequestContext,
     716            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     717            6 :         match self.get_switch_aux_file_policy() {
     718            6 :             AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await,
     719            0 :             AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await,
     720              :             AuxFilePolicy::CrossValidation => {
     721            0 :                 let v1_result = self.list_aux_files_v1(lsn, ctx).await;
     722            0 :                 let v2_result = self.list_aux_files_v2(lsn, ctx).await;
     723            0 :                 match (v1_result, v2_result) {
     724            0 :                     (Ok(v1), Ok(v2)) => {
     725            0 :                         if v1 != v2 {
     726            0 :                             tracing::error!(
     727            0 :                                 "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
     728              :                             );
     729            0 :                             return Err(PageReconstructError::Other(anyhow::anyhow!(
     730            0 :                                 "unmatched aux file v1 v2 result"
     731            0 :                             )));
     732            0 :                         }
     733            0 :                         Ok(v1)
     734              :                     }
     735            0 :                     (Ok(_), Err(v2)) => {
     736            0 :                         tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
     737            0 :                         Err(v2)
     738              :                     }
     739            0 :                     (Err(v1), Ok(_)) => {
     740            0 :                         tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
     741            0 :                         Err(v1)
     742              :                     }
     743            0 :                     (Err(_), Err(v2)) => Err(v2),
     744              :                 }
     745              :             }
     746              :         }
     747            6 :     }
     748              : 
     749              :     /// Does the same as get_current_logical_size but counted on demand.
     750              :     /// Used to initialize the logical size tracking on startup.
     751              :     ///
     752              :     /// Only relation blocks are counted currently. That excludes metadata,
     753              :     /// SLRUs, twophase files etc.
     754              :     ///
     755              :     /// # Cancel-Safety
     756              :     ///
     757              :     /// This method is cancellation-safe.
     758            0 :     pub async fn get_current_logical_size_non_incremental(
     759            0 :         &self,
     760            0 :         lsn: Lsn,
     761            0 :         ctx: &RequestContext,
     762            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     763            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     764              : 
     765              :         // Fetch list of database dirs and iterate them
     766            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     767            0 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     768              : 
     769            0 :         let mut total_size: u64 = 0;
     770            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     771            0 :             for rel in self
     772            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     773            0 :                 .await?
     774              :             {
     775            0 :                 if self.cancel.is_cancelled() {
     776            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     777            0 :                 }
     778            0 :                 let relsize_key = rel_size_to_key(rel);
     779            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     780            0 :                 let relsize = buf.get_u32_le();
     781            0 : 
     782            0 :                 total_size += relsize as u64;
     783              :             }
     784              :         }
     785            0 :         Ok(total_size * BLCKSZ as u64)
     786            0 :     }
     787              : 
     788              :     ///
     789              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     790              :     /// Anything that's not listed maybe removed from the underlying storage (from
     791              :     /// that LSN forwards).
     792              :     ///
     793              :     /// The return value is (dense keyspace, sparse keyspace).
     794          204 :     pub(crate) async fn collect_keyspace(
     795          204 :         &self,
     796          204 :         lsn: Lsn,
     797          204 :         ctx: &RequestContext,
     798          204 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     799          204 :         // Iterate through key ranges, greedily packing them into partitions
     800          204 :         let mut result = KeySpaceAccum::new();
     801          204 : 
     802          204 :         // The dbdir metadata always exists
     803          204 :         result.add_key(DBDIR_KEY);
     804              : 
     805              :         // Fetch list of database dirs and iterate them
     806         2292 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     807          204 :         let dbdir = DbDirectory::des(&buf)?;
     808              : 
     809          204 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     810          204 :         dbs.sort_unstable();
     811          204 :         for (spcnode, dbnode) in dbs {
     812            0 :             result.add_key(relmap_file_key(spcnode, dbnode));
     813            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     814              : 
     815            0 :             let mut rels: Vec<RelTag> = self
     816            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     817            0 :                 .await?
     818            0 :                 .into_iter()
     819            0 :                 .collect();
     820            0 :             rels.sort_unstable();
     821            0 :             for rel in rels {
     822            0 :                 let relsize_key = rel_size_to_key(rel);
     823            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     824            0 :                 let relsize = buf.get_u32_le();
     825            0 : 
     826            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     827            0 :                 result.add_key(relsize_key);
     828              :             }
     829              :         }
     830              : 
     831              :         // Iterate SLRUs next
     832          612 :         for kind in [
     833          204 :             SlruKind::Clog,
     834          204 :             SlruKind::MultiXactMembers,
     835          204 :             SlruKind::MultiXactOffsets,
     836              :         ] {
     837          612 :             let slrudir_key = slru_dir_to_key(kind);
     838          612 :             result.add_key(slrudir_key);
     839         7882 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     840          612 :             let dir = SlruSegmentDirectory::des(&buf)?;
     841          612 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     842          612 :             segments.sort_unstable();
     843          612 :             for segno in segments {
     844            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     845            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     846            0 :                 let segsize = buf.get_u32_le();
     847            0 : 
     848            0 :                 result.add_range(
     849            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     850            0 :                 );
     851            0 :                 result.add_key(segsize_key);
     852              :             }
     853              :         }
     854              : 
     855              :         // Then pg_twophase
     856          204 :         result.add_key(TWOPHASEDIR_KEY);
     857         3019 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     858          204 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     859          204 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     860          204 :         xids.sort_unstable();
     861          204 :         for xid in xids {
     862            0 :             result.add_key(twophase_file_key(xid));
     863            0 :         }
     864              : 
     865          204 :         result.add_key(CONTROLFILE_KEY);
     866          204 :         result.add_key(CHECKPOINT_KEY);
     867          204 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     868          102 :             result.add_key(AUX_FILES_KEY);
     869          102 :         }
     870              : 
     871          204 :         Ok((
     872          204 :             result.to_keyspace(),
     873          204 :             /* AUX sparse key space */
     874          204 :             SparseKeySpace(KeySpace::single(Key::metadata_aux_key_range())),
     875          204 :         ))
     876          204 :     }
     877              : 
     878              :     /// Get cached size of relation if it not updated after specified LSN
     879       448540 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     880       448540 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     881       448540 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
     882       448518 :             if lsn >= *cached_lsn {
     883       443372 :                 return Some(*nblocks);
     884         5146 :             }
     885           22 :         }
     886         5168 :         None
     887       448540 :     }
     888              : 
     889              :     /// Update cached relation size if there is no more recent update
     890         5136 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     891         5136 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     892         5136 : 
     893         5136 :         if lsn < rel_size_cache.complete_as_of {
     894              :             // Do not cache old values. It's safe to cache the size on read, as long as
     895              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
     896              :             // never evict values from the cache, so if the relation size changed after
     897              :             // 'lsn', the new value is already in the cache.
     898            0 :             return;
     899         5136 :         }
     900         5136 : 
     901         5136 :         match rel_size_cache.map.entry(tag) {
     902         5136 :             hash_map::Entry::Occupied(mut entry) => {
     903         5136 :                 let cached_lsn = entry.get_mut();
     904         5136 :                 if lsn >= cached_lsn.0 {
     905            0 :                     *cached_lsn = (lsn, nblocks);
     906         5136 :                 }
     907              :             }
     908            0 :             hash_map::Entry::Vacant(entry) => {
     909            0 :                 entry.insert((lsn, nblocks));
     910            0 :             }
     911              :         }
     912         5136 :     }
     913              : 
     914              :     /// Store cached relation size
     915       288732 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     916       288732 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     917       288732 :         rel_size_cache.map.insert(tag, (lsn, nblocks));
     918       288732 :     }
     919              : 
     920              :     /// Remove cached relation size
     921            2 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     922            2 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     923            2 :         rel_size_cache.map.remove(tag);
     924            2 :     }
     925              : }
     926              : 
     927              : /// DatadirModification represents an operation to ingest an atomic set of
     928              : /// updates to the repository. It is created by the 'begin_record'
     929              : /// function. It is called for each WAL record, so that all the modifications
     930              : /// by a one WAL record appear atomic.
     931              : pub struct DatadirModification<'a> {
     932              :     /// The timeline this modification applies to. You can access this to
     933              :     /// read the state, but note that any pending updates are *not* reflected
     934              :     /// in the state in 'tline' yet.
     935              :     pub tline: &'a Timeline,
     936              : 
     937              :     /// Current LSN of the modification
     938              :     lsn: Lsn,
     939              : 
     940              :     // The modifications are not applied directly to the underlying key-value store.
     941              :     // The put-functions add the modifications here, and they are flushed to the
     942              :     // underlying key-value store by the 'finish' function.
     943              :     pending_lsns: Vec<Lsn>,
     944              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     945              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     946              :     pending_nblocks: i64,
     947              : 
     948              :     /// For special "directory" keys that store key-value maps, track the size of the map
     949              :     /// if it was updated in this modification.
     950              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
     951              : }
     952              : 
     953              : impl<'a> DatadirModification<'a> {
     954              :     /// Get the current lsn
     955       418056 :     pub(crate) fn get_lsn(&self) -> Lsn {
     956       418056 :         self.lsn
     957       418056 :     }
     958              : 
     959              :     /// Set the current lsn
     960       145858 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     961       145858 :         ensure!(
     962       145858 :             lsn >= self.lsn,
     963            0 :             "setting an older lsn {} than {} is not allowed",
     964              :             lsn,
     965              :             self.lsn
     966              :         );
     967       145858 :         if lsn > self.lsn {
     968       145858 :             self.pending_lsns.push(self.lsn);
     969       145858 :             self.lsn = lsn;
     970       145858 :         }
     971       145858 :         Ok(())
     972       145858 :     }
     973              : 
     974              :     /// Initialize a completely new repository.
     975              :     ///
     976              :     /// This inserts the directory metadata entries that are assumed to
     977              :     /// always exist.
     978          104 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     979          104 :         let buf = DbDirectory::ser(&DbDirectory {
     980          104 :             dbdirs: HashMap::new(),
     981          104 :         })?;
     982          104 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
     983          104 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     984          104 : 
     985          104 :         // Create AuxFilesDirectory
     986          104 :         self.init_aux_dir()?;
     987              : 
     988          104 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     989          104 :             xids: HashSet::new(),
     990          104 :         })?;
     991          104 :         self.pending_directory_entries
     992          104 :             .push((DirectoryKind::TwoPhase, 0));
     993          104 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     994              : 
     995          104 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     996          104 :         let empty_dir = Value::Image(buf);
     997          104 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     998          104 :         self.pending_directory_entries
     999          104 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1000          104 :         self.put(
    1001          104 :             slru_dir_to_key(SlruKind::MultiXactMembers),
    1002          104 :             empty_dir.clone(),
    1003          104 :         );
    1004          104 :         self.pending_directory_entries
    1005          104 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1006          104 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1007          104 :         self.pending_directory_entries
    1008          104 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
    1009          104 : 
    1010          104 :         Ok(())
    1011          104 :     }
    1012              : 
    1013              :     #[cfg(test)]
    1014          102 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1015          102 :         self.init_empty()?;
    1016          102 :         self.put_control_file(bytes::Bytes::from_static(
    1017          102 :             b"control_file contents do not matter",
    1018          102 :         ))
    1019          102 :         .context("put_control_file")?;
    1020          102 :         self.put_checkpoint(bytes::Bytes::from_static(
    1021          102 :             b"checkpoint_file contents do not matter",
    1022          102 :         ))
    1023          102 :         .context("put_checkpoint_file")?;
    1024          102 :         Ok(())
    1025          102 :     }
    1026              : 
    1027              :     /// Put a new page version that can be constructed from a WAL record
    1028              :     ///
    1029              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1030              :     /// current end-of-file. It's up to the caller to check that the relation size
    1031              :     /// matches the blocks inserted!
    1032       145630 :     pub fn put_rel_wal_record(
    1033       145630 :         &mut self,
    1034       145630 :         rel: RelTag,
    1035       145630 :         blknum: BlockNumber,
    1036       145630 :         rec: NeonWalRecord,
    1037       145630 :     ) -> anyhow::Result<()> {
    1038       145630 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1039       145630 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1040       145630 :         Ok(())
    1041       145630 :     }
    1042              : 
    1043              :     // Same, but for an SLRU.
    1044            8 :     pub fn put_slru_wal_record(
    1045            8 :         &mut self,
    1046            8 :         kind: SlruKind,
    1047            8 :         segno: u32,
    1048            8 :         blknum: BlockNumber,
    1049            8 :         rec: NeonWalRecord,
    1050            8 :     ) -> anyhow::Result<()> {
    1051            8 :         self.put(
    1052            8 :             slru_block_to_key(kind, segno, blknum),
    1053            8 :             Value::WalRecord(rec),
    1054            8 :         );
    1055            8 :         Ok(())
    1056            8 :     }
    1057              : 
    1058              :     /// Like put_wal_record, but with ready-made image of the page.
    1059       280864 :     pub fn put_rel_page_image(
    1060       280864 :         &mut self,
    1061       280864 :         rel: RelTag,
    1062       280864 :         blknum: BlockNumber,
    1063       280864 :         img: Bytes,
    1064       280864 :     ) -> anyhow::Result<()> {
    1065       280864 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1066       280864 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1067       280864 :         Ok(())
    1068       280864 :     }
    1069              : 
    1070            6 :     pub fn put_slru_page_image(
    1071            6 :         &mut self,
    1072            6 :         kind: SlruKind,
    1073            6 :         segno: u32,
    1074            6 :         blknum: BlockNumber,
    1075            6 :         img: Bytes,
    1076            6 :     ) -> anyhow::Result<()> {
    1077            6 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
    1078            6 :         Ok(())
    1079            6 :     }
    1080              : 
    1081              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1082           16 :     pub async fn put_relmap_file(
    1083           16 :         &mut self,
    1084           16 :         spcnode: Oid,
    1085           16 :         dbnode: Oid,
    1086           16 :         img: Bytes,
    1087           16 :         ctx: &RequestContext,
    1088           16 :     ) -> anyhow::Result<()> {
    1089              :         // Add it to the directory (if it doesn't exist already)
    1090           16 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1091           16 :         let mut dbdir = DbDirectory::des(&buf)?;
    1092              : 
    1093           16 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1094           16 :         if r.is_none() || r == Some(false) {
    1095              :             // The dbdir entry didn't exist, or it contained a
    1096              :             // 'false'. The 'insert' call already updated it with
    1097              :             // 'true', now write the updated 'dbdirs' map back.
    1098           16 :             let buf = DbDirectory::ser(&dbdir)?;
    1099           16 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1100           16 : 
    1101           16 :             // Create AuxFilesDirectory as well
    1102           16 :             self.init_aux_dir()?;
    1103            0 :         }
    1104           16 :         if r.is_none() {
    1105            8 :             // Create RelDirectory
    1106            8 :             let buf = RelDirectory::ser(&RelDirectory {
    1107            8 :                 rels: HashSet::new(),
    1108            8 :             })?;
    1109            8 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1110            8 :             self.put(
    1111            8 :                 rel_dir_to_key(spcnode, dbnode),
    1112            8 :                 Value::Image(Bytes::from(buf)),
    1113            8 :             );
    1114            8 :         }
    1115              : 
    1116           16 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1117           16 :         Ok(())
    1118           16 :     }
    1119              : 
    1120            0 :     pub async fn put_twophase_file(
    1121            0 :         &mut self,
    1122            0 :         xid: TransactionId,
    1123            0 :         img: Bytes,
    1124            0 :         ctx: &RequestContext,
    1125            0 :     ) -> anyhow::Result<()> {
    1126              :         // Add it to the directory entry
    1127            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1128            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1129            0 :         if !dir.xids.insert(xid) {
    1130            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1131            0 :         }
    1132            0 :         self.pending_directory_entries
    1133            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1134            0 :         self.put(
    1135            0 :             TWOPHASEDIR_KEY,
    1136            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1137              :         );
    1138              : 
    1139            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1140            0 :         Ok(())
    1141            0 :     }
    1142              : 
    1143          104 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1144          104 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1145          104 :         Ok(())
    1146          104 :     }
    1147              : 
    1148          118 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1149          118 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1150          118 :         Ok(())
    1151          118 :     }
    1152              : 
    1153            0 :     pub async fn drop_dbdir(
    1154            0 :         &mut self,
    1155            0 :         spcnode: Oid,
    1156            0 :         dbnode: Oid,
    1157            0 :         ctx: &RequestContext,
    1158            0 :     ) -> anyhow::Result<()> {
    1159            0 :         let total_blocks = self
    1160            0 :             .tline
    1161            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1162            0 :             .await?;
    1163              : 
    1164              :         // Remove entry from dbdir
    1165            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1166            0 :         let mut dir = DbDirectory::des(&buf)?;
    1167            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1168            0 :             let buf = DbDirectory::ser(&dir)?;
    1169            0 :             self.pending_directory_entries
    1170            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1171            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1172              :         } else {
    1173            0 :             warn!(
    1174            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1175              :                 spcnode, dbnode
    1176              :             );
    1177              :         }
    1178              : 
    1179              :         // Update logical database size.
    1180            0 :         self.pending_nblocks -= total_blocks as i64;
    1181            0 : 
    1182            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1183            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1184            0 :         Ok(())
    1185            0 :     }
    1186              : 
    1187              :     /// Create a relation fork.
    1188              :     ///
    1189              :     /// 'nblocks' is the initial size.
    1190         1920 :     pub async fn put_rel_creation(
    1191         1920 :         &mut self,
    1192         1920 :         rel: RelTag,
    1193         1920 :         nblocks: BlockNumber,
    1194         1920 :         ctx: &RequestContext,
    1195         1920 :     ) -> Result<(), RelationError> {
    1196         1920 :         if rel.relnode == 0 {
    1197            0 :             return Err(RelationError::InvalidRelnode);
    1198         1920 :         }
    1199              :         // It's possible that this is the first rel for this db in this
    1200              :         // tablespace.  Create the reldir entry for it if so.
    1201         1920 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1202         1920 :             .context("deserialize db")?;
    1203         1920 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1204         1920 :         let mut rel_dir =
    1205         1920 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1206              :                 // Didn't exist. Update dbdir
    1207            8 :                 e.insert(false);
    1208            8 :                 let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1209            8 :                 self.pending_directory_entries
    1210            8 :                     .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1211            8 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1212            8 : 
    1213            8 :                 // and create the RelDirectory
    1214            8 :                 RelDirectory::default()
    1215              :             } else {
    1216              :                 // reldir already exists, fetch it
    1217         1912 :                 RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1218         1912 :                     .context("deserialize db")?
    1219              :             };
    1220              : 
    1221              :         // Add the new relation to the rel directory entry, and write it back
    1222         1920 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1223            0 :             return Err(RelationError::AlreadyExists);
    1224         1920 :         }
    1225         1920 : 
    1226         1920 :         self.pending_directory_entries
    1227         1920 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1228         1920 : 
    1229         1920 :         self.put(
    1230         1920 :             rel_dir_key,
    1231         1920 :             Value::Image(Bytes::from(
    1232         1920 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1233              :             )),
    1234              :         );
    1235              : 
    1236              :         // Put size
    1237         1920 :         let size_key = rel_size_to_key(rel);
    1238         1920 :         let buf = nblocks.to_le_bytes();
    1239         1920 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1240         1920 : 
    1241         1920 :         self.pending_nblocks += nblocks as i64;
    1242         1920 : 
    1243         1920 :         // Update relation size cache
    1244         1920 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1245         1920 : 
    1246         1920 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1247         1920 :         // caller.
    1248         1920 :         Ok(())
    1249         1920 :     }
    1250              : 
    1251              :     /// Truncate relation
    1252         6012 :     pub async fn put_rel_truncation(
    1253         6012 :         &mut self,
    1254         6012 :         rel: RelTag,
    1255         6012 :         nblocks: BlockNumber,
    1256         6012 :         ctx: &RequestContext,
    1257         6012 :     ) -> anyhow::Result<()> {
    1258         6012 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1259         6012 :         if self
    1260         6012 :             .tline
    1261         6012 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1262            0 :             .await?
    1263              :         {
    1264         6012 :             let size_key = rel_size_to_key(rel);
    1265              :             // Fetch the old size first
    1266         6012 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1267         6012 : 
    1268         6012 :             // Update the entry with the new size.
    1269         6012 :             let buf = nblocks.to_le_bytes();
    1270         6012 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1271         6012 : 
    1272         6012 :             // Update relation size cache
    1273         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1274         6012 : 
    1275         6012 :             // Update relation size cache
    1276         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1277         6012 : 
    1278         6012 :             // Update logical database size.
    1279         6012 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1280            0 :         }
    1281         6012 :         Ok(())
    1282         6012 :     }
    1283              : 
    1284              :     /// Extend relation
    1285              :     /// If new size is smaller, do nothing.
    1286       276680 :     pub async fn put_rel_extend(
    1287       276680 :         &mut self,
    1288       276680 :         rel: RelTag,
    1289       276680 :         nblocks: BlockNumber,
    1290       276680 :         ctx: &RequestContext,
    1291       276680 :     ) -> anyhow::Result<()> {
    1292       276680 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1293              : 
    1294              :         // Put size
    1295       276680 :         let size_key = rel_size_to_key(rel);
    1296       276680 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1297       276680 : 
    1298       276680 :         // only extend relation here. never decrease the size
    1299       276680 :         if nblocks > old_size {
    1300       274788 :             let buf = nblocks.to_le_bytes();
    1301       274788 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1302       274788 : 
    1303       274788 :             // Update relation size cache
    1304       274788 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1305       274788 : 
    1306       274788 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1307       274788 :         }
    1308       276680 :         Ok(())
    1309       276680 :     }
    1310              : 
    1311              :     /// Drop a relation.
    1312            2 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1313            2 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1314              : 
    1315              :         // Remove it from the directory entry
    1316            2 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1317            2 :         let buf = self.get(dir_key, ctx).await?;
    1318            2 :         let mut dir = RelDirectory::des(&buf)?;
    1319              : 
    1320            2 :         self.pending_directory_entries
    1321            2 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1322            2 : 
    1323            2 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1324            2 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1325              :         } else {
    1326            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1327              :         }
    1328              : 
    1329              :         // update logical size
    1330            2 :         let size_key = rel_size_to_key(rel);
    1331            2 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1332            2 :         self.pending_nblocks -= old_size as i64;
    1333            2 : 
    1334            2 :         // Remove enty from relation size cache
    1335            2 :         self.tline.remove_cached_rel_size(&rel);
    1336            2 : 
    1337            2 :         // Delete size entry, as well as all blocks
    1338            2 :         self.delete(rel_key_range(rel));
    1339            2 : 
    1340            2 :         Ok(())
    1341            2 :     }
    1342              : 
    1343            6 :     pub async fn put_slru_segment_creation(
    1344            6 :         &mut self,
    1345            6 :         kind: SlruKind,
    1346            6 :         segno: u32,
    1347            6 :         nblocks: BlockNumber,
    1348            6 :         ctx: &RequestContext,
    1349            6 :     ) -> anyhow::Result<()> {
    1350            6 :         // Add it to the directory entry
    1351            6 :         let dir_key = slru_dir_to_key(kind);
    1352            6 :         let buf = self.get(dir_key, ctx).await?;
    1353            6 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1354              : 
    1355            6 :         if !dir.segments.insert(segno) {
    1356            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1357            6 :         }
    1358            6 :         self.pending_directory_entries
    1359            6 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1360            6 :         self.put(
    1361            6 :             dir_key,
    1362            6 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1363              :         );
    1364              : 
    1365              :         // Put size
    1366            6 :         let size_key = slru_segment_size_to_key(kind, segno);
    1367            6 :         let buf = nblocks.to_le_bytes();
    1368            6 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1369            6 : 
    1370            6 :         // even if nblocks > 0, we don't insert any actual blocks here
    1371            6 : 
    1372            6 :         Ok(())
    1373            6 :     }
    1374              : 
    1375              :     /// Extend SLRU segment
    1376            0 :     pub fn put_slru_extend(
    1377            0 :         &mut self,
    1378            0 :         kind: SlruKind,
    1379            0 :         segno: u32,
    1380            0 :         nblocks: BlockNumber,
    1381            0 :     ) -> anyhow::Result<()> {
    1382            0 :         // Put size
    1383            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1384            0 :         let buf = nblocks.to_le_bytes();
    1385            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1386            0 :         Ok(())
    1387            0 :     }
    1388              : 
    1389              :     /// This method is used for marking truncated SLRU files
    1390            0 :     pub async fn drop_slru_segment(
    1391            0 :         &mut self,
    1392            0 :         kind: SlruKind,
    1393            0 :         segno: u32,
    1394            0 :         ctx: &RequestContext,
    1395            0 :     ) -> anyhow::Result<()> {
    1396            0 :         // Remove it from the directory entry
    1397            0 :         let dir_key = slru_dir_to_key(kind);
    1398            0 :         let buf = self.get(dir_key, ctx).await?;
    1399            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1400              : 
    1401            0 :         if !dir.segments.remove(&segno) {
    1402            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1403            0 :         }
    1404            0 :         self.pending_directory_entries
    1405            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1406            0 :         self.put(
    1407            0 :             dir_key,
    1408            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1409              :         );
    1410              : 
    1411              :         // Delete size entry, as well as all blocks
    1412            0 :         self.delete(slru_segment_key_range(kind, segno));
    1413            0 : 
    1414            0 :         Ok(())
    1415            0 :     }
    1416              : 
    1417              :     /// Drop a relmapper file (pg_filenode.map)
    1418            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1419            0 :         // TODO
    1420            0 :         Ok(())
    1421            0 :     }
    1422              : 
    1423              :     /// This method is used for marking truncated SLRU files
    1424            0 :     pub async fn drop_twophase_file(
    1425            0 :         &mut self,
    1426            0 :         xid: TransactionId,
    1427            0 :         ctx: &RequestContext,
    1428            0 :     ) -> anyhow::Result<()> {
    1429              :         // Remove it from the directory entry
    1430            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1431            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1432              : 
    1433            0 :         if !dir.xids.remove(&xid) {
    1434            0 :             warn!("twophase file for xid {} does not exist", xid);
    1435            0 :         }
    1436            0 :         self.pending_directory_entries
    1437            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1438            0 :         self.put(
    1439            0 :             TWOPHASEDIR_KEY,
    1440            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1441              :         );
    1442              : 
    1443              :         // Delete it
    1444            0 :         self.delete(twophase_key_range(xid));
    1445            0 : 
    1446            0 :         Ok(())
    1447            0 :     }
    1448              : 
    1449          120 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1450          120 :         if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
    1451            0 :             return Ok(());
    1452          120 :         }
    1453          120 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1454          120 :             files: HashMap::new(),
    1455          120 :         })?;
    1456          120 :         self.pending_directory_entries
    1457          120 :             .push((DirectoryKind::AuxFiles, 0));
    1458          120 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1459          120 :         Ok(())
    1460          120 :     }
    1461              : 
    1462           12 :     pub async fn put_file(
    1463           12 :         &mut self,
    1464           12 :         path: &str,
    1465           12 :         content: &[u8],
    1466           12 :         ctx: &RequestContext,
    1467           12 :     ) -> anyhow::Result<()> {
    1468           12 :         let policy = self.tline.get_switch_aux_file_policy();
    1469           12 :         if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
    1470            0 :             let key = aux_file::encode_aux_file_key(path);
    1471              :             // retrieve the key from the engine
    1472            0 :             let old_val = match self.get(key, ctx).await {
    1473            0 :                 Ok(val) => Some(val),
    1474            0 :                 Err(PageReconstructError::MissingKey(_)) => None,
    1475            0 :                 Err(e) => return Err(e.into()),
    1476              :             };
    1477            0 :             let files = if let Some(ref old_val) = old_val {
    1478            0 :                 aux_file::decode_file_value(old_val)?
    1479              :             } else {
    1480            0 :                 Vec::new()
    1481              :             };
    1482            0 :             let new_files = if content.is_empty() {
    1483            0 :                 files
    1484            0 :                     .into_iter()
    1485            0 :                     .filter(|(p, _)| &path != p)
    1486            0 :                     .collect::<Vec<_>>()
    1487              :             } else {
    1488            0 :                 files
    1489            0 :                     .into_iter()
    1490            0 :                     .filter(|(p, _)| &path != p)
    1491            0 :                     .chain(std::iter::once((path, content)))
    1492            0 :                     .collect::<Vec<_>>()
    1493              :             };
    1494            0 :             let new_val = aux_file::encode_file_value(&new_files)?;
    1495            0 :             self.put(key, Value::Image(new_val.into()));
    1496           12 :         }
    1497              : 
    1498           12 :         if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
    1499           12 :             let file_path = path.to_string();
    1500           12 :             let content = if content.is_empty() {
    1501            2 :                 None
    1502              :             } else {
    1503           10 :                 Some(Bytes::copy_from_slice(content))
    1504              :             };
    1505              : 
    1506              :             let n_files;
    1507           12 :             let mut aux_files = self.tline.aux_files.lock().await;
    1508           12 :             if let Some(mut dir) = aux_files.dir.take() {
    1509              :                 // We already updated aux files in `self`: emit a delta and update our latest value.
    1510            8 :                 dir.upsert(file_path.clone(), content.clone());
    1511            8 :                 n_files = dir.files.len();
    1512            8 :                 if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
    1513            0 :                     self.put(
    1514            0 :                         AUX_FILES_KEY,
    1515            0 :                         Value::Image(Bytes::from(
    1516            0 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1517              :                         )),
    1518              :                     );
    1519            0 :                     aux_files.n_deltas = 0;
    1520            8 :                 } else {
    1521            8 :                     self.put(
    1522            8 :                         AUX_FILES_KEY,
    1523            8 :                         Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
    1524            8 :                     );
    1525            8 :                     aux_files.n_deltas += 1;
    1526            8 :                 }
    1527            8 :                 aux_files.dir = Some(dir);
    1528              :             } else {
    1529              :                 // Check if the AUX_FILES_KEY is initialized
    1530            4 :                 match self.get(AUX_FILES_KEY, ctx).await {
    1531            0 :                     Ok(dir_bytes) => {
    1532            0 :                         let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1533              :                         // Key is already set, we may append a delta
    1534            0 :                         self.put(
    1535            0 :                             AUX_FILES_KEY,
    1536            0 :                             Value::WalRecord(NeonWalRecord::AuxFile {
    1537            0 :                                 file_path: file_path.clone(),
    1538            0 :                                 content: content.clone(),
    1539            0 :                             }),
    1540            0 :                         );
    1541            0 :                         dir.upsert(file_path, content);
    1542            0 :                         n_files = dir.files.len();
    1543            0 :                         aux_files.dir = Some(dir);
    1544              :                     }
    1545              :                     Err(
    1546            0 :                         e @ (PageReconstructError::AncestorStopping(_)
    1547              :                         | PageReconstructError::Cancelled
    1548              :                         | PageReconstructError::AncestorLsnTimeout(_)),
    1549              :                     ) => {
    1550              :                         // Important that we do not interpret a shutdown error as "not found" and thereby
    1551              :                         // reset the map.
    1552            0 :                         return Err(e.into());
    1553              :                     }
    1554              :                     // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
    1555              :                     // the original code assumes all other errors are missing keys. Therefore, we keep the code path
    1556              :                     // the same for now, though in theory, we should only match the `MissingKey` variant.
    1557              :                     Err(
    1558              :                         PageReconstructError::Other(_)
    1559              :                         | PageReconstructError::WalRedo(_)
    1560              :                         | PageReconstructError::MissingKey { .. },
    1561              :                     ) => {
    1562              :                         // Key is missing, we must insert an image as the basis for subsequent deltas.
    1563              : 
    1564            4 :                         let mut dir = AuxFilesDirectory {
    1565            4 :                             files: HashMap::new(),
    1566            4 :                         };
    1567            4 :                         dir.upsert(file_path, content);
    1568            4 :                         self.put(
    1569            4 :                             AUX_FILES_KEY,
    1570            4 :                             Value::Image(Bytes::from(
    1571            4 :                                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1572              :                             )),
    1573              :                         );
    1574            4 :                         n_files = 1;
    1575            4 :                         aux_files.dir = Some(dir);
    1576              :                     }
    1577              :                 }
    1578              :             }
    1579              : 
    1580           12 :             self.pending_directory_entries
    1581           12 :                 .push((DirectoryKind::AuxFiles, n_files));
    1582            0 :         }
    1583              : 
    1584           12 :         Ok(())
    1585           12 :     }
    1586              : 
    1587              :     ///
    1588              :     /// Flush changes accumulated so far to the underlying repository.
    1589              :     ///
    1590              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1591              :     /// you to flush them to the underlying repository before the final `commit`.
    1592              :     /// That allows to free up the memory used to hold the pending changes.
    1593              :     ///
    1594              :     /// Currently only used during bulk import of a data directory. In that
    1595              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1596              :     /// whole import fails and the timeline will be deleted anyway.
    1597              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1598              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1599              :     ///
    1600              :     /// Note: A consequence of flushing the pending operations is that they
    1601              :     /// won't be visible to subsequent operations until `commit`. The function
    1602              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1603              :     /// for bulk import, where you are just loading data pages and won't try to
    1604              :     /// modify the same pages twice.
    1605         1930 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1606         1930 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1607         1930 :         // to scan through the pending_updates list.
    1608         1930 :         let pending_nblocks = self.pending_nblocks;
    1609         1930 :         if pending_nblocks < 10000 {
    1610         1930 :             return Ok(());
    1611            0 :         }
    1612              : 
    1613            0 :         let mut writer = self.tline.writer().await;
    1614              : 
    1615              :         // Flush relation and  SLRU data blocks, keep metadata.
    1616            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1617            0 :         for (key, values) in self.pending_updates.drain() {
    1618            0 :             for (lsn, value) in values {
    1619            0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1620              :                     // This bails out on first error without modifying pending_updates.
    1621              :                     // That's Ok, cf this function's doc comment.
    1622            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1623            0 :                 } else {
    1624            0 :                     retained_pending_updates
    1625            0 :                         .entry(key)
    1626            0 :                         .or_default()
    1627            0 :                         .push((lsn, value));
    1628            0 :                 }
    1629              :             }
    1630              :         }
    1631              : 
    1632            0 :         self.pending_updates = retained_pending_updates;
    1633            0 : 
    1634            0 :         if pending_nblocks != 0 {
    1635            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1636            0 :             self.pending_nblocks = 0;
    1637            0 :         }
    1638              : 
    1639            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1640            0 :             writer.update_directory_entries_count(kind, count as u64);
    1641            0 :         }
    1642              : 
    1643            0 :         Ok(())
    1644         1930 :     }
    1645              : 
    1646              :     ///
    1647              :     /// Finish this atomic update, writing all the updated keys to the
    1648              :     /// underlying timeline.
    1649              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1650              :     ///
    1651       742986 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1652       742986 :         let mut writer = self.tline.writer().await;
    1653              : 
    1654       742986 :         let timer = WAL_INGEST.time_spent_on_ingest.start_timer();
    1655       742986 : 
    1656       742986 :         let pending_nblocks = self.pending_nblocks;
    1657       742986 :         self.pending_nblocks = 0;
    1658       742986 : 
    1659       742986 :         if !self.pending_updates.is_empty() {
    1660              :             // The put_batch call below expects expects the inputs to be sorted by Lsn,
    1661              :             // so we do that first.
    1662       413970 :             let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
    1663       413970 :                 self.pending_updates
    1664       413970 :                     .drain()
    1665       699924 :                     .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
    1666       413970 :                     .kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
    1667       413970 :                 VecMapOrdering::GreaterOrEqual,
    1668       413970 :             );
    1669       413970 : 
    1670       413970 :             writer.put_batch(lsn_ordered_batch, ctx).await?;
    1671       329016 :         }
    1672              : 
    1673       742986 :         if !self.pending_deletions.is_empty() {
    1674            2 :             writer.delete_batch(&self.pending_deletions).await?;
    1675            2 :             self.pending_deletions.clear();
    1676       742984 :         }
    1677              : 
    1678       742986 :         self.pending_lsns.push(self.lsn);
    1679       888844 :         for pending_lsn in self.pending_lsns.drain(..) {
    1680       888844 :             // Ideally, we should be able to call writer.finish_write() only once
    1681       888844 :             // with the highest LSN. However, the last_record_lsn variable in the
    1682       888844 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1683       888844 :             // so we need to record every LSN to not leave a gap between them.
    1684       888844 :             writer.finish_write(pending_lsn);
    1685       888844 :         }
    1686              : 
    1687       742986 :         if pending_nblocks != 0 {
    1688       270570 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1689       472416 :         }
    1690              : 
    1691       742986 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1692         2596 :             writer.update_directory_entries_count(kind, count as u64);
    1693         2596 :         }
    1694              : 
    1695       742986 :         timer.observe_duration();
    1696       742986 : 
    1697       742986 :         Ok(())
    1698       742986 :     }
    1699              : 
    1700       291704 :     pub(crate) fn len(&self) -> usize {
    1701       291704 :         self.pending_updates.len() + self.pending_deletions.len()
    1702       291704 :     }
    1703              : 
    1704              :     // Internal helper functions to batch the modifications
    1705              : 
    1706       286564 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1707              :         // Have we already updated the same key? Read the latest pending updated
    1708              :         // version in that case.
    1709              :         //
    1710              :         // Note: we don't check pending_deletions. It is an error to request a
    1711              :         // value that has been removed, deletion only avoids leaking storage.
    1712       286564 :         if let Some(values) = self.pending_updates.get(&key) {
    1713        15928 :             if let Some((_, value)) = values.last() {
    1714        15928 :                 return if let Value::Image(img) = value {
    1715        15928 :                     Ok(img.clone())
    1716              :                 } else {
    1717              :                     // Currently, we never need to read back a WAL record that we
    1718              :                     // inserted in the same "transaction". All the metadata updates
    1719              :                     // work directly with Images, and we never need to read actual
    1720              :                     // data pages. We could handle this if we had to, by calling
    1721              :                     // the walredo manager, but let's keep it simple for now.
    1722            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1723            0 :                         "unexpected pending WAL record"
    1724            0 :                     )))
    1725              :                 };
    1726            0 :             }
    1727       270636 :         }
    1728       270636 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1729       270636 :         self.tline.get(key, lsn, ctx).await
    1730       286564 :     }
    1731              : 
    1732       712084 :     fn put(&mut self, key: Key, val: Value) {
    1733       712084 :         let values = self.pending_updates.entry(key).or_default();
    1734              :         // Replace the previous value if it exists at the same lsn
    1735       712084 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1736        12166 :             if *last_lsn == self.lsn {
    1737        12160 :                 *last_value = val;
    1738        12160 :                 return;
    1739            6 :             }
    1740       699918 :         }
    1741       699924 :         values.push((self.lsn, val));
    1742       712084 :     }
    1743              : 
    1744            2 :     fn delete(&mut self, key_range: Range<Key>) {
    1745            2 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1746            2 :         self.pending_deletions.push((key_range, self.lsn));
    1747            2 :     }
    1748              : }
    1749              : 
    1750              : /// This struct facilitates accessing either a committed key from the timeline at a
    1751              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1752              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1753              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1754              : /// need to look up the keys in the modification first before looking them up in the
    1755              : /// timeline to not miss the latest updates.
    1756              : #[derive(Clone, Copy)]
    1757              : pub enum Version<'a> {
    1758              :     Lsn(Lsn),
    1759              :     Modified(&'a DatadirModification<'a>),
    1760              : }
    1761              : 
    1762              : impl<'a> Version<'a> {
    1763        23542 :     async fn get(
    1764        23542 :         &self,
    1765        23542 :         timeline: &Timeline,
    1766        23542 :         key: Key,
    1767        23542 :         ctx: &RequestContext,
    1768        23542 :     ) -> Result<Bytes, PageReconstructError> {
    1769        23542 :         match self {
    1770        23532 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1771           10 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1772              :         }
    1773        23542 :     }
    1774              : 
    1775        35620 :     fn get_lsn(&self) -> Lsn {
    1776        35620 :         match self {
    1777        29574 :             Version::Lsn(lsn) => *lsn,
    1778         6046 :             Version::Modified(modification) => modification.lsn,
    1779              :         }
    1780        35620 :     }
    1781              : }
    1782              : 
    1783              : //--- Metadata structs stored in key-value pairs in the repository.
    1784              : 
    1785         2140 : #[derive(Debug, Serialize, Deserialize)]
    1786              : struct DbDirectory {
    1787              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1788              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1789              : }
    1790              : 
    1791          204 : #[derive(Debug, Serialize, Deserialize)]
    1792              : struct TwoPhaseDirectory {
    1793              :     xids: HashSet<TransactionId>,
    1794              : }
    1795              : 
    1796         1932 : #[derive(Debug, Serialize, Deserialize, Default)]
    1797              : struct RelDirectory {
    1798              :     // Set of relations that exist. (relfilenode, forknum)
    1799              :     //
    1800              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1801              :     // key-value pairs, if you have a lot of relations
    1802              :     rels: HashSet<(Oid, u8)>,
    1803              : }
    1804              : 
    1805           24 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
    1806              : pub(crate) struct AuxFilesDirectory {
    1807              :     pub(crate) files: HashMap<String, Bytes>,
    1808              : }
    1809              : 
    1810              : impl AuxFilesDirectory {
    1811           28 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    1812           28 :         if let Some(value) = value {
    1813           22 :             self.files.insert(key, value);
    1814           22 :         } else {
    1815            6 :             self.files.remove(&key);
    1816            6 :         }
    1817           28 :     }
    1818              : }
    1819              : 
    1820            0 : #[derive(Debug, Serialize, Deserialize)]
    1821              : struct RelSizeEntry {
    1822              :     nblocks: u32,
    1823              : }
    1824              : 
    1825          618 : #[derive(Debug, Serialize, Deserialize, Default)]
    1826              : struct SlruSegmentDirectory {
    1827              :     // Set of SLRU segments that exist.
    1828              :     segments: HashSet<u32>,
    1829              : }
    1830              : 
    1831              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    1832              : #[repr(u8)]
    1833              : pub(crate) enum DirectoryKind {
    1834              :     Db,
    1835              :     TwoPhase,
    1836              :     Rel,
    1837              :     AuxFiles,
    1838              :     SlruSegment(SlruKind),
    1839              : }
    1840              : 
    1841              : impl DirectoryKind {
    1842              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    1843         5192 :     pub(crate) fn offset(&self) -> usize {
    1844         5192 :         self.into_usize()
    1845         5192 :     }
    1846              : }
    1847              : 
    1848              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1849              : 
    1850              : #[allow(clippy::bool_assert_comparison)]
    1851              : #[cfg(test)]
    1852              : mod tests {
    1853              :     use hex_literal::hex;
    1854              :     use utils::id::TimelineId;
    1855              : 
    1856              :     use super::*;
    1857              : 
    1858              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    1859              : 
    1860              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    1861              :     #[tokio::test]
    1862            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    1863            2 :         let name = "aux_files_round_trip";
    1864            2 :         let harness = TenantHarness::create(name)?;
    1865            2 : 
    1866            2 :         pub const TIMELINE_ID: TimelineId =
    1867            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    1868            2 : 
    1869            8 :         let (tenant, ctx) = harness.load().await;
    1870            2 :         let tline = tenant
    1871            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    1872            2 :             .await?;
    1873            2 :         let tline = tline.raw_timeline().unwrap();
    1874            2 : 
    1875            2 :         // First modification: insert two keys
    1876            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    1877            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    1878            2 :         modification.set_lsn(Lsn(0x1008))?;
    1879            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    1880            2 :         modification.commit(&ctx).await?;
    1881            2 :         let expect_1008 = HashMap::from([
    1882            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    1883            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    1884            2 :         ]);
    1885            2 : 
    1886            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1887            2 :         assert_eq!(readback, expect_1008);
    1888            2 : 
    1889            2 :         // Second modification: update one key, remove the other
    1890            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    1891            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    1892            2 :         modification.set_lsn(Lsn(0x2008))?;
    1893            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    1894            2 :         modification.commit(&ctx).await?;
    1895            2 :         let expect_2008 =
    1896            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    1897            2 : 
    1898            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    1899            2 :         assert_eq!(readback, expect_2008);
    1900            2 : 
    1901            2 :         // Reading back in time works
    1902            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1903            2 :         assert_eq!(readback, expect_1008);
    1904            2 : 
    1905            2 :         Ok(())
    1906            2 :     }
    1907              : 
    1908              :     /*
    1909              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1910              :             let incremental = timeline.get_current_logical_size();
    1911              :             let non_incremental = timeline
    1912              :                 .get_current_logical_size_non_incremental(lsn)
    1913              :                 .unwrap();
    1914              :             assert_eq!(incremental, non_incremental);
    1915              :         }
    1916              :     */
    1917              : 
    1918              :     /*
    1919              :     ///
    1920              :     /// Test list_rels() function, with branches and dropped relations
    1921              :     ///
    1922              :     #[test]
    1923              :     fn test_list_rels_drop() -> Result<()> {
    1924              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1925              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1926              :         const TESTDB: u32 = 111;
    1927              : 
    1928              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1929              :         // after branching fails below
    1930              :         let mut writer = tline.begin_record(Lsn(0x10));
    1931              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1932              :         writer.finish()?;
    1933              : 
    1934              :         // Create a relation on the timeline
    1935              :         let mut writer = tline.begin_record(Lsn(0x20));
    1936              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1937              :         writer.finish()?;
    1938              : 
    1939              :         let writer = tline.begin_record(Lsn(0x00));
    1940              :         writer.finish()?;
    1941              : 
    1942              :         // Check that list_rels() lists it after LSN 2, but no before it
    1943              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1944              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1945              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1946              : 
    1947              :         // Create a branch, check that the relation is visible there
    1948              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1949              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1950              :             Some(timeline) => timeline,
    1951              :             None => panic!("Should have a local timeline"),
    1952              :         };
    1953              :         let newtline = DatadirTimelineImpl::new(newtline);
    1954              :         assert!(newtline
    1955              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1956              :             .contains(&TESTREL_A));
    1957              : 
    1958              :         // Drop it on the branch
    1959              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1960              :         new_writer.drop_relation(TESTREL_A)?;
    1961              :         new_writer.finish()?;
    1962              : 
    1963              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1964              :         assert!(newtline
    1965              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1966              :             .contains(&TESTREL_A));
    1967              :         assert!(!newtline
    1968              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1969              :             .contains(&TESTREL_A));
    1970              : 
    1971              :         // Run checkpoint and garbage collection and check that it's still not visible
    1972              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1973              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1974              : 
    1975              :         assert!(!newtline
    1976              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1977              :             .contains(&TESTREL_A));
    1978              : 
    1979              :         Ok(())
    1980              :     }
    1981              :      */
    1982              : 
    1983              :     /*
    1984              :     #[test]
    1985              :     fn test_read_beyond_eof() -> Result<()> {
    1986              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1987              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1988              : 
    1989              :         make_some_layers(&tline, Lsn(0x20))?;
    1990              :         let mut writer = tline.begin_record(Lsn(0x60));
    1991              :         walingest.put_rel_page_image(
    1992              :             &mut writer,
    1993              :             TESTREL_A,
    1994              :             0,
    1995              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1996              :         )?;
    1997              :         writer.finish()?;
    1998              : 
    1999              :         // Test read before rel creation. Should error out.
    2000              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2001              : 
    2002              :         // Read block beyond end of relation at different points in time.
    2003              :         // These reads should fall into different delta, image, and in-memory layers.
    2004              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2005              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2006              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2007              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2008              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2009              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2010              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2011              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2012              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2013              : 
    2014              :         // Test on an in-memory layer with no preceding layer
    2015              :         let mut writer = tline.begin_record(Lsn(0x70));
    2016              :         walingest.put_rel_page_image(
    2017              :             &mut writer,
    2018              :             TESTREL_B,
    2019              :             0,
    2020              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2021              :         )?;
    2022              :         writer.finish()?;
    2023              : 
    2024              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2025              : 
    2026              :         Ok(())
    2027              :     }
    2028              :      */
    2029              : }
        

Generated by: LCOV version 2.1-beta