LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 2a9d99866121f170b43760bd62e1e2431e597707.info Lines: 55.4 % 1383 766
Test Date: 2024-09-02 14:10:37 Functions: 40.3 % 186 75

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      13              : use crate::walrecord::NeonWalRecord;
      14              : use crate::{aux_file, repository::*};
      15              : use anyhow::{bail, ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use pageserver_api::key::{
      19              :     dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
      20              :     relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      21              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      22              :     CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      23              : };
      24              : use pageserver_api::keyspace::SparseKeySpace;
      25              : use pageserver_api::models::AuxFilePolicy;
      26              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      27              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      28              : use postgres_ffi::BLCKSZ;
      29              : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
      30              : use serde::{Deserialize, Serialize};
      31              : use std::collections::{hash_map, HashMap, HashSet};
      32              : use std::ops::ControlFlow;
      33              : use std::ops::Range;
      34              : use strum::IntoEnumIterator;
      35              : use tokio_util::sync::CancellationToken;
      36              : use tracing::{debug, info, trace, warn};
      37              : use utils::bin_ser::DeserializeError;
      38              : use utils::pausable_failpoint;
      39              : use utils::{bin_ser::BeSer, lsn::Lsn};
      40              : 
      41              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      42              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      43              : 
      44              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      45              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
      46              : 
      47              : #[derive(Debug)]
      48              : pub enum LsnForTimestamp {
      49              :     /// Found commits both before and after the given timestamp
      50              :     Present(Lsn),
      51              : 
      52              :     /// Found no commits after the given timestamp, this means
      53              :     /// that the newest data in the branch is older than the given
      54              :     /// timestamp.
      55              :     ///
      56              :     /// All commits <= LSN happened before the given timestamp
      57              :     Future(Lsn),
      58              : 
      59              :     /// The queried timestamp is past our horizon we look back at (PITR)
      60              :     ///
      61              :     /// All commits > LSN happened after the given timestamp,
      62              :     /// but any commits < LSN might have happened before or after
      63              :     /// the given timestamp. We don't know because no data before
      64              :     /// the given lsn is available.
      65              :     Past(Lsn),
      66              : 
      67              :     /// We have found no commit with a timestamp,
      68              :     /// so we can't return anything meaningful.
      69              :     ///
      70              :     /// The associated LSN is the lower bound value we can safely
      71              :     /// create branches on, but no statement is made if it is
      72              :     /// older or newer than the timestamp.
      73              :     ///
      74              :     /// This variant can e.g. be returned right after a
      75              :     /// cluster import.
      76              :     NoData(Lsn),
      77              : }
      78              : 
      79            0 : #[derive(Debug, thiserror::Error)]
      80              : pub(crate) enum CalculateLogicalSizeError {
      81              :     #[error("cancelled")]
      82              :     Cancelled,
      83              : 
      84              :     /// Something went wrong while reading the metadata we use to calculate logical size
      85              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
      86              :     /// in the `From` implementation for this variant.
      87              :     #[error(transparent)]
      88              :     PageRead(PageReconstructError),
      89              : 
      90              :     /// Something went wrong deserializing metadata that we read to calculate logical size
      91              :     #[error("decode error: {0}")]
      92              :     Decode(#[from] DeserializeError),
      93              : }
      94              : 
      95            0 : #[derive(Debug, thiserror::Error)]
      96              : pub(crate) enum CollectKeySpaceError {
      97              :     #[error(transparent)]
      98              :     Decode(#[from] DeserializeError),
      99              :     #[error(transparent)]
     100              :     PageRead(PageReconstructError),
     101              :     #[error("cancelled")]
     102              :     Cancelled,
     103              : }
     104              : 
     105              : impl From<PageReconstructError> for CollectKeySpaceError {
     106            0 :     fn from(err: PageReconstructError) -> Self {
     107            0 :         match err {
     108            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     109            0 :             err => Self::PageRead(err),
     110              :         }
     111            0 :     }
     112              : }
     113              : 
     114              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     115            0 :     fn from(pre: PageReconstructError) -> Self {
     116            0 :         match pre {
     117            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     118            0 :             _ => Self::PageRead(pre),
     119              :         }
     120            0 :     }
     121              : }
     122              : 
     123            0 : #[derive(Debug, thiserror::Error)]
     124              : pub enum RelationError {
     125              :     #[error("Relation Already Exists")]
     126              :     AlreadyExists,
     127              :     #[error("invalid relnode")]
     128              :     InvalidRelnode,
     129              :     #[error(transparent)]
     130              :     Other(#[from] anyhow::Error),
     131              : }
     132              : 
     133              : ///
     134              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     135              : /// and other special kinds of files, in a versioned key-value store. The
     136              : /// Timeline struct provides the key-value store.
     137              : ///
     138              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     139              : /// implementation, and might be moved into a separate struct later.
     140              : impl Timeline {
     141              :     /// Start ingesting a WAL record, or other atomic modification of
     142              :     /// the timeline.
     143              :     ///
     144              :     /// This provides a transaction-like interface to perform a bunch
     145              :     /// of modifications atomically.
     146              :     ///
     147              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     148              :     /// DatadirModification object. Use the functions in the object to
     149              :     /// modify the repository state, updating all the pages and metadata
     150              :     /// that the WAL record affects. When you're done, call commit() to
     151              :     /// commit the changes.
     152              :     ///
     153              :     /// Lsn stored in modification is advanced by `ingest_record` and
     154              :     /// is used by `commit()` to update `last_record_lsn`.
     155              :     ///
     156              :     /// Calling commit() will flush all the changes and reset the state,
     157              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     158              :     ///
     159              :     /// Note that any pending modifications you make through the
     160              :     /// modification object won't be visible to calls to the 'get' and list
     161              :     /// functions of the timeline until you finish! And if you update the
     162              :     /// same page twice, the last update wins.
     163              :     ///
     164       805176 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     165       805176 :     where
     166       805176 :         Self: Sized,
     167       805176 :     {
     168       805176 :         DatadirModification {
     169       805176 :             tline: self,
     170       805176 :             pending_lsns: Vec::new(),
     171       805176 :             pending_updates: HashMap::new(),
     172       805176 :             pending_deletions: Vec::new(),
     173       805176 :             pending_nblocks: 0,
     174       805176 :             pending_directory_entries: Vec::new(),
     175       805176 :             pending_bytes: 0,
     176       805176 :             lsn,
     177       805176 :         }
     178       805176 :     }
     179              : 
     180              :     //------------------------------------------------------------------------------
     181              :     // Public GET functions
     182              :     //------------------------------------------------------------------------------
     183              : 
     184              :     /// Look up given page version.
     185        55152 :     pub(crate) async fn get_rel_page_at_lsn(
     186        55152 :         &self,
     187        55152 :         tag: RelTag,
     188        55152 :         blknum: BlockNumber,
     189        55152 :         version: Version<'_>,
     190        55152 :         ctx: &RequestContext,
     191        55152 :     ) -> Result<Bytes, PageReconstructError> {
     192        55152 :         if tag.relnode == 0 {
     193            0 :             return Err(PageReconstructError::Other(
     194            0 :                 RelationError::InvalidRelnode.into(),
     195            0 :             ));
     196        55152 :         }
     197              : 
     198        55152 :         let nblocks = self.get_rel_size(tag, version, ctx).await?;
     199        55152 :         if blknum >= nblocks {
     200            0 :             debug!(
     201            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     202            0 :                 tag,
     203            0 :                 blknum,
     204            0 :                 version.get_lsn(),
     205              :                 nblocks
     206              :             );
     207            0 :             return Ok(ZERO_PAGE.clone());
     208        55152 :         }
     209        55152 : 
     210        55152 :         let key = rel_block_to_key(tag, blknum);
     211        55152 :         version.get(self, key, ctx).await
     212        55152 :     }
     213              : 
     214              :     // Get size of a database in blocks
     215            0 :     pub(crate) async fn get_db_size(
     216            0 :         &self,
     217            0 :         spcnode: Oid,
     218            0 :         dbnode: Oid,
     219            0 :         version: Version<'_>,
     220            0 :         ctx: &RequestContext,
     221            0 :     ) -> Result<usize, PageReconstructError> {
     222            0 :         let mut total_blocks = 0;
     223              : 
     224            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     225              : 
     226            0 :         for rel in rels {
     227            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     228            0 :             total_blocks += n_blocks as usize;
     229              :         }
     230            0 :         Ok(total_blocks)
     231            0 :     }
     232              : 
     233              :     /// Get size of a relation file
     234        73302 :     pub(crate) async fn get_rel_size(
     235        73302 :         &self,
     236        73302 :         tag: RelTag,
     237        73302 :         version: Version<'_>,
     238        73302 :         ctx: &RequestContext,
     239        73302 :     ) -> Result<BlockNumber, PageReconstructError> {
     240        73302 :         if tag.relnode == 0 {
     241            0 :             return Err(PageReconstructError::Other(
     242            0 :                 RelationError::InvalidRelnode.into(),
     243            0 :             ));
     244        73302 :         }
     245              : 
     246        73302 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     247        57882 :             return Ok(nblocks);
     248        15420 :         }
     249        15420 : 
     250        15420 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     251            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     252              :         {
     253              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     254              :             // FSM, and smgrnblocks() on it immediately afterwards,
     255              :             // without extending it.  Tolerate that by claiming that
     256              :             // any non-existent FSM fork has size 0.
     257            0 :             return Ok(0);
     258        15420 :         }
     259        15420 : 
     260        15420 :         let key = rel_size_to_key(tag);
     261        15420 :         let mut buf = version.get(self, key, ctx).await?;
     262        15408 :         let nblocks = buf.get_u32_le();
     263        15408 : 
     264        15408 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     265        15408 : 
     266        15408 :         Ok(nblocks)
     267        73302 :     }
     268              : 
     269              :     /// Does relation exist?
     270        18150 :     pub(crate) async fn get_rel_exists(
     271        18150 :         &self,
     272        18150 :         tag: RelTag,
     273        18150 :         version: Version<'_>,
     274        18150 :         ctx: &RequestContext,
     275        18150 :     ) -> Result<bool, PageReconstructError> {
     276        18150 :         if tag.relnode == 0 {
     277            0 :             return Err(PageReconstructError::Other(
     278            0 :                 RelationError::InvalidRelnode.into(),
     279            0 :             ));
     280        18150 :         }
     281              : 
     282              :         // first try to lookup relation in cache
     283        18150 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     284        18096 :             return Ok(true);
     285           54 :         }
     286              :         // then check if the database was already initialized.
     287              :         // get_rel_exists can be called before dbdir is created.
     288           54 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     289           54 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     290           54 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     291            0 :             return Ok(false);
     292           54 :         }
     293           54 :         // fetch directory listing
     294           54 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     295           54 :         let buf = version.get(self, key, ctx).await?;
     296              : 
     297           54 :         let dir = RelDirectory::des(&buf)?;
     298           54 :         Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
     299        18150 :     }
     300              : 
     301              :     /// Get a list of all existing relations in given tablespace and database.
     302              :     ///
     303              :     /// # Cancel-Safety
     304              :     ///
     305              :     /// This method is cancellation-safe.
     306            0 :     pub(crate) async fn list_rels(
     307            0 :         &self,
     308            0 :         spcnode: Oid,
     309            0 :         dbnode: Oid,
     310            0 :         version: Version<'_>,
     311            0 :         ctx: &RequestContext,
     312            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     313            0 :         // fetch directory listing
     314            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     315            0 :         let buf = version.get(self, key, ctx).await?;
     316              : 
     317            0 :         let dir = RelDirectory::des(&buf)?;
     318            0 :         let rels: HashSet<RelTag> =
     319            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     320            0 :                 spcnode,
     321            0 :                 dbnode,
     322            0 :                 relnode: *relnode,
     323            0 :                 forknum: *forknum,
     324            0 :             }));
     325            0 : 
     326            0 :         Ok(rels)
     327            0 :     }
     328              : 
     329              :     /// Get the whole SLRU segment
     330            0 :     pub(crate) async fn get_slru_segment(
     331            0 :         &self,
     332            0 :         kind: SlruKind,
     333            0 :         segno: u32,
     334            0 :         lsn: Lsn,
     335            0 :         ctx: &RequestContext,
     336            0 :     ) -> Result<Bytes, PageReconstructError> {
     337            0 :         let n_blocks = self
     338            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     339            0 :             .await?;
     340            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     341            0 :         for blkno in 0..n_blocks {
     342            0 :             let block = self
     343            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     344            0 :                 .await?;
     345            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     346              :         }
     347            0 :         Ok(segment.freeze())
     348            0 :     }
     349              : 
     350              :     /// Look up given SLRU page version.
     351            0 :     pub(crate) async fn get_slru_page_at_lsn(
     352            0 :         &self,
     353            0 :         kind: SlruKind,
     354            0 :         segno: u32,
     355            0 :         blknum: BlockNumber,
     356            0 :         lsn: Lsn,
     357            0 :         ctx: &RequestContext,
     358            0 :     ) -> Result<Bytes, PageReconstructError> {
     359            0 :         let key = slru_block_to_key(kind, segno, blknum);
     360            0 :         self.get(key, lsn, ctx).await
     361            0 :     }
     362              : 
     363              :     /// Get size of an SLRU segment
     364            0 :     pub(crate) async fn get_slru_segment_size(
     365            0 :         &self,
     366            0 :         kind: SlruKind,
     367            0 :         segno: u32,
     368            0 :         version: Version<'_>,
     369            0 :         ctx: &RequestContext,
     370            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     371            0 :         let key = slru_segment_size_to_key(kind, segno);
     372            0 :         let mut buf = version.get(self, key, ctx).await?;
     373            0 :         Ok(buf.get_u32_le())
     374            0 :     }
     375              : 
     376              :     /// Get size of an SLRU segment
     377            0 :     pub(crate) async fn get_slru_segment_exists(
     378            0 :         &self,
     379            0 :         kind: SlruKind,
     380            0 :         segno: u32,
     381            0 :         version: Version<'_>,
     382            0 :         ctx: &RequestContext,
     383            0 :     ) -> Result<bool, PageReconstructError> {
     384            0 :         // fetch directory listing
     385            0 :         let key = slru_dir_to_key(kind);
     386            0 :         let buf = version.get(self, key, ctx).await?;
     387              : 
     388            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     389            0 :         Ok(dir.segments.contains(&segno))
     390            0 :     }
     391              : 
     392              :     /// Locate LSN, such that all transactions that committed before
     393              :     /// 'search_timestamp' are visible, but nothing newer is.
     394              :     ///
     395              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     396              :     /// so it's not well defined which LSN you get if there were multiple commits
     397              :     /// "in flight" at that point in time.
     398              :     ///
     399            0 :     pub(crate) async fn find_lsn_for_timestamp(
     400            0 :         &self,
     401            0 :         search_timestamp: TimestampTz,
     402            0 :         cancel: &CancellationToken,
     403            0 :         ctx: &RequestContext,
     404            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     405              :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     406              : 
     407            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     408            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     409            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     410            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     411            0 :         // on the safe side.
     412            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     413            0 :         let max_lsn = self.get_last_record_lsn();
     414            0 : 
     415            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     416            0 :         // LSN divided by 8.
     417            0 :         let mut low = min_lsn.0 / 8;
     418            0 :         let mut high = max_lsn.0 / 8 + 1;
     419            0 : 
     420            0 :         let mut found_smaller = false;
     421            0 :         let mut found_larger = false;
     422              : 
     423            0 :         while low < high {
     424            0 :             if cancel.is_cancelled() {
     425            0 :                 return Err(PageReconstructError::Cancelled);
     426            0 :             }
     427            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     428            0 :             let mid = (high + low) / 2;
     429              : 
     430            0 :             let cmp = self
     431            0 :                 .is_latest_commit_timestamp_ge_than(
     432            0 :                     search_timestamp,
     433            0 :                     Lsn(mid * 8),
     434            0 :                     &mut found_smaller,
     435            0 :                     &mut found_larger,
     436            0 :                     ctx,
     437            0 :                 )
     438            0 :                 .await?;
     439              : 
     440            0 :             if cmp {
     441            0 :                 high = mid;
     442            0 :             } else {
     443            0 :                 low = mid + 1;
     444            0 :             }
     445              :         }
     446              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     447              :         // so the LSN of the last commit record before or at `search_timestamp`.
     448              :         // Remove one from `low` to get `t`.
     449              :         //
     450              :         // FIXME: it would be better to get the LSN of the previous commit.
     451              :         // Otherwise, if you restore to the returned LSN, the database will
     452              :         // include physical changes from later commits that will be marked
     453              :         // as aborted, and will need to be vacuumed away.
     454            0 :         let commit_lsn = Lsn((low - 1) * 8);
     455            0 :         match (found_smaller, found_larger) {
     456              :             (false, false) => {
     457              :                 // This can happen if no commit records have been processed yet, e.g.
     458              :                 // just after importing a cluster.
     459            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     460              :             }
     461              :             (false, true) => {
     462              :                 // Didn't find any commit timestamps smaller than the request
     463            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     464              :             }
     465            0 :             (true, _) if commit_lsn < min_lsn => {
     466            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     467            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     468            0 :                 // below the min_lsn. We should never do that.
     469            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     470              :             }
     471              :             (true, false) => {
     472              :                 // Only found commits with timestamps smaller than the request.
     473              :                 // It's still a valid case for branch creation, return it.
     474              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     475              :                 // case, anyway.
     476            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     477              :             }
     478            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     479              :         }
     480            0 :     }
     481              : 
     482              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     483              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     484              :     ///
     485              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     486              :     /// with a smaller/larger timestamp.
     487              :     ///
     488            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     489            0 :         &self,
     490            0 :         search_timestamp: TimestampTz,
     491            0 :         probe_lsn: Lsn,
     492            0 :         found_smaller: &mut bool,
     493            0 :         found_larger: &mut bool,
     494            0 :         ctx: &RequestContext,
     495            0 :     ) -> Result<bool, PageReconstructError> {
     496            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     497            0 :             if timestamp >= search_timestamp {
     498            0 :                 *found_larger = true;
     499            0 :                 return ControlFlow::Break(true);
     500            0 :             } else {
     501            0 :                 *found_smaller = true;
     502            0 :             }
     503            0 :             ControlFlow::Continue(())
     504            0 :         })
     505            0 :         .await
     506            0 :     }
     507              : 
     508              :     /// Obtain the possible timestamp range for the given lsn.
     509              :     ///
     510              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     511            0 :     pub(crate) async fn get_timestamp_for_lsn(
     512            0 :         &self,
     513            0 :         probe_lsn: Lsn,
     514            0 :         ctx: &RequestContext,
     515            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     516            0 :         let mut max: Option<TimestampTz> = None;
     517            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     518            0 :             if let Some(max_prev) = max {
     519            0 :                 max = Some(max_prev.max(timestamp));
     520            0 :             } else {
     521            0 :                 max = Some(timestamp);
     522            0 :             }
     523            0 :             ControlFlow::Continue(())
     524            0 :         })
     525            0 :         .await?;
     526              : 
     527            0 :         Ok(max)
     528            0 :     }
     529              : 
     530              :     /// Runs the given function on all the timestamps for a given lsn
     531              :     ///
     532              :     /// The return value is either given by the closure, or set to the `Default`
     533              :     /// impl's output.
     534            0 :     async fn map_all_timestamps<T: Default>(
     535            0 :         &self,
     536            0 :         probe_lsn: Lsn,
     537            0 :         ctx: &RequestContext,
     538            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     539            0 :     ) -> Result<T, PageReconstructError> {
     540            0 :         for segno in self
     541            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     542            0 :             .await?
     543              :         {
     544            0 :             let nblocks = self
     545            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     546            0 :                 .await?;
     547            0 :             for blknum in (0..nblocks).rev() {
     548            0 :                 let clog_page = self
     549            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     550            0 :                     .await?;
     551              : 
     552            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     553            0 :                     let mut timestamp_bytes = [0u8; 8];
     554            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     555            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     556            0 : 
     557            0 :                     match f(timestamp) {
     558            0 :                         ControlFlow::Break(b) => return Ok(b),
     559            0 :                         ControlFlow::Continue(()) => (),
     560              :                     }
     561            0 :                 }
     562              :             }
     563              :         }
     564            0 :         Ok(Default::default())
     565            0 :     }
     566              : 
     567            0 :     pub(crate) async fn get_slru_keyspace(
     568            0 :         &self,
     569            0 :         version: Version<'_>,
     570            0 :         ctx: &RequestContext,
     571            0 :     ) -> Result<KeySpace, PageReconstructError> {
     572            0 :         let mut accum = KeySpaceAccum::new();
     573              : 
     574            0 :         for kind in SlruKind::iter() {
     575            0 :             let mut segments: Vec<u32> = self
     576            0 :                 .list_slru_segments(kind, version, ctx)
     577            0 :                 .await?
     578            0 :                 .into_iter()
     579            0 :                 .collect();
     580            0 :             segments.sort_unstable();
     581              : 
     582            0 :             for seg in segments {
     583            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     584              : 
     585            0 :                 accum.add_range(
     586            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     587            0 :                 );
     588              :             }
     589              :         }
     590              : 
     591            0 :         Ok(accum.to_keyspace())
     592            0 :     }
     593              : 
     594              :     /// Get a list of SLRU segments
     595            0 :     pub(crate) async fn list_slru_segments(
     596            0 :         &self,
     597            0 :         kind: SlruKind,
     598            0 :         version: Version<'_>,
     599            0 :         ctx: &RequestContext,
     600            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     601            0 :         // fetch directory entry
     602            0 :         let key = slru_dir_to_key(kind);
     603              : 
     604            0 :         let buf = version.get(self, key, ctx).await?;
     605            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
     606            0 :     }
     607              : 
     608            0 :     pub(crate) async fn get_relmap_file(
     609            0 :         &self,
     610            0 :         spcnode: Oid,
     611            0 :         dbnode: Oid,
     612            0 :         version: Version<'_>,
     613            0 :         ctx: &RequestContext,
     614            0 :     ) -> Result<Bytes, PageReconstructError> {
     615            0 :         let key = relmap_file_key(spcnode, dbnode);
     616              : 
     617            0 :         let buf = version.get(self, key, ctx).await?;
     618            0 :         Ok(buf)
     619            0 :     }
     620              : 
     621          864 :     pub(crate) async fn list_dbdirs(
     622          864 :         &self,
     623          864 :         lsn: Lsn,
     624          864 :         ctx: &RequestContext,
     625          864 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     626              :         // fetch directory entry
     627         9382 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     628              : 
     629          864 :         Ok(DbDirectory::des(&buf)?.dbdirs)
     630          864 :     }
     631              : 
     632            0 :     pub(crate) async fn get_twophase_file(
     633            0 :         &self,
     634            0 :         xid: TransactionId,
     635            0 :         lsn: Lsn,
     636            0 :         ctx: &RequestContext,
     637            0 :     ) -> Result<Bytes, PageReconstructError> {
     638            0 :         let key = twophase_file_key(xid);
     639            0 :         let buf = self.get(key, lsn, ctx).await?;
     640            0 :         Ok(buf)
     641            0 :     }
     642              : 
     643            6 :     pub(crate) async fn list_twophase_files(
     644            6 :         &self,
     645            6 :         lsn: Lsn,
     646            6 :         ctx: &RequestContext,
     647            6 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     648              :         // fetch directory entry
     649            6 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     650              : 
     651            6 :         Ok(TwoPhaseDirectory::des(&buf)?.xids)
     652            6 :     }
     653              : 
     654            0 :     pub(crate) async fn get_control_file(
     655            0 :         &self,
     656            0 :         lsn: Lsn,
     657            0 :         ctx: &RequestContext,
     658            0 :     ) -> Result<Bytes, PageReconstructError> {
     659            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     660            0 :     }
     661              : 
     662           36 :     pub(crate) async fn get_checkpoint(
     663           36 :         &self,
     664           36 :         lsn: Lsn,
     665           36 :         ctx: &RequestContext,
     666           36 :     ) -> Result<Bytes, PageReconstructError> {
     667           36 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     668           36 :     }
     669              : 
     670           48 :     async fn list_aux_files_v1(
     671           48 :         &self,
     672           48 :         lsn: Lsn,
     673           48 :         ctx: &RequestContext,
     674           48 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     675           48 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     676           30 :             Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
     677           18 :             Err(e) => {
     678           18 :                 // This is expected: historical databases do not have the key.
     679           18 :                 debug!("Failed to get info about AUX files: {}", e);
     680           18 :                 Ok(HashMap::new())
     681              :             }
     682              :         }
     683           48 :     }
     684              : 
     685           72 :     async fn list_aux_files_v2(
     686           72 :         &self,
     687           72 :         lsn: Lsn,
     688           72 :         ctx: &RequestContext,
     689           72 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     690           72 :         let kv = self
     691           72 :             .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
     692            0 :             .await?;
     693           72 :         let mut result = HashMap::new();
     694           72 :         let mut sz = 0;
     695          180 :         for (_, v) in kv {
     696          108 :             let v = v?;
     697          108 :             let v = aux_file::decode_file_value_bytes(&v)
     698          108 :                 .context("value decode")
     699          108 :                 .map_err(PageReconstructError::Other)?;
     700          210 :             for (fname, content) in v {
     701          102 :                 sz += fname.len();
     702          102 :                 sz += content.len();
     703          102 :                 result.insert(fname, content);
     704          102 :             }
     705              :         }
     706           72 :         self.aux_file_size_estimator.on_initial(sz);
     707           72 :         Ok(result)
     708           72 :     }
     709              : 
     710            0 :     pub(crate) async fn trigger_aux_file_size_computation(
     711            0 :         &self,
     712            0 :         lsn: Lsn,
     713            0 :         ctx: &RequestContext,
     714            0 :     ) -> Result<(), PageReconstructError> {
     715            0 :         let current_policy = self.last_aux_file_policy.load();
     716            0 :         if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
     717            0 :             self.list_aux_files_v2(lsn, ctx).await?;
     718            0 :         }
     719            0 :         Ok(())
     720            0 :     }
     721              : 
     722           78 :     pub(crate) async fn list_aux_files(
     723           78 :         &self,
     724           78 :         lsn: Lsn,
     725           78 :         ctx: &RequestContext,
     726           78 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     727           78 :         let current_policy = self.last_aux_file_policy.load();
     728           78 :         match current_policy {
     729              :             Some(AuxFilePolicy::V1) => {
     730            6 :                 warn!("this timeline is using deprecated aux file policy V1 (policy=V1)");
     731            6 :                 self.list_aux_files_v1(lsn, ctx).await
     732              :             }
     733              :             None => {
     734            0 :                 let res = self.list_aux_files_v1(lsn, ctx).await?;
     735            0 :                 if !res.is_empty() {
     736            0 :                     warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
     737            0 :                 }
     738            0 :                 Ok(res)
     739              :             }
     740           66 :             Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
     741              :             Some(AuxFilePolicy::CrossValidation) => {
     742            6 :                 let v1_result = self.list_aux_files_v1(lsn, ctx).await;
     743            6 :                 let v2_result = self.list_aux_files_v2(lsn, ctx).await;
     744            6 :                 match (v1_result, v2_result) {
     745            6 :                     (Ok(v1), Ok(v2)) => {
     746            6 :                         if v1 != v2 {
     747            0 :                             tracing::error!(
     748            0 :                                 "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
     749              :                             );
     750            0 :                             return Err(PageReconstructError::Other(anyhow::anyhow!(
     751            0 :                                 "unmatched aux file v1 v2 result"
     752            0 :                             )));
     753            6 :                         }
     754            6 :                         Ok(v1)
     755              :                     }
     756            0 :                     (Ok(_), Err(v2)) => {
     757            0 :                         tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
     758            0 :                         Err(v2)
     759              :                     }
     760            0 :                     (Err(v1), Ok(_)) => {
     761            0 :                         tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
     762            0 :                         Err(v1)
     763              :                     }
     764            0 :                     (Err(_), Err(v2)) => Err(v2),
     765              :                 }
     766              :             }
     767              :         }
     768           78 :     }
     769              : 
     770            0 :     pub(crate) async fn get_replorigins(
     771            0 :         &self,
     772            0 :         lsn: Lsn,
     773            0 :         ctx: &RequestContext,
     774            0 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
     775            0 :         let kv = self
     776            0 :             .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
     777            0 :             .await?;
     778            0 :         let mut result = HashMap::new();
     779            0 :         for (k, v) in kv {
     780            0 :             let v = v?;
     781            0 :             let origin_id = k.field6 as RepOriginId;
     782            0 :             let origin_lsn = Lsn::des(&v).unwrap();
     783            0 :             if origin_lsn != Lsn::INVALID {
     784            0 :                 result.insert(origin_id, origin_lsn);
     785            0 :             }
     786              :         }
     787            0 :         Ok(result)
     788            0 :     }
     789              : 
     790              :     /// Does the same as get_current_logical_size but counted on demand.
     791              :     /// Used to initialize the logical size tracking on startup.
     792              :     ///
     793              :     /// Only relation blocks are counted currently. That excludes metadata,
     794              :     /// SLRUs, twophase files etc.
     795              :     ///
     796              :     /// # Cancel-Safety
     797              :     ///
     798              :     /// This method is cancellation-safe.
     799            0 :     pub(crate) async fn get_current_logical_size_non_incremental(
     800            0 :         &self,
     801            0 :         lsn: Lsn,
     802            0 :         ctx: &RequestContext,
     803            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     804            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     805              : 
     806              :         // Fetch list of database dirs and iterate them
     807            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     808            0 :         let dbdir = DbDirectory::des(&buf)?;
     809              : 
     810            0 :         let mut total_size: u64 = 0;
     811            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     812            0 :             for rel in self
     813            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     814            0 :                 .await?
     815              :             {
     816            0 :                 if self.cancel.is_cancelled() {
     817            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     818            0 :                 }
     819            0 :                 let relsize_key = rel_size_to_key(rel);
     820            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     821            0 :                 let relsize = buf.get_u32_le();
     822            0 : 
     823            0 :                 total_size += relsize as u64;
     824              :             }
     825              :         }
     826            0 :         Ok(total_size * BLCKSZ as u64)
     827            0 :     }
     828              : 
     829              :     ///
     830              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     831              :     /// Anything that's not listed maybe removed from the underlying storage (from
     832              :     /// that LSN forwards).
     833              :     ///
     834              :     /// The return value is (dense keyspace, sparse keyspace).
     835          864 :     pub(crate) async fn collect_keyspace(
     836          864 :         &self,
     837          864 :         lsn: Lsn,
     838          864 :         ctx: &RequestContext,
     839          864 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     840          864 :         // Iterate through key ranges, greedily packing them into partitions
     841          864 :         let mut result = KeySpaceAccum::new();
     842          864 : 
     843          864 :         // The dbdir metadata always exists
     844          864 :         result.add_key(DBDIR_KEY);
     845              : 
     846              :         // Fetch list of database dirs and iterate them
     847         9382 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
     848          864 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
     849          864 : 
     850          864 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
     851          864 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
     852            0 :             if has_relmap_file {
     853            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
     854            0 :             }
     855            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     856              : 
     857            0 :             let mut rels: Vec<RelTag> = self
     858            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     859            0 :                 .await?
     860            0 :                 .into_iter()
     861            0 :                 .collect();
     862            0 :             rels.sort_unstable();
     863            0 :             for rel in rels {
     864            0 :                 let relsize_key = rel_size_to_key(rel);
     865            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     866            0 :                 let relsize = buf.get_u32_le();
     867            0 : 
     868            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     869            0 :                 result.add_key(relsize_key);
     870              :             }
     871              :         }
     872              : 
     873              :         // Iterate SLRUs next
     874         2592 :         for kind in [
     875          864 :             SlruKind::Clog,
     876          864 :             SlruKind::MultiXactMembers,
     877          864 :             SlruKind::MultiXactOffsets,
     878              :         ] {
     879         2592 :             let slrudir_key = slru_dir_to_key(kind);
     880         2592 :             result.add_key(slrudir_key);
     881        28707 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     882         2592 :             let dir = SlruSegmentDirectory::des(&buf)?;
     883         2592 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     884         2592 :             segments.sort_unstable();
     885         2592 :             for segno in segments {
     886            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     887            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     888            0 :                 let segsize = buf.get_u32_le();
     889            0 : 
     890            0 :                 result.add_range(
     891            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     892            0 :                 );
     893            0 :                 result.add_key(segsize_key);
     894              :             }
     895              :         }
     896              : 
     897              :         // Then pg_twophase
     898          864 :         result.add_key(TWOPHASEDIR_KEY);
     899         9401 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     900          864 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     901          864 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     902          864 :         xids.sort_unstable();
     903          864 :         for xid in xids {
     904            0 :             result.add_key(twophase_file_key(xid));
     905            0 :         }
     906              : 
     907          864 :         result.add_key(CONTROLFILE_KEY);
     908          864 :         result.add_key(CHECKPOINT_KEY);
     909          864 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     910           12 :             result.add_key(AUX_FILES_KEY);
     911          852 :         }
     912              : 
     913              :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
     914              :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
     915              :         // and the keys will not be garbage-colllected.
     916              :         #[cfg(test)]
     917              :         {
     918          864 :             let guard = self.extra_test_dense_keyspace.load();
     919          864 :             for kr in &guard.ranges {
     920            0 :                 result.add_range(kr.clone());
     921            0 :             }
     922              :         }
     923              : 
     924          864 :         let dense_keyspace = result.to_keyspace();
     925          864 :         let sparse_keyspace = SparseKeySpace(KeySpace {
     926          864 :             ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
     927          864 :         });
     928          864 : 
     929          864 :         if cfg!(debug_assertions) {
     930              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
     931              : 
     932              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
     933              :             // category of sparse keys are split into their own image/delta files. If there
     934              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
     935              :             // and we want the developer to keep the keyspaces separated.
     936              : 
     937          864 :             let ranges = &sparse_keyspace.0.ranges;
     938              : 
     939              :             // TODO: use a single overlaps_with across the codebase
     940          864 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
     941          864 :                 !(a.end <= b.start || b.end <= a.start)
     942          864 :             }
     943         1728 :             for i in 0..ranges.len() {
     944         1728 :                 for j in 0..i {
     945          864 :                     if overlaps_with(&ranges[i], &ranges[j]) {
     946            0 :                         panic!(
     947            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
     948            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
     949            0 :                         );
     950          864 :                     }
     951              :                 }
     952              :             }
     953          864 :             for i in 1..ranges.len() {
     954          864 :                 assert!(
     955          864 :                     ranges[i - 1].end <= ranges[i].start,
     956            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
     957            0 :                     ranges[i - 1].start,
     958            0 :                     ranges[i - 1].end,
     959            0 :                     ranges[i].start,
     960            0 :                     ranges[i].end
     961              :                 );
     962              :             }
     963            0 :         }
     964              : 
     965          864 :         Ok((dense_keyspace, sparse_keyspace))
     966          864 :     }
     967              : 
     968              :     /// Get cached size of relation if it not updated after specified LSN
     969      1345620 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     970      1345620 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     971      1345620 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
     972      1345554 :             if lsn >= *cached_lsn {
     973      1330116 :                 return Some(*nblocks);
     974        15438 :             }
     975           66 :         }
     976        15504 :         None
     977      1345620 :     }
     978              : 
     979              :     /// Update cached relation size if there is no more recent update
     980        15408 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     981        15408 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     982        15408 : 
     983        15408 :         if lsn < rel_size_cache.complete_as_of {
     984              :             // Do not cache old values. It's safe to cache the size on read, as long as
     985              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
     986              :             // never evict values from the cache, so if the relation size changed after
     987              :             // 'lsn', the new value is already in the cache.
     988            0 :             return;
     989        15408 :         }
     990        15408 : 
     991        15408 :         match rel_size_cache.map.entry(tag) {
     992        15408 :             hash_map::Entry::Occupied(mut entry) => {
     993        15408 :                 let cached_lsn = entry.get_mut();
     994        15408 :                 if lsn >= cached_lsn.0 {
     995            0 :                     *cached_lsn = (lsn, nblocks);
     996        15408 :                 }
     997              :             }
     998            0 :             hash_map::Entry::Vacant(entry) => {
     999            0 :                 entry.insert((lsn, nblocks));
    1000            0 :             }
    1001              :         }
    1002        15408 :     }
    1003              : 
    1004              :     /// Store cached relation size
    1005       866196 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1006       866196 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1007       866196 :         rel_size_cache.map.insert(tag, (lsn, nblocks));
    1008       866196 :     }
    1009              : 
    1010              :     /// Remove cached relation size
    1011            6 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1012            6 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1013            6 :         rel_size_cache.map.remove(tag);
    1014            6 :     }
    1015              : }
    1016              : 
    1017              : /// DatadirModification represents an operation to ingest an atomic set of
    1018              : /// updates to the repository. It is created by the 'begin_record'
    1019              : /// function. It is called for each WAL record, so that all the modifications
    1020              : /// by a one WAL record appear atomic.
    1021              : pub struct DatadirModification<'a> {
    1022              :     /// The timeline this modification applies to. You can access this to
    1023              :     /// read the state, but note that any pending updates are *not* reflected
    1024              :     /// in the state in 'tline' yet.
    1025              :     pub tline: &'a Timeline,
    1026              : 
    1027              :     /// Current LSN of the modification
    1028              :     lsn: Lsn,
    1029              : 
    1030              :     // The modifications are not applied directly to the underlying key-value store.
    1031              :     // The put-functions add the modifications here, and they are flushed to the
    1032              :     // underlying key-value store by the 'finish' function.
    1033              :     pending_lsns: Vec<Lsn>,
    1034              :     pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
    1035              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1036              :     pending_nblocks: i64,
    1037              : 
    1038              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1039              :     /// if it was updated in this modification.
    1040              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
    1041              : 
    1042              :     /// An **approximation** of how large our EphemeralFile write will be when committed.
    1043              :     pending_bytes: usize,
    1044              : }
    1045              : 
    1046              : impl<'a> DatadirModification<'a> {
    1047              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1048              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1049              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1050              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1051              : 
    1052              :     /// Get the current lsn
    1053      1254168 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1054      1254168 :         self.lsn
    1055      1254168 :     }
    1056              : 
    1057            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1058            0 :         self.pending_bytes
    1059            0 :     }
    1060              : 
    1061              :     /// Set the current lsn
    1062       437574 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
    1063       437574 :         ensure!(
    1064       437574 :             lsn >= self.lsn,
    1065            0 :             "setting an older lsn {} than {} is not allowed",
    1066              :             lsn,
    1067              :             self.lsn
    1068              :         );
    1069       437574 :         if lsn > self.lsn {
    1070       437574 :             self.pending_lsns.push(self.lsn);
    1071       437574 :             self.lsn = lsn;
    1072       437574 :         }
    1073       437574 :         Ok(())
    1074       437574 :     }
    1075              : 
    1076              :     /// Initialize a completely new repository.
    1077              :     ///
    1078              :     /// This inserts the directory metadata entries that are assumed to
    1079              :     /// always exist.
    1080          522 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1081          522 :         let buf = DbDirectory::ser(&DbDirectory {
    1082          522 :             dbdirs: HashMap::new(),
    1083          522 :         })?;
    1084          522 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
    1085          522 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1086          522 : 
    1087          522 :         // Create AuxFilesDirectory
    1088          522 :         self.init_aux_dir()?;
    1089              : 
    1090          522 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1091          522 :             xids: HashSet::new(),
    1092          522 :         })?;
    1093          522 :         self.pending_directory_entries
    1094          522 :             .push((DirectoryKind::TwoPhase, 0));
    1095          522 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1096              : 
    1097          522 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1098          522 :         let empty_dir = Value::Image(buf);
    1099          522 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1100          522 :         self.pending_directory_entries
    1101          522 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1102          522 :         self.put(
    1103          522 :             slru_dir_to_key(SlruKind::MultiXactMembers),
    1104          522 :             empty_dir.clone(),
    1105          522 :         );
    1106          522 :         self.pending_directory_entries
    1107          522 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1108          522 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1109          522 :         self.pending_directory_entries
    1110          522 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
    1111          522 : 
    1112          522 :         Ok(())
    1113          522 :     }
    1114              : 
    1115              :     #[cfg(test)]
    1116          516 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1117          516 :         self.init_empty()?;
    1118          516 :         self.put_control_file(bytes::Bytes::from_static(
    1119          516 :             b"control_file contents do not matter",
    1120          516 :         ))
    1121          516 :         .context("put_control_file")?;
    1122          516 :         self.put_checkpoint(bytes::Bytes::from_static(
    1123          516 :             b"checkpoint_file contents do not matter",
    1124          516 :         ))
    1125          516 :         .context("put_checkpoint_file")?;
    1126          516 :         Ok(())
    1127          516 :     }
    1128              : 
    1129              :     /// Put a new page version that can be constructed from a WAL record
    1130              :     ///
    1131              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1132              :     /// current end-of-file. It's up to the caller to check that the relation size
    1133              :     /// matches the blocks inserted!
    1134       436890 :     pub fn put_rel_wal_record(
    1135       436890 :         &mut self,
    1136       436890 :         rel: RelTag,
    1137       436890 :         blknum: BlockNumber,
    1138       436890 :         rec: NeonWalRecord,
    1139       436890 :     ) -> anyhow::Result<()> {
    1140       436890 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1141       436890 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1142       436890 :         Ok(())
    1143       436890 :     }
    1144              : 
    1145              :     // Same, but for an SLRU.
    1146           24 :     pub fn put_slru_wal_record(
    1147           24 :         &mut self,
    1148           24 :         kind: SlruKind,
    1149           24 :         segno: u32,
    1150           24 :         blknum: BlockNumber,
    1151           24 :         rec: NeonWalRecord,
    1152           24 :     ) -> anyhow::Result<()> {
    1153           24 :         self.put(
    1154           24 :             slru_block_to_key(kind, segno, blknum),
    1155           24 :             Value::WalRecord(rec),
    1156           24 :         );
    1157           24 :         Ok(())
    1158           24 :     }
    1159              : 
    1160              :     /// Like put_wal_record, but with ready-made image of the page.
    1161       842592 :     pub fn put_rel_page_image(
    1162       842592 :         &mut self,
    1163       842592 :         rel: RelTag,
    1164       842592 :         blknum: BlockNumber,
    1165       842592 :         img: Bytes,
    1166       842592 :     ) -> anyhow::Result<()> {
    1167       842592 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1168       842592 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1169       842592 :         Ok(())
    1170       842592 :     }
    1171              : 
    1172           18 :     pub fn put_slru_page_image(
    1173           18 :         &mut self,
    1174           18 :         kind: SlruKind,
    1175           18 :         segno: u32,
    1176           18 :         blknum: BlockNumber,
    1177           18 :         img: Bytes,
    1178           18 :     ) -> anyhow::Result<()> {
    1179           18 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
    1180           18 :         Ok(())
    1181           18 :     }
    1182              : 
    1183              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1184           48 :     pub async fn put_relmap_file(
    1185           48 :         &mut self,
    1186           48 :         spcnode: Oid,
    1187           48 :         dbnode: Oid,
    1188           48 :         img: Bytes,
    1189           48 :         ctx: &RequestContext,
    1190           48 :     ) -> anyhow::Result<()> {
    1191              :         // Add it to the directory (if it doesn't exist already)
    1192           48 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1193           48 :         let mut dbdir = DbDirectory::des(&buf)?;
    1194              : 
    1195           48 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1196           48 :         if r.is_none() || r == Some(false) {
    1197              :             // The dbdir entry didn't exist, or it contained a
    1198              :             // 'false'. The 'insert' call already updated it with
    1199              :             // 'true', now write the updated 'dbdirs' map back.
    1200           48 :             let buf = DbDirectory::ser(&dbdir)?;
    1201           48 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1202           48 : 
    1203           48 :             // Create AuxFilesDirectory as well
    1204           48 :             self.init_aux_dir()?;
    1205            0 :         }
    1206           48 :         if r.is_none() {
    1207           24 :             // Create RelDirectory
    1208           24 :             let buf = RelDirectory::ser(&RelDirectory {
    1209           24 :                 rels: HashSet::new(),
    1210           24 :             })?;
    1211           24 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1212           24 :             self.put(
    1213           24 :                 rel_dir_to_key(spcnode, dbnode),
    1214           24 :                 Value::Image(Bytes::from(buf)),
    1215           24 :             );
    1216           24 :         }
    1217              : 
    1218           48 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1219           48 :         Ok(())
    1220           48 :     }
    1221              : 
    1222            0 :     pub async fn put_twophase_file(
    1223            0 :         &mut self,
    1224            0 :         xid: TransactionId,
    1225            0 :         img: Bytes,
    1226            0 :         ctx: &RequestContext,
    1227            0 :     ) -> anyhow::Result<()> {
    1228              :         // Add it to the directory entry
    1229            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1230            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1231            0 :         if !dir.xids.insert(xid) {
    1232            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1233            0 :         }
    1234            0 :         self.pending_directory_entries
    1235            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1236            0 :         self.put(
    1237            0 :             TWOPHASEDIR_KEY,
    1238            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1239              :         );
    1240              : 
    1241            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1242            0 :         Ok(())
    1243            0 :     }
    1244              : 
    1245            0 :     pub async fn set_replorigin(
    1246            0 :         &mut self,
    1247            0 :         origin_id: RepOriginId,
    1248            0 :         origin_lsn: Lsn,
    1249            0 :     ) -> anyhow::Result<()> {
    1250            0 :         let key = repl_origin_key(origin_id);
    1251            0 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    1252            0 :         Ok(())
    1253            0 :     }
    1254              : 
    1255            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
    1256            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    1257            0 :     }
    1258              : 
    1259          522 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1260          522 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1261          522 :         Ok(())
    1262          522 :     }
    1263              : 
    1264          564 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1265          564 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1266          564 :         Ok(())
    1267          564 :     }
    1268              : 
    1269            0 :     pub async fn drop_dbdir(
    1270            0 :         &mut self,
    1271            0 :         spcnode: Oid,
    1272            0 :         dbnode: Oid,
    1273            0 :         ctx: &RequestContext,
    1274            0 :     ) -> anyhow::Result<()> {
    1275            0 :         let total_blocks = self
    1276            0 :             .tline
    1277            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1278            0 :             .await?;
    1279              : 
    1280              :         // Remove entry from dbdir
    1281            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1282            0 :         let mut dir = DbDirectory::des(&buf)?;
    1283            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1284            0 :             let buf = DbDirectory::ser(&dir)?;
    1285            0 :             self.pending_directory_entries
    1286            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1287            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1288              :         } else {
    1289            0 :             warn!(
    1290            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1291              :                 spcnode, dbnode
    1292              :             );
    1293              :         }
    1294              : 
    1295              :         // Update logical database size.
    1296            0 :         self.pending_nblocks -= total_blocks as i64;
    1297            0 : 
    1298            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1299            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1300            0 :         Ok(())
    1301            0 :     }
    1302              : 
    1303              :     /// Create a relation fork.
    1304              :     ///
    1305              :     /// 'nblocks' is the initial size.
    1306         5760 :     pub async fn put_rel_creation(
    1307         5760 :         &mut self,
    1308         5760 :         rel: RelTag,
    1309         5760 :         nblocks: BlockNumber,
    1310         5760 :         ctx: &RequestContext,
    1311         5760 :     ) -> Result<(), RelationError> {
    1312         5760 :         if rel.relnode == 0 {
    1313            0 :             return Err(RelationError::InvalidRelnode);
    1314         5760 :         }
    1315              :         // It's possible that this is the first rel for this db in this
    1316              :         // tablespace.  Create the reldir entry for it if so.
    1317         5760 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1318         5760 :             .context("deserialize db")?;
    1319         5760 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1320         5760 :         let mut rel_dir =
    1321         5760 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1322              :                 // Didn't exist. Update dbdir
    1323           24 :                 e.insert(false);
    1324           24 :                 let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1325           24 :                 self.pending_directory_entries
    1326           24 :                     .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1327           24 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1328           24 : 
    1329           24 :                 // and create the RelDirectory
    1330           24 :                 RelDirectory::default()
    1331              :             } else {
    1332              :                 // reldir already exists, fetch it
    1333         5736 :                 RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1334         5736 :                     .context("deserialize db")?
    1335              :             };
    1336              : 
    1337              :         // Add the new relation to the rel directory entry, and write it back
    1338         5760 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1339            0 :             return Err(RelationError::AlreadyExists);
    1340         5760 :         }
    1341         5760 : 
    1342         5760 :         self.pending_directory_entries
    1343         5760 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1344         5760 : 
    1345         5760 :         self.put(
    1346         5760 :             rel_dir_key,
    1347         5760 :             Value::Image(Bytes::from(
    1348         5760 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1349              :             )),
    1350              :         );
    1351              : 
    1352              :         // Put size
    1353         5760 :         let size_key = rel_size_to_key(rel);
    1354         5760 :         let buf = nblocks.to_le_bytes();
    1355         5760 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1356         5760 : 
    1357         5760 :         self.pending_nblocks += nblocks as i64;
    1358         5760 : 
    1359         5760 :         // Update relation size cache
    1360         5760 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1361         5760 : 
    1362         5760 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1363         5760 :         // caller.
    1364         5760 :         Ok(())
    1365         5760 :     }
    1366              : 
    1367              :     /// Truncate relation
    1368        18036 :     pub async fn put_rel_truncation(
    1369        18036 :         &mut self,
    1370        18036 :         rel: RelTag,
    1371        18036 :         nblocks: BlockNumber,
    1372        18036 :         ctx: &RequestContext,
    1373        18036 :     ) -> anyhow::Result<()> {
    1374        18036 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1375        18036 :         if self
    1376        18036 :             .tline
    1377        18036 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1378            0 :             .await?
    1379              :         {
    1380        18036 :             let size_key = rel_size_to_key(rel);
    1381              :             // Fetch the old size first
    1382        18036 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1383        18036 : 
    1384        18036 :             // Update the entry with the new size.
    1385        18036 :             let buf = nblocks.to_le_bytes();
    1386        18036 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1387        18036 : 
    1388        18036 :             // Update relation size cache
    1389        18036 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1390        18036 : 
    1391        18036 :             // Update relation size cache
    1392        18036 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1393        18036 : 
    1394        18036 :             // Update logical database size.
    1395        18036 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1396            0 :         }
    1397        18036 :         Ok(())
    1398        18036 :     }
    1399              : 
    1400              :     /// Extend relation
    1401              :     /// If new size is smaller, do nothing.
    1402       830040 :     pub async fn put_rel_extend(
    1403       830040 :         &mut self,
    1404       830040 :         rel: RelTag,
    1405       830040 :         nblocks: BlockNumber,
    1406       830040 :         ctx: &RequestContext,
    1407       830040 :     ) -> anyhow::Result<()> {
    1408       830040 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1409              : 
    1410              :         // Put size
    1411       830040 :         let size_key = rel_size_to_key(rel);
    1412       830040 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1413       830040 : 
    1414       830040 :         // only extend relation here. never decrease the size
    1415       830040 :         if nblocks > old_size {
    1416       824364 :             let buf = nblocks.to_le_bytes();
    1417       824364 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1418       824364 : 
    1419       824364 :             // Update relation size cache
    1420       824364 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1421       824364 : 
    1422       824364 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1423       824364 :         }
    1424       830040 :         Ok(())
    1425       830040 :     }
    1426              : 
    1427              :     /// Drop a relation.
    1428            6 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1429            6 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1430              : 
    1431              :         // Remove it from the directory entry
    1432            6 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1433            6 :         let buf = self.get(dir_key, ctx).await?;
    1434            6 :         let mut dir = RelDirectory::des(&buf)?;
    1435              : 
    1436            6 :         self.pending_directory_entries
    1437            6 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1438            6 : 
    1439            6 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1440            6 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1441              :         } else {
    1442            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1443              :         }
    1444              : 
    1445              :         // update logical size
    1446            6 :         let size_key = rel_size_to_key(rel);
    1447            6 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1448            6 :         self.pending_nblocks -= old_size as i64;
    1449            6 : 
    1450            6 :         // Remove enty from relation size cache
    1451            6 :         self.tline.remove_cached_rel_size(&rel);
    1452            6 : 
    1453            6 :         // Delete size entry, as well as all blocks
    1454            6 :         self.delete(rel_key_range(rel));
    1455            6 : 
    1456            6 :         Ok(())
    1457            6 :     }
    1458              : 
    1459           18 :     pub async fn put_slru_segment_creation(
    1460           18 :         &mut self,
    1461           18 :         kind: SlruKind,
    1462           18 :         segno: u32,
    1463           18 :         nblocks: BlockNumber,
    1464           18 :         ctx: &RequestContext,
    1465           18 :     ) -> anyhow::Result<()> {
    1466           18 :         // Add it to the directory entry
    1467           18 :         let dir_key = slru_dir_to_key(kind);
    1468           18 :         let buf = self.get(dir_key, ctx).await?;
    1469           18 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1470              : 
    1471           18 :         if !dir.segments.insert(segno) {
    1472            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1473           18 :         }
    1474           18 :         self.pending_directory_entries
    1475           18 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1476           18 :         self.put(
    1477           18 :             dir_key,
    1478           18 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1479              :         );
    1480              : 
    1481              :         // Put size
    1482           18 :         let size_key = slru_segment_size_to_key(kind, segno);
    1483           18 :         let buf = nblocks.to_le_bytes();
    1484           18 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1485           18 : 
    1486           18 :         // even if nblocks > 0, we don't insert any actual blocks here
    1487           18 : 
    1488           18 :         Ok(())
    1489           18 :     }
    1490              : 
    1491              :     /// Extend SLRU segment
    1492            0 :     pub fn put_slru_extend(
    1493            0 :         &mut self,
    1494            0 :         kind: SlruKind,
    1495            0 :         segno: u32,
    1496            0 :         nblocks: BlockNumber,
    1497            0 :     ) -> anyhow::Result<()> {
    1498            0 :         // Put size
    1499            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1500            0 :         let buf = nblocks.to_le_bytes();
    1501            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1502            0 :         Ok(())
    1503            0 :     }
    1504              : 
    1505              :     /// This method is used for marking truncated SLRU files
    1506            0 :     pub async fn drop_slru_segment(
    1507            0 :         &mut self,
    1508            0 :         kind: SlruKind,
    1509            0 :         segno: u32,
    1510            0 :         ctx: &RequestContext,
    1511            0 :     ) -> anyhow::Result<()> {
    1512            0 :         // Remove it from the directory entry
    1513            0 :         let dir_key = slru_dir_to_key(kind);
    1514            0 :         let buf = self.get(dir_key, ctx).await?;
    1515            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1516              : 
    1517            0 :         if !dir.segments.remove(&segno) {
    1518            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1519            0 :         }
    1520            0 :         self.pending_directory_entries
    1521            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1522            0 :         self.put(
    1523            0 :             dir_key,
    1524            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1525              :         );
    1526              : 
    1527              :         // Delete size entry, as well as all blocks
    1528            0 :         self.delete(slru_segment_key_range(kind, segno));
    1529            0 : 
    1530            0 :         Ok(())
    1531            0 :     }
    1532              : 
    1533              :     /// Drop a relmapper file (pg_filenode.map)
    1534            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1535            0 :         // TODO
    1536            0 :         Ok(())
    1537            0 :     }
    1538              : 
    1539              :     /// This method is used for marking truncated SLRU files
    1540            0 :     pub async fn drop_twophase_file(
    1541            0 :         &mut self,
    1542            0 :         xid: TransactionId,
    1543            0 :         ctx: &RequestContext,
    1544            0 :     ) -> anyhow::Result<()> {
    1545              :         // Remove it from the directory entry
    1546            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1547            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1548              : 
    1549            0 :         if !dir.xids.remove(&xid) {
    1550            0 :             warn!("twophase file for xid {} does not exist", xid);
    1551            0 :         }
    1552            0 :         self.pending_directory_entries
    1553            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1554            0 :         self.put(
    1555            0 :             TWOPHASEDIR_KEY,
    1556            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1557              :         );
    1558              : 
    1559              :         // Delete it
    1560            0 :         self.delete(twophase_key_range(xid));
    1561            0 : 
    1562            0 :         Ok(())
    1563            0 :     }
    1564              : 
    1565          570 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1566          570 :         if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
    1567          558 :             return Ok(());
    1568           12 :         }
    1569           12 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1570           12 :             files: HashMap::new(),
    1571           12 :         })?;
    1572           12 :         self.pending_directory_entries
    1573           12 :             .push((DirectoryKind::AuxFiles, 0));
    1574           12 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1575           12 :         Ok(())
    1576          570 :     }
    1577              : 
    1578           90 :     pub async fn put_file(
    1579           90 :         &mut self,
    1580           90 :         path: &str,
    1581           90 :         content: &[u8],
    1582           90 :         ctx: &RequestContext,
    1583           90 :     ) -> anyhow::Result<()> {
    1584           90 :         let switch_policy = self.tline.get_switch_aux_file_policy();
    1585              : 
    1586           90 :         let policy = {
    1587           90 :             let current_policy = self.tline.last_aux_file_policy.load();
    1588              :             // Allowed switch path:
    1589              :             // * no aux files -> v1/v2/cross-validation
    1590              :             // * cross-validation->v2
    1591              : 
    1592           90 :             let current_policy = if current_policy.is_none() {
    1593              :                 // This path will only be hit once per tenant: we will decide the final policy in this code block.
    1594              :                 // The next call to `put_file` will always have `last_aux_file_policy != None`.
    1595           36 :                 let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1596           36 :                 let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
    1597           36 :                 if aux_files_key_v1.is_empty() {
    1598           30 :                     None
    1599              :                 } else {
    1600            6 :                     warn!("this timeline is using deprecated aux file policy V1");
    1601            6 :                     self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
    1602            6 :                     Some(AuxFilePolicy::V1)
    1603              :                 }
    1604              :             } else {
    1605           54 :                 current_policy
    1606              :             };
    1607              : 
    1608           90 :             if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
    1609           36 :                 self.tline.do_switch_aux_policy(switch_policy)?;
    1610           36 :                 info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
    1611           36 :                 switch_policy
    1612              :             } else {
    1613              :                 // This branch handles non-valid migration path, and the case that switch_policy == current_policy.
    1614              :                 // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
    1615           54 :                 current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
    1616              :             }
    1617              :         };
    1618              : 
    1619           90 :         if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
    1620           78 :             let key = aux_file::encode_aux_file_key(path);
    1621              :             // retrieve the key from the engine
    1622           78 :             let old_val = match self.get(key, ctx).await {
    1623           18 :                 Ok(val) => Some(val),
    1624           60 :                 Err(PageReconstructError::MissingKey(_)) => None,
    1625            0 :                 Err(e) => return Err(e.into()),
    1626              :             };
    1627           78 :             let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    1628           18 :                 aux_file::decode_file_value(old_val)?
    1629              :             } else {
    1630           60 :                 Vec::new()
    1631              :             };
    1632           78 :             let mut other_files = Vec::with_capacity(files.len());
    1633           78 :             let mut modifying_file = None;
    1634           96 :             for file @ (p, content) in files {
    1635           18 :                 if path == p {
    1636           18 :                     assert!(
    1637           18 :                         modifying_file.is_none(),
    1638            0 :                         "duplicated entries found for {}",
    1639              :                         path
    1640              :                     );
    1641           18 :                     modifying_file = Some(content);
    1642            0 :                 } else {
    1643            0 :                     other_files.push(file);
    1644            0 :                 }
    1645              :             }
    1646           78 :             let mut new_files = other_files;
    1647           78 :             match (modifying_file, content.is_empty()) {
    1648           12 :                 (Some(old_content), false) => {
    1649           12 :                     self.tline
    1650           12 :                         .aux_file_size_estimator
    1651           12 :                         .on_update(old_content.len(), content.len());
    1652           12 :                     new_files.push((path, content));
    1653           12 :                 }
    1654            6 :                 (Some(old_content), true) => {
    1655            6 :                     self.tline
    1656            6 :                         .aux_file_size_estimator
    1657            6 :                         .on_remove(old_content.len());
    1658            6 :                     // not adding the file key to the final `new_files` vec.
    1659            6 :                 }
    1660           60 :                 (None, false) => {
    1661           60 :                     self.tline.aux_file_size_estimator.on_add(content.len());
    1662           60 :                     new_files.push((path, content));
    1663           60 :                 }
    1664            0 :                 (None, true) => warn!("removing non-existing aux file: {}", path),
    1665              :             }
    1666           78 :             let new_val = aux_file::encode_file_value(&new_files)?;
    1667           78 :             self.put(key, Value::Image(new_val.into()));
    1668           12 :         }
    1669              : 
    1670           90 :         if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
    1671           18 :             let file_path = path.to_string();
    1672           18 :             let content = if content.is_empty() {
    1673            0 :                 None
    1674              :             } else {
    1675           18 :                 Some(Bytes::copy_from_slice(content))
    1676              :             };
    1677              : 
    1678              :             let n_files;
    1679           18 :             let mut aux_files = self.tline.aux_files.lock().await;
    1680           18 :             if let Some(mut dir) = aux_files.dir.take() {
    1681              :                 // We already updated aux files in `self`: emit a delta and update our latest value.
    1682            0 :                 dir.upsert(file_path.clone(), content.clone());
    1683            0 :                 n_files = dir.files.len();
    1684            0 :                 if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
    1685            0 :                     self.put(
    1686            0 :                         AUX_FILES_KEY,
    1687            0 :                         Value::Image(Bytes::from(
    1688            0 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1689              :                         )),
    1690              :                     );
    1691            0 :                     aux_files.n_deltas = 0;
    1692            0 :                 } else {
    1693            0 :                     self.put(
    1694            0 :                         AUX_FILES_KEY,
    1695            0 :                         Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
    1696            0 :                     );
    1697            0 :                     aux_files.n_deltas += 1;
    1698            0 :                 }
    1699            0 :                 aux_files.dir = Some(dir);
    1700              :             } else {
    1701              :                 // Check if the AUX_FILES_KEY is initialized
    1702           18 :                 match self.get(AUX_FILES_KEY, ctx).await {
    1703           18 :                     Ok(dir_bytes) => {
    1704           18 :                         let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1705              :                         // Key is already set, we may append a delta
    1706           18 :                         self.put(
    1707           18 :                             AUX_FILES_KEY,
    1708           18 :                             Value::WalRecord(NeonWalRecord::AuxFile {
    1709           18 :                                 file_path: file_path.clone(),
    1710           18 :                                 content: content.clone(),
    1711           18 :                             }),
    1712           18 :                         );
    1713           18 :                         dir.upsert(file_path, content);
    1714           18 :                         n_files = dir.files.len();
    1715           18 :                         aux_files.dir = Some(dir);
    1716              :                     }
    1717              :                     Err(
    1718            0 :                         e @ (PageReconstructError::Cancelled
    1719            0 :                         | PageReconstructError::AncestorLsnTimeout(_)),
    1720            0 :                     ) => {
    1721            0 :                         // Important that we do not interpret a shutdown error as "not found" and thereby
    1722            0 :                         // reset the map.
    1723            0 :                         return Err(e.into());
    1724              :                     }
    1725              :                     // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
    1726              :                     // the original code assumes all other errors are missing keys. Therefore, we keep the code path
    1727              :                     // the same for now, though in theory, we should only match the `MissingKey` variant.
    1728              :                     Err(
    1729            0 :                         e @ (PageReconstructError::Other(_)
    1730              :                         | PageReconstructError::WalRedo(_)
    1731              :                         | PageReconstructError::MissingKey(_)),
    1732              :                     ) => {
    1733              :                         // Key is missing, we must insert an image as the basis for subsequent deltas.
    1734              : 
    1735            0 :                         if !matches!(e, PageReconstructError::MissingKey(_)) {
    1736            0 :                             let e = utils::error::report_compact_sources(&e);
    1737            0 :                             tracing::warn!("treating error as if it was a missing key: {}", e);
    1738            0 :                         }
    1739              : 
    1740            0 :                         let mut dir = AuxFilesDirectory {
    1741            0 :                             files: HashMap::new(),
    1742            0 :                         };
    1743            0 :                         dir.upsert(file_path, content);
    1744            0 :                         self.put(
    1745            0 :                             AUX_FILES_KEY,
    1746            0 :                             Value::Image(Bytes::from(
    1747            0 :                                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1748              :                             )),
    1749              :                         );
    1750            0 :                         n_files = 1;
    1751            0 :                         aux_files.dir = Some(dir);
    1752              :                     }
    1753              :                 }
    1754              :             }
    1755              : 
    1756           18 :             self.pending_directory_entries
    1757           18 :                 .push((DirectoryKind::AuxFiles, n_files));
    1758           72 :         }
    1759              : 
    1760           90 :         Ok(())
    1761           90 :     }
    1762              : 
    1763              :     ///
    1764              :     /// Flush changes accumulated so far to the underlying repository.
    1765              :     ///
    1766              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1767              :     /// you to flush them to the underlying repository before the final `commit`.
    1768              :     /// That allows to free up the memory used to hold the pending changes.
    1769              :     ///
    1770              :     /// Currently only used during bulk import of a data directory. In that
    1771              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1772              :     /// whole import fails and the timeline will be deleted anyway.
    1773              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1774              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1775              :     ///
    1776              :     /// Note: A consequence of flushing the pending operations is that they
    1777              :     /// won't be visible to subsequent operations until `commit`. The function
    1778              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1779              :     /// for bulk import, where you are just loading data pages and won't try to
    1780              :     /// modify the same pages twice.
    1781         5790 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1782         5790 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1783         5790 :         // to scan through the pending_updates list.
    1784         5790 :         let pending_nblocks = self.pending_nblocks;
    1785         5790 :         if pending_nblocks < 10000 {
    1786         5790 :             return Ok(());
    1787            0 :         }
    1788              : 
    1789            0 :         let mut writer = self.tline.writer().await;
    1790              : 
    1791              :         // Flush relation and  SLRU data blocks, keep metadata.
    1792            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1793            0 :         for (key, values) in self.pending_updates.drain() {
    1794            0 :             if !key.is_valid_key_on_write_path() {
    1795            0 :                 bail!(
    1796            0 :                     "the request contains data not supported by pageserver at TimelineWriter::put: {}", key
    1797            0 :                 );
    1798            0 :             }
    1799            0 :             let mut write_batch = Vec::new();
    1800            0 :             for (lsn, value_ser_size, value) in values {
    1801            0 :                 if key.is_rel_block_key() || key.is_slru_block_key() {
    1802            0 :                     // This bails out on first error without modifying pending_updates.
    1803            0 :                     // That's Ok, cf this function's doc comment.
    1804            0 :                     write_batch.push((key.to_compact(), lsn, value_ser_size, value));
    1805            0 :                 } else {
    1806            0 :                     retained_pending_updates.entry(key).or_default().push((
    1807            0 :                         lsn,
    1808            0 :                         value_ser_size,
    1809            0 :                         value,
    1810            0 :                     ));
    1811            0 :                 }
    1812              :             }
    1813            0 :             writer.put_batch(write_batch, ctx).await?;
    1814              :         }
    1815              : 
    1816            0 :         self.pending_updates = retained_pending_updates;
    1817            0 :         self.pending_bytes = 0;
    1818            0 : 
    1819            0 :         if pending_nblocks != 0 {
    1820            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1821            0 :             self.pending_nblocks = 0;
    1822            0 :         }
    1823              : 
    1824            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1825            0 :             writer.update_directory_entries_count(kind, count as u64);
    1826            0 :         }
    1827              : 
    1828            0 :         Ok(())
    1829         5790 :     }
    1830              : 
    1831              :     ///
    1832              :     /// Finish this atomic update, writing all the updated keys to the
    1833              :     /// underlying timeline.
    1834              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1835              :     ///
    1836      2229228 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1837      2229228 :         let mut writer = self.tline.writer().await;
    1838              : 
    1839      2229228 :         let pending_nblocks = self.pending_nblocks;
    1840      2229228 :         self.pending_nblocks = 0;
    1841      2229228 : 
    1842      2229228 :         if !self.pending_updates.is_empty() {
    1843              :             // Ordering: the items in this batch do not need to be in any global order, but values for
    1844              :             // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    1845              :             // this to do efficient updates to its index.
    1846      1242180 :             let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
    1847      1242180 :                 .pending_updates
    1848      1242180 :                 .drain()
    1849      2100984 :                 .flat_map(|(key, values)| {
    1850      2100984 :                     values.into_iter().map(move |(lsn, val_ser_size, value)| {
    1851      2100984 :                         if !key.is_valid_key_on_write_path() {
    1852            0 :                             bail!("the request contains data not supported by pageserver at TimelineWriter::put: {}", key);
    1853      2100984 :                         }
    1854      2100984 :                         Ok((key.to_compact(), lsn, val_ser_size, value))
    1855      2100984 :                     })
    1856      2100984 :                 })
    1857      1242180 :                 .collect::<anyhow::Result<Vec<_>>>()?;
    1858              : 
    1859      1242180 :             writer.put_batch(batch, ctx).await?;
    1860       987048 :         }
    1861              : 
    1862      2229228 :         if !self.pending_deletions.is_empty() {
    1863            6 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    1864            6 :             self.pending_deletions.clear();
    1865      2229222 :         }
    1866              : 
    1867      2229228 :         self.pending_lsns.push(self.lsn);
    1868      2666802 :         for pending_lsn in self.pending_lsns.drain(..) {
    1869      2666802 :             // Ideally, we should be able to call writer.finish_write() only once
    1870      2666802 :             // with the highest LSN. However, the last_record_lsn variable in the
    1871      2666802 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1872      2666802 :             // so we need to record every LSN to not leave a gap between them.
    1873      2666802 :             writer.finish_write(pending_lsn);
    1874      2666802 :         }
    1875              : 
    1876      2229228 :         if pending_nblocks != 0 {
    1877       811710 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1878      1417518 :         }
    1879              : 
    1880      2229228 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1881         8472 :             writer.update_directory_entries_count(kind, count as u64);
    1882         8472 :         }
    1883              : 
    1884      2229228 :         self.pending_bytes = 0;
    1885      2229228 : 
    1886      2229228 :         Ok(())
    1887      2229228 :     }
    1888              : 
    1889       875112 :     pub(crate) fn len(&self) -> usize {
    1890       875112 :         self.pending_updates.len() + self.pending_deletions.len()
    1891       875112 :     }
    1892              : 
    1893              :     // Internal helper functions to batch the modifications
    1894              : 
    1895       859806 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1896              :         // Have we already updated the same key? Read the latest pending updated
    1897              :         // version in that case.
    1898              :         //
    1899              :         // Note: we don't check pending_deletions. It is an error to request a
    1900              :         // value that has been removed, deletion only avoids leaking storage.
    1901       859806 :         if let Some(values) = self.pending_updates.get(&key) {
    1902        47784 :             if let Some((_, _, value)) = values.last() {
    1903        47784 :                 return if let Value::Image(img) = value {
    1904        47784 :                     Ok(img.clone())
    1905              :                 } else {
    1906              :                     // Currently, we never need to read back a WAL record that we
    1907              :                     // inserted in the same "transaction". All the metadata updates
    1908              :                     // work directly with Images, and we never need to read actual
    1909              :                     // data pages. We could handle this if we had to, by calling
    1910              :                     // the walredo manager, but let's keep it simple for now.
    1911            0 :                     Err(PageReconstructError::Other(anyhow::anyhow!(
    1912            0 :                         "unexpected pending WAL record"
    1913            0 :                     )))
    1914              :                 };
    1915            0 :             }
    1916       812022 :         }
    1917       812022 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1918       812022 :         self.tline.get(key, lsn, ctx).await
    1919       859806 :     }
    1920              : 
    1921              :     /// Only used during unit tests, force putting a key into the modification.
    1922              :     #[cfg(test)]
    1923            6 :     pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
    1924            6 :         self.put(key, val);
    1925            6 :     }
    1926              : 
    1927      2137440 :     fn put(&mut self, key: Key, val: Value) {
    1928      2137440 :         let values = self.pending_updates.entry(key).or_default();
    1929              :         // Replace the previous value if it exists at the same lsn
    1930      2137440 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    1931        36456 :             if *last_lsn == self.lsn {
    1932        36456 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    1933        36456 :                 *last_value = val;
    1934        36456 :                 return;
    1935            0 :             }
    1936      2100984 :         }
    1937              : 
    1938      2100984 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    1939      2100984 :         self.pending_bytes += val_serialized_size;
    1940      2100984 :         values.push((self.lsn, val_serialized_size, val));
    1941      2137440 :     }
    1942              : 
    1943            6 :     fn delete(&mut self, key_range: Range<Key>) {
    1944            6 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1945            6 :         self.pending_deletions.push((key_range, self.lsn));
    1946            6 :     }
    1947              : }
    1948              : 
    1949              : /// This struct facilitates accessing either a committed key from the timeline at a
    1950              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1951              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1952              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1953              : /// need to look up the keys in the modification first before looking them up in the
    1954              : /// timeline to not miss the latest updates.
    1955              : #[derive(Clone, Copy)]
    1956              : pub enum Version<'a> {
    1957              :     Lsn(Lsn),
    1958              :     Modified(&'a DatadirModification<'a>),
    1959              : }
    1960              : 
    1961              : impl<'a> Version<'a> {
    1962        70680 :     async fn get(
    1963        70680 :         &self,
    1964        70680 :         timeline: &Timeline,
    1965        70680 :         key: Key,
    1966        70680 :         ctx: &RequestContext,
    1967        70680 :     ) -> Result<Bytes, PageReconstructError> {
    1968        70680 :         match self {
    1969        70620 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1970           60 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1971              :         }
    1972        70680 :     }
    1973              : 
    1974       106860 :     fn get_lsn(&self) -> Lsn {
    1975       106860 :         match self {
    1976        88722 :             Version::Lsn(lsn) => *lsn,
    1977        18138 :             Version::Modified(modification) => modification.lsn,
    1978              :         }
    1979       106860 :     }
    1980              : }
    1981              : 
    1982              : //--- Metadata structs stored in key-value pairs in the repository.
    1983              : 
    1984         6726 : #[derive(Debug, Serialize, Deserialize)]
    1985              : struct DbDirectory {
    1986              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1987              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1988              : }
    1989              : 
    1990          870 : #[derive(Debug, Serialize, Deserialize)]
    1991              : struct TwoPhaseDirectory {
    1992              :     xids: HashSet<TransactionId>,
    1993              : }
    1994              : 
    1995         5796 : #[derive(Debug, Serialize, Deserialize, Default)]
    1996              : struct RelDirectory {
    1997              :     // Set of relations that exist. (relfilenode, forknum)
    1998              :     //
    1999              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    2000              :     // key-value pairs, if you have a lot of relations
    2001              :     rels: HashSet<(Oid, u8)>,
    2002              : }
    2003              : 
    2004           84 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
    2005              : pub(crate) struct AuxFilesDirectory {
    2006              :     pub(crate) files: HashMap<String, Bytes>,
    2007              : }
    2008              : 
    2009              : impl AuxFilesDirectory {
    2010           48 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    2011           48 :         if let Some(value) = value {
    2012           42 :             self.files.insert(key, value);
    2013           42 :         } else {
    2014            6 :             self.files.remove(&key);
    2015            6 :         }
    2016           48 :     }
    2017              : }
    2018              : 
    2019            0 : #[derive(Debug, Serialize, Deserialize)]
    2020              : struct RelSizeEntry {
    2021              :     nblocks: u32,
    2022              : }
    2023              : 
    2024         2610 : #[derive(Debug, Serialize, Deserialize, Default)]
    2025              : struct SlruSegmentDirectory {
    2026              :     // Set of SLRU segments that exist.
    2027              :     segments: HashSet<u32>,
    2028              : }
    2029              : 
    2030              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2031              : #[repr(u8)]
    2032              : pub(crate) enum DirectoryKind {
    2033              :     Db,
    2034              :     TwoPhase,
    2035              :     Rel,
    2036              :     AuxFiles,
    2037              :     SlruSegment(SlruKind),
    2038              : }
    2039              : 
    2040              : impl DirectoryKind {
    2041              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2042        16944 :     pub(crate) fn offset(&self) -> usize {
    2043        16944 :         self.into_usize()
    2044        16944 :     }
    2045              : }
    2046              : 
    2047              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2048              : 
    2049              : #[allow(clippy::bool_assert_comparison)]
    2050              : #[cfg(test)]
    2051              : mod tests {
    2052              :     use hex_literal::hex;
    2053              :     use utils::id::TimelineId;
    2054              : 
    2055              :     use super::*;
    2056              : 
    2057              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    2058              : 
    2059              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2060              :     #[tokio::test]
    2061            6 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2062            6 :         let name = "aux_files_round_trip";
    2063            6 :         let harness = TenantHarness::create(name).await?;
    2064            6 : 
    2065            6 :         pub const TIMELINE_ID: TimelineId =
    2066            6 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2067            6 : 
    2068           23 :         let (tenant, ctx) = harness.load().await;
    2069            6 :         let tline = tenant
    2070            6 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2071            6 :             .await?;
    2072            6 :         let tline = tline.raw_timeline().unwrap();
    2073            6 : 
    2074            6 :         // First modification: insert two keys
    2075            6 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2076            6 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2077            6 :         modification.set_lsn(Lsn(0x1008))?;
    2078            6 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2079            6 :         modification.commit(&ctx).await?;
    2080            6 :         let expect_1008 = HashMap::from([
    2081            6 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2082            6 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2083            6 :         ]);
    2084            6 : 
    2085            6 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2086            6 :         assert_eq!(readback, expect_1008);
    2087            6 : 
    2088            6 :         // Second modification: update one key, remove the other
    2089            6 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2090            6 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2091            6 :         modification.set_lsn(Lsn(0x2008))?;
    2092            6 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2093            6 :         modification.commit(&ctx).await?;
    2094            6 :         let expect_2008 =
    2095            6 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2096            6 : 
    2097            6 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    2098            6 :         assert_eq!(readback, expect_2008);
    2099            6 : 
    2100            6 :         // Reading back in time works
    2101            6 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2102            6 :         assert_eq!(readback, expect_1008);
    2103            6 : 
    2104            6 :         Ok(())
    2105            6 :     }
    2106              : 
    2107              :     /*
    2108              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    2109              :             let incremental = timeline.get_current_logical_size();
    2110              :             let non_incremental = timeline
    2111              :                 .get_current_logical_size_non_incremental(lsn)
    2112              :                 .unwrap();
    2113              :             assert_eq!(incremental, non_incremental);
    2114              :         }
    2115              :     */
    2116              : 
    2117              :     /*
    2118              :     ///
    2119              :     /// Test list_rels() function, with branches and dropped relations
    2120              :     ///
    2121              :     #[test]
    2122              :     fn test_list_rels_drop() -> Result<()> {
    2123              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    2124              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    2125              :         const TESTDB: u32 = 111;
    2126              : 
    2127              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    2128              :         // after branching fails below
    2129              :         let mut writer = tline.begin_record(Lsn(0x10));
    2130              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    2131              :         writer.finish()?;
    2132              : 
    2133              :         // Create a relation on the timeline
    2134              :         let mut writer = tline.begin_record(Lsn(0x20));
    2135              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    2136              :         writer.finish()?;
    2137              : 
    2138              :         let writer = tline.begin_record(Lsn(0x00));
    2139              :         writer.finish()?;
    2140              : 
    2141              :         // Check that list_rels() lists it after LSN 2, but no before it
    2142              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    2143              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    2144              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    2145              : 
    2146              :         // Create a branch, check that the relation is visible there
    2147              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    2148              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    2149              :             Some(timeline) => timeline,
    2150              :             None => panic!("Should have a local timeline"),
    2151              :         };
    2152              :         let newtline = DatadirTimelineImpl::new(newtline);
    2153              :         assert!(newtline
    2154              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2155              :             .contains(&TESTREL_A));
    2156              : 
    2157              :         // Drop it on the branch
    2158              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    2159              :         new_writer.drop_relation(TESTREL_A)?;
    2160              :         new_writer.finish()?;
    2161              : 
    2162              :         // Check that it's no longer listed on the branch after the point where it was dropped
    2163              :         assert!(newtline
    2164              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2165              :             .contains(&TESTREL_A));
    2166              :         assert!(!newtline
    2167              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2168              :             .contains(&TESTREL_A));
    2169              : 
    2170              :         // Run checkpoint and garbage collection and check that it's still not visible
    2171              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    2172              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    2173              : 
    2174              :         assert!(!newtline
    2175              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2176              :             .contains(&TESTREL_A));
    2177              : 
    2178              :         Ok(())
    2179              :     }
    2180              :      */
    2181              : 
    2182              :     /*
    2183              :     #[test]
    2184              :     fn test_read_beyond_eof() -> Result<()> {
    2185              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    2186              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    2187              : 
    2188              :         make_some_layers(&tline, Lsn(0x20))?;
    2189              :         let mut writer = tline.begin_record(Lsn(0x60));
    2190              :         walingest.put_rel_page_image(
    2191              :             &mut writer,
    2192              :             TESTREL_A,
    2193              :             0,
    2194              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    2195              :         )?;
    2196              :         writer.finish()?;
    2197              : 
    2198              :         // Test read before rel creation. Should error out.
    2199              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2200              : 
    2201              :         // Read block beyond end of relation at different points in time.
    2202              :         // These reads should fall into different delta, image, and in-memory layers.
    2203              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2204              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2205              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2206              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2207              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2208              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2209              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2210              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2211              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2212              : 
    2213              :         // Test on an in-memory layer with no preceding layer
    2214              :         let mut writer = tline.begin_record(Lsn(0x70));
    2215              :         walingest.put_rel_page_image(
    2216              :             &mut writer,
    2217              :             TESTREL_B,
    2218              :             0,
    2219              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2220              :         )?;
    2221              :         writer.finish()?;
    2222              : 
    2223              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2224              : 
    2225              :         Ok(())
    2226              :     }
    2227              :      */
    2228              : }
        

Generated by: LCOV version 2.1-beta