LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 55.4 % 1365 756
Test Date: 2024-10-22 22:13:45 Functions: 39.8 % 191 76

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      13              : use crate::walrecord::NeonWalRecord;
      14              : use crate::{aux_file, repository::*};
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use pageserver_api::key::{
      19              :     dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
      20              :     relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      21              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      22              :     CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      23              : };
      24              : use pageserver_api::keyspace::SparseKeySpace;
      25              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      26              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      27              : use postgres_ffi::BLCKSZ;
      28              : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
      29              : use serde::{Deserialize, Serialize};
      30              : use std::collections::{hash_map, HashMap, HashSet};
      31              : use std::ops::ControlFlow;
      32              : use std::ops::Range;
      33              : use strum::IntoEnumIterator;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::{debug, trace, warn};
      36              : use utils::bin_ser::DeserializeError;
      37              : use utils::pausable_failpoint;
      38              : use utils::{bin_ser::BeSer, lsn::Lsn};
      39              : 
      40              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      41              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      42              : 
      43              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      44              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
      45              : 
      46              : #[derive(Debug)]
      47              : pub enum LsnForTimestamp {
      48              :     /// Found commits both before and after the given timestamp
      49              :     Present(Lsn),
      50              : 
      51              :     /// Found no commits after the given timestamp, this means
      52              :     /// that the newest data in the branch is older than the given
      53              :     /// timestamp.
      54              :     ///
      55              :     /// All commits <= LSN happened before the given timestamp
      56              :     Future(Lsn),
      57              : 
      58              :     /// The queried timestamp is past our horizon we look back at (PITR)
      59              :     ///
      60              :     /// All commits > LSN happened after the given timestamp,
      61              :     /// but any commits < LSN might have happened before or after
      62              :     /// the given timestamp. We don't know because no data before
      63              :     /// the given lsn is available.
      64              :     Past(Lsn),
      65              : 
      66              :     /// We have found no commit with a timestamp,
      67              :     /// so we can't return anything meaningful.
      68              :     ///
      69              :     /// The associated LSN is the lower bound value we can safely
      70              :     /// create branches on, but no statement is made if it is
      71              :     /// older or newer than the timestamp.
      72              :     ///
      73              :     /// This variant can e.g. be returned right after a
      74              :     /// cluster import.
      75              :     NoData(Lsn),
      76              : }
      77              : 
      78            0 : #[derive(Debug, thiserror::Error)]
      79              : pub(crate) enum CalculateLogicalSizeError {
      80              :     #[error("cancelled")]
      81              :     Cancelled,
      82              : 
      83              :     /// Something went wrong while reading the metadata we use to calculate logical size
      84              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
      85              :     /// in the `From` implementation for this variant.
      86              :     #[error(transparent)]
      87              :     PageRead(PageReconstructError),
      88              : 
      89              :     /// Something went wrong deserializing metadata that we read to calculate logical size
      90              :     #[error("decode error: {0}")]
      91              :     Decode(#[from] DeserializeError),
      92              : }
      93              : 
      94            0 : #[derive(Debug, thiserror::Error)]
      95              : pub(crate) enum CollectKeySpaceError {
      96              :     #[error(transparent)]
      97              :     Decode(#[from] DeserializeError),
      98              :     #[error(transparent)]
      99              :     PageRead(PageReconstructError),
     100              :     #[error("cancelled")]
     101              :     Cancelled,
     102              : }
     103              : 
     104              : impl From<PageReconstructError> for CollectKeySpaceError {
     105            0 :     fn from(err: PageReconstructError) -> Self {
     106            0 :         match err {
     107            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     108            0 :             err => Self::PageRead(err),
     109              :         }
     110            0 :     }
     111              : }
     112              : 
     113              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     114            0 :     fn from(pre: PageReconstructError) -> Self {
     115            0 :         match pre {
     116            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     117            0 :             _ => Self::PageRead(pre),
     118              :         }
     119            0 :     }
     120              : }
     121              : 
     122            0 : #[derive(Debug, thiserror::Error)]
     123              : pub enum RelationError {
     124              :     #[error("Relation Already Exists")]
     125              :     AlreadyExists,
     126              :     #[error("invalid relnode")]
     127              :     InvalidRelnode,
     128              :     #[error(transparent)]
     129              :     Other(#[from] anyhow::Error),
     130              : }
     131              : 
     132              : ///
     133              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     134              : /// and other special kinds of files, in a versioned key-value store. The
     135              : /// Timeline struct provides the key-value store.
     136              : ///
     137              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     138              : /// implementation, and might be moved into a separate struct later.
     139              : impl Timeline {
     140              :     /// Start ingesting a WAL record, or other atomic modification of
     141              :     /// the timeline.
     142              :     ///
     143              :     /// This provides a transaction-like interface to perform a bunch
     144              :     /// of modifications atomically.
     145              :     ///
     146              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     147              :     /// DatadirModification object. Use the functions in the object to
     148              :     /// modify the repository state, updating all the pages and metadata
     149              :     /// that the WAL record affects. When you're done, call commit() to
     150              :     /// commit the changes.
     151              :     ///
     152              :     /// Lsn stored in modification is advanced by `ingest_record` and
     153              :     /// is used by `commit()` to update `last_record_lsn`.
     154              :     ///
     155              :     /// Calling commit() will flush all the changes and reset the state,
     156              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     157              :     ///
     158              :     /// Note that any pending modifications you make through the
     159              :     /// modification object won't be visible to calls to the 'get' and list
     160              :     /// functions of the timeline until you finish! And if you update the
     161              :     /// same page twice, the last update wins.
     162              :     ///
     163       268374 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     164       268374 :     where
     165       268374 :         Self: Sized,
     166       268374 :     {
     167       268374 :         DatadirModification {
     168       268374 :             tline: self,
     169       268374 :             pending_lsns: Vec::new(),
     170       268374 :             pending_metadata_pages: HashMap::new(),
     171       268374 :             pending_data_pages: Vec::new(),
     172       268374 :             pending_zero_data_pages: Default::default(),
     173       268374 :             pending_deletions: Vec::new(),
     174       268374 :             pending_nblocks: 0,
     175       268374 :             pending_directory_entries: Vec::new(),
     176       268374 :             pending_bytes: 0,
     177       268374 :             lsn,
     178       268374 :         }
     179       268374 :     }
     180              : 
     181              :     //------------------------------------------------------------------------------
     182              :     // Public GET functions
     183              :     //------------------------------------------------------------------------------
     184              : 
     185              :     /// Look up given page version.
     186        18384 :     pub(crate) async fn get_rel_page_at_lsn(
     187        18384 :         &self,
     188        18384 :         tag: RelTag,
     189        18384 :         blknum: BlockNumber,
     190        18384 :         version: Version<'_>,
     191        18384 :         ctx: &RequestContext,
     192        18384 :     ) -> Result<Bytes, PageReconstructError> {
     193        18384 :         if tag.relnode == 0 {
     194            0 :             return Err(PageReconstructError::Other(
     195            0 :                 RelationError::InvalidRelnode.into(),
     196            0 :             ));
     197        18384 :         }
     198              : 
     199        18384 :         let nblocks = self.get_rel_size(tag, version, ctx).await?;
     200        18384 :         if blknum >= nblocks {
     201            0 :             debug!(
     202            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     203            0 :                 tag,
     204            0 :                 blknum,
     205            0 :                 version.get_lsn(),
     206              :                 nblocks
     207              :             );
     208            0 :             return Ok(ZERO_PAGE.clone());
     209        18384 :         }
     210        18384 : 
     211        18384 :         let key = rel_block_to_key(tag, blknum);
     212        18384 :         version.get(self, key, ctx).await
     213        18384 :     }
     214              : 
     215              :     // Get size of a database in blocks
     216            0 :     pub(crate) async fn get_db_size(
     217            0 :         &self,
     218            0 :         spcnode: Oid,
     219            0 :         dbnode: Oid,
     220            0 :         version: Version<'_>,
     221            0 :         ctx: &RequestContext,
     222            0 :     ) -> Result<usize, PageReconstructError> {
     223            0 :         let mut total_blocks = 0;
     224              : 
     225            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     226              : 
     227            0 :         for rel in rels {
     228            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     229            0 :             total_blocks += n_blocks as usize;
     230              :         }
     231            0 :         Ok(total_blocks)
     232            0 :     }
     233              : 
     234              :     /// Get size of a relation file
     235        24434 :     pub(crate) async fn get_rel_size(
     236        24434 :         &self,
     237        24434 :         tag: RelTag,
     238        24434 :         version: Version<'_>,
     239        24434 :         ctx: &RequestContext,
     240        24434 :     ) -> Result<BlockNumber, PageReconstructError> {
     241        24434 :         if tag.relnode == 0 {
     242            0 :             return Err(PageReconstructError::Other(
     243            0 :                 RelationError::InvalidRelnode.into(),
     244            0 :             ));
     245        24434 :         }
     246              : 
     247        24434 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     248        19294 :             return Ok(nblocks);
     249         5140 :         }
     250         5140 : 
     251         5140 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     252            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     253              :         {
     254              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     255              :             // FSM, and smgrnblocks() on it immediately afterwards,
     256              :             // without extending it.  Tolerate that by claiming that
     257              :             // any non-existent FSM fork has size 0.
     258            0 :             return Ok(0);
     259         5140 :         }
     260         5140 : 
     261         5140 :         let key = rel_size_to_key(tag);
     262         5140 :         let mut buf = version.get(self, key, ctx).await?;
     263         5136 :         let nblocks = buf.get_u32_le();
     264         5136 : 
     265         5136 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     266         5136 : 
     267         5136 :         Ok(nblocks)
     268        24434 :     }
     269              : 
     270              :     /// Does relation exist?
     271         6050 :     pub(crate) async fn get_rel_exists(
     272         6050 :         &self,
     273         6050 :         tag: RelTag,
     274         6050 :         version: Version<'_>,
     275         6050 :         ctx: &RequestContext,
     276         6050 :     ) -> Result<bool, PageReconstructError> {
     277         6050 :         if tag.relnode == 0 {
     278            0 :             return Err(PageReconstructError::Other(
     279            0 :                 RelationError::InvalidRelnode.into(),
     280            0 :             ));
     281         6050 :         }
     282              : 
     283              :         // first try to lookup relation in cache
     284         6050 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     285         6032 :             return Ok(true);
     286           18 :         }
     287              :         // then check if the database was already initialized.
     288              :         // get_rel_exists can be called before dbdir is created.
     289           18 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     290           18 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     291           18 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     292            0 :             return Ok(false);
     293           18 :         }
     294           18 :         // fetch directory listing
     295           18 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     296           18 :         let buf = version.get(self, key, ctx).await?;
     297              : 
     298           18 :         let dir = RelDirectory::des(&buf)?;
     299           18 :         Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
     300         6050 :     }
     301              : 
     302              :     /// Get a list of all existing relations in given tablespace and database.
     303              :     ///
     304              :     /// # Cancel-Safety
     305              :     ///
     306              :     /// This method is cancellation-safe.
     307            0 :     pub(crate) async fn list_rels(
     308            0 :         &self,
     309            0 :         spcnode: Oid,
     310            0 :         dbnode: Oid,
     311            0 :         version: Version<'_>,
     312            0 :         ctx: &RequestContext,
     313            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     314            0 :         // fetch directory listing
     315            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     316            0 :         let buf = version.get(self, key, ctx).await?;
     317              : 
     318            0 :         let dir = RelDirectory::des(&buf)?;
     319            0 :         let rels: HashSet<RelTag> =
     320            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     321            0 :                 spcnode,
     322            0 :                 dbnode,
     323            0 :                 relnode: *relnode,
     324            0 :                 forknum: *forknum,
     325            0 :             }));
     326            0 : 
     327            0 :         Ok(rels)
     328            0 :     }
     329              : 
     330              :     /// Get the whole SLRU segment
     331            0 :     pub(crate) async fn get_slru_segment(
     332            0 :         &self,
     333            0 :         kind: SlruKind,
     334            0 :         segno: u32,
     335            0 :         lsn: Lsn,
     336            0 :         ctx: &RequestContext,
     337            0 :     ) -> Result<Bytes, PageReconstructError> {
     338            0 :         let n_blocks = self
     339            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     340            0 :             .await?;
     341            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     342            0 :         for blkno in 0..n_blocks {
     343            0 :             let block = self
     344            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     345            0 :                 .await?;
     346            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     347              :         }
     348            0 :         Ok(segment.freeze())
     349            0 :     }
     350              : 
     351              :     /// Look up given SLRU page version.
     352            0 :     pub(crate) async fn get_slru_page_at_lsn(
     353            0 :         &self,
     354            0 :         kind: SlruKind,
     355            0 :         segno: u32,
     356            0 :         blknum: BlockNumber,
     357            0 :         lsn: Lsn,
     358            0 :         ctx: &RequestContext,
     359            0 :     ) -> Result<Bytes, PageReconstructError> {
     360            0 :         let key = slru_block_to_key(kind, segno, blknum);
     361            0 :         self.get(key, lsn, ctx).await
     362            0 :     }
     363              : 
     364              :     /// Get size of an SLRU segment
     365            0 :     pub(crate) async fn get_slru_segment_size(
     366            0 :         &self,
     367            0 :         kind: SlruKind,
     368            0 :         segno: u32,
     369            0 :         version: Version<'_>,
     370            0 :         ctx: &RequestContext,
     371            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     372            0 :         let key = slru_segment_size_to_key(kind, segno);
     373            0 :         let mut buf = version.get(self, key, ctx).await?;
     374            0 :         Ok(buf.get_u32_le())
     375            0 :     }
     376              : 
     377              :     /// Get size of an SLRU segment
     378            0 :     pub(crate) async fn get_slru_segment_exists(
     379            0 :         &self,
     380            0 :         kind: SlruKind,
     381            0 :         segno: u32,
     382            0 :         version: Version<'_>,
     383            0 :         ctx: &RequestContext,
     384            0 :     ) -> Result<bool, PageReconstructError> {
     385            0 :         // fetch directory listing
     386            0 :         let key = slru_dir_to_key(kind);
     387            0 :         let buf = version.get(self, key, ctx).await?;
     388              : 
     389            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     390            0 :         Ok(dir.segments.contains(&segno))
     391            0 :     }
     392              : 
     393              :     /// Locate LSN, such that all transactions that committed before
     394              :     /// 'search_timestamp' are visible, but nothing newer is.
     395              :     ///
     396              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     397              :     /// so it's not well defined which LSN you get if there were multiple commits
     398              :     /// "in flight" at that point in time.
     399              :     ///
     400            0 :     pub(crate) async fn find_lsn_for_timestamp(
     401            0 :         &self,
     402            0 :         search_timestamp: TimestampTz,
     403            0 :         cancel: &CancellationToken,
     404            0 :         ctx: &RequestContext,
     405            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     406            0 :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     407              : 
     408            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     409            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     410            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     411            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     412            0 :         // on the safe side.
     413            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     414            0 :         let max_lsn = self.get_last_record_lsn();
     415            0 : 
     416            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     417            0 :         // LSN divided by 8.
     418            0 :         let mut low = min_lsn.0 / 8;
     419            0 :         let mut high = max_lsn.0 / 8 + 1;
     420            0 : 
     421            0 :         let mut found_smaller = false;
     422            0 :         let mut found_larger = false;
     423              : 
     424            0 :         while low < high {
     425            0 :             if cancel.is_cancelled() {
     426            0 :                 return Err(PageReconstructError::Cancelled);
     427            0 :             }
     428            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     429            0 :             let mid = (high + low) / 2;
     430              : 
     431            0 :             let cmp = self
     432            0 :                 .is_latest_commit_timestamp_ge_than(
     433            0 :                     search_timestamp,
     434            0 :                     Lsn(mid * 8),
     435            0 :                     &mut found_smaller,
     436            0 :                     &mut found_larger,
     437            0 :                     ctx,
     438            0 :                 )
     439            0 :                 .await?;
     440              : 
     441            0 :             if cmp {
     442            0 :                 high = mid;
     443            0 :             } else {
     444            0 :                 low = mid + 1;
     445            0 :             }
     446              :         }
     447              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     448              :         // so the LSN of the last commit record before or at `search_timestamp`.
     449              :         // Remove one from `low` to get `t`.
     450              :         //
     451              :         // FIXME: it would be better to get the LSN of the previous commit.
     452              :         // Otherwise, if you restore to the returned LSN, the database will
     453              :         // include physical changes from later commits that will be marked
     454              :         // as aborted, and will need to be vacuumed away.
     455            0 :         let commit_lsn = Lsn((low - 1) * 8);
     456            0 :         match (found_smaller, found_larger) {
     457              :             (false, false) => {
     458              :                 // This can happen if no commit records have been processed yet, e.g.
     459              :                 // just after importing a cluster.
     460            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     461              :             }
     462              :             (false, true) => {
     463              :                 // Didn't find any commit timestamps smaller than the request
     464            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     465              :             }
     466            0 :             (true, _) if commit_lsn < min_lsn => {
     467            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     468            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     469            0 :                 // below the min_lsn. We should never do that.
     470            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     471              :             }
     472              :             (true, false) => {
     473              :                 // Only found commits with timestamps smaller than the request.
     474              :                 // It's still a valid case for branch creation, return it.
     475              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     476              :                 // case, anyway.
     477            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     478              :             }
     479            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     480              :         }
     481            0 :     }
     482              : 
     483              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     484              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     485              :     ///
     486              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     487              :     /// with a smaller/larger timestamp.
     488              :     ///
     489            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     490            0 :         &self,
     491            0 :         search_timestamp: TimestampTz,
     492            0 :         probe_lsn: Lsn,
     493            0 :         found_smaller: &mut bool,
     494            0 :         found_larger: &mut bool,
     495            0 :         ctx: &RequestContext,
     496            0 :     ) -> Result<bool, PageReconstructError> {
     497            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     498            0 :             if timestamp >= search_timestamp {
     499            0 :                 *found_larger = true;
     500            0 :                 return ControlFlow::Break(true);
     501            0 :             } else {
     502            0 :                 *found_smaller = true;
     503            0 :             }
     504            0 :             ControlFlow::Continue(())
     505            0 :         })
     506            0 :         .await
     507            0 :     }
     508              : 
     509              :     /// Obtain the possible timestamp range for the given lsn.
     510              :     ///
     511              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     512            0 :     pub(crate) async fn get_timestamp_for_lsn(
     513            0 :         &self,
     514            0 :         probe_lsn: Lsn,
     515            0 :         ctx: &RequestContext,
     516            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     517            0 :         let mut max: Option<TimestampTz> = None;
     518            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     519            0 :             if let Some(max_prev) = max {
     520            0 :                 max = Some(max_prev.max(timestamp));
     521            0 :             } else {
     522            0 :                 max = Some(timestamp);
     523            0 :             }
     524            0 :             ControlFlow::Continue(())
     525            0 :         })
     526            0 :         .await?;
     527              : 
     528            0 :         Ok(max)
     529            0 :     }
     530              : 
     531              :     /// Runs the given function on all the timestamps for a given lsn
     532              :     ///
     533              :     /// The return value is either given by the closure, or set to the `Default`
     534              :     /// impl's output.
     535            0 :     async fn map_all_timestamps<T: Default>(
     536            0 :         &self,
     537            0 :         probe_lsn: Lsn,
     538            0 :         ctx: &RequestContext,
     539            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     540            0 :     ) -> Result<T, PageReconstructError> {
     541            0 :         for segno in self
     542            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     543            0 :             .await?
     544              :         {
     545            0 :             let nblocks = self
     546            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     547            0 :                 .await?;
     548            0 :             for blknum in (0..nblocks).rev() {
     549            0 :                 let clog_page = self
     550            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     551            0 :                     .await?;
     552              : 
     553            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     554            0 :                     let mut timestamp_bytes = [0u8; 8];
     555            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     556            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     557            0 : 
     558            0 :                     match f(timestamp) {
     559            0 :                         ControlFlow::Break(b) => return Ok(b),
     560            0 :                         ControlFlow::Continue(()) => (),
     561              :                     }
     562            0 :                 }
     563              :             }
     564              :         }
     565            0 :         Ok(Default::default())
     566            0 :     }
     567              : 
     568            0 :     pub(crate) async fn get_slru_keyspace(
     569            0 :         &self,
     570            0 :         version: Version<'_>,
     571            0 :         ctx: &RequestContext,
     572            0 :     ) -> Result<KeySpace, PageReconstructError> {
     573            0 :         let mut accum = KeySpaceAccum::new();
     574              : 
     575            0 :         for kind in SlruKind::iter() {
     576            0 :             let mut segments: Vec<u32> = self
     577            0 :                 .list_slru_segments(kind, version, ctx)
     578            0 :                 .await?
     579            0 :                 .into_iter()
     580            0 :                 .collect();
     581            0 :             segments.sort_unstable();
     582              : 
     583            0 :             for seg in segments {
     584            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     585              : 
     586            0 :                 accum.add_range(
     587            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     588            0 :                 );
     589              :             }
     590              :         }
     591              : 
     592            0 :         Ok(accum.to_keyspace())
     593            0 :     }
     594              : 
     595              :     /// Get a list of SLRU segments
     596            0 :     pub(crate) async fn list_slru_segments(
     597            0 :         &self,
     598            0 :         kind: SlruKind,
     599            0 :         version: Version<'_>,
     600            0 :         ctx: &RequestContext,
     601            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     602            0 :         // fetch directory entry
     603            0 :         let key = slru_dir_to_key(kind);
     604              : 
     605            0 :         let buf = version.get(self, key, ctx).await?;
     606            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
     607            0 :     }
     608              : 
     609            0 :     pub(crate) async fn get_relmap_file(
     610            0 :         &self,
     611            0 :         spcnode: Oid,
     612            0 :         dbnode: Oid,
     613            0 :         version: Version<'_>,
     614            0 :         ctx: &RequestContext,
     615            0 :     ) -> Result<Bytes, PageReconstructError> {
     616            0 :         let key = relmap_file_key(spcnode, dbnode);
     617              : 
     618            0 :         let buf = version.get(self, key, ctx).await?;
     619            0 :         Ok(buf)
     620            0 :     }
     621              : 
     622          286 :     pub(crate) async fn list_dbdirs(
     623          286 :         &self,
     624          286 :         lsn: Lsn,
     625          286 :         ctx: &RequestContext,
     626          286 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     627              :         // fetch directory entry
     628         3087 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     629              : 
     630          286 :         Ok(DbDirectory::des(&buf)?.dbdirs)
     631          286 :     }
     632              : 
     633            0 :     pub(crate) async fn get_twophase_file(
     634            0 :         &self,
     635            0 :         xid: u64,
     636            0 :         lsn: Lsn,
     637            0 :         ctx: &RequestContext,
     638            0 :     ) -> Result<Bytes, PageReconstructError> {
     639            0 :         let key = twophase_file_key(xid);
     640            0 :         let buf = self.get(key, lsn, ctx).await?;
     641            0 :         Ok(buf)
     642            0 :     }
     643              : 
     644          288 :     pub(crate) async fn list_twophase_files(
     645          288 :         &self,
     646          288 :         lsn: Lsn,
     647          288 :         ctx: &RequestContext,
     648          288 :     ) -> Result<HashSet<u64>, PageReconstructError> {
     649              :         // fetch directory entry
     650         3120 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     651              : 
     652          288 :         if self.pg_version >= 17 {
     653            0 :             Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
     654              :         } else {
     655          288 :             Ok(TwoPhaseDirectory::des(&buf)?
     656              :                 .xids
     657          288 :                 .iter()
     658          288 :                 .map(|x| u64::from(*x))
     659          288 :                 .collect())
     660              :         }
     661          288 :     }
     662              : 
     663            0 :     pub(crate) async fn get_control_file(
     664            0 :         &self,
     665            0 :         lsn: Lsn,
     666            0 :         ctx: &RequestContext,
     667            0 :     ) -> Result<Bytes, PageReconstructError> {
     668            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     669            0 :     }
     670              : 
     671           12 :     pub(crate) async fn get_checkpoint(
     672           12 :         &self,
     673           12 :         lsn: Lsn,
     674           12 :         ctx: &RequestContext,
     675           12 :     ) -> Result<Bytes, PageReconstructError> {
     676           12 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     677           12 :     }
     678              : 
     679           12 :     async fn list_aux_files_v2(
     680           12 :         &self,
     681           12 :         lsn: Lsn,
     682           12 :         ctx: &RequestContext,
     683           12 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     684           12 :         let kv = self
     685           12 :             .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
     686            0 :             .await?;
     687           12 :         let mut result = HashMap::new();
     688           12 :         let mut sz = 0;
     689           30 :         for (_, v) in kv {
     690           18 :             let v = v?;
     691           18 :             let v = aux_file::decode_file_value_bytes(&v)
     692           18 :                 .context("value decode")
     693           18 :                 .map_err(PageReconstructError::Other)?;
     694           34 :             for (fname, content) in v {
     695           16 :                 sz += fname.len();
     696           16 :                 sz += content.len();
     697           16 :                 result.insert(fname, content);
     698           16 :             }
     699              :         }
     700           12 :         self.aux_file_size_estimator.on_initial(sz);
     701           12 :         Ok(result)
     702           12 :     }
     703              : 
     704            0 :     pub(crate) async fn trigger_aux_file_size_computation(
     705            0 :         &self,
     706            0 :         lsn: Lsn,
     707            0 :         ctx: &RequestContext,
     708            0 :     ) -> Result<(), PageReconstructError> {
     709            0 :         self.list_aux_files_v2(lsn, ctx).await?;
     710            0 :         Ok(())
     711            0 :     }
     712              : 
     713           12 :     pub(crate) async fn list_aux_files(
     714           12 :         &self,
     715           12 :         lsn: Lsn,
     716           12 :         ctx: &RequestContext,
     717           12 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     718           12 :         self.list_aux_files_v2(lsn, ctx).await
     719           12 :     }
     720              : 
     721            0 :     pub(crate) async fn get_replorigins(
     722            0 :         &self,
     723            0 :         lsn: Lsn,
     724            0 :         ctx: &RequestContext,
     725            0 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
     726            0 :         let kv = self
     727            0 :             .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
     728            0 :             .await?;
     729            0 :         let mut result = HashMap::new();
     730            0 :         for (k, v) in kv {
     731            0 :             let v = v?;
     732            0 :             let origin_id = k.field6 as RepOriginId;
     733            0 :             let origin_lsn = Lsn::des(&v).unwrap();
     734            0 :             if origin_lsn != Lsn::INVALID {
     735            0 :                 result.insert(origin_id, origin_lsn);
     736            0 :             }
     737              :         }
     738            0 :         Ok(result)
     739            0 :     }
     740              : 
     741              :     /// Does the same as get_current_logical_size but counted on demand.
     742              :     /// Used to initialize the logical size tracking on startup.
     743              :     ///
     744              :     /// Only relation blocks are counted currently. That excludes metadata,
     745              :     /// SLRUs, twophase files etc.
     746              :     ///
     747              :     /// # Cancel-Safety
     748              :     ///
     749              :     /// This method is cancellation-safe.
     750            0 :     pub(crate) async fn get_current_logical_size_non_incremental(
     751            0 :         &self,
     752            0 :         lsn: Lsn,
     753            0 :         ctx: &RequestContext,
     754            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     755            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     756              : 
     757              :         // Fetch list of database dirs and iterate them
     758            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     759            0 :         let dbdir = DbDirectory::des(&buf)?;
     760              : 
     761            0 :         let mut total_size: u64 = 0;
     762            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     763            0 :             for rel in self
     764            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     765            0 :                 .await?
     766              :             {
     767            0 :                 if self.cancel.is_cancelled() {
     768            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     769            0 :                 }
     770            0 :                 let relsize_key = rel_size_to_key(rel);
     771            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     772            0 :                 let relsize = buf.get_u32_le();
     773            0 : 
     774            0 :                 total_size += relsize as u64;
     775              :             }
     776              :         }
     777            0 :         Ok(total_size * BLCKSZ as u64)
     778            0 :     }
     779              : 
     780              :     /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
     781              :     /// for gc-compaction.
     782              :     ///
     783              :     /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
     784              :     /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
     785              :     /// be kept only for a specific range of LSN.
     786              :     ///
     787              :     /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
     788              :     /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
     789              :     /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
     790              :     /// determine which keys to retain/drop for gc-compaction.
     791              :     ///
     792              :     /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
     793              :     /// to be retained for each of the branch LSN.
     794              :     ///
     795              :     /// The return value is (dense keyspace, sparse keyspace).
     796           26 :     pub(crate) async fn collect_gc_compaction_keyspace(
     797           26 :         &self,
     798           26 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     799           26 :         let metadata_key_begin = Key::metadata_key_range().start;
     800           26 :         let aux_v1_key = AUX_FILES_KEY;
     801           26 :         let dense_keyspace = KeySpace {
     802           26 :             ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
     803           26 :         };
     804           26 :         Ok((
     805           26 :             dense_keyspace,
     806           26 :             SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
     807           26 :         ))
     808           26 :     }
     809              : 
     810              :     ///
     811              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     812              :     /// Anything that's not listed maybe removed from the underlying storage (from
     813              :     /// that LSN forwards).
     814              :     ///
     815              :     /// The return value is (dense keyspace, sparse keyspace).
     816          286 :     pub(crate) async fn collect_keyspace(
     817          286 :         &self,
     818          286 :         lsn: Lsn,
     819          286 :         ctx: &RequestContext,
     820          286 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     821          286 :         // Iterate through key ranges, greedily packing them into partitions
     822          286 :         let mut result = KeySpaceAccum::new();
     823          286 : 
     824          286 :         // The dbdir metadata always exists
     825          286 :         result.add_key(DBDIR_KEY);
     826              : 
     827              :         // Fetch list of database dirs and iterate them
     828         3087 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
     829          286 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
     830          286 : 
     831          286 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
     832          286 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
     833            0 :             if has_relmap_file {
     834            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
     835            0 :             }
     836            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     837              : 
     838            0 :             let mut rels: Vec<RelTag> = self
     839            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     840            0 :                 .await?
     841            0 :                 .into_iter()
     842            0 :                 .collect();
     843            0 :             rels.sort_unstable();
     844            0 :             for rel in rels {
     845            0 :                 let relsize_key = rel_size_to_key(rel);
     846            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     847            0 :                 let relsize = buf.get_u32_le();
     848            0 : 
     849            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     850            0 :                 result.add_key(relsize_key);
     851              :             }
     852              :         }
     853              : 
     854              :         // Iterate SLRUs next
     855          858 :         for kind in [
     856          286 :             SlruKind::Clog,
     857          286 :             SlruKind::MultiXactMembers,
     858          286 :             SlruKind::MultiXactOffsets,
     859              :         ] {
     860          858 :             let slrudir_key = slru_dir_to_key(kind);
     861          858 :             result.add_key(slrudir_key);
     862         9543 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     863          858 :             let dir = SlruSegmentDirectory::des(&buf)?;
     864          858 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     865          858 :             segments.sort_unstable();
     866          858 :             for segno in segments {
     867            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     868            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     869            0 :                 let segsize = buf.get_u32_le();
     870            0 : 
     871            0 :                 result.add_range(
     872            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     873            0 :                 );
     874            0 :                 result.add_key(segsize_key);
     875              :             }
     876              :         }
     877              : 
     878              :         // Then pg_twophase
     879          286 :         result.add_key(TWOPHASEDIR_KEY);
     880              : 
     881          286 :         let mut xids: Vec<u64> = self
     882          286 :             .list_twophase_files(lsn, ctx)
     883         3120 :             .await?
     884          286 :             .iter()
     885          286 :             .cloned()
     886          286 :             .collect();
     887          286 :         xids.sort_unstable();
     888          286 :         for xid in xids {
     889            0 :             result.add_key(twophase_file_key(xid));
     890            0 :         }
     891              : 
     892          286 :         result.add_key(CONTROLFILE_KEY);
     893          286 :         result.add_key(CHECKPOINT_KEY);
     894          286 : 
     895          286 :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
     896          286 :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
     897          286 :         // and the keys will not be garbage-colllected.
     898          286 :         #[cfg(test)]
     899          286 :         {
     900          286 :             let guard = self.extra_test_dense_keyspace.load();
     901          286 :             for kr in &guard.ranges {
     902            0 :                 result.add_range(kr.clone());
     903            0 :             }
     904            0 :         }
     905            0 : 
     906          286 :         let dense_keyspace = result.to_keyspace();
     907          286 :         let sparse_keyspace = SparseKeySpace(KeySpace {
     908          286 :             ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
     909          286 :         });
     910          286 : 
     911          286 :         if cfg!(debug_assertions) {
     912              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
     913              : 
     914              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
     915              :             // category of sparse keys are split into their own image/delta files. If there
     916              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
     917              :             // and we want the developer to keep the keyspaces separated.
     918              : 
     919          286 :             let ranges = &sparse_keyspace.0.ranges;
     920              : 
     921              :             // TODO: use a single overlaps_with across the codebase
     922          286 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
     923          286 :                 !(a.end <= b.start || b.end <= a.start)
     924          286 :             }
     925          572 :             for i in 0..ranges.len() {
     926          572 :                 for j in 0..i {
     927          286 :                     if overlaps_with(&ranges[i], &ranges[j]) {
     928            0 :                         panic!(
     929            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
     930            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
     931            0 :                         );
     932          286 :                     }
     933              :                 }
     934              :             }
     935          286 :             for i in 1..ranges.len() {
     936          286 :                 assert!(
     937          286 :                     ranges[i - 1].end <= ranges[i].start,
     938            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
     939            0 :                     ranges[i - 1].start,
     940            0 :                     ranges[i - 1].end,
     941            0 :                     ranges[i].start,
     942            0 :                     ranges[i].end
     943              :                 );
     944              :             }
     945            0 :         }
     946              : 
     947          286 :         Ok((dense_keyspace, sparse_keyspace))
     948          286 :     }
     949              : 
     950              :     /// Get cached size of relation if it not updated after specified LSN
     951       448540 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     952       448540 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     953       448540 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
     954       448518 :             if lsn >= *cached_lsn {
     955       443372 :                 return Some(*nblocks);
     956         5146 :             }
     957           22 :         }
     958         5168 :         None
     959       448540 :     }
     960              : 
     961              :     /// Update cached relation size if there is no more recent update
     962         5136 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     963         5136 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     964         5136 : 
     965         5136 :         if lsn < rel_size_cache.complete_as_of {
     966              :             // Do not cache old values. It's safe to cache the size on read, as long as
     967              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
     968              :             // never evict values from the cache, so if the relation size changed after
     969              :             // 'lsn', the new value is already in the cache.
     970            0 :             return;
     971         5136 :         }
     972         5136 : 
     973         5136 :         match rel_size_cache.map.entry(tag) {
     974         5136 :             hash_map::Entry::Occupied(mut entry) => {
     975         5136 :                 let cached_lsn = entry.get_mut();
     976         5136 :                 if lsn >= cached_lsn.0 {
     977            0 :                     *cached_lsn = (lsn, nblocks);
     978         5136 :                 }
     979              :             }
     980            0 :             hash_map::Entry::Vacant(entry) => {
     981            0 :                 entry.insert((lsn, nblocks));
     982            0 :             }
     983              :         }
     984         5136 :     }
     985              : 
     986              :     /// Store cached relation size
     987       282720 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     988       282720 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     989       282720 :         rel_size_cache.map.insert(tag, (lsn, nblocks));
     990       282720 :     }
     991              : 
     992              :     /// Remove cached relation size
     993            2 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     994            2 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     995            2 :         rel_size_cache.map.remove(tag);
     996            2 :     }
     997              : }
     998              : 
     999              : /// DatadirModification represents an operation to ingest an atomic set of
    1000              : /// updates to the repository.
    1001              : ///
    1002              : /// It is created by the 'begin_record' function. It is called for each WAL
    1003              : /// record, so that all the modifications by a one WAL record appear atomic.
    1004              : pub struct DatadirModification<'a> {
    1005              :     /// The timeline this modification applies to. You can access this to
    1006              :     /// read the state, but note that any pending updates are *not* reflected
    1007              :     /// in the state in 'tline' yet.
    1008              :     pub tline: &'a Timeline,
    1009              : 
    1010              :     /// Current LSN of the modification
    1011              :     lsn: Lsn,
    1012              : 
    1013              :     // The modifications are not applied directly to the underlying key-value store.
    1014              :     // The put-functions add the modifications here, and they are flushed to the
    1015              :     // underlying key-value store by the 'finish' function.
    1016              :     pending_lsns: Vec<Lsn>,
    1017              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1018              :     pending_nblocks: i64,
    1019              : 
    1020              :     /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
    1021              :     /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
    1022              :     pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
    1023              : 
    1024              :     /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
    1025              :     /// which keys are stored here.
    1026              :     pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,
    1027              : 
    1028              :     // Sometimes during ingest, for example when extending a relation, we would like to write a zero page.  However,
    1029              :     // if we encounter a write from postgres in the same wal record, we will drop this entry.
    1030              :     //
    1031              :     // Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
    1032              :     // at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
    1033              :     pending_zero_data_pages: HashSet<CompactKey>,
    1034              : 
    1035              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1036              :     /// if it was updated in this modification.
    1037              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
    1038              : 
    1039              :     /// An **approximation** of how large our EphemeralFile write will be when committed.
    1040              :     pending_bytes: usize,
    1041              : }
    1042              : 
    1043              : impl<'a> DatadirModification<'a> {
    1044              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1045              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1046              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1047              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1048              : 
    1049              :     /// Get the current lsn
    1050       418056 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1051       418056 :         self.lsn
    1052       418056 :     }
    1053              : 
    1054            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1055            0 :         self.pending_bytes
    1056            0 :     }
    1057              : 
    1058            0 :     pub(crate) fn has_dirty_data_pages(&self) -> bool {
    1059            0 :         (!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
    1060            0 :     }
    1061              : 
    1062              :     /// Set the current lsn
    1063       145858 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
    1064       145858 :         ensure!(
    1065       145858 :             lsn >= self.lsn,
    1066            0 :             "setting an older lsn {} than {} is not allowed",
    1067              :             lsn,
    1068              :             self.lsn
    1069              :         );
    1070              : 
    1071              :         // If we are advancing LSN, then state from previous wal record should have been flushed.
    1072       145858 :         assert!(self.pending_zero_data_pages.is_empty());
    1073              : 
    1074       145858 :         if lsn > self.lsn {
    1075       145858 :             self.pending_lsns.push(self.lsn);
    1076       145858 :             self.lsn = lsn;
    1077       145858 :         }
    1078       145858 :         Ok(())
    1079       145858 :     }
    1080              : 
    1081              :     /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
    1082              :     /// keys that represent literal blocks that postgres can read.  So data includes relation blocks and
    1083              :     /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
    1084              :     ///
    1085              :     /// The distinction is important because data keys are handled on a fast path where dirty writes are
    1086              :     /// not readable until this modification is committed, whereas metadata keys are visible for read
    1087              :     /// via [`Self::get`] as soon as their record has been ingested.
    1088       996032 :     fn is_data_key(key: &Key) -> bool {
    1089       996032 :         key.is_rel_block_key() || key.is_slru_block_key()
    1090       996032 :     }
    1091              : 
    1092              :     /// Initialize a completely new repository.
    1093              :     ///
    1094              :     /// This inserts the directory metadata entries that are assumed to
    1095              :     /// always exist.
    1096          172 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1097          172 :         let buf = DbDirectory::ser(&DbDirectory {
    1098          172 :             dbdirs: HashMap::new(),
    1099          172 :         })?;
    1100          172 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
    1101          172 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1102              : 
    1103          172 :         let buf = if self.tline.pg_version >= 17 {
    1104            0 :             TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
    1105            0 :                 xids: HashSet::new(),
    1106            0 :             })
    1107              :         } else {
    1108          172 :             TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1109          172 :                 xids: HashSet::new(),
    1110          172 :             })
    1111            0 :         }?;
    1112          172 :         self.pending_directory_entries
    1113          172 :             .push((DirectoryKind::TwoPhase, 0));
    1114          172 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1115              : 
    1116          172 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1117          172 :         let empty_dir = Value::Image(buf);
    1118          172 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1119          172 :         self.pending_directory_entries
    1120          172 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1121          172 :         self.put(
    1122          172 :             slru_dir_to_key(SlruKind::MultiXactMembers),
    1123          172 :             empty_dir.clone(),
    1124          172 :         );
    1125          172 :         self.pending_directory_entries
    1126          172 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1127          172 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1128          172 :         self.pending_directory_entries
    1129          172 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
    1130          172 : 
    1131          172 :         Ok(())
    1132          172 :     }
    1133              : 
    1134              :     #[cfg(test)]
    1135          170 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1136          170 :         self.init_empty()?;
    1137          170 :         self.put_control_file(bytes::Bytes::from_static(
    1138          170 :             b"control_file contents do not matter",
    1139          170 :         ))
    1140          170 :         .context("put_control_file")?;
    1141          170 :         self.put_checkpoint(bytes::Bytes::from_static(
    1142          170 :             b"checkpoint_file contents do not matter",
    1143          170 :         ))
    1144          170 :         .context("put_checkpoint_file")?;
    1145          170 :         Ok(())
    1146          170 :     }
    1147              : 
    1148              :     /// Put a new page version that can be constructed from a WAL record
    1149              :     ///
    1150              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1151              :     /// current end-of-file. It's up to the caller to check that the relation size
    1152              :     /// matches the blocks inserted!
    1153       145630 :     pub fn put_rel_wal_record(
    1154       145630 :         &mut self,
    1155       145630 :         rel: RelTag,
    1156       145630 :         blknum: BlockNumber,
    1157       145630 :         rec: NeonWalRecord,
    1158       145630 :     ) -> anyhow::Result<()> {
    1159       145630 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1160       145630 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1161       145630 :         Ok(())
    1162       145630 :     }
    1163              : 
    1164              :     // Same, but for an SLRU.
    1165            8 :     pub fn put_slru_wal_record(
    1166            8 :         &mut self,
    1167            8 :         kind: SlruKind,
    1168            8 :         segno: u32,
    1169            8 :         blknum: BlockNumber,
    1170            8 :         rec: NeonWalRecord,
    1171            8 :     ) -> anyhow::Result<()> {
    1172            8 :         self.put(
    1173            8 :             slru_block_to_key(kind, segno, blknum),
    1174            8 :             Value::WalRecord(rec),
    1175            8 :         );
    1176            8 :         Ok(())
    1177            8 :     }
    1178              : 
    1179              :     /// Like put_wal_record, but with ready-made image of the page.
    1180       277866 :     pub fn put_rel_page_image(
    1181       277866 :         &mut self,
    1182       277866 :         rel: RelTag,
    1183       277866 :         blknum: BlockNumber,
    1184       277866 :         img: Bytes,
    1185       277866 :     ) -> anyhow::Result<()> {
    1186       277866 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1187       277866 :         let key = rel_block_to_key(rel, blknum);
    1188       277866 :         if !key.is_valid_key_on_write_path() {
    1189            0 :             anyhow::bail!(
    1190            0 :                 "the request contains data not supported by pageserver at {}",
    1191            0 :                 key
    1192            0 :             );
    1193       277866 :         }
    1194       277866 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1195       277866 :         Ok(())
    1196       277866 :     }
    1197              : 
    1198            6 :     pub fn put_slru_page_image(
    1199            6 :         &mut self,
    1200            6 :         kind: SlruKind,
    1201            6 :         segno: u32,
    1202            6 :         blknum: BlockNumber,
    1203            6 :         img: Bytes,
    1204            6 :     ) -> anyhow::Result<()> {
    1205            6 :         let key = slru_block_to_key(kind, segno, blknum);
    1206            6 :         if !key.is_valid_key_on_write_path() {
    1207            0 :             anyhow::bail!(
    1208            0 :                 "the request contains data not supported by pageserver at {}",
    1209            0 :                 key
    1210            0 :             );
    1211            6 :         }
    1212            6 :         self.put(key, Value::Image(img));
    1213            6 :         Ok(())
    1214            6 :     }
    1215              : 
    1216         2998 :     pub(crate) fn put_rel_page_image_zero(
    1217         2998 :         &mut self,
    1218         2998 :         rel: RelTag,
    1219         2998 :         blknum: BlockNumber,
    1220         2998 :     ) -> anyhow::Result<()> {
    1221         2998 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1222         2998 :         let key = rel_block_to_key(rel, blknum);
    1223         2998 :         if !key.is_valid_key_on_write_path() {
    1224            0 :             anyhow::bail!(
    1225            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1226            0 :                 key,
    1227            0 :                 self.lsn
    1228            0 :             );
    1229         2998 :         }
    1230         2998 :         self.pending_zero_data_pages.insert(key.to_compact());
    1231         2998 :         self.pending_bytes += ZERO_PAGE.len();
    1232         2998 :         Ok(())
    1233         2998 :     }
    1234              : 
    1235            0 :     pub(crate) fn put_slru_page_image_zero(
    1236            0 :         &mut self,
    1237            0 :         kind: SlruKind,
    1238            0 :         segno: u32,
    1239            0 :         blknum: BlockNumber,
    1240            0 :     ) -> anyhow::Result<()> {
    1241            0 :         let key = slru_block_to_key(kind, segno, blknum);
    1242            0 :         if !key.is_valid_key_on_write_path() {
    1243            0 :             anyhow::bail!(
    1244            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1245            0 :                 key,
    1246            0 :                 self.lsn
    1247            0 :             );
    1248            0 :         }
    1249            0 :         self.pending_zero_data_pages.insert(key.to_compact());
    1250            0 :         self.pending_bytes += ZERO_PAGE.len();
    1251            0 :         Ok(())
    1252            0 :     }
    1253              : 
    1254              :     /// Call this at the end of each WAL record.
    1255       145864 :     pub(crate) fn on_record_end(&mut self) {
    1256       145864 :         let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
    1257       148862 :         for key in pending_zero_data_pages {
    1258         2998 :             self.put_data(key, Value::Image(ZERO_PAGE.clone()));
    1259         2998 :         }
    1260       145864 :     }
    1261              : 
    1262              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1263           16 :     pub async fn put_relmap_file(
    1264           16 :         &mut self,
    1265           16 :         spcnode: Oid,
    1266           16 :         dbnode: Oid,
    1267           16 :         img: Bytes,
    1268           16 :         ctx: &RequestContext,
    1269           16 :     ) -> anyhow::Result<()> {
    1270              :         // Add it to the directory (if it doesn't exist already)
    1271           16 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1272           16 :         let mut dbdir = DbDirectory::des(&buf)?;
    1273              : 
    1274           16 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1275           16 :         if r.is_none() || r == Some(false) {
    1276              :             // The dbdir entry didn't exist, or it contained a
    1277              :             // 'false'. The 'insert' call already updated it with
    1278              :             // 'true', now write the updated 'dbdirs' map back.
    1279           16 :             let buf = DbDirectory::ser(&dbdir)?;
    1280           16 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1281            0 :         }
    1282           16 :         if r.is_none() {
    1283              :             // Create RelDirectory
    1284            8 :             let buf = RelDirectory::ser(&RelDirectory {
    1285            8 :                 rels: HashSet::new(),
    1286            8 :             })?;
    1287            8 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1288            8 :             self.put(
    1289            8 :                 rel_dir_to_key(spcnode, dbnode),
    1290            8 :                 Value::Image(Bytes::from(buf)),
    1291            8 :             );
    1292            8 :         }
    1293              : 
    1294           16 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1295           16 :         Ok(())
    1296           16 :     }
    1297              : 
    1298            0 :     pub async fn put_twophase_file(
    1299            0 :         &mut self,
    1300            0 :         xid: u64,
    1301            0 :         img: Bytes,
    1302            0 :         ctx: &RequestContext,
    1303            0 :     ) -> anyhow::Result<()> {
    1304              :         // Add it to the directory entry
    1305            0 :         let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1306            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1307            0 :             let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
    1308            0 :             if !dir.xids.insert(xid) {
    1309            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1310            0 :             }
    1311            0 :             self.pending_directory_entries
    1312            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1313            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1314              :         } else {
    1315            0 :             let xid = xid as u32;
    1316            0 :             let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
    1317            0 :             if !dir.xids.insert(xid) {
    1318            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1319            0 :             }
    1320            0 :             self.pending_directory_entries
    1321            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1322            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1323              :         };
    1324            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1325            0 : 
    1326            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1327            0 :         Ok(())
    1328            0 :     }
    1329              : 
    1330            0 :     pub async fn set_replorigin(
    1331            0 :         &mut self,
    1332            0 :         origin_id: RepOriginId,
    1333            0 :         origin_lsn: Lsn,
    1334            0 :     ) -> anyhow::Result<()> {
    1335            0 :         let key = repl_origin_key(origin_id);
    1336            0 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    1337            0 :         Ok(())
    1338            0 :     }
    1339              : 
    1340            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
    1341            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    1342            0 :     }
    1343              : 
    1344          172 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1345          172 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1346          172 :         Ok(())
    1347          172 :     }
    1348              : 
    1349          186 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1350          186 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1351          186 :         Ok(())
    1352          186 :     }
    1353              : 
    1354            0 :     pub async fn drop_dbdir(
    1355            0 :         &mut self,
    1356            0 :         spcnode: Oid,
    1357            0 :         dbnode: Oid,
    1358            0 :         ctx: &RequestContext,
    1359            0 :     ) -> anyhow::Result<()> {
    1360            0 :         let total_blocks = self
    1361            0 :             .tline
    1362            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1363            0 :             .await?;
    1364              : 
    1365              :         // Remove entry from dbdir
    1366            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1367            0 :         let mut dir = DbDirectory::des(&buf)?;
    1368            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1369            0 :             let buf = DbDirectory::ser(&dir)?;
    1370            0 :             self.pending_directory_entries
    1371            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1372            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1373              :         } else {
    1374            0 :             warn!(
    1375            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1376              :                 spcnode, dbnode
    1377              :             );
    1378              :         }
    1379              : 
    1380              :         // Update logical database size.
    1381            0 :         self.pending_nblocks -= total_blocks as i64;
    1382            0 : 
    1383            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1384            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1385            0 :         Ok(())
    1386            0 :     }
    1387              : 
    1388              :     /// Create a relation fork.
    1389              :     ///
    1390              :     /// 'nblocks' is the initial size.
    1391         1920 :     pub async fn put_rel_creation(
    1392         1920 :         &mut self,
    1393         1920 :         rel: RelTag,
    1394         1920 :         nblocks: BlockNumber,
    1395         1920 :         ctx: &RequestContext,
    1396         1920 :     ) -> Result<(), RelationError> {
    1397         1920 :         if rel.relnode == 0 {
    1398            0 :             return Err(RelationError::InvalidRelnode);
    1399         1920 :         }
    1400              :         // It's possible that this is the first rel for this db in this
    1401              :         // tablespace.  Create the reldir entry for it if so.
    1402         1920 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1403         1920 :             .context("deserialize db")?;
    1404         1920 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1405         1920 :         let mut rel_dir =
    1406         1920 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1407              :                 // Didn't exist. Update dbdir
    1408            8 :                 e.insert(false);
    1409            8 :                 let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1410            8 :                 self.pending_directory_entries
    1411            8 :                     .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1412            8 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1413            8 : 
    1414            8 :                 // and create the RelDirectory
    1415            8 :                 RelDirectory::default()
    1416              :             } else {
    1417              :                 // reldir already exists, fetch it
    1418         1912 :                 RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1419         1912 :                     .context("deserialize db")?
    1420              :             };
    1421              : 
    1422              :         // Add the new relation to the rel directory entry, and write it back
    1423         1920 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1424            0 :             return Err(RelationError::AlreadyExists);
    1425         1920 :         }
    1426         1920 : 
    1427         1920 :         self.pending_directory_entries
    1428         1920 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1429         1920 : 
    1430         1920 :         self.put(
    1431         1920 :             rel_dir_key,
    1432         1920 :             Value::Image(Bytes::from(
    1433         1920 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1434              :             )),
    1435              :         );
    1436              : 
    1437              :         // Put size
    1438         1920 :         let size_key = rel_size_to_key(rel);
    1439         1920 :         let buf = nblocks.to_le_bytes();
    1440         1920 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1441         1920 : 
    1442         1920 :         self.pending_nblocks += nblocks as i64;
    1443         1920 : 
    1444         1920 :         // Update relation size cache
    1445         1920 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1446         1920 : 
    1447         1920 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1448         1920 :         // caller.
    1449         1920 :         Ok(())
    1450         1920 :     }
    1451              : 
    1452              :     /// Truncate relation
    1453         6012 :     pub async fn put_rel_truncation(
    1454         6012 :         &mut self,
    1455         6012 :         rel: RelTag,
    1456         6012 :         nblocks: BlockNumber,
    1457         6012 :         ctx: &RequestContext,
    1458         6012 :     ) -> anyhow::Result<()> {
    1459         6012 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1460         6012 :         if self
    1461         6012 :             .tline
    1462         6012 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1463            0 :             .await?
    1464              :         {
    1465         6012 :             let size_key = rel_size_to_key(rel);
    1466              :             // Fetch the old size first
    1467         6012 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1468         6012 : 
    1469         6012 :             // Update the entry with the new size.
    1470         6012 :             let buf = nblocks.to_le_bytes();
    1471         6012 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1472         6012 : 
    1473         6012 :             // Update relation size cache
    1474         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1475         6012 : 
    1476         6012 :             // Update logical database size.
    1477         6012 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1478            0 :         }
    1479         6012 :         Ok(())
    1480         6012 :     }
    1481              : 
    1482              :     /// Extend relation
    1483              :     /// If new size is smaller, do nothing.
    1484       276680 :     pub async fn put_rel_extend(
    1485       276680 :         &mut self,
    1486       276680 :         rel: RelTag,
    1487       276680 :         nblocks: BlockNumber,
    1488       276680 :         ctx: &RequestContext,
    1489       276680 :     ) -> anyhow::Result<()> {
    1490       276680 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1491              : 
    1492              :         // Put size
    1493       276680 :         let size_key = rel_size_to_key(rel);
    1494       276680 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1495       276680 : 
    1496       276680 :         // only extend relation here. never decrease the size
    1497       276680 :         if nblocks > old_size {
    1498       274788 :             let buf = nblocks.to_le_bytes();
    1499       274788 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1500       274788 : 
    1501       274788 :             // Update relation size cache
    1502       274788 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1503       274788 : 
    1504       274788 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1505       274788 :         }
    1506       276680 :         Ok(())
    1507       276680 :     }
    1508              : 
    1509              :     /// Drop a relation.
    1510            2 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1511            2 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1512              : 
    1513              :         // Remove it from the directory entry
    1514            2 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1515            2 :         let buf = self.get(dir_key, ctx).await?;
    1516            2 :         let mut dir = RelDirectory::des(&buf)?;
    1517              : 
    1518            2 :         self.pending_directory_entries
    1519            2 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1520            2 : 
    1521            2 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1522            2 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1523              :         } else {
    1524            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1525              :         }
    1526              : 
    1527              :         // update logical size
    1528            2 :         let size_key = rel_size_to_key(rel);
    1529            2 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1530            2 :         self.pending_nblocks -= old_size as i64;
    1531            2 : 
    1532            2 :         // Remove enty from relation size cache
    1533            2 :         self.tline.remove_cached_rel_size(&rel);
    1534            2 : 
    1535            2 :         // Delete size entry, as well as all blocks
    1536            2 :         self.delete(rel_key_range(rel));
    1537            2 : 
    1538            2 :         Ok(())
    1539            2 :     }
    1540              : 
    1541            6 :     pub async fn put_slru_segment_creation(
    1542            6 :         &mut self,
    1543            6 :         kind: SlruKind,
    1544            6 :         segno: u32,
    1545            6 :         nblocks: BlockNumber,
    1546            6 :         ctx: &RequestContext,
    1547            6 :     ) -> anyhow::Result<()> {
    1548            6 :         // Add it to the directory entry
    1549            6 :         let dir_key = slru_dir_to_key(kind);
    1550            6 :         let buf = self.get(dir_key, ctx).await?;
    1551            6 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1552              : 
    1553            6 :         if !dir.segments.insert(segno) {
    1554            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1555            6 :         }
    1556            6 :         self.pending_directory_entries
    1557            6 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1558            6 :         self.put(
    1559            6 :             dir_key,
    1560            6 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1561              :         );
    1562              : 
    1563              :         // Put size
    1564            6 :         let size_key = slru_segment_size_to_key(kind, segno);
    1565            6 :         let buf = nblocks.to_le_bytes();
    1566            6 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1567            6 : 
    1568            6 :         // even if nblocks > 0, we don't insert any actual blocks here
    1569            6 : 
    1570            6 :         Ok(())
    1571            6 :     }
    1572              : 
    1573              :     /// Extend SLRU segment
    1574            0 :     pub fn put_slru_extend(
    1575            0 :         &mut self,
    1576            0 :         kind: SlruKind,
    1577            0 :         segno: u32,
    1578            0 :         nblocks: BlockNumber,
    1579            0 :     ) -> anyhow::Result<()> {
    1580            0 :         // Put size
    1581            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1582            0 :         let buf = nblocks.to_le_bytes();
    1583            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1584            0 :         Ok(())
    1585            0 :     }
    1586              : 
    1587              :     /// This method is used for marking truncated SLRU files
    1588            0 :     pub async fn drop_slru_segment(
    1589            0 :         &mut self,
    1590            0 :         kind: SlruKind,
    1591            0 :         segno: u32,
    1592            0 :         ctx: &RequestContext,
    1593            0 :     ) -> anyhow::Result<()> {
    1594            0 :         // Remove it from the directory entry
    1595            0 :         let dir_key = slru_dir_to_key(kind);
    1596            0 :         let buf = self.get(dir_key, ctx).await?;
    1597            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1598              : 
    1599            0 :         if !dir.segments.remove(&segno) {
    1600            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1601            0 :         }
    1602            0 :         self.pending_directory_entries
    1603            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1604            0 :         self.put(
    1605            0 :             dir_key,
    1606            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1607              :         );
    1608              : 
    1609              :         // Delete size entry, as well as all blocks
    1610            0 :         self.delete(slru_segment_key_range(kind, segno));
    1611            0 : 
    1612            0 :         Ok(())
    1613            0 :     }
    1614              : 
    1615              :     /// Drop a relmapper file (pg_filenode.map)
    1616            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1617            0 :         // TODO
    1618            0 :         Ok(())
    1619            0 :     }
    1620              : 
    1621              :     /// This method is used for marking truncated SLRU files
    1622            0 :     pub async fn drop_twophase_file(
    1623            0 :         &mut self,
    1624            0 :         xid: u64,
    1625            0 :         ctx: &RequestContext,
    1626            0 :     ) -> anyhow::Result<()> {
    1627              :         // Remove it from the directory entry
    1628            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1629            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1630            0 :             let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
    1631              : 
    1632            0 :             if !dir.xids.remove(&xid) {
    1633            0 :                 warn!("twophase file for xid {} does not exist", xid);
    1634            0 :             }
    1635            0 :             self.pending_directory_entries
    1636            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1637            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1638              :         } else {
    1639            0 :             let xid: u32 = u32::try_from(xid)?;
    1640            0 :             let mut dir = TwoPhaseDirectory::des(&buf)?;
    1641              : 
    1642            0 :             if !dir.xids.remove(&xid) {
    1643            0 :                 warn!("twophase file for xid {} does not exist", xid);
    1644            0 :             }
    1645            0 :             self.pending_directory_entries
    1646            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1647            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1648              :         };
    1649            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1650            0 : 
    1651            0 :         // Delete it
    1652            0 :         self.delete(twophase_key_range(xid));
    1653            0 : 
    1654            0 :         Ok(())
    1655            0 :     }
    1656              : 
    1657           16 :     pub async fn put_file(
    1658           16 :         &mut self,
    1659           16 :         path: &str,
    1660           16 :         content: &[u8],
    1661           16 :         ctx: &RequestContext,
    1662           16 :     ) -> anyhow::Result<()> {
    1663           16 :         let key = aux_file::encode_aux_file_key(path);
    1664              :         // retrieve the key from the engine
    1665           16 :         let old_val = match self.get(key, ctx).await {
    1666            4 :             Ok(val) => Some(val),
    1667           12 :             Err(PageReconstructError::MissingKey(_)) => None,
    1668            0 :             Err(e) => return Err(e.into()),
    1669              :         };
    1670           16 :         let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    1671            4 :             aux_file::decode_file_value(old_val)?
    1672              :         } else {
    1673           12 :             Vec::new()
    1674              :         };
    1675           16 :         let mut other_files = Vec::with_capacity(files.len());
    1676           16 :         let mut modifying_file = None;
    1677           20 :         for file @ (p, content) in files {
    1678            4 :             if path == p {
    1679            4 :                 assert!(
    1680            4 :                     modifying_file.is_none(),
    1681            0 :                     "duplicated entries found for {}",
    1682              :                     path
    1683              :                 );
    1684            4 :                 modifying_file = Some(content);
    1685            0 :             } else {
    1686            0 :                 other_files.push(file);
    1687            0 :             }
    1688              :         }
    1689           16 :         let mut new_files = other_files;
    1690           16 :         match (modifying_file, content.is_empty()) {
    1691            2 :             (Some(old_content), false) => {
    1692            2 :                 self.tline
    1693            2 :                     .aux_file_size_estimator
    1694            2 :                     .on_update(old_content.len(), content.len());
    1695            2 :                 new_files.push((path, content));
    1696            2 :             }
    1697            2 :             (Some(old_content), true) => {
    1698            2 :                 self.tline
    1699            2 :                     .aux_file_size_estimator
    1700            2 :                     .on_remove(old_content.len());
    1701            2 :                 // not adding the file key to the final `new_files` vec.
    1702            2 :             }
    1703           12 :             (None, false) => {
    1704           12 :                 self.tline.aux_file_size_estimator.on_add(content.len());
    1705           12 :                 new_files.push((path, content));
    1706           12 :             }
    1707            0 :             (None, true) => warn!("removing non-existing aux file: {}", path),
    1708              :         }
    1709           16 :         let new_val = aux_file::encode_file_value(&new_files)?;
    1710           16 :         self.put(key, Value::Image(new_val.into()));
    1711           16 : 
    1712           16 :         Ok(())
    1713           16 :     }
    1714              : 
    1715              :     ///
    1716              :     /// Flush changes accumulated so far to the underlying repository.
    1717              :     ///
    1718              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1719              :     /// you to flush them to the underlying repository before the final `commit`.
    1720              :     /// That allows to free up the memory used to hold the pending changes.
    1721              :     ///
    1722              :     /// Currently only used during bulk import of a data directory. In that
    1723              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1724              :     /// whole import fails and the timeline will be deleted anyway.
    1725              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1726              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1727              :     ///
    1728              :     /// Note: A consequence of flushing the pending operations is that they
    1729              :     /// won't be visible to subsequent operations until `commit`. The function
    1730              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1731              :     /// for bulk import, where you are just loading data pages and won't try to
    1732              :     /// modify the same pages twice.
    1733         1930 :     pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1734         1930 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1735         1930 :         // to scan through the pending_updates list.
    1736         1930 :         let pending_nblocks = self.pending_nblocks;
    1737         1930 :         if pending_nblocks < 10000 {
    1738         1930 :             return Ok(());
    1739            0 :         }
    1740              : 
    1741            0 :         let mut writer = self.tline.writer().await;
    1742              : 
    1743              :         // Flush relation and  SLRU data blocks, keep metadata.
    1744            0 :         let pending_data_pages = std::mem::take(&mut self.pending_data_pages);
    1745            0 : 
    1746            0 :         // This bails out on first error without modifying pending_updates.
    1747            0 :         // That's Ok, cf this function's doc comment.
    1748            0 :         writer.put_batch(pending_data_pages, ctx).await?;
    1749            0 :         self.pending_bytes = 0;
    1750            0 : 
    1751            0 :         if pending_nblocks != 0 {
    1752            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1753            0 :             self.pending_nblocks = 0;
    1754            0 :         }
    1755              : 
    1756            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1757            0 :             writer.update_directory_entries_count(kind, count as u64);
    1758            0 :         }
    1759              : 
    1760            0 :         Ok(())
    1761         1930 :     }
    1762              : 
    1763              :     ///
    1764              :     /// Finish this atomic update, writing all the updated keys to the
    1765              :     /// underlying timeline.
    1766              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1767              :     ///
    1768       743058 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1769       743058 :         // Commit should never be called mid-wal-record
    1770       743058 :         assert!(self.pending_zero_data_pages.is_empty());
    1771              : 
    1772       743058 :         let mut writer = self.tline.writer().await;
    1773              : 
    1774       743058 :         let pending_nblocks = self.pending_nblocks;
    1775       743058 :         self.pending_nblocks = 0;
    1776       743058 : 
    1777       743058 :         // Ordering: the items in this batch do not need to be in any global order, but values for
    1778       743058 :         // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    1779       743058 :         // this to do efficient updates to its index.
    1780       743058 :         let mut write_batch = std::mem::take(&mut self.pending_data_pages);
    1781       743058 : 
    1782       743058 :         write_batch.extend(
    1783       743058 :             self.pending_metadata_pages
    1784       743058 :                 .drain()
    1785       743058 :                 .flat_map(|(key, values)| {
    1786       273784 :                     values
    1787       273784 :                         .into_iter()
    1788       273784 :                         .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
    1789       743058 :                 }),
    1790       743058 :         );
    1791       743058 : 
    1792       743058 :         if !write_batch.is_empty() {
    1793       414042 :             writer.put_batch(write_batch, ctx).await?;
    1794       329016 :         }
    1795              : 
    1796       743058 :         if !self.pending_deletions.is_empty() {
    1797            2 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    1798            2 :             self.pending_deletions.clear();
    1799       743056 :         }
    1800              : 
    1801       743058 :         self.pending_lsns.push(self.lsn);
    1802       888916 :         for pending_lsn in self.pending_lsns.drain(..) {
    1803       888916 :             // Ideally, we should be able to call writer.finish_write() only once
    1804       888916 :             // with the highest LSN. However, the last_record_lsn variable in the
    1805       888916 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1806       888916 :             // so we need to record every LSN to not leave a gap between them.
    1807       888916 :             writer.finish_write(pending_lsn);
    1808       888916 :         }
    1809              : 
    1810       743058 :         if pending_nblocks != 0 {
    1811       270570 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1812       472488 :         }
    1813              : 
    1814       743058 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1815         2804 :             writer.update_directory_entries_count(kind, count as u64);
    1816         2804 :         }
    1817              : 
    1818       743058 :         self.pending_bytes = 0;
    1819       743058 : 
    1820       743058 :         Ok(())
    1821       743058 :     }
    1822              : 
    1823       291704 :     pub(crate) fn len(&self) -> usize {
    1824       291704 :         self.pending_metadata_pages.len()
    1825       291704 :             + self.pending_data_pages.len()
    1826       291704 :             + self.pending_deletions.len()
    1827       291704 :     }
    1828              : 
    1829              :     /// Read a page from the Timeline we are writing to.  For metadata pages, this passes through
    1830              :     /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
    1831              :     /// in the modification.
    1832              :     ///
    1833              :     /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
    1834              :     /// page must ensure that the pages they read are already committed in Timeline, for example
    1835              :     /// DB create operations are always preceded by a call to commit().  This is special cased because
    1836              :     /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
    1837              :     /// and not data pages.
    1838       286586 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1839       286586 :         if !Self::is_data_key(&key) {
    1840              :             // Have we already updated the same key? Read the latest pending updated
    1841              :             // version in that case.
    1842              :             //
    1843              :             // Note: we don't check pending_deletions. It is an error to request a
    1844              :             // value that has been removed, deletion only avoids leaking storage.
    1845       286586 :             if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
    1846        15928 :                 if let Some((_, _, value)) = values.last() {
    1847        15928 :                     return if let Value::Image(img) = value {
    1848        15928 :                         Ok(img.clone())
    1849              :                     } else {
    1850              :                         // Currently, we never need to read back a WAL record that we
    1851              :                         // inserted in the same "transaction". All the metadata updates
    1852              :                         // work directly with Images, and we never need to read actual
    1853              :                         // data pages. We could handle this if we had to, by calling
    1854              :                         // the walredo manager, but let's keep it simple for now.
    1855            0 :                         Err(PageReconstructError::Other(anyhow::anyhow!(
    1856            0 :                             "unexpected pending WAL record"
    1857            0 :                         )))
    1858              :                     };
    1859            0 :                 }
    1860       270658 :             }
    1861              :         } else {
    1862              :             // This is an expensive check, so we only do it in debug mode. If reading a data key,
    1863              :             // this key should never be present in pending_data_pages. We ensure this by committing
    1864              :             // modifications before ingesting DB create operations, which are the only kind that reads
    1865              :             // data pages during ingest.
    1866            0 :             if cfg!(debug_assertions) {
    1867            0 :                 for (dirty_key, _, _, _) in &self.pending_data_pages {
    1868            0 :                     debug_assert!(&key.to_compact() != dirty_key);
    1869              :                 }
    1870              : 
    1871            0 :                 debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
    1872            0 :             }
    1873              :         }
    1874              : 
    1875              :         // Metadata page cache miss, or we're reading a data page.
    1876       270658 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1877       270658 :         self.tline.get(key, lsn, ctx).await
    1878       286586 :     }
    1879              : 
    1880       709446 :     fn put(&mut self, key: Key, val: Value) {
    1881       709446 :         if Self::is_data_key(&key) {
    1882       423510 :             self.put_data(key.to_compact(), val)
    1883              :         } else {
    1884       285936 :             self.put_metadata(key.to_compact(), val)
    1885              :         }
    1886       709446 :     }
    1887              : 
    1888       426508 :     fn put_data(&mut self, key: CompactKey, val: Value) {
    1889       426508 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    1890       426508 : 
    1891       426508 :         // If this page was previously zero'd in the same WalRecord, then drop the previous zero page write.  This
    1892       426508 :         // is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
    1893       426508 :         // and the subsequent postgres-originating write
    1894       426508 :         if self.pending_zero_data_pages.remove(&key) {
    1895            0 :             self.pending_bytes -= ZERO_PAGE.len();
    1896       426508 :         }
    1897              : 
    1898       426508 :         self.pending_bytes += val_serialized_size;
    1899       426508 :         self.pending_data_pages
    1900       426508 :             .push((key, self.lsn, val_serialized_size, val))
    1901       426508 :     }
    1902              : 
    1903       285936 :     fn put_metadata(&mut self, key: CompactKey, val: Value) {
    1904       285936 :         let values = self.pending_metadata_pages.entry(key).or_default();
    1905              :         // Replace the previous value if it exists at the same lsn
    1906       285936 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    1907        12152 :             if *last_lsn == self.lsn {
    1908              :                 // Update the pending_bytes contribution from this entry, and update the serialized size in place
    1909        12152 :                 self.pending_bytes -= *last_value_ser_size;
    1910        12152 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    1911        12152 :                 self.pending_bytes += *last_value_ser_size;
    1912        12152 : 
    1913        12152 :                 // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
    1914        12152 :                 // have been generated by synthesized zero page writes prior to the first real write to a page.
    1915        12152 :                 *last_value = val;
    1916        12152 :                 return;
    1917            0 :             }
    1918       273784 :         }
    1919              : 
    1920       273784 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    1921       273784 :         self.pending_bytes += val_serialized_size;
    1922       273784 :         values.push((self.lsn, val_serialized_size, val));
    1923       285936 :     }
    1924              : 
    1925            2 :     fn delete(&mut self, key_range: Range<Key>) {
    1926            2 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1927            2 :         self.pending_deletions.push((key_range, self.lsn));
    1928            2 :     }
    1929              : }
    1930              : 
    1931              : /// This struct facilitates accessing either a committed key from the timeline at a
    1932              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1933              : ///
    1934              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1935              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1936              : /// need to look up the keys in the modification first before looking them up in the
    1937              : /// timeline to not miss the latest updates.
    1938              : #[derive(Clone, Copy)]
    1939              : pub enum Version<'a> {
    1940              :     Lsn(Lsn),
    1941              :     Modified(&'a DatadirModification<'a>),
    1942              : }
    1943              : 
    1944              : impl<'a> Version<'a> {
    1945        23560 :     async fn get(
    1946        23560 :         &self,
    1947        23560 :         timeline: &Timeline,
    1948        23560 :         key: Key,
    1949        23560 :         ctx: &RequestContext,
    1950        23560 :     ) -> Result<Bytes, PageReconstructError> {
    1951        23560 :         match self {
    1952        23540 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1953           20 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1954              :         }
    1955        23560 :     }
    1956              : 
    1957        35620 :     fn get_lsn(&self) -> Lsn {
    1958        35620 :         match self {
    1959        29574 :             Version::Lsn(lsn) => *lsn,
    1960         6046 :             Version::Modified(modification) => modification.lsn,
    1961              :         }
    1962        35620 :     }
    1963              : }
    1964              : 
    1965              : //--- Metadata structs stored in key-value pairs in the repository.
    1966              : 
    1967         2240 : #[derive(Debug, Serialize, Deserialize)]
    1968              : struct DbDirectory {
    1969              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1970              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1971              : }
    1972              : 
    1973              : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
    1974              : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs.  Previously, the files
    1975              : // were named like "pg_twophase/000002E5", now they're like
    1976              : // "pg_twophsae/0000000A000002E4".
    1977              : 
    1978          288 : #[derive(Debug, Serialize, Deserialize)]
    1979              : struct TwoPhaseDirectory {
    1980              :     xids: HashSet<TransactionId>,
    1981              : }
    1982              : 
    1983            0 : #[derive(Debug, Serialize, Deserialize)]
    1984              : struct TwoPhaseDirectoryV17 {
    1985              :     xids: HashSet<u64>,
    1986              : }
    1987              : 
    1988         1932 : #[derive(Debug, Serialize, Deserialize, Default)]
    1989              : struct RelDirectory {
    1990              :     // Set of relations that exist. (relfilenode, forknum)
    1991              :     //
    1992              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1993              :     // key-value pairs, if you have a lot of relations
    1994              :     rels: HashSet<(Oid, u8)>,
    1995              : }
    1996              : 
    1997            0 : #[derive(Debug, Serialize, Deserialize)]
    1998              : struct RelSizeEntry {
    1999              :     nblocks: u32,
    2000              : }
    2001              : 
    2002          864 : #[derive(Debug, Serialize, Deserialize, Default)]
    2003              : struct SlruSegmentDirectory {
    2004              :     // Set of SLRU segments that exist.
    2005              :     segments: HashSet<u32>,
    2006              : }
    2007              : 
    2008              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2009              : #[repr(u8)]
    2010              : pub(crate) enum DirectoryKind {
    2011              :     Db,
    2012              :     TwoPhase,
    2013              :     Rel,
    2014              :     AuxFiles,
    2015              :     SlruSegment(SlruKind),
    2016              : }
    2017              : 
    2018              : impl DirectoryKind {
    2019              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2020         5608 :     pub(crate) fn offset(&self) -> usize {
    2021         5608 :         self.into_usize()
    2022         5608 :     }
    2023              : }
    2024              : 
    2025              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2026              : 
    2027              : #[allow(clippy::bool_assert_comparison)]
    2028              : #[cfg(test)]
    2029              : mod tests {
    2030              :     use hex_literal::hex;
    2031              :     use utils::id::TimelineId;
    2032              : 
    2033              :     use super::*;
    2034              : 
    2035              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    2036              : 
    2037              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2038              :     #[tokio::test]
    2039            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2040            2 :         let name = "aux_files_round_trip";
    2041            2 :         let harness = TenantHarness::create(name).await?;
    2042            2 : 
    2043            2 :         pub const TIMELINE_ID: TimelineId =
    2044            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2045            2 : 
    2046            8 :         let (tenant, ctx) = harness.load().await;
    2047            2 :         let tline = tenant
    2048            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2049            2 :             .await?;
    2050            2 :         let tline = tline.raw_timeline().unwrap();
    2051            2 : 
    2052            2 :         // First modification: insert two keys
    2053            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2054            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2055            2 :         modification.set_lsn(Lsn(0x1008))?;
    2056            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2057            2 :         modification.commit(&ctx).await?;
    2058            2 :         let expect_1008 = HashMap::from([
    2059            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2060            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2061            2 :         ]);
    2062            2 : 
    2063            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2064            2 :         assert_eq!(readback, expect_1008);
    2065            2 : 
    2066            2 :         // Second modification: update one key, remove the other
    2067            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2068            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2069            2 :         modification.set_lsn(Lsn(0x2008))?;
    2070            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2071            2 :         modification.commit(&ctx).await?;
    2072            2 :         let expect_2008 =
    2073            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2074            2 : 
    2075            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    2076            2 :         assert_eq!(readback, expect_2008);
    2077            2 : 
    2078            2 :         // Reading back in time works
    2079            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2080            2 :         assert_eq!(readback, expect_1008);
    2081            2 : 
    2082            2 :         Ok(())
    2083            2 :     }
    2084              : 
    2085              :     /*
    2086              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    2087              :             let incremental = timeline.get_current_logical_size();
    2088              :             let non_incremental = timeline
    2089              :                 .get_current_logical_size_non_incremental(lsn)
    2090              :                 .unwrap();
    2091              :             assert_eq!(incremental, non_incremental);
    2092              :         }
    2093              :     */
    2094              : 
    2095              :     /*
    2096              :     ///
    2097              :     /// Test list_rels() function, with branches and dropped relations
    2098              :     ///
    2099              :     #[test]
    2100              :     fn test_list_rels_drop() -> Result<()> {
    2101              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    2102              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    2103              :         const TESTDB: u32 = 111;
    2104              : 
    2105              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    2106              :         // after branching fails below
    2107              :         let mut writer = tline.begin_record(Lsn(0x10));
    2108              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    2109              :         writer.finish()?;
    2110              : 
    2111              :         // Create a relation on the timeline
    2112              :         let mut writer = tline.begin_record(Lsn(0x20));
    2113              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    2114              :         writer.finish()?;
    2115              : 
    2116              :         let writer = tline.begin_record(Lsn(0x00));
    2117              :         writer.finish()?;
    2118              : 
    2119              :         // Check that list_rels() lists it after LSN 2, but no before it
    2120              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    2121              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    2122              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    2123              : 
    2124              :         // Create a branch, check that the relation is visible there
    2125              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    2126              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    2127              :             Some(timeline) => timeline,
    2128              :             None => panic!("Should have a local timeline"),
    2129              :         };
    2130              :         let newtline = DatadirTimelineImpl::new(newtline);
    2131              :         assert!(newtline
    2132              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2133              :             .contains(&TESTREL_A));
    2134              : 
    2135              :         // Drop it on the branch
    2136              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    2137              :         new_writer.drop_relation(TESTREL_A)?;
    2138              :         new_writer.finish()?;
    2139              : 
    2140              :         // Check that it's no longer listed on the branch after the point where it was dropped
    2141              :         assert!(newtline
    2142              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2143              :             .contains(&TESTREL_A));
    2144              :         assert!(!newtline
    2145              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2146              :             .contains(&TESTREL_A));
    2147              : 
    2148              :         // Run checkpoint and garbage collection and check that it's still not visible
    2149              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    2150              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    2151              : 
    2152              :         assert!(!newtline
    2153              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2154              :             .contains(&TESTREL_A));
    2155              : 
    2156              :         Ok(())
    2157              :     }
    2158              :      */
    2159              : 
    2160              :     /*
    2161              :     #[test]
    2162              :     fn test_read_beyond_eof() -> Result<()> {
    2163              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    2164              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    2165              : 
    2166              :         make_some_layers(&tline, Lsn(0x20))?;
    2167              :         let mut writer = tline.begin_record(Lsn(0x60));
    2168              :         walingest.put_rel_page_image(
    2169              :             &mut writer,
    2170              :             TESTREL_A,
    2171              :             0,
    2172              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    2173              :         )?;
    2174              :         writer.finish()?;
    2175              : 
    2176              :         // Test read before rel creation. Should error out.
    2177              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2178              : 
    2179              :         // Read block beyond end of relation at different points in time.
    2180              :         // These reads should fall into different delta, image, and in-memory layers.
    2181              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2182              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2183              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2184              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2185              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2186              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2187              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2188              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2189              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2190              : 
    2191              :         // Test on an in-memory layer with no preceding layer
    2192              :         let mut writer = tline.begin_record(Lsn(0x70));
    2193              :         walingest.put_rel_page_image(
    2194              :             &mut writer,
    2195              :             TESTREL_B,
    2196              :             0,
    2197              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2198              :         )?;
    2199              :         writer.finish()?;
    2200              : 
    2201              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2202              : 
    2203              :         Ok(())
    2204              :     }
    2205              :      */
    2206              : }
        

Generated by: LCOV version 2.1-beta