LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 56.1 % 1516 850
Test Date: 2024-09-24 13:57:57 Functions: 40.2 % 204 82

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      13              : use crate::walrecord::NeonWalRecord;
      14              : use crate::{aux_file, repository::*};
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use pageserver_api::key::{
      19              :     dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
      20              :     relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      21              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      22              :     CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      23              : };
      24              : use pageserver_api::keyspace::SparseKeySpace;
      25              : use pageserver_api::models::AuxFilePolicy;
      26              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      27              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      28              : use postgres_ffi::BLCKSZ;
      29              : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
      30              : use serde::{Deserialize, Serialize};
      31              : use std::collections::{hash_map, HashMap, HashSet};
      32              : use std::ops::ControlFlow;
      33              : use std::ops::Range;
      34              : use strum::IntoEnumIterator;
      35              : use tokio_util::sync::CancellationToken;
      36              : use tracing::{debug, info, trace, warn};
      37              : use utils::bin_ser::DeserializeError;
      38              : use utils::pausable_failpoint;
      39              : use utils::{bin_ser::BeSer, lsn::Lsn};
      40              : 
      41              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      42              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      43              : 
      44              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      45              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
      46              : 
      47              : #[derive(Debug)]
      48              : pub enum LsnForTimestamp {
      49              :     /// Found commits both before and after the given timestamp
      50              :     Present(Lsn),
      51              : 
      52              :     /// Found no commits after the given timestamp, this means
      53              :     /// that the newest data in the branch is older than the given
      54              :     /// timestamp.
      55              :     ///
      56              :     /// All commits <= LSN happened before the given timestamp
      57              :     Future(Lsn),
      58              : 
      59              :     /// The queried timestamp is past our horizon we look back at (PITR)
      60              :     ///
      61              :     /// All commits > LSN happened after the given timestamp,
      62              :     /// but any commits < LSN might have happened before or after
      63              :     /// the given timestamp. We don't know because no data before
      64              :     /// the given lsn is available.
      65              :     Past(Lsn),
      66              : 
      67              :     /// We have found no commit with a timestamp,
      68              :     /// so we can't return anything meaningful.
      69              :     ///
      70              :     /// The associated LSN is the lower bound value we can safely
      71              :     /// create branches on, but no statement is made if it is
      72              :     /// older or newer than the timestamp.
      73              :     ///
      74              :     /// This variant can e.g. be returned right after a
      75              :     /// cluster import.
      76              :     NoData(Lsn),
      77              : }
      78              : 
      79            0 : #[derive(Debug, thiserror::Error)]
      80              : pub(crate) enum CalculateLogicalSizeError {
      81              :     #[error("cancelled")]
      82              :     Cancelled,
      83              : 
      84              :     /// Something went wrong while reading the metadata we use to calculate logical size
      85              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
      86              :     /// in the `From` implementation for this variant.
      87              :     #[error(transparent)]
      88              :     PageRead(PageReconstructError),
      89              : 
      90              :     /// Something went wrong deserializing metadata that we read to calculate logical size
      91              :     #[error("decode error: {0}")]
      92              :     Decode(#[from] DeserializeError),
      93              : }
      94              : 
      95            0 : #[derive(Debug, thiserror::Error)]
      96              : pub(crate) enum CollectKeySpaceError {
      97              :     #[error(transparent)]
      98              :     Decode(#[from] DeserializeError),
      99              :     #[error(transparent)]
     100              :     PageRead(PageReconstructError),
     101              :     #[error("cancelled")]
     102              :     Cancelled,
     103              : }
     104              : 
     105              : impl From<PageReconstructError> for CollectKeySpaceError {
     106            0 :     fn from(err: PageReconstructError) -> Self {
     107            0 :         match err {
     108            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     109            0 :             err => Self::PageRead(err),
     110              :         }
     111            0 :     }
     112              : }
     113              : 
     114              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     115            0 :     fn from(pre: PageReconstructError) -> Self {
     116            0 :         match pre {
     117            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     118            0 :             _ => Self::PageRead(pre),
     119              :         }
     120            0 :     }
     121              : }
     122              : 
     123            0 : #[derive(Debug, thiserror::Error)]
     124              : pub enum RelationError {
     125              :     #[error("Relation Already Exists")]
     126              :     AlreadyExists,
     127              :     #[error("invalid relnode")]
     128              :     InvalidRelnode,
     129              :     #[error(transparent)]
     130              :     Other(#[from] anyhow::Error),
     131              : }
     132              : 
     133              : ///
     134              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     135              : /// and other special kinds of files, in a versioned key-value store. The
     136              : /// Timeline struct provides the key-value store.
     137              : ///
     138              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     139              : /// implementation, and might be moved into a separate struct later.
     140              : impl Timeline {
     141              :     /// Start ingesting a WAL record, or other atomic modification of
     142              :     /// the timeline.
     143              :     ///
     144              :     /// This provides a transaction-like interface to perform a bunch
     145              :     /// of modifications atomically.
     146              :     ///
     147              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     148              :     /// DatadirModification object. Use the functions in the object to
     149              :     /// modify the repository state, updating all the pages and metadata
     150              :     /// that the WAL record affects. When you're done, call commit() to
     151              :     /// commit the changes.
     152              :     ///
     153              :     /// Lsn stored in modification is advanced by `ingest_record` and
     154              :     /// is used by `commit()` to update `last_record_lsn`.
     155              :     ///
     156              :     /// Calling commit() will flush all the changes and reset the state,
     157              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     158              :     ///
     159              :     /// Note that any pending modifications you make through the
     160              :     /// modification object won't be visible to calls to the 'get' and list
     161              :     /// functions of the timeline until you finish! And if you update the
     162              :     /// same page twice, the last update wins.
     163              :     ///
     164       805188 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     165       805188 :     where
     166       805188 :         Self: Sized,
     167       805188 :     {
     168       805188 :         DatadirModification {
     169       805188 :             tline: self,
     170       805188 :             pending_lsns: Vec::new(),
     171       805188 :             pending_metadata_pages: HashMap::new(),
     172       805188 :             pending_data_pages: Vec::new(),
     173       805188 :             pending_zero_data_pages: Default::default(),
     174       805188 :             pending_deletions: Vec::new(),
     175       805188 :             pending_nblocks: 0,
     176       805188 :             pending_directory_entries: Vec::new(),
     177       805188 :             pending_bytes: 0,
     178       805188 :             lsn,
     179       805188 :         }
     180       805188 :     }
     181              : 
     182              :     //------------------------------------------------------------------------------
     183              :     // Public GET functions
     184              :     //------------------------------------------------------------------------------
     185              : 
     186              :     /// Look up given page version.
     187        55152 :     pub(crate) async fn get_rel_page_at_lsn(
     188        55152 :         &self,
     189        55152 :         tag: RelTag,
     190        55152 :         blknum: BlockNumber,
     191        55152 :         version: Version<'_>,
     192        55152 :         ctx: &RequestContext,
     193        55152 :     ) -> Result<Bytes, PageReconstructError> {
     194        55152 :         if tag.relnode == 0 {
     195            0 :             return Err(PageReconstructError::Other(
     196            0 :                 RelationError::InvalidRelnode.into(),
     197            0 :             ));
     198        55152 :         }
     199              : 
     200        55152 :         let nblocks = self.get_rel_size(tag, version, ctx).await?;
     201        55152 :         if blknum >= nblocks {
     202            0 :             debug!(
     203            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     204            0 :                 tag,
     205            0 :                 blknum,
     206            0 :                 version.get_lsn(),
     207              :                 nblocks
     208              :             );
     209            0 :             return Ok(ZERO_PAGE.clone());
     210        55152 :         }
     211        55152 : 
     212        55152 :         let key = rel_block_to_key(tag, blknum);
     213        55152 :         version.get(self, key, ctx).await
     214        55152 :     }
     215              : 
     216              :     // Get size of a database in blocks
     217            0 :     pub(crate) async fn get_db_size(
     218            0 :         &self,
     219            0 :         spcnode: Oid,
     220            0 :         dbnode: Oid,
     221            0 :         version: Version<'_>,
     222            0 :         ctx: &RequestContext,
     223            0 :     ) -> Result<usize, PageReconstructError> {
     224            0 :         let mut total_blocks = 0;
     225              : 
     226            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     227              : 
     228            0 :         for rel in rels {
     229            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     230            0 :             total_blocks += n_blocks as usize;
     231              :         }
     232            0 :         Ok(total_blocks)
     233            0 :     }
     234              : 
     235              :     /// Get size of a relation file
     236        73302 :     pub(crate) async fn get_rel_size(
     237        73302 :         &self,
     238        73302 :         tag: RelTag,
     239        73302 :         version: Version<'_>,
     240        73302 :         ctx: &RequestContext,
     241        73302 :     ) -> Result<BlockNumber, PageReconstructError> {
     242        73302 :         if tag.relnode == 0 {
     243            0 :             return Err(PageReconstructError::Other(
     244            0 :                 RelationError::InvalidRelnode.into(),
     245            0 :             ));
     246        73302 :         }
     247              : 
     248        73302 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     249        57882 :             return Ok(nblocks);
     250        15420 :         }
     251        15420 : 
     252        15420 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     253            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     254              :         {
     255              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     256              :             // FSM, and smgrnblocks() on it immediately afterwards,
     257              :             // without extending it.  Tolerate that by claiming that
     258              :             // any non-existent FSM fork has size 0.
     259            0 :             return Ok(0);
     260        15420 :         }
     261        15420 : 
     262        15420 :         let key = rel_size_to_key(tag);
     263        15420 :         let mut buf = version.get(self, key, ctx).await?;
     264        15408 :         let nblocks = buf.get_u32_le();
     265        15408 : 
     266        15408 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     267        15408 : 
     268        15408 :         Ok(nblocks)
     269        73302 :     }
     270              : 
     271              :     /// Does relation exist?
     272        18150 :     pub(crate) async fn get_rel_exists(
     273        18150 :         &self,
     274        18150 :         tag: RelTag,
     275        18150 :         version: Version<'_>,
     276        18150 :         ctx: &RequestContext,
     277        18150 :     ) -> Result<bool, PageReconstructError> {
     278        18150 :         if tag.relnode == 0 {
     279            0 :             return Err(PageReconstructError::Other(
     280            0 :                 RelationError::InvalidRelnode.into(),
     281            0 :             ));
     282        18150 :         }
     283              : 
     284              :         // first try to lookup relation in cache
     285        18150 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     286        18096 :             return Ok(true);
     287           54 :         }
     288              :         // then check if the database was already initialized.
     289              :         // get_rel_exists can be called before dbdir is created.
     290           54 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     291           54 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     292           54 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     293            0 :             return Ok(false);
     294           54 :         }
     295           54 :         // fetch directory listing
     296           54 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     297           54 :         let buf = version.get(self, key, ctx).await?;
     298              : 
     299           54 :         let dir = RelDirectory::des(&buf)?;
     300           54 :         Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
     301        18150 :     }
     302              : 
     303              :     /// Get a list of all existing relations in given tablespace and database.
     304              :     ///
     305              :     /// # Cancel-Safety
     306              :     ///
     307              :     /// This method is cancellation-safe.
     308            0 :     pub(crate) async fn list_rels(
     309            0 :         &self,
     310            0 :         spcnode: Oid,
     311            0 :         dbnode: Oid,
     312            0 :         version: Version<'_>,
     313            0 :         ctx: &RequestContext,
     314            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     315            0 :         // fetch directory listing
     316            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     317            0 :         let buf = version.get(self, key, ctx).await?;
     318              : 
     319            0 :         let dir = RelDirectory::des(&buf)?;
     320            0 :         let rels: HashSet<RelTag> =
     321            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     322            0 :                 spcnode,
     323            0 :                 dbnode,
     324            0 :                 relnode: *relnode,
     325            0 :                 forknum: *forknum,
     326            0 :             }));
     327            0 : 
     328            0 :         Ok(rels)
     329            0 :     }
     330              : 
     331              :     /// Get the whole SLRU segment
     332            0 :     pub(crate) async fn get_slru_segment(
     333            0 :         &self,
     334            0 :         kind: SlruKind,
     335            0 :         segno: u32,
     336            0 :         lsn: Lsn,
     337            0 :         ctx: &RequestContext,
     338            0 :     ) -> Result<Bytes, PageReconstructError> {
     339            0 :         let n_blocks = self
     340            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     341            0 :             .await?;
     342            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     343            0 :         for blkno in 0..n_blocks {
     344            0 :             let block = self
     345            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     346            0 :                 .await?;
     347            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     348              :         }
     349            0 :         Ok(segment.freeze())
     350            0 :     }
     351              : 
     352              :     /// Look up given SLRU page version.
     353            0 :     pub(crate) async fn get_slru_page_at_lsn(
     354            0 :         &self,
     355            0 :         kind: SlruKind,
     356            0 :         segno: u32,
     357            0 :         blknum: BlockNumber,
     358            0 :         lsn: Lsn,
     359            0 :         ctx: &RequestContext,
     360            0 :     ) -> Result<Bytes, PageReconstructError> {
     361            0 :         let key = slru_block_to_key(kind, segno, blknum);
     362            0 :         self.get(key, lsn, ctx).await
     363            0 :     }
     364              : 
     365              :     /// Get size of an SLRU segment
     366            0 :     pub(crate) async fn get_slru_segment_size(
     367            0 :         &self,
     368            0 :         kind: SlruKind,
     369            0 :         segno: u32,
     370            0 :         version: Version<'_>,
     371            0 :         ctx: &RequestContext,
     372            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     373            0 :         let key = slru_segment_size_to_key(kind, segno);
     374            0 :         let mut buf = version.get(self, key, ctx).await?;
     375            0 :         Ok(buf.get_u32_le())
     376            0 :     }
     377              : 
     378              :     /// Get size of an SLRU segment
     379            0 :     pub(crate) async fn get_slru_segment_exists(
     380            0 :         &self,
     381            0 :         kind: SlruKind,
     382            0 :         segno: u32,
     383            0 :         version: Version<'_>,
     384            0 :         ctx: &RequestContext,
     385            0 :     ) -> Result<bool, PageReconstructError> {
     386            0 :         // fetch directory listing
     387            0 :         let key = slru_dir_to_key(kind);
     388            0 :         let buf = version.get(self, key, ctx).await?;
     389              : 
     390            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     391            0 :         Ok(dir.segments.contains(&segno))
     392            0 :     }
     393              : 
     394              :     /// Locate LSN, such that all transactions that committed before
     395              :     /// 'search_timestamp' are visible, but nothing newer is.
     396              :     ///
     397              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     398              :     /// so it's not well defined which LSN you get if there were multiple commits
     399              :     /// "in flight" at that point in time.
     400              :     ///
     401            0 :     pub(crate) async fn find_lsn_for_timestamp(
     402            0 :         &self,
     403            0 :         search_timestamp: TimestampTz,
     404            0 :         cancel: &CancellationToken,
     405            0 :         ctx: &RequestContext,
     406            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     407            0 :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     408              : 
     409            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     410            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     411            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     412            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     413            0 :         // on the safe side.
     414            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     415            0 :         let max_lsn = self.get_last_record_lsn();
     416            0 : 
     417            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     418            0 :         // LSN divided by 8.
     419            0 :         let mut low = min_lsn.0 / 8;
     420            0 :         let mut high = max_lsn.0 / 8 + 1;
     421            0 : 
     422            0 :         let mut found_smaller = false;
     423            0 :         let mut found_larger = false;
     424              : 
     425            0 :         while low < high {
     426            0 :             if cancel.is_cancelled() {
     427            0 :                 return Err(PageReconstructError::Cancelled);
     428            0 :             }
     429            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     430            0 :             let mid = (high + low) / 2;
     431              : 
     432            0 :             let cmp = self
     433            0 :                 .is_latest_commit_timestamp_ge_than(
     434            0 :                     search_timestamp,
     435            0 :                     Lsn(mid * 8),
     436            0 :                     &mut found_smaller,
     437            0 :                     &mut found_larger,
     438            0 :                     ctx,
     439            0 :                 )
     440            0 :                 .await?;
     441              : 
     442            0 :             if cmp {
     443            0 :                 high = mid;
     444            0 :             } else {
     445            0 :                 low = mid + 1;
     446            0 :             }
     447              :         }
     448              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     449              :         // so the LSN of the last commit record before or at `search_timestamp`.
     450              :         // Remove one from `low` to get `t`.
     451              :         //
     452              :         // FIXME: it would be better to get the LSN of the previous commit.
     453              :         // Otherwise, if you restore to the returned LSN, the database will
     454              :         // include physical changes from later commits that will be marked
     455              :         // as aborted, and will need to be vacuumed away.
     456            0 :         let commit_lsn = Lsn((low - 1) * 8);
     457            0 :         match (found_smaller, found_larger) {
     458              :             (false, false) => {
     459              :                 // This can happen if no commit records have been processed yet, e.g.
     460              :                 // just after importing a cluster.
     461            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     462              :             }
     463              :             (false, true) => {
     464              :                 // Didn't find any commit timestamps smaller than the request
     465            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     466              :             }
     467            0 :             (true, _) if commit_lsn < min_lsn => {
     468            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     469            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     470            0 :                 // below the min_lsn. We should never do that.
     471            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     472              :             }
     473              :             (true, false) => {
     474              :                 // Only found commits with timestamps smaller than the request.
     475              :                 // It's still a valid case for branch creation, return it.
     476              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     477              :                 // case, anyway.
     478            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     479              :             }
     480            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     481              :         }
     482            0 :     }
     483              : 
     484              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     485              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     486              :     ///
     487              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     488              :     /// with a smaller/larger timestamp.
     489              :     ///
     490            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     491            0 :         &self,
     492            0 :         search_timestamp: TimestampTz,
     493            0 :         probe_lsn: Lsn,
     494            0 :         found_smaller: &mut bool,
     495            0 :         found_larger: &mut bool,
     496            0 :         ctx: &RequestContext,
     497            0 :     ) -> Result<bool, PageReconstructError> {
     498            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     499            0 :             if timestamp >= search_timestamp {
     500            0 :                 *found_larger = true;
     501            0 :                 return ControlFlow::Break(true);
     502            0 :             } else {
     503            0 :                 *found_smaller = true;
     504            0 :             }
     505            0 :             ControlFlow::Continue(())
     506            0 :         })
     507            0 :         .await
     508            0 :     }
     509              : 
     510              :     /// Obtain the possible timestamp range for the given lsn.
     511              :     ///
     512              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     513            0 :     pub(crate) async fn get_timestamp_for_lsn(
     514            0 :         &self,
     515            0 :         probe_lsn: Lsn,
     516            0 :         ctx: &RequestContext,
     517            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     518            0 :         let mut max: Option<TimestampTz> = None;
     519            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     520            0 :             if let Some(max_prev) = max {
     521            0 :                 max = Some(max_prev.max(timestamp));
     522            0 :             } else {
     523            0 :                 max = Some(timestamp);
     524            0 :             }
     525            0 :             ControlFlow::Continue(())
     526            0 :         })
     527            0 :         .await?;
     528              : 
     529            0 :         Ok(max)
     530            0 :     }
     531              : 
     532              :     /// Runs the given function on all the timestamps for a given lsn
     533              :     ///
     534              :     /// The return value is either given by the closure, or set to the `Default`
     535              :     /// impl's output.
     536            0 :     async fn map_all_timestamps<T: Default>(
     537            0 :         &self,
     538            0 :         probe_lsn: Lsn,
     539            0 :         ctx: &RequestContext,
     540            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     541            0 :     ) -> Result<T, PageReconstructError> {
     542            0 :         for segno in self
     543            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     544            0 :             .await?
     545              :         {
     546            0 :             let nblocks = self
     547            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     548            0 :                 .await?;
     549            0 :             for blknum in (0..nblocks).rev() {
     550            0 :                 let clog_page = self
     551            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     552            0 :                     .await?;
     553              : 
     554            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     555            0 :                     let mut timestamp_bytes = [0u8; 8];
     556            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     557            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     558            0 : 
     559            0 :                     match f(timestamp) {
     560            0 :                         ControlFlow::Break(b) => return Ok(b),
     561            0 :                         ControlFlow::Continue(()) => (),
     562              :                     }
     563            0 :                 }
     564              :             }
     565              :         }
     566            0 :         Ok(Default::default())
     567            0 :     }
     568              : 
     569            0 :     pub(crate) async fn get_slru_keyspace(
     570            0 :         &self,
     571            0 :         version: Version<'_>,
     572            0 :         ctx: &RequestContext,
     573            0 :     ) -> Result<KeySpace, PageReconstructError> {
     574            0 :         let mut accum = KeySpaceAccum::new();
     575              : 
     576            0 :         for kind in SlruKind::iter() {
     577            0 :             let mut segments: Vec<u32> = self
     578            0 :                 .list_slru_segments(kind, version, ctx)
     579            0 :                 .await?
     580            0 :                 .into_iter()
     581            0 :                 .collect();
     582            0 :             segments.sort_unstable();
     583              : 
     584            0 :             for seg in segments {
     585            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     586              : 
     587            0 :                 accum.add_range(
     588            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     589            0 :                 );
     590              :             }
     591              :         }
     592              : 
     593            0 :         Ok(accum.to_keyspace())
     594            0 :     }
     595              : 
     596              :     /// Get a list of SLRU segments
     597            0 :     pub(crate) async fn list_slru_segments(
     598            0 :         &self,
     599            0 :         kind: SlruKind,
     600            0 :         version: Version<'_>,
     601            0 :         ctx: &RequestContext,
     602            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     603            0 :         // fetch directory entry
     604            0 :         let key = slru_dir_to_key(kind);
     605              : 
     606            0 :         let buf = version.get(self, key, ctx).await?;
     607            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
     608            0 :     }
     609              : 
     610            0 :     pub(crate) async fn get_relmap_file(
     611            0 :         &self,
     612            0 :         spcnode: Oid,
     613            0 :         dbnode: Oid,
     614            0 :         version: Version<'_>,
     615            0 :         ctx: &RequestContext,
     616            0 :     ) -> Result<Bytes, PageReconstructError> {
     617            0 :         let key = relmap_file_key(spcnode, dbnode);
     618              : 
     619            0 :         let buf = version.get(self, key, ctx).await?;
     620            0 :         Ok(buf)
     621            0 :     }
     622              : 
     623          876 :     pub(crate) async fn list_dbdirs(
     624          876 :         &self,
     625          876 :         lsn: Lsn,
     626          876 :         ctx: &RequestContext,
     627          876 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     628              :         // fetch directory entry
     629         9307 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     630              : 
     631          876 :         Ok(DbDirectory::des(&buf)?.dbdirs)
     632          876 :     }
     633              : 
     634            0 :     pub(crate) async fn get_twophase_file(
     635            0 :         &self,
     636            0 :         xid: u64,
     637            0 :         lsn: Lsn,
     638            0 :         ctx: &RequestContext,
     639            0 :     ) -> Result<Bytes, PageReconstructError> {
     640            0 :         let key = twophase_file_key(xid);
     641            0 :         let buf = self.get(key, lsn, ctx).await?;
     642            0 :         Ok(buf)
     643            0 :     }
     644              : 
     645          882 :     pub(crate) async fn list_twophase_files(
     646          882 :         &self,
     647          882 :         lsn: Lsn,
     648          882 :         ctx: &RequestContext,
     649          882 :     ) -> Result<HashSet<u64>, PageReconstructError> {
     650              :         // fetch directory entry
     651         9450 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     652              : 
     653          882 :         if self.pg_version >= 17 {
     654            0 :             Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
     655              :         } else {
     656          882 :             Ok(TwoPhaseDirectory::des(&buf)?
     657              :                 .xids
     658          882 :                 .iter()
     659          882 :                 .map(|x| u64::from(*x))
     660          882 :                 .collect())
     661              :         }
     662          882 :     }
     663              : 
     664            0 :     pub(crate) async fn get_control_file(
     665            0 :         &self,
     666            0 :         lsn: Lsn,
     667            0 :         ctx: &RequestContext,
     668            0 :     ) -> Result<Bytes, PageReconstructError> {
     669            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     670            0 :     }
     671              : 
     672           36 :     pub(crate) async fn get_checkpoint(
     673           36 :         &self,
     674           36 :         lsn: Lsn,
     675           36 :         ctx: &RequestContext,
     676           36 :     ) -> Result<Bytes, PageReconstructError> {
     677           36 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     678           36 :     }
     679              : 
     680           48 :     async fn list_aux_files_v1(
     681           48 :         &self,
     682           48 :         lsn: Lsn,
     683           48 :         ctx: &RequestContext,
     684           48 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     685           48 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     686           30 :             Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
     687           18 :             Err(e) => {
     688           18 :                 // This is expected: historical databases do not have the key.
     689           18 :                 debug!("Failed to get info about AUX files: {}", e);
     690           18 :                 Ok(HashMap::new())
     691              :             }
     692              :         }
     693           48 :     }
     694              : 
     695           72 :     async fn list_aux_files_v2(
     696           72 :         &self,
     697           72 :         lsn: Lsn,
     698           72 :         ctx: &RequestContext,
     699           72 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     700           72 :         let kv = self
     701           72 :             .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
     702            0 :             .await?;
     703           72 :         let mut result = HashMap::new();
     704           72 :         let mut sz = 0;
     705          180 :         for (_, v) in kv {
     706          108 :             let v = v?;
     707          108 :             let v = aux_file::decode_file_value_bytes(&v)
     708          108 :                 .context("value decode")
     709          108 :                 .map_err(PageReconstructError::Other)?;
     710          210 :             for (fname, content) in v {
     711          102 :                 sz += fname.len();
     712          102 :                 sz += content.len();
     713          102 :                 result.insert(fname, content);
     714          102 :             }
     715              :         }
     716           72 :         self.aux_file_size_estimator.on_initial(sz);
     717           72 :         Ok(result)
     718           72 :     }
     719              : 
     720            0 :     pub(crate) async fn trigger_aux_file_size_computation(
     721            0 :         &self,
     722            0 :         lsn: Lsn,
     723            0 :         ctx: &RequestContext,
     724            0 :     ) -> Result<(), PageReconstructError> {
     725            0 :         let current_policy = self.last_aux_file_policy.load();
     726            0 :         if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
     727            0 :             self.list_aux_files_v2(lsn, ctx).await?;
     728            0 :         }
     729            0 :         Ok(())
     730            0 :     }
     731              : 
     732           78 :     pub(crate) async fn list_aux_files(
     733           78 :         &self,
     734           78 :         lsn: Lsn,
     735           78 :         ctx: &RequestContext,
     736           78 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     737           78 :         let current_policy = self.last_aux_file_policy.load();
     738           78 :         match current_policy {
     739              :             Some(AuxFilePolicy::V1) => {
     740            6 :                 let res = self.list_aux_files_v1(lsn, ctx).await?;
     741            6 :                 let empty_str = if res.is_empty() { ", empty" } else { "" };
     742            6 :                 warn!(
     743            0 :                     "this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
     744              :                 );
     745            6 :                 Ok(res)
     746              :             }
     747              :             None => {
     748            0 :                 let res = self.list_aux_files_v1(lsn, ctx).await?;
     749            0 :                 if !res.is_empty() {
     750            0 :                     warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
     751            0 :                 }
     752            0 :                 Ok(res)
     753              :             }
     754           66 :             Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
     755              :             Some(AuxFilePolicy::CrossValidation) => {
     756            6 :                 let v1_result = self.list_aux_files_v1(lsn, ctx).await;
     757            6 :                 let v2_result = self.list_aux_files_v2(lsn, ctx).await;
     758            6 :                 match (v1_result, v2_result) {
     759            6 :                     (Ok(v1), Ok(v2)) => {
     760            6 :                         if v1 != v2 {
     761            0 :                             tracing::error!(
     762            0 :                                 "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
     763              :                             );
     764            0 :                             return Err(PageReconstructError::Other(anyhow::anyhow!(
     765            0 :                                 "unmatched aux file v1 v2 result"
     766            0 :                             )));
     767            6 :                         }
     768            6 :                         Ok(v1)
     769              :                     }
     770            0 :                     (Ok(_), Err(v2)) => {
     771            0 :                         tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
     772            0 :                         Err(v2)
     773              :                     }
     774            0 :                     (Err(v1), Ok(_)) => {
     775            0 :                         tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
     776            0 :                         Err(v1)
     777              :                     }
     778            0 :                     (Err(_), Err(v2)) => Err(v2),
     779              :                 }
     780              :             }
     781              :         }
     782           78 :     }
     783              : 
     784            0 :     pub(crate) async fn get_replorigins(
     785            0 :         &self,
     786            0 :         lsn: Lsn,
     787            0 :         ctx: &RequestContext,
     788            0 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
     789            0 :         let kv = self
     790            0 :             .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
     791            0 :             .await?;
     792            0 :         let mut result = HashMap::new();
     793            0 :         for (k, v) in kv {
     794            0 :             let v = v?;
     795            0 :             let origin_id = k.field6 as RepOriginId;
     796            0 :             let origin_lsn = Lsn::des(&v).unwrap();
     797            0 :             if origin_lsn != Lsn::INVALID {
     798            0 :                 result.insert(origin_id, origin_lsn);
     799            0 :             }
     800              :         }
     801            0 :         Ok(result)
     802            0 :     }
     803              : 
     804              :     /// Does the same as get_current_logical_size but counted on demand.
     805              :     /// Used to initialize the logical size tracking on startup.
     806              :     ///
     807              :     /// Only relation blocks are counted currently. That excludes metadata,
     808              :     /// SLRUs, twophase files etc.
     809              :     ///
     810              :     /// # Cancel-Safety
     811              :     ///
     812              :     /// This method is cancellation-safe.
     813            0 :     pub(crate) async fn get_current_logical_size_non_incremental(
     814            0 :         &self,
     815            0 :         lsn: Lsn,
     816            0 :         ctx: &RequestContext,
     817            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     818            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     819              : 
     820              :         // Fetch list of database dirs and iterate them
     821            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     822            0 :         let dbdir = DbDirectory::des(&buf)?;
     823              : 
     824            0 :         let mut total_size: u64 = 0;
     825            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     826            0 :             for rel in self
     827            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     828            0 :                 .await?
     829              :             {
     830            0 :                 if self.cancel.is_cancelled() {
     831            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     832            0 :                 }
     833            0 :                 let relsize_key = rel_size_to_key(rel);
     834            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     835            0 :                 let relsize = buf.get_u32_le();
     836            0 : 
     837            0 :                 total_size += relsize as u64;
     838              :             }
     839              :         }
     840            0 :         Ok(total_size * BLCKSZ as u64)
     841            0 :     }
     842              : 
     843              :     /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
     844              :     /// for gc-compaction.
     845              :     ///
     846              :     /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
     847              :     /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
     848              :     /// be kept only for a specific range of LSN.
     849              :     ///
     850              :     /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
     851              :     /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
     852              :     /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
     853              :     /// determine which keys to retain/drop for gc-compaction.
     854              :     ///
     855              :     /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
     856              :     /// to be retained for each of the branch LSN.
     857              :     ///
     858              :     /// The return value is (dense keyspace, sparse keyspace).
     859           78 :     pub(crate) async fn collect_gc_compaction_keyspace(
     860           78 :         &self,
     861           78 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     862           78 :         let metadata_key_begin = Key::metadata_key_range().start;
     863           78 :         let aux_v1_key = AUX_FILES_KEY;
     864           78 :         let dense_keyspace = KeySpace {
     865           78 :             ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
     866           78 :         };
     867           78 :         Ok((
     868           78 :             dense_keyspace,
     869           78 :             SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
     870           78 :         ))
     871           78 :     }
     872              : 
     873              :     ///
     874              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     875              :     /// Anything that's not listed maybe removed from the underlying storage (from
     876              :     /// that LSN forwards).
     877              :     ///
     878              :     /// The return value is (dense keyspace, sparse keyspace).
     879          876 :     pub(crate) async fn collect_keyspace(
     880          876 :         &self,
     881          876 :         lsn: Lsn,
     882          876 :         ctx: &RequestContext,
     883          876 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     884          876 :         // Iterate through key ranges, greedily packing them into partitions
     885          876 :         let mut result = KeySpaceAccum::new();
     886          876 : 
     887          876 :         // The dbdir metadata always exists
     888          876 :         result.add_key(DBDIR_KEY);
     889              : 
     890              :         // Fetch list of database dirs and iterate them
     891         9307 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
     892          876 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
     893          876 : 
     894          876 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
     895          876 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
     896            0 :             if has_relmap_file {
     897            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
     898            0 :             }
     899            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     900              : 
     901            0 :             let mut rels: Vec<RelTag> = self
     902            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     903            0 :                 .await?
     904            0 :                 .into_iter()
     905            0 :                 .collect();
     906            0 :             rels.sort_unstable();
     907            0 :             for rel in rels {
     908            0 :                 let relsize_key = rel_size_to_key(rel);
     909            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     910            0 :                 let relsize = buf.get_u32_le();
     911            0 : 
     912            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     913            0 :                 result.add_key(relsize_key);
     914              :             }
     915              :         }
     916              : 
     917              :         // Iterate SLRUs next
     918         2628 :         for kind in [
     919          876 :             SlruKind::Clog,
     920          876 :             SlruKind::MultiXactMembers,
     921          876 :             SlruKind::MultiXactOffsets,
     922              :         ] {
     923         2628 :             let slrudir_key = slru_dir_to_key(kind);
     924         2628 :             result.add_key(slrudir_key);
     925        28775 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     926         2628 :             let dir = SlruSegmentDirectory::des(&buf)?;
     927         2628 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     928         2628 :             segments.sort_unstable();
     929         2628 :             for segno in segments {
     930            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     931            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     932            0 :                 let segsize = buf.get_u32_le();
     933            0 : 
     934            0 :                 result.add_range(
     935            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     936            0 :                 );
     937            0 :                 result.add_key(segsize_key);
     938              :             }
     939              :         }
     940              : 
     941              :         // Then pg_twophase
     942          876 :         result.add_key(TWOPHASEDIR_KEY);
     943              : 
     944          876 :         let mut xids: Vec<u64> = self
     945          876 :             .list_twophase_files(lsn, ctx)
     946         9448 :             .await?
     947          876 :             .iter()
     948          876 :             .cloned()
     949          876 :             .collect();
     950          876 :         xids.sort_unstable();
     951          876 :         for xid in xids {
     952            0 :             result.add_key(twophase_file_key(xid));
     953            0 :         }
     954              : 
     955          876 :         result.add_key(CONTROLFILE_KEY);
     956          876 :         result.add_key(CHECKPOINT_KEY);
     957          876 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     958           12 :             result.add_key(AUX_FILES_KEY);
     959          864 :         }
     960              : 
     961              :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
     962              :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
     963              :         // and the keys will not be garbage-colllected.
     964              :         #[cfg(test)]
     965              :         {
     966          876 :             let guard = self.extra_test_dense_keyspace.load();
     967          876 :             for kr in &guard.ranges {
     968            0 :                 result.add_range(kr.clone());
     969            0 :             }
     970              :         }
     971              : 
     972          876 :         let dense_keyspace = result.to_keyspace();
     973          876 :         let sparse_keyspace = SparseKeySpace(KeySpace {
     974          876 :             ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
     975          876 :         });
     976          876 : 
     977          876 :         if cfg!(debug_assertions) {
     978              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
     979              : 
     980              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
     981              :             // category of sparse keys are split into their own image/delta files. If there
     982              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
     983              :             // and we want the developer to keep the keyspaces separated.
     984              : 
     985          876 :             let ranges = &sparse_keyspace.0.ranges;
     986              : 
     987              :             // TODO: use a single overlaps_with across the codebase
     988          876 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
     989          876 :                 !(a.end <= b.start || b.end <= a.start)
     990          876 :             }
     991         1752 :             for i in 0..ranges.len() {
     992         1752 :                 for j in 0..i {
     993          876 :                     if overlaps_with(&ranges[i], &ranges[j]) {
     994            0 :                         panic!(
     995            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
     996            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
     997            0 :                         );
     998          876 :                     }
     999              :                 }
    1000              :             }
    1001          876 :             for i in 1..ranges.len() {
    1002          876 :                 assert!(
    1003          876 :                     ranges[i - 1].end <= ranges[i].start,
    1004            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
    1005            0 :                     ranges[i - 1].start,
    1006            0 :                     ranges[i - 1].end,
    1007            0 :                     ranges[i].start,
    1008            0 :                     ranges[i].end
    1009              :                 );
    1010              :             }
    1011            0 :         }
    1012              : 
    1013          876 :         Ok((dense_keyspace, sparse_keyspace))
    1014          876 :     }
    1015              : 
    1016              :     /// Get cached size of relation if it not updated after specified LSN
    1017      1345620 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
    1018      1345620 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
    1019      1345620 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
    1020      1345554 :             if lsn >= *cached_lsn {
    1021      1330116 :                 return Some(*nblocks);
    1022        15438 :             }
    1023           66 :         }
    1024        15504 :         None
    1025      1345620 :     }
    1026              : 
    1027              :     /// Update cached relation size if there is no more recent update
    1028        15408 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1029        15408 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1030        15408 : 
    1031        15408 :         if lsn < rel_size_cache.complete_as_of {
    1032              :             // Do not cache old values. It's safe to cache the size on read, as long as
    1033              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
    1034              :             // never evict values from the cache, so if the relation size changed after
    1035              :             // 'lsn', the new value is already in the cache.
    1036            0 :             return;
    1037        15408 :         }
    1038        15408 : 
    1039        15408 :         match rel_size_cache.map.entry(tag) {
    1040        15408 :             hash_map::Entry::Occupied(mut entry) => {
    1041        15408 :                 let cached_lsn = entry.get_mut();
    1042        15408 :                 if lsn >= cached_lsn.0 {
    1043            0 :                     *cached_lsn = (lsn, nblocks);
    1044        15408 :                 }
    1045              :             }
    1046            0 :             hash_map::Entry::Vacant(entry) => {
    1047            0 :                 entry.insert((lsn, nblocks));
    1048            0 :             }
    1049              :         }
    1050        15408 :     }
    1051              : 
    1052              :     /// Store cached relation size
    1053       866196 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1054       866196 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1055       866196 :         rel_size_cache.map.insert(tag, (lsn, nblocks));
    1056       866196 :     }
    1057              : 
    1058              :     /// Remove cached relation size
    1059            6 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1060            6 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1061            6 :         rel_size_cache.map.remove(tag);
    1062            6 :     }
    1063              : }
    1064              : 
    1065              : /// DatadirModification represents an operation to ingest an atomic set of
    1066              : /// updates to the repository.
    1067              : ///
    1068              : /// It is created by the 'begin_record' function. It is called for each WAL
    1069              : /// record, so that all the modifications by a one WAL record appear atomic.
    1070              : pub struct DatadirModification<'a> {
    1071              :     /// The timeline this modification applies to. You can access this to
    1072              :     /// read the state, but note that any pending updates are *not* reflected
    1073              :     /// in the state in 'tline' yet.
    1074              :     pub tline: &'a Timeline,
    1075              : 
    1076              :     /// Current LSN of the modification
    1077              :     lsn: Lsn,
    1078              : 
    1079              :     // The modifications are not applied directly to the underlying key-value store.
    1080              :     // The put-functions add the modifications here, and they are flushed to the
    1081              :     // underlying key-value store by the 'finish' function.
    1082              :     pending_lsns: Vec<Lsn>,
    1083              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1084              :     pending_nblocks: i64,
    1085              : 
    1086              :     /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
    1087              :     /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
    1088              :     pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
    1089              : 
    1090              :     /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
    1091              :     /// which keys are stored here.
    1092              :     pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,
    1093              : 
    1094              :     // Sometimes during ingest, for example when extending a relation, we would like to write a zero page.  However,
    1095              :     // if we encounter a write from postgres in the same wal record, we will drop this entry.
    1096              :     //
    1097              :     // Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
    1098              :     // at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
    1099              :     pending_zero_data_pages: HashSet<CompactKey>,
    1100              : 
    1101              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1102              :     /// if it was updated in this modification.
    1103              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
    1104              : 
    1105              :     /// An **approximation** of how large our EphemeralFile write will be when committed.
    1106              :     pending_bytes: usize,
    1107              : }
    1108              : 
    1109              : impl<'a> DatadirModification<'a> {
    1110              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1111              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1112              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1113              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1114              : 
    1115              :     /// Get the current lsn
    1116      1254168 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1117      1254168 :         self.lsn
    1118      1254168 :     }
    1119              : 
    1120            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1121            0 :         self.pending_bytes
    1122            0 :     }
    1123              : 
    1124            0 :     pub(crate) fn has_dirty_data_pages(&self) -> bool {
    1125            0 :         (!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
    1126            0 :     }
    1127              : 
    1128              :     /// Set the current lsn
    1129       437574 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
    1130       437574 :         ensure!(
    1131       437574 :             lsn >= self.lsn,
    1132            0 :             "setting an older lsn {} than {} is not allowed",
    1133              :             lsn,
    1134              :             self.lsn
    1135              :         );
    1136              : 
    1137              :         // If we are advancing LSN, then state from previous wal record should have been flushed.
    1138       437574 :         assert!(self.pending_zero_data_pages.is_empty());
    1139              : 
    1140       437574 :         if lsn > self.lsn {
    1141       437574 :             self.pending_lsns.push(self.lsn);
    1142       437574 :             self.lsn = lsn;
    1143       437574 :         }
    1144       437574 :         Ok(())
    1145       437574 :     }
    1146              : 
    1147              :     /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
    1148              :     /// keys that represent literal blocks that postgres can read.  So data includes relation blocks and
    1149              :     /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
    1150              :     ///
    1151              :     /// The distinction is important because data keys are handled on a fast path where dirty writes are
    1152              :     /// not readable until this modification is committed, whereas metadata keys are visible for read
    1153              :     /// via [`Self::get`] as soon as their record has been ingested.
    1154      2988336 :     fn is_data_key(key: &Key) -> bool {
    1155      2988336 :         key.is_rel_block_key() || key.is_slru_block_key()
    1156      2988336 :     }
    1157              : 
    1158              :     /// Initialize a completely new repository.
    1159              :     ///
    1160              :     /// This inserts the directory metadata entries that are assumed to
    1161              :     /// always exist.
    1162          534 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1163          534 :         let buf = DbDirectory::ser(&DbDirectory {
    1164          534 :             dbdirs: HashMap::new(),
    1165          534 :         })?;
    1166          534 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
    1167          534 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1168          534 : 
    1169          534 :         // Create AuxFilesDirectory
    1170          534 :         self.init_aux_dir()?;
    1171              : 
    1172          534 :         let buf = if self.tline.pg_version >= 17 {
    1173            0 :             TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
    1174            0 :                 xids: HashSet::new(),
    1175            0 :             })
    1176              :         } else {
    1177          534 :             TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1178          534 :                 xids: HashSet::new(),
    1179          534 :             })
    1180            0 :         }?;
    1181          534 :         self.pending_directory_entries
    1182          534 :             .push((DirectoryKind::TwoPhase, 0));
    1183          534 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1184              : 
    1185          534 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1186          534 :         let empty_dir = Value::Image(buf);
    1187          534 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1188          534 :         self.pending_directory_entries
    1189          534 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1190          534 :         self.put(
    1191          534 :             slru_dir_to_key(SlruKind::MultiXactMembers),
    1192          534 :             empty_dir.clone(),
    1193          534 :         );
    1194          534 :         self.pending_directory_entries
    1195          534 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1196          534 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1197          534 :         self.pending_directory_entries
    1198          534 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
    1199          534 : 
    1200          534 :         Ok(())
    1201          534 :     }
    1202              : 
    1203              :     #[cfg(test)]
    1204          528 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1205          528 :         self.init_empty()?;
    1206          528 :         self.put_control_file(bytes::Bytes::from_static(
    1207          528 :             b"control_file contents do not matter",
    1208          528 :         ))
    1209          528 :         .context("put_control_file")?;
    1210          528 :         self.put_checkpoint(bytes::Bytes::from_static(
    1211          528 :             b"checkpoint_file contents do not matter",
    1212          528 :         ))
    1213          528 :         .context("put_checkpoint_file")?;
    1214          528 :         Ok(())
    1215          528 :     }
    1216              : 
    1217              :     /// Put a new page version that can be constructed from a WAL record
    1218              :     ///
    1219              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1220              :     /// current end-of-file. It's up to the caller to check that the relation size
    1221              :     /// matches the blocks inserted!
    1222       436890 :     pub fn put_rel_wal_record(
    1223       436890 :         &mut self,
    1224       436890 :         rel: RelTag,
    1225       436890 :         blknum: BlockNumber,
    1226       436890 :         rec: NeonWalRecord,
    1227       436890 :     ) -> anyhow::Result<()> {
    1228       436890 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1229       436890 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1230       436890 :         Ok(())
    1231       436890 :     }
    1232              : 
    1233              :     // Same, but for an SLRU.
    1234           24 :     pub fn put_slru_wal_record(
    1235           24 :         &mut self,
    1236           24 :         kind: SlruKind,
    1237           24 :         segno: u32,
    1238           24 :         blknum: BlockNumber,
    1239           24 :         rec: NeonWalRecord,
    1240           24 :     ) -> anyhow::Result<()> {
    1241           24 :         self.put(
    1242           24 :             slru_block_to_key(kind, segno, blknum),
    1243           24 :             Value::WalRecord(rec),
    1244           24 :         );
    1245           24 :         Ok(())
    1246           24 :     }
    1247              : 
    1248              :     /// Like put_wal_record, but with ready-made image of the page.
    1249       833598 :     pub fn put_rel_page_image(
    1250       833598 :         &mut self,
    1251       833598 :         rel: RelTag,
    1252       833598 :         blknum: BlockNumber,
    1253       833598 :         img: Bytes,
    1254       833598 :     ) -> anyhow::Result<()> {
    1255       833598 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1256       833598 :         let key = rel_block_to_key(rel, blknum);
    1257       833598 :         if !key.is_valid_key_on_write_path() {
    1258            0 :             anyhow::bail!(
    1259            0 :                 "the request contains data not supported by pageserver at {}",
    1260            0 :                 key
    1261            0 :             );
    1262       833598 :         }
    1263       833598 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1264       833598 :         Ok(())
    1265       833598 :     }
    1266              : 
    1267           18 :     pub fn put_slru_page_image(
    1268           18 :         &mut self,
    1269           18 :         kind: SlruKind,
    1270           18 :         segno: u32,
    1271           18 :         blknum: BlockNumber,
    1272           18 :         img: Bytes,
    1273           18 :     ) -> anyhow::Result<()> {
    1274           18 :         let key = slru_block_to_key(kind, segno, blknum);
    1275           18 :         if !key.is_valid_key_on_write_path() {
    1276            0 :             anyhow::bail!(
    1277            0 :                 "the request contains data not supported by pageserver at {}",
    1278            0 :                 key
    1279            0 :             );
    1280           18 :         }
    1281           18 :         self.put(key, Value::Image(img));
    1282           18 :         Ok(())
    1283           18 :     }
    1284              : 
    1285         8994 :     pub(crate) fn put_rel_page_image_zero(
    1286         8994 :         &mut self,
    1287         8994 :         rel: RelTag,
    1288         8994 :         blknum: BlockNumber,
    1289         8994 :     ) -> anyhow::Result<()> {
    1290         8994 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1291         8994 :         let key = rel_block_to_key(rel, blknum);
    1292         8994 :         if !key.is_valid_key_on_write_path() {
    1293            0 :             anyhow::bail!(
    1294            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1295            0 :                 key,
    1296            0 :                 self.lsn
    1297            0 :             );
    1298         8994 :         }
    1299         8994 :         self.pending_zero_data_pages.insert(key.to_compact());
    1300         8994 :         self.pending_bytes += ZERO_PAGE.len();
    1301         8994 :         Ok(())
    1302         8994 :     }
    1303              : 
    1304            0 :     pub(crate) fn put_slru_page_image_zero(
    1305            0 :         &mut self,
    1306            0 :         kind: SlruKind,
    1307            0 :         segno: u32,
    1308            0 :         blknum: BlockNumber,
    1309            0 :     ) -> anyhow::Result<()> {
    1310            0 :         let key = slru_block_to_key(kind, segno, blknum);
    1311            0 :         if !key.is_valid_key_on_write_path() {
    1312            0 :             anyhow::bail!(
    1313            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1314            0 :                 key,
    1315            0 :                 self.lsn
    1316            0 :             );
    1317            0 :         }
    1318            0 :         self.pending_zero_data_pages.insert(key.to_compact());
    1319            0 :         self.pending_bytes += ZERO_PAGE.len();
    1320            0 :         Ok(())
    1321            0 :     }
    1322              : 
    1323              :     /// Call this at the end of each WAL record.
    1324       437592 :     pub(crate) fn on_record_end(&mut self) {
    1325       437592 :         let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
    1326       446586 :         for key in pending_zero_data_pages {
    1327         8994 :             self.put_data(key, Value::Image(ZERO_PAGE.clone()));
    1328         8994 :         }
    1329       437592 :     }
    1330              : 
    1331              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1332           48 :     pub async fn put_relmap_file(
    1333           48 :         &mut self,
    1334           48 :         spcnode: Oid,
    1335           48 :         dbnode: Oid,
    1336           48 :         img: Bytes,
    1337           48 :         ctx: &RequestContext,
    1338           48 :     ) -> anyhow::Result<()> {
    1339              :         // Add it to the directory (if it doesn't exist already)
    1340           48 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1341           48 :         let mut dbdir = DbDirectory::des(&buf)?;
    1342              : 
    1343           48 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1344           48 :         if r.is_none() || r == Some(false) {
    1345              :             // The dbdir entry didn't exist, or it contained a
    1346              :             // 'false'. The 'insert' call already updated it with
    1347              :             // 'true', now write the updated 'dbdirs' map back.
    1348           48 :             let buf = DbDirectory::ser(&dbdir)?;
    1349           48 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1350           48 : 
    1351           48 :             // Create AuxFilesDirectory as well
    1352           48 :             self.init_aux_dir()?;
    1353            0 :         }
    1354           48 :         if r.is_none() {
    1355              :             // Create RelDirectory
    1356           24 :             let buf = RelDirectory::ser(&RelDirectory {
    1357           24 :                 rels: HashSet::new(),
    1358           24 :             })?;
    1359           24 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1360           24 :             self.put(
    1361           24 :                 rel_dir_to_key(spcnode, dbnode),
    1362           24 :                 Value::Image(Bytes::from(buf)),
    1363           24 :             );
    1364           24 :         }
    1365              : 
    1366           48 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1367           48 :         Ok(())
    1368           48 :     }
    1369              : 
    1370            0 :     pub async fn put_twophase_file(
    1371            0 :         &mut self,
    1372            0 :         xid: u64,
    1373            0 :         img: Bytes,
    1374            0 :         ctx: &RequestContext,
    1375            0 :     ) -> anyhow::Result<()> {
    1376              :         // Add it to the directory entry
    1377            0 :         let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1378            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1379            0 :             let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
    1380            0 :             if !dir.xids.insert(xid) {
    1381            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1382            0 :             }
    1383            0 :             self.pending_directory_entries
    1384            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1385            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1386              :         } else {
    1387            0 :             let xid = xid as u32;
    1388            0 :             let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
    1389            0 :             if !dir.xids.insert(xid) {
    1390            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1391            0 :             }
    1392            0 :             self.pending_directory_entries
    1393            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1394            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1395              :         };
    1396            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1397            0 : 
    1398            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1399            0 :         Ok(())
    1400            0 :     }
    1401              : 
    1402            0 :     pub async fn set_replorigin(
    1403            0 :         &mut self,
    1404            0 :         origin_id: RepOriginId,
    1405            0 :         origin_lsn: Lsn,
    1406            0 :     ) -> anyhow::Result<()> {
    1407            0 :         let key = repl_origin_key(origin_id);
    1408            0 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    1409            0 :         Ok(())
    1410            0 :     }
    1411              : 
    1412            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
    1413            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    1414            0 :     }
    1415              : 
    1416          534 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1417          534 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1418          534 :         Ok(())
    1419          534 :     }
    1420              : 
    1421          576 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1422          576 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1423          576 :         Ok(())
    1424          576 :     }
    1425              : 
    1426            0 :     pub async fn drop_dbdir(
    1427            0 :         &mut self,
    1428            0 :         spcnode: Oid,
    1429            0 :         dbnode: Oid,
    1430            0 :         ctx: &RequestContext,
    1431            0 :     ) -> anyhow::Result<()> {
    1432            0 :         let total_blocks = self
    1433            0 :             .tline
    1434            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1435            0 :             .await?;
    1436              : 
    1437              :         // Remove entry from dbdir
    1438            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1439            0 :         let mut dir = DbDirectory::des(&buf)?;
    1440            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1441            0 :             let buf = DbDirectory::ser(&dir)?;
    1442            0 :             self.pending_directory_entries
    1443            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1444            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1445              :         } else {
    1446            0 :             warn!(
    1447            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1448              :                 spcnode, dbnode
    1449              :             );
    1450              :         }
    1451              : 
    1452              :         // Update logical database size.
    1453            0 :         self.pending_nblocks -= total_blocks as i64;
    1454            0 : 
    1455            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1456            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1457            0 :         Ok(())
    1458            0 :     }
    1459              : 
    1460              :     /// Create a relation fork.
    1461              :     ///
    1462              :     /// 'nblocks' is the initial size.
    1463         5760 :     pub async fn put_rel_creation(
    1464         5760 :         &mut self,
    1465         5760 :         rel: RelTag,
    1466         5760 :         nblocks: BlockNumber,
    1467         5760 :         ctx: &RequestContext,
    1468         5760 :     ) -> Result<(), RelationError> {
    1469         5760 :         if rel.relnode == 0 {
    1470            0 :             return Err(RelationError::InvalidRelnode);
    1471         5760 :         }
    1472              :         // It's possible that this is the first rel for this db in this
    1473              :         // tablespace.  Create the reldir entry for it if so.
    1474         5760 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1475         5760 :             .context("deserialize db")?;
    1476         5760 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1477         5760 :         let mut rel_dir =
    1478         5760 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1479              :                 // Didn't exist. Update dbdir
    1480           24 :                 e.insert(false);
    1481           24 :                 let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1482           24 :                 self.pending_directory_entries
    1483           24 :                     .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1484           24 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1485           24 : 
    1486           24 :                 // and create the RelDirectory
    1487           24 :                 RelDirectory::default()
    1488              :             } else {
    1489              :                 // reldir already exists, fetch it
    1490         5736 :                 RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1491         5736 :                     .context("deserialize db")?
    1492              :             };
    1493              : 
    1494              :         // Add the new relation to the rel directory entry, and write it back
    1495         5760 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1496            0 :             return Err(RelationError::AlreadyExists);
    1497         5760 :         }
    1498         5760 : 
    1499         5760 :         self.pending_directory_entries
    1500         5760 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1501         5760 : 
    1502         5760 :         self.put(
    1503         5760 :             rel_dir_key,
    1504         5760 :             Value::Image(Bytes::from(
    1505         5760 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1506              :             )),
    1507              :         );
    1508              : 
    1509              :         // Put size
    1510         5760 :         let size_key = rel_size_to_key(rel);
    1511         5760 :         let buf = nblocks.to_le_bytes();
    1512         5760 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1513         5760 : 
    1514         5760 :         self.pending_nblocks += nblocks as i64;
    1515         5760 : 
    1516         5760 :         // Update relation size cache
    1517         5760 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1518         5760 : 
    1519         5760 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1520         5760 :         // caller.
    1521         5760 :         Ok(())
    1522         5760 :     }
    1523              : 
    1524              :     /// Truncate relation
    1525        18036 :     pub async fn put_rel_truncation(
    1526        18036 :         &mut self,
    1527        18036 :         rel: RelTag,
    1528        18036 :         nblocks: BlockNumber,
    1529        18036 :         ctx: &RequestContext,
    1530        18036 :     ) -> anyhow::Result<()> {
    1531        18036 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1532        18036 :         if self
    1533        18036 :             .tline
    1534        18036 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1535            0 :             .await?
    1536              :         {
    1537        18036 :             let size_key = rel_size_to_key(rel);
    1538              :             // Fetch the old size first
    1539        18036 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1540        18036 : 
    1541        18036 :             // Update the entry with the new size.
    1542        18036 :             let buf = nblocks.to_le_bytes();
    1543        18036 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1544        18036 : 
    1545        18036 :             // Update relation size cache
    1546        18036 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1547        18036 : 
    1548        18036 :             // Update relation size cache
    1549        18036 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1550        18036 : 
    1551        18036 :             // Update logical database size.
    1552        18036 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1553            0 :         }
    1554        18036 :         Ok(())
    1555        18036 :     }
    1556              : 
    1557              :     /// Extend relation
    1558              :     /// If new size is smaller, do nothing.
    1559       830040 :     pub async fn put_rel_extend(
    1560       830040 :         &mut self,
    1561       830040 :         rel: RelTag,
    1562       830040 :         nblocks: BlockNumber,
    1563       830040 :         ctx: &RequestContext,
    1564       830040 :     ) -> anyhow::Result<()> {
    1565       830040 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1566              : 
    1567              :         // Put size
    1568       830040 :         let size_key = rel_size_to_key(rel);
    1569       830040 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1570       830040 : 
    1571       830040 :         // only extend relation here. never decrease the size
    1572       830040 :         if nblocks > old_size {
    1573       824364 :             let buf = nblocks.to_le_bytes();
    1574       824364 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1575       824364 : 
    1576       824364 :             // Update relation size cache
    1577       824364 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1578       824364 : 
    1579       824364 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1580       824364 :         }
    1581       830040 :         Ok(())
    1582       830040 :     }
    1583              : 
    1584              :     /// Drop a relation.
    1585            6 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1586            6 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1587              : 
    1588              :         // Remove it from the directory entry
    1589            6 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1590            6 :         let buf = self.get(dir_key, ctx).await?;
    1591            6 :         let mut dir = RelDirectory::des(&buf)?;
    1592              : 
    1593            6 :         self.pending_directory_entries
    1594            6 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1595            6 : 
    1596            6 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1597            6 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1598              :         } else {
    1599            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1600              :         }
    1601              : 
    1602              :         // update logical size
    1603            6 :         let size_key = rel_size_to_key(rel);
    1604            6 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1605            6 :         self.pending_nblocks -= old_size as i64;
    1606            6 : 
    1607            6 :         // Remove enty from relation size cache
    1608            6 :         self.tline.remove_cached_rel_size(&rel);
    1609            6 : 
    1610            6 :         // Delete size entry, as well as all blocks
    1611            6 :         self.delete(rel_key_range(rel));
    1612            6 : 
    1613            6 :         Ok(())
    1614            6 :     }
    1615              : 
    1616           18 :     pub async fn put_slru_segment_creation(
    1617           18 :         &mut self,
    1618           18 :         kind: SlruKind,
    1619           18 :         segno: u32,
    1620           18 :         nblocks: BlockNumber,
    1621           18 :         ctx: &RequestContext,
    1622           18 :     ) -> anyhow::Result<()> {
    1623           18 :         // Add it to the directory entry
    1624           18 :         let dir_key = slru_dir_to_key(kind);
    1625           18 :         let buf = self.get(dir_key, ctx).await?;
    1626           18 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1627              : 
    1628           18 :         if !dir.segments.insert(segno) {
    1629            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1630           18 :         }
    1631           18 :         self.pending_directory_entries
    1632           18 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1633           18 :         self.put(
    1634           18 :             dir_key,
    1635           18 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1636              :         );
    1637              : 
    1638              :         // Put size
    1639           18 :         let size_key = slru_segment_size_to_key(kind, segno);
    1640           18 :         let buf = nblocks.to_le_bytes();
    1641           18 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1642           18 : 
    1643           18 :         // even if nblocks > 0, we don't insert any actual blocks here
    1644           18 : 
    1645           18 :         Ok(())
    1646           18 :     }
    1647              : 
    1648              :     /// Extend SLRU segment
    1649            0 :     pub fn put_slru_extend(
    1650            0 :         &mut self,
    1651            0 :         kind: SlruKind,
    1652            0 :         segno: u32,
    1653            0 :         nblocks: BlockNumber,
    1654            0 :     ) -> anyhow::Result<()> {
    1655            0 :         // Put size
    1656            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1657            0 :         let buf = nblocks.to_le_bytes();
    1658            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1659            0 :         Ok(())
    1660            0 :     }
    1661              : 
    1662              :     /// This method is used for marking truncated SLRU files
    1663            0 :     pub async fn drop_slru_segment(
    1664            0 :         &mut self,
    1665            0 :         kind: SlruKind,
    1666            0 :         segno: u32,
    1667            0 :         ctx: &RequestContext,
    1668            0 :     ) -> anyhow::Result<()> {
    1669            0 :         // Remove it from the directory entry
    1670            0 :         let dir_key = slru_dir_to_key(kind);
    1671            0 :         let buf = self.get(dir_key, ctx).await?;
    1672            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1673              : 
    1674            0 :         if !dir.segments.remove(&segno) {
    1675            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1676            0 :         }
    1677            0 :         self.pending_directory_entries
    1678            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1679            0 :         self.put(
    1680            0 :             dir_key,
    1681            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1682              :         );
    1683              : 
    1684              :         // Delete size entry, as well as all blocks
    1685            0 :         self.delete(slru_segment_key_range(kind, segno));
    1686            0 : 
    1687            0 :         Ok(())
    1688            0 :     }
    1689              : 
    1690              :     /// Drop a relmapper file (pg_filenode.map)
    1691            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1692            0 :         // TODO
    1693            0 :         Ok(())
    1694            0 :     }
    1695              : 
    1696              :     /// This method is used for marking truncated SLRU files
    1697            0 :     pub async fn drop_twophase_file(
    1698            0 :         &mut self,
    1699            0 :         xid: u64,
    1700            0 :         ctx: &RequestContext,
    1701            0 :     ) -> anyhow::Result<()> {
    1702              :         // Remove it from the directory entry
    1703            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1704            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1705            0 :             let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
    1706              : 
    1707            0 :             if !dir.xids.remove(&xid) {
    1708            0 :                 warn!("twophase file for xid {} does not exist", xid);
    1709            0 :             }
    1710            0 :             self.pending_directory_entries
    1711            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1712            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1713              :         } else {
    1714            0 :             let xid: u32 = u32::try_from(xid)?;
    1715            0 :             let mut dir = TwoPhaseDirectory::des(&buf)?;
    1716              : 
    1717            0 :             if !dir.xids.remove(&xid) {
    1718            0 :                 warn!("twophase file for xid {} does not exist", xid);
    1719            0 :             }
    1720            0 :             self.pending_directory_entries
    1721            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1722            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1723              :         };
    1724            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1725            0 : 
    1726            0 :         // Delete it
    1727            0 :         self.delete(twophase_key_range(xid));
    1728            0 : 
    1729            0 :         Ok(())
    1730            0 :     }
    1731              : 
    1732          582 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1733          582 :         if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
    1734          570 :             return Ok(());
    1735           12 :         }
    1736           12 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1737           12 :             files: HashMap::new(),
    1738           12 :         })?;
    1739           12 :         self.pending_directory_entries
    1740           12 :             .push((DirectoryKind::AuxFiles, 0));
    1741           12 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1742           12 :         Ok(())
    1743          582 :     }
    1744              : 
    1745           90 :     pub async fn put_file(
    1746           90 :         &mut self,
    1747           90 :         path: &str,
    1748           90 :         content: &[u8],
    1749           90 :         ctx: &RequestContext,
    1750           90 :     ) -> anyhow::Result<()> {
    1751           90 :         let switch_policy = self.tline.get_switch_aux_file_policy();
    1752              : 
    1753           90 :         let policy = {
    1754           90 :             let current_policy = self.tline.last_aux_file_policy.load();
    1755              :             // Allowed switch path:
    1756              :             // * no aux files -> v1/v2/cross-validation
    1757              :             // * cross-validation->v2
    1758              : 
    1759           90 :             let current_policy = if current_policy.is_none() {
    1760              :                 // This path will only be hit once per tenant: we will decide the final policy in this code block.
    1761              :                 // The next call to `put_file` will always have `last_aux_file_policy != None`.
    1762           36 :                 let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1763           36 :                 let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
    1764           36 :                 if aux_files_key_v1.is_empty() {
    1765           30 :                     None
    1766              :                 } else {
    1767            6 :                     warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
    1768            6 :                     self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
    1769            6 :                     Some(AuxFilePolicy::V1)
    1770              :                 }
    1771              :             } else {
    1772           54 :                 current_policy
    1773              :             };
    1774              : 
    1775           90 :             if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
    1776           36 :                 self.tline.do_switch_aux_policy(switch_policy)?;
    1777           36 :                 info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
    1778           36 :                 switch_policy
    1779              :             } else {
    1780              :                 // This branch handles non-valid migration path, and the case that switch_policy == current_policy.
    1781              :                 // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
    1782           54 :                 current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
    1783              :             }
    1784              :         };
    1785              : 
    1786           90 :         if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
    1787           78 :             let key = aux_file::encode_aux_file_key(path);
    1788              :             // retrieve the key from the engine
    1789           78 :             let old_val = match self.get(key, ctx).await {
    1790           18 :                 Ok(val) => Some(val),
    1791           60 :                 Err(PageReconstructError::MissingKey(_)) => None,
    1792            0 :                 Err(e) => return Err(e.into()),
    1793              :             };
    1794           78 :             let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    1795           18 :                 aux_file::decode_file_value(old_val)?
    1796              :             } else {
    1797           60 :                 Vec::new()
    1798              :             };
    1799           78 :             let mut other_files = Vec::with_capacity(files.len());
    1800           78 :             let mut modifying_file = None;
    1801           96 :             for file @ (p, content) in files {
    1802           18 :                 if path == p {
    1803           18 :                     assert!(
    1804           18 :                         modifying_file.is_none(),
    1805            0 :                         "duplicated entries found for {}",
    1806              :                         path
    1807              :                     );
    1808           18 :                     modifying_file = Some(content);
    1809            0 :                 } else {
    1810            0 :                     other_files.push(file);
    1811            0 :                 }
    1812              :             }
    1813           78 :             let mut new_files = other_files;
    1814           78 :             match (modifying_file, content.is_empty()) {
    1815           12 :                 (Some(old_content), false) => {
    1816           12 :                     self.tline
    1817           12 :                         .aux_file_size_estimator
    1818           12 :                         .on_update(old_content.len(), content.len());
    1819           12 :                     new_files.push((path, content));
    1820           12 :                 }
    1821            6 :                 (Some(old_content), true) => {
    1822            6 :                     self.tline
    1823            6 :                         .aux_file_size_estimator
    1824            6 :                         .on_remove(old_content.len());
    1825            6 :                     // not adding the file key to the final `new_files` vec.
    1826            6 :                 }
    1827           60 :                 (None, false) => {
    1828           60 :                     self.tline.aux_file_size_estimator.on_add(content.len());
    1829           60 :                     new_files.push((path, content));
    1830           60 :                 }
    1831            0 :                 (None, true) => warn!("removing non-existing aux file: {}", path),
    1832              :             }
    1833           78 :             let new_val = aux_file::encode_file_value(&new_files)?;
    1834           78 :             self.put(key, Value::Image(new_val.into()));
    1835           12 :         }
    1836              : 
    1837           90 :         if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
    1838           18 :             let file_path = path.to_string();
    1839           18 :             let content = if content.is_empty() {
    1840            0 :                 None
    1841              :             } else {
    1842           18 :                 Some(Bytes::copy_from_slice(content))
    1843              :             };
    1844              : 
    1845              :             let n_files;
    1846           18 :             let mut aux_files = self.tline.aux_files.lock().await;
    1847           18 :             if let Some(mut dir) = aux_files.dir.take() {
    1848              :                 // We already updated aux files in `self`: emit a delta and update our latest value.
    1849            0 :                 dir.upsert(file_path.clone(), content.clone());
    1850            0 :                 n_files = dir.files.len();
    1851            0 :                 if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
    1852            0 :                     self.put(
    1853            0 :                         AUX_FILES_KEY,
    1854            0 :                         Value::Image(Bytes::from(
    1855            0 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1856              :                         )),
    1857              :                     );
    1858            0 :                     aux_files.n_deltas = 0;
    1859            0 :                 } else {
    1860            0 :                     self.put(
    1861            0 :                         AUX_FILES_KEY,
    1862            0 :                         Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
    1863            0 :                     );
    1864            0 :                     aux_files.n_deltas += 1;
    1865            0 :                 }
    1866            0 :                 aux_files.dir = Some(dir);
    1867              :             } else {
    1868              :                 // Check if the AUX_FILES_KEY is initialized
    1869           18 :                 match self.get(AUX_FILES_KEY, ctx).await {
    1870           18 :                     Ok(dir_bytes) => {
    1871           18 :                         let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1872              :                         // Key is already set, we may append a delta
    1873           18 :                         self.put(
    1874           18 :                             AUX_FILES_KEY,
    1875           18 :                             Value::WalRecord(NeonWalRecord::AuxFile {
    1876           18 :                                 file_path: file_path.clone(),
    1877           18 :                                 content: content.clone(),
    1878           18 :                             }),
    1879           18 :                         );
    1880           18 :                         dir.upsert(file_path, content);
    1881           18 :                         n_files = dir.files.len();
    1882           18 :                         aux_files.dir = Some(dir);
    1883              :                     }
    1884              :                     Err(
    1885            0 :                         e @ (PageReconstructError::Cancelled
    1886            0 :                         | PageReconstructError::AncestorLsnTimeout(_)),
    1887            0 :                     ) => {
    1888            0 :                         // Important that we do not interpret a shutdown error as "not found" and thereby
    1889            0 :                         // reset the map.
    1890            0 :                         return Err(e.into());
    1891              :                     }
    1892              :                     // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
    1893              :                     // the original code assumes all other errors are missing keys. Therefore, we keep the code path
    1894              :                     // the same for now, though in theory, we should only match the `MissingKey` variant.
    1895              :                     Err(
    1896            0 :                         e @ (PageReconstructError::Other(_)
    1897              :                         | PageReconstructError::WalRedo(_)
    1898              :                         | PageReconstructError::MissingKey(_)),
    1899              :                     ) => {
    1900              :                         // Key is missing, we must insert an image as the basis for subsequent deltas.
    1901              : 
    1902            0 :                         if !matches!(e, PageReconstructError::MissingKey(_)) {
    1903            0 :                             let e = utils::error::report_compact_sources(&e);
    1904            0 :                             tracing::warn!("treating error as if it was a missing key: {}", e);
    1905            0 :                         }
    1906              : 
    1907            0 :                         let mut dir = AuxFilesDirectory {
    1908            0 :                             files: HashMap::new(),
    1909            0 :                         };
    1910            0 :                         dir.upsert(file_path, content);
    1911            0 :                         self.put(
    1912            0 :                             AUX_FILES_KEY,
    1913            0 :                             Value::Image(Bytes::from(
    1914            0 :                                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1915              :                             )),
    1916              :                         );
    1917            0 :                         n_files = 1;
    1918            0 :                         aux_files.dir = Some(dir);
    1919              :                     }
    1920              :                 }
    1921              :             }
    1922              : 
    1923           18 :             self.pending_directory_entries
    1924           18 :                 .push((DirectoryKind::AuxFiles, n_files));
    1925           72 :         }
    1926              : 
    1927           90 :         Ok(())
    1928           90 :     }
    1929              : 
    1930              :     ///
    1931              :     /// Flush changes accumulated so far to the underlying repository.
    1932              :     ///
    1933              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1934              :     /// you to flush them to the underlying repository before the final `commit`.
    1935              :     /// That allows to free up the memory used to hold the pending changes.
    1936              :     ///
    1937              :     /// Currently only used during bulk import of a data directory. In that
    1938              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1939              :     /// whole import fails and the timeline will be deleted anyway.
    1940              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1941              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1942              :     ///
    1943              :     /// Note: A consequence of flushing the pending operations is that they
    1944              :     /// won't be visible to subsequent operations until `commit`. The function
    1945              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1946              :     /// for bulk import, where you are just loading data pages and won't try to
    1947              :     /// modify the same pages twice.
    1948         5790 :     pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1949         5790 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1950         5790 :         // to scan through the pending_updates list.
    1951         5790 :         let pending_nblocks = self.pending_nblocks;
    1952         5790 :         if pending_nblocks < 10000 {
    1953         5790 :             return Ok(());
    1954            0 :         }
    1955              : 
    1956            0 :         let mut writer = self.tline.writer().await;
    1957              : 
    1958              :         // Flush relation and  SLRU data blocks, keep metadata.
    1959            0 :         let pending_data_pages = std::mem::take(&mut self.pending_data_pages);
    1960            0 : 
    1961            0 :         // This bails out on first error without modifying pending_updates.
    1962            0 :         // That's Ok, cf this function's doc comment.
    1963            0 :         writer.put_batch(pending_data_pages, ctx).await?;
    1964            0 :         self.pending_bytes = 0;
    1965            0 : 
    1966            0 :         if pending_nblocks != 0 {
    1967            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1968            0 :             self.pending_nblocks = 0;
    1969            0 :         }
    1970              : 
    1971            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1972            0 :             writer.update_directory_entries_count(kind, count as u64);
    1973            0 :         }
    1974              : 
    1975            0 :         Ok(())
    1976         5790 :     }
    1977              : 
    1978              :     ///
    1979              :     /// Finish this atomic update, writing all the updated keys to the
    1980              :     /// underlying timeline.
    1981              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1982              :     ///
    1983      2229240 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1984      2229240 :         // Commit should never be called mid-wal-record
    1985      2229240 :         assert!(self.pending_zero_data_pages.is_empty());
    1986              : 
    1987      2229240 :         let mut writer = self.tline.writer().await;
    1988              : 
    1989      2229240 :         let pending_nblocks = self.pending_nblocks;
    1990      2229240 :         self.pending_nblocks = 0;
    1991      2229240 : 
    1992      2229240 :         // Ordering: the items in this batch do not need to be in any global order, but values for
    1993      2229240 :         // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    1994      2229240 :         // this to do efficient updates to its index.
    1995      2229240 :         let mut write_batch = std::mem::take(&mut self.pending_data_pages);
    1996      2229240 : 
    1997      2229240 :         write_batch.extend(
    1998      2229240 :             self.pending_metadata_pages
    1999      2229240 :                 .drain()
    2000      2229240 :                 .flat_map(|(key, values)| {
    2001       821544 :                     values
    2002       821544 :                         .into_iter()
    2003       821544 :                         .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
    2004      2229240 :                 }),
    2005      2229240 :         );
    2006      2229240 : 
    2007      2229240 :         if !write_batch.is_empty() {
    2008      1242192 :             writer.put_batch(write_batch, ctx).await?;
    2009       987048 :         }
    2010              : 
    2011      2229240 :         if !self.pending_deletions.is_empty() {
    2012            6 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    2013            6 :             self.pending_deletions.clear();
    2014      2229234 :         }
    2015              : 
    2016      2229240 :         self.pending_lsns.push(self.lsn);
    2017      2666814 :         for pending_lsn in self.pending_lsns.drain(..) {
    2018      2666814 :             // Ideally, we should be able to call writer.finish_write() only once
    2019      2666814 :             // with the highest LSN. However, the last_record_lsn variable in the
    2020      2666814 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    2021      2666814 :             // so we need to record every LSN to not leave a gap between them.
    2022      2666814 :             writer.finish_write(pending_lsn);
    2023      2666814 :         }
    2024              : 
    2025      2229240 :         if pending_nblocks != 0 {
    2026       811710 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2027      1417530 :         }
    2028              : 
    2029      2229240 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2030         8532 :             writer.update_directory_entries_count(kind, count as u64);
    2031         8532 :         }
    2032              : 
    2033      2229240 :         self.pending_bytes = 0;
    2034      2229240 : 
    2035      2229240 :         Ok(())
    2036      2229240 :     }
    2037              : 
    2038       875112 :     pub(crate) fn len(&self) -> usize {
    2039       875112 :         self.pending_metadata_pages.len()
    2040       875112 :             + self.pending_data_pages.len()
    2041       875112 :             + self.pending_deletions.len()
    2042       875112 :     }
    2043              : 
    2044              :     /// Read a page from the Timeline we are writing to.  For metadata pages, this passes through
    2045              :     /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
    2046              :     /// in the modification.
    2047              :     ///
    2048              :     /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
    2049              :     /// page must ensure that the pages they read are already committed in Timeline, for example
    2050              :     /// DB create operations are always preceded by a call to commit().  This is special cased because
    2051              :     /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
    2052              :     /// and not data pages.
    2053       859806 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    2054       859806 :         if !Self::is_data_key(&key) {
    2055              :             // Have we already updated the same key? Read the latest pending updated
    2056              :             // version in that case.
    2057              :             //
    2058              :             // Note: we don't check pending_deletions. It is an error to request a
    2059              :             // value that has been removed, deletion only avoids leaking storage.
    2060       859806 :             if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
    2061        47784 :                 if let Some((_, _, value)) = values.last() {
    2062        47784 :                     return if let Value::Image(img) = value {
    2063        47784 :                         Ok(img.clone())
    2064              :                     } else {
    2065              :                         // Currently, we never need to read back a WAL record that we
    2066              :                         // inserted in the same "transaction". All the metadata updates
    2067              :                         // work directly with Images, and we never need to read actual
    2068              :                         // data pages. We could handle this if we had to, by calling
    2069              :                         // the walredo manager, but let's keep it simple for now.
    2070            0 :                         Err(PageReconstructError::Other(anyhow::anyhow!(
    2071            0 :                             "unexpected pending WAL record"
    2072            0 :                         )))
    2073              :                     };
    2074            0 :                 }
    2075       812022 :             }
    2076              :         } else {
    2077              :             // This is an expensive check, so we only do it in debug mode. If reading a data key,
    2078              :             // this key should never be present in pending_data_pages. We ensure this by committing
    2079              :             // modifications before ingesting DB create operations, which are the only kind that reads
    2080              :             // data pages during ingest.
    2081            0 :             if cfg!(debug_assertions) {
    2082            0 :                 for (dirty_key, _, _, _) in &self.pending_data_pages {
    2083            0 :                     debug_assert!(&key.to_compact() != dirty_key);
    2084              :                 }
    2085              : 
    2086            0 :                 debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
    2087            0 :             }
    2088              :         }
    2089              : 
    2090              :         // Metadata page cache miss, or we're reading a data page.
    2091       812022 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    2092       812022 :         self.tline.get(key, lsn, ctx).await
    2093       859806 :     }
    2094              : 
    2095              :     /// Only used during unit tests, force putting a key into the modification.
    2096              :     #[cfg(test)]
    2097            6 :     pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
    2098            6 :         self.put(key, val);
    2099            6 :     }
    2100              : 
    2101      2128530 :     fn put(&mut self, key: Key, val: Value) {
    2102      2128530 :         if Self::is_data_key(&key) {
    2103      1270530 :             self.put_data(key.to_compact(), val)
    2104              :         } else {
    2105       858000 :             self.put_metadata(key.to_compact(), val)
    2106              :         }
    2107      2128530 :     }
    2108              : 
    2109      1279524 :     fn put_data(&mut self, key: CompactKey, val: Value) {
    2110      1279524 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    2111      1279524 : 
    2112      1279524 :         // If this page was previously zero'd in the same WalRecord, then drop the previous zero page write.  This
    2113      1279524 :         // is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
    2114      1279524 :         // and the subsequent postgres-originating write
    2115      1279524 :         if self.pending_zero_data_pages.remove(&key) {
    2116            0 :             self.pending_bytes -= ZERO_PAGE.len();
    2117      1279524 :         }
    2118              : 
    2119      1279524 :         self.pending_bytes += val_serialized_size;
    2120      1279524 :         self.pending_data_pages
    2121      1279524 :             .push((key, self.lsn, val_serialized_size, val))
    2122      1279524 :     }
    2123              : 
    2124       858000 :     fn put_metadata(&mut self, key: CompactKey, val: Value) {
    2125       858000 :         let values = self.pending_metadata_pages.entry(key).or_default();
    2126              :         // Replace the previous value if it exists at the same lsn
    2127       858000 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    2128        36456 :             if *last_lsn == self.lsn {
    2129              :                 // Update the pending_bytes contribution from this entry, and update the serialized size in place
    2130        36456 :                 self.pending_bytes -= *last_value_ser_size;
    2131        36456 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    2132        36456 :                 self.pending_bytes += *last_value_ser_size;
    2133        36456 : 
    2134        36456 :                 // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
    2135        36456 :                 // have been generated by synthesized zero page writes prior to the first real write to a page.
    2136        36456 :                 *last_value = val;
    2137        36456 :                 return;
    2138            0 :             }
    2139       821544 :         }
    2140              : 
    2141       821544 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    2142       821544 :         self.pending_bytes += val_serialized_size;
    2143       821544 :         values.push((self.lsn, val_serialized_size, val));
    2144       858000 :     }
    2145              : 
    2146            6 :     fn delete(&mut self, key_range: Range<Key>) {
    2147            6 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    2148            6 :         self.pending_deletions.push((key_range, self.lsn));
    2149            6 :     }
    2150              : }
    2151              : 
    2152              : /// This struct facilitates accessing either a committed key from the timeline at a
    2153              : /// specific LSN, or the latest uncommitted key from a pending modification.
    2154              : ///
    2155              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    2156              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    2157              : /// need to look up the keys in the modification first before looking them up in the
    2158              : /// timeline to not miss the latest updates.
    2159              : #[derive(Clone, Copy)]
    2160              : pub enum Version<'a> {
    2161              :     Lsn(Lsn),
    2162              :     Modified(&'a DatadirModification<'a>),
    2163              : }
    2164              : 
    2165              : impl<'a> Version<'a> {
    2166        70680 :     async fn get(
    2167        70680 :         &self,
    2168        70680 :         timeline: &Timeline,
    2169        70680 :         key: Key,
    2170        70680 :         ctx: &RequestContext,
    2171        70680 :     ) -> Result<Bytes, PageReconstructError> {
    2172        70680 :         match self {
    2173        70620 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    2174           60 :             Version::Modified(modification) => modification.get(key, ctx).await,
    2175              :         }
    2176        70680 :     }
    2177              : 
    2178       106860 :     fn get_lsn(&self) -> Lsn {
    2179       106860 :         match self {
    2180        88722 :             Version::Lsn(lsn) => *lsn,
    2181        18138 :             Version::Modified(modification) => modification.lsn,
    2182              :         }
    2183       106860 :     }
    2184              : }
    2185              : 
    2186              : //--- Metadata structs stored in key-value pairs in the repository.
    2187              : 
    2188         6738 : #[derive(Debug, Serialize, Deserialize)]
    2189              : struct DbDirectory {
    2190              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    2191              :     dbdirs: HashMap<(Oid, Oid), bool>,
    2192              : }
    2193              : 
    2194              : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
    2195              : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs.  Previously, the files
    2196              : // were named like "pg_twophase/000002E5", now they're like
    2197              : // "pg_twophsae/0000000A000002E4".
    2198              : 
    2199          882 : #[derive(Debug, Serialize, Deserialize)]
    2200              : struct TwoPhaseDirectory {
    2201              :     xids: HashSet<TransactionId>,
    2202              : }
    2203              : 
    2204            0 : #[derive(Debug, Serialize, Deserialize)]
    2205              : struct TwoPhaseDirectoryV17 {
    2206              :     xids: HashSet<u64>,
    2207              : }
    2208              : 
    2209         5796 : #[derive(Debug, Serialize, Deserialize, Default)]
    2210              : struct RelDirectory {
    2211              :     // Set of relations that exist. (relfilenode, forknum)
    2212              :     //
    2213              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    2214              :     // key-value pairs, if you have a lot of relations
    2215              :     rels: HashSet<(Oid, u8)>,
    2216              : }
    2217              : 
    2218           84 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
    2219              : pub(crate) struct AuxFilesDirectory {
    2220              :     pub(crate) files: HashMap<String, Bytes>,
    2221              : }
    2222              : 
    2223              : impl AuxFilesDirectory {
    2224           48 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    2225           48 :         if let Some(value) = value {
    2226           42 :             self.files.insert(key, value);
    2227           42 :         } else {
    2228            6 :             self.files.remove(&key);
    2229            6 :         }
    2230           48 :     }
    2231              : }
    2232              : 
    2233            0 : #[derive(Debug, Serialize, Deserialize)]
    2234              : struct RelSizeEntry {
    2235              :     nblocks: u32,
    2236              : }
    2237              : 
    2238         2646 : #[derive(Debug, Serialize, Deserialize, Default)]
    2239              : struct SlruSegmentDirectory {
    2240              :     // Set of SLRU segments that exist.
    2241              :     segments: HashSet<u32>,
    2242              : }
    2243              : 
    2244              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2245              : #[repr(u8)]
    2246              : pub(crate) enum DirectoryKind {
    2247              :     Db,
    2248              :     TwoPhase,
    2249              :     Rel,
    2250              :     AuxFiles,
    2251              :     SlruSegment(SlruKind),
    2252              : }
    2253              : 
    2254              : impl DirectoryKind {
    2255              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2256        17064 :     pub(crate) fn offset(&self) -> usize {
    2257        17064 :         self.into_usize()
    2258        17064 :     }
    2259              : }
    2260              : 
    2261              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2262              : 
    2263              : #[allow(clippy::bool_assert_comparison)]
    2264              : #[cfg(test)]
    2265              : mod tests {
    2266              :     use hex_literal::hex;
    2267              :     use utils::id::TimelineId;
    2268              : 
    2269              :     use super::*;
    2270              : 
    2271              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    2272              : 
    2273              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2274              :     #[tokio::test]
    2275            6 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2276            6 :         let name = "aux_files_round_trip";
    2277            6 :         let harness = TenantHarness::create(name).await?;
    2278            6 : 
    2279            6 :         pub const TIMELINE_ID: TimelineId =
    2280            6 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2281            6 : 
    2282           24 :         let (tenant, ctx) = harness.load().await;
    2283            6 :         let tline = tenant
    2284            6 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2285            6 :             .await?;
    2286            6 :         let tline = tline.raw_timeline().unwrap();
    2287            6 : 
    2288            6 :         // First modification: insert two keys
    2289            6 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2290            6 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2291            6 :         modification.set_lsn(Lsn(0x1008))?;
    2292            6 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2293            6 :         modification.commit(&ctx).await?;
    2294            6 :         let expect_1008 = HashMap::from([
    2295            6 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2296            6 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2297            6 :         ]);
    2298            6 : 
    2299            6 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2300            6 :         assert_eq!(readback, expect_1008);
    2301            6 : 
    2302            6 :         // Second modification: update one key, remove the other
    2303            6 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2304            6 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2305            6 :         modification.set_lsn(Lsn(0x2008))?;
    2306            6 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2307            6 :         modification.commit(&ctx).await?;
    2308            6 :         let expect_2008 =
    2309            6 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2310            6 : 
    2311            6 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    2312            6 :         assert_eq!(readback, expect_2008);
    2313            6 : 
    2314            6 :         // Reading back in time works
    2315            6 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2316            6 :         assert_eq!(readback, expect_1008);
    2317            6 : 
    2318            6 :         Ok(())
    2319            6 :     }
    2320              : 
    2321              :     /*
    2322              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    2323              :             let incremental = timeline.get_current_logical_size();
    2324              :             let non_incremental = timeline
    2325              :                 .get_current_logical_size_non_incremental(lsn)
    2326              :                 .unwrap();
    2327              :             assert_eq!(incremental, non_incremental);
    2328              :         }
    2329              :     */
    2330              : 
    2331              :     /*
    2332              :     ///
    2333              :     /// Test list_rels() function, with branches and dropped relations
    2334              :     ///
    2335              :     #[test]
    2336              :     fn test_list_rels_drop() -> Result<()> {
    2337              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    2338              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    2339              :         const TESTDB: u32 = 111;
    2340              : 
    2341              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    2342              :         // after branching fails below
    2343              :         let mut writer = tline.begin_record(Lsn(0x10));
    2344              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    2345              :         writer.finish()?;
    2346              : 
    2347              :         // Create a relation on the timeline
    2348              :         let mut writer = tline.begin_record(Lsn(0x20));
    2349              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    2350              :         writer.finish()?;
    2351              : 
    2352              :         let writer = tline.begin_record(Lsn(0x00));
    2353              :         writer.finish()?;
    2354              : 
    2355              :         // Check that list_rels() lists it after LSN 2, but no before it
    2356              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    2357              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    2358              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    2359              : 
    2360              :         // Create a branch, check that the relation is visible there
    2361              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    2362              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    2363              :             Some(timeline) => timeline,
    2364              :             None => panic!("Should have a local timeline"),
    2365              :         };
    2366              :         let newtline = DatadirTimelineImpl::new(newtline);
    2367              :         assert!(newtline
    2368              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2369              :             .contains(&TESTREL_A));
    2370              : 
    2371              :         // Drop it on the branch
    2372              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    2373              :         new_writer.drop_relation(TESTREL_A)?;
    2374              :         new_writer.finish()?;
    2375              : 
    2376              :         // Check that it's no longer listed on the branch after the point where it was dropped
    2377              :         assert!(newtline
    2378              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2379              :             .contains(&TESTREL_A));
    2380              :         assert!(!newtline
    2381              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2382              :             .contains(&TESTREL_A));
    2383              : 
    2384              :         // Run checkpoint and garbage collection and check that it's still not visible
    2385              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    2386              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    2387              : 
    2388              :         assert!(!newtline
    2389              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2390              :             .contains(&TESTREL_A));
    2391              : 
    2392              :         Ok(())
    2393              :     }
    2394              :      */
    2395              : 
    2396              :     /*
    2397              :     #[test]
    2398              :     fn test_read_beyond_eof() -> Result<()> {
    2399              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    2400              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    2401              : 
    2402              :         make_some_layers(&tline, Lsn(0x20))?;
    2403              :         let mut writer = tline.begin_record(Lsn(0x60));
    2404              :         walingest.put_rel_page_image(
    2405              :             &mut writer,
    2406              :             TESTREL_A,
    2407              :             0,
    2408              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    2409              :         )?;
    2410              :         writer.finish()?;
    2411              : 
    2412              :         // Test read before rel creation. Should error out.
    2413              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2414              : 
    2415              :         // Read block beyond end of relation at different points in time.
    2416              :         // These reads should fall into different delta, image, and in-memory layers.
    2417              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2418              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2419              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2420              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2421              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2422              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2423              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2424              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2425              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2426              : 
    2427              :         // Test on an in-memory layer with no preceding layer
    2428              :         let mut writer = tline.begin_record(Lsn(0x70));
    2429              :         walingest.put_rel_page_image(
    2430              :             &mut writer,
    2431              :             TESTREL_B,
    2432              :             0,
    2433              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2434              :         )?;
    2435              :         writer.finish()?;
    2436              : 
    2437              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2438              : 
    2439              :         Ok(())
    2440              :     }
    2441              :      */
    2442              : }
        

Generated by: LCOV version 2.1-beta