LCOV - differential code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit LBC UBC GBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 91.8 % 1197 1099 2 96 1 1098
Current Date: 2024-01-09 02:06:09 Functions: 63.1 % 222 140 1 81 1 139
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //! This provides an abstraction to store PostgreSQL relations and other files
       3                 : //! in the key-value store that implements the Repository interface.
       4                 : //!
       5                 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6                 : //! walingest.rs handles a few things like implicit relation creation and extension.
       7                 : //! Clarify that)
       8                 : //!
       9                 : use super::tenant::{PageReconstructError, Timeline};
      10                 : use crate::context::RequestContext;
      11                 : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12                 : use crate::repository::*;
      13                 : use crate::walrecord::NeonWalRecord;
      14                 : use anyhow::{ensure, Context};
      15                 : use bytes::{Buf, Bytes};
      16                 : use pageserver_api::key::is_rel_block_key;
      17                 : use pageserver_api::reltag::{RelTag, SlruKind};
      18                 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      19                 : use postgres_ffi::BLCKSZ;
      20                 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      21                 : use serde::{Deserialize, Serialize};
      22                 : use std::collections::{hash_map, HashMap, HashSet};
      23                 : use std::ops::ControlFlow;
      24                 : use std::ops::Range;
      25                 : use tokio_util::sync::CancellationToken;
      26                 : use tracing::{debug, trace, warn};
      27                 : use utils::bin_ser::DeserializeError;
      28                 : use utils::{bin_ser::BeSer, lsn::Lsn};
      29                 : 
      30                 : /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
      31                 : pub type BlockNumber = u32;
      32                 : 
      33 UBC           0 : #[derive(Debug)]
      34                 : pub enum LsnForTimestamp {
      35                 :     /// Found commits both before and after the given timestamp
      36                 :     Present(Lsn),
      37                 : 
      38                 :     /// Found no commits after the given timestamp, this means
      39                 :     /// that the newest data in the branch is older than the given
      40                 :     /// timestamp.
      41                 :     ///
      42                 :     /// All commits <= LSN happened before the given timestamp
      43                 :     Future(Lsn),
      44                 : 
      45                 :     /// The queried timestamp is past our horizon we look back at (PITR)
      46                 :     ///
      47                 :     /// All commits > LSN happened after the given timestamp,
      48                 :     /// but any commits < LSN might have happened before or after
      49                 :     /// the given timestamp. We don't know because no data before
      50                 :     /// the given lsn is available.
      51                 :     Past(Lsn),
      52                 : 
      53                 :     /// We have found no commit with a timestamp,
      54                 :     /// so we can't return anything meaningful.
      55                 :     ///
      56                 :     /// The associated LSN is the lower bound value we can safely
      57                 :     /// create branches on, but no statement is made if it is
      58                 :     /// older or newer than the timestamp.
      59                 :     ///
      60                 :     /// This variant can e.g. be returned right after a
      61                 :     /// cluster import.
      62                 :     NoData(Lsn),
      63                 : }
      64                 : 
      65               0 : #[derive(Debug, thiserror::Error)]
      66                 : pub enum CalculateLogicalSizeError {
      67                 :     #[error("cancelled")]
      68                 :     Cancelled,
      69                 :     #[error(transparent)]
      70                 :     Other(#[from] anyhow::Error),
      71                 : }
      72                 : 
      73 LBC         (4) : #[derive(Debug, thiserror::Error)]
      74                 : pub(crate) enum CollectKeySpaceError {
      75                 :     #[error(transparent)]
      76                 :     Decode(#[from] DeserializeError),
      77                 :     #[error(transparent)]
      78                 :     PageRead(PageReconstructError),
      79                 :     #[error("cancelled")]
      80                 :     Cancelled,
      81                 : }
      82                 : 
      83                 : impl From<PageReconstructError> for CollectKeySpaceError {
      84 CBC           1 :     fn from(err: PageReconstructError) -> Self {
      85               1 :         match err {
      86               1 :             PageReconstructError::Cancelled => Self::Cancelled,
      87 LBC         (2) :             err => Self::PageRead(err),
      88                 :         }
      89 CBC           1 :     }
      90                 : }
      91                 : 
      92                 : impl From<PageReconstructError> for CalculateLogicalSizeError {
      93              18 :     fn from(pre: PageReconstructError) -> Self {
      94              18 :         match pre {
      95                 :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
      96              16 :                 Self::Cancelled
      97                 :             }
      98               2 :             _ => Self::Other(pre.into()),
      99                 :         }
     100              18 :     }
     101                 : }
     102                 : 
     103 UBC           0 : #[derive(Debug, thiserror::Error)]
     104                 : pub enum RelationError {
     105                 :     #[error("Relation Already Exists")]
     106                 :     AlreadyExists,
     107                 :     #[error("invalid relnode")]
     108                 :     InvalidRelnode,
     109                 :     #[error(transparent)]
     110                 :     Other(#[from] anyhow::Error),
     111                 : }
     112                 : 
     113                 : ///
     114                 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     115                 : /// and other special kinds of files, in a versioned key-value store. The
     116                 : /// Timeline struct provides the key-value store.
     117                 : ///
     118                 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     119                 : /// implementation, and might be moved into a separate struct later.
     120                 : impl Timeline {
     121                 :     /// Start ingesting a WAL record, or other atomic modification of
     122                 :     /// the timeline.
     123                 :     ///
     124                 :     /// This provides a transaction-like interface to perform a bunch
     125                 :     /// of modifications atomically.
     126                 :     ///
     127                 :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     128                 :     /// DatadirModification object. Use the functions in the object to
     129                 :     /// modify the repository state, updating all the pages and metadata
     130                 :     /// that the WAL record affects. When you're done, call commit() to
     131                 :     /// commit the changes.
     132                 :     ///
     133                 :     /// Lsn stored in modification is advanced by `ingest_record` and
     134                 :     /// is used by `commit()` to update `last_record_lsn`.
     135                 :     ///
     136                 :     /// Calling commit() will flush all the changes and reset the state,
     137                 :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     138                 :     ///
     139                 :     /// Note that any pending modifications you make through the
     140                 :     /// modification object won't be visible to calls to the 'get' and list
     141                 :     /// functions of the timeline until you finish! And if you update the
     142                 :     /// same page twice, the last update wins.
     143                 :     ///
     144 CBC      697658 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     145          697658 :     where
     146          697658 :         Self: Sized,
     147          697658 :     {
     148          697658 :         DatadirModification {
     149          697658 :             tline: self,
     150          697658 :             pending_lsns: Vec::new(),
     151          697658 :             pending_updates: HashMap::new(),
     152          697658 :             pending_deletions: Vec::new(),
     153          697658 :             pending_nblocks: 0,
     154          697658 :             lsn,
     155          697658 :         }
     156          697658 :     }
     157                 : 
     158                 :     //------------------------------------------------------------------------------
     159                 :     // Public GET functions
     160                 :     //------------------------------------------------------------------------------
     161                 : 
     162                 :     /// Look up given page version.
     163         3648033 :     pub(crate) async fn get_rel_page_at_lsn(
     164         3648033 :         &self,
     165         3648033 :         tag: RelTag,
     166         3648033 :         blknum: BlockNumber,
     167         3648033 :         version: Version<'_>,
     168         3648033 :         latest: bool,
     169         3648033 :         ctx: &RequestContext,
     170         3648033 :     ) -> Result<Bytes, PageReconstructError> {
     171         3648033 :         if tag.relnode == 0 {
     172 UBC           0 :             return Err(PageReconstructError::Other(
     173               0 :                 RelationError::InvalidRelnode.into(),
     174               0 :             ));
     175 CBC     3648033 :         }
     176                 : 
     177         3648033 :         let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
     178         3648033 :         if blknum >= nblocks {
     179 UBC           0 :             debug!(
     180               0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     181               0 :                 tag,
     182               0 :                 blknum,
     183               0 :                 version.get_lsn(),
     184               0 :                 nblocks
     185               0 :             );
     186 CBC      118268 :             return Ok(ZERO_PAGE.clone());
     187         3529765 :         }
     188         3529765 : 
     189         3529765 :         let key = rel_block_to_key(tag, blknum);
     190         3529765 :         version.get(self, key, ctx).await
     191         3648007 :     }
     192                 : 
     193                 :     // Get size of a database in blocks
     194               8 :     pub(crate) async fn get_db_size(
     195               8 :         &self,
     196               8 :         spcnode: Oid,
     197               8 :         dbnode: Oid,
     198               8 :         version: Version<'_>,
     199               8 :         latest: bool,
     200               8 :         ctx: &RequestContext,
     201               8 :     ) -> Result<usize, PageReconstructError> {
     202               8 :         let mut total_blocks = 0;
     203                 : 
     204               8 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     205                 : 
     206            2351 :         for rel in rels {
     207            2343 :             let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
     208            2343 :             total_blocks += n_blocks as usize;
     209                 :         }
     210               8 :         Ok(total_blocks)
     211               8 :     }
     212                 : 
     213                 :     /// Get size of a relation file
     214         3777105 :     pub(crate) async fn get_rel_size(
     215         3777105 :         &self,
     216         3777105 :         tag: RelTag,
     217         3777105 :         version: Version<'_>,
     218         3777105 :         latest: bool,
     219         3777105 :         ctx: &RequestContext,
     220         3777105 :     ) -> Result<BlockNumber, PageReconstructError> {
     221         3777105 :         if tag.relnode == 0 {
     222 UBC           0 :             return Err(PageReconstructError::Other(
     223               0 :                 RelationError::InvalidRelnode.into(),
     224               0 :             ));
     225 CBC     3777105 :         }
     226                 : 
     227         3777105 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     228         3476775 :             return Ok(nblocks);
     229          300330 :         }
     230          300330 : 
     231          300330 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     232            7323 :             && !self.get_rel_exists(tag, version, latest, ctx).await?
     233                 :         {
     234                 :             // FIXME: Postgres sometimes calls smgrcreate() to create
     235                 :             // FSM, and smgrnblocks() on it immediately afterwards,
     236                 :             // without extending it.  Tolerate that by claiming that
     237                 :             // any non-existent FSM fork has size 0.
     238              11 :             return Ok(0);
     239          300319 :         }
     240          300319 : 
     241          300319 :         let key = rel_size_to_key(tag);
     242          300319 :         let mut buf = version.get(self, key, ctx).await?;
     243          300317 :         let nblocks = buf.get_u32_le();
     244          300317 : 
     245          300317 :         if latest {
     246          151988 :             // Update relation size cache only if "latest" flag is set.
     247          151988 :             // This flag is set by compute when it is working with most recent version of relation.
     248          151988 :             // Typically master compute node always set latest=true.
     249          151988 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     250          151988 :             // latest=true, then it can not cause cache corruption, because with latest=true
     251          151988 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     252          151988 :             // associated with most recent value of LSN.
     253          151988 :             self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     254          151988 :         }
     255          300317 :         Ok(nblocks)
     256         3777105 :     }
     257                 : 
     258                 :     /// Does relation exist?
     259          212940 :     pub(crate) async fn get_rel_exists(
     260          212940 :         &self,
     261          212940 :         tag: RelTag,
     262          212940 :         version: Version<'_>,
     263          212940 :         _latest: bool,
     264          212940 :         ctx: &RequestContext,
     265          212940 :     ) -> Result<bool, PageReconstructError> {
     266          212940 :         if tag.relnode == 0 {
     267 UBC           0 :             return Err(PageReconstructError::Other(
     268               0 :                 RelationError::InvalidRelnode.into(),
     269               0 :             ));
     270 CBC      212940 :         }
     271                 : 
     272                 :         // first try to lookup relation in cache
     273          212940 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     274          113638 :             return Ok(true);
     275           99302 :         }
     276           99302 :         // fetch directory listing
     277           99302 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     278           99302 :         let buf = version.get(self, key, ctx).await?;
     279                 : 
     280           99302 :         match RelDirectory::des(&buf).context("deserialization failure") {
     281           99302 :             Ok(dir) => {
     282           99302 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     283           99302 :                 Ok(exists)
     284                 :             }
     285 UBC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     286                 :         }
     287 CBC      212940 :     }
     288                 : 
     289                 :     /// Get a list of all existing relations in given tablespace and database.
     290                 :     ///
     291                 :     /// # Cancel-Safety
     292                 :     ///
     293                 :     /// This method is cancellation-safe.
     294            7375 :     pub(crate) async fn list_rels(
     295            7375 :         &self,
     296            7375 :         spcnode: Oid,
     297            7375 :         dbnode: Oid,
     298            7375 :         version: Version<'_>,
     299            7375 :         ctx: &RequestContext,
     300            7375 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     301            7375 :         // fetch directory listing
     302            7375 :         let key = rel_dir_to_key(spcnode, dbnode);
     303            7375 :         let buf = version.get(self, key, ctx).await?;
     304                 : 
     305            7375 :         match RelDirectory::des(&buf).context("deserialization failure") {
     306            7375 :             Ok(dir) => {
     307            7375 :                 let rels: HashSet<RelTag> =
     308         1735987 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     309         1735987 :                         spcnode,
     310         1735987 :                         dbnode,
     311         1735987 :                         relnode: *relnode,
     312         1735987 :                         forknum: *forknum,
     313         1735987 :                     }));
     314            7375 : 
     315            7375 :                 Ok(rels)
     316                 :             }
     317 UBC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     318                 :         }
     319 CBC        7375 :     }
     320                 : 
     321                 :     /// Look up given SLRU page version.
     322            5607 :     pub(crate) async fn get_slru_page_at_lsn(
     323            5607 :         &self,
     324            5607 :         kind: SlruKind,
     325            5607 :         segno: u32,
     326            5607 :         blknum: BlockNumber,
     327            5607 :         lsn: Lsn,
     328            5607 :         ctx: &RequestContext,
     329            5607 :     ) -> Result<Bytes, PageReconstructError> {
     330            5607 :         let key = slru_block_to_key(kind, segno, blknum);
     331          255502 :         self.get(key, lsn, ctx).await
     332            5606 :     }
     333                 : 
     334                 :     /// Get size of an SLRU segment
     335            5622 :     pub(crate) async fn get_slru_segment_size(
     336            5622 :         &self,
     337            5622 :         kind: SlruKind,
     338            5622 :         segno: u32,
     339            5622 :         version: Version<'_>,
     340            5622 :         ctx: &RequestContext,
     341            5622 :     ) -> Result<BlockNumber, PageReconstructError> {
     342            5622 :         let key = slru_segment_size_to_key(kind, segno);
     343            5622 :         let mut buf = version.get(self, key, ctx).await?;
     344            5622 :         Ok(buf.get_u32_le())
     345            5622 :     }
     346                 : 
     347                 :     /// Get size of an SLRU segment
     348             664 :     pub(crate) async fn get_slru_segment_exists(
     349             664 :         &self,
     350             664 :         kind: SlruKind,
     351             664 :         segno: u32,
     352             664 :         version: Version<'_>,
     353             664 :         ctx: &RequestContext,
     354             664 :     ) -> Result<bool, PageReconstructError> {
     355             664 :         // fetch directory listing
     356             664 :         let key = slru_dir_to_key(kind);
     357             664 :         let buf = version.get(self, key, ctx).await?;
     358                 : 
     359             664 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     360             664 :             Ok(dir) => {
     361             664 :                 let exists = dir.segments.get(&segno).is_some();
     362             664 :                 Ok(exists)
     363                 :             }
     364 UBC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     365                 :         }
     366 CBC         664 :     }
     367                 : 
     368                 :     /// Locate LSN, such that all transactions that committed before
     369                 :     /// 'search_timestamp' are visible, but nothing newer is.
     370                 :     ///
     371                 :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     372                 :     /// so it's not well defined which LSN you get if there were multiple commits
     373                 :     /// "in flight" at that point in time.
     374                 :     ///
     375             201 :     pub(crate) async fn find_lsn_for_timestamp(
     376             201 :         &self,
     377             201 :         search_timestamp: TimestampTz,
     378             201 :         cancel: &CancellationToken,
     379             201 :         ctx: &RequestContext,
     380             201 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     381             201 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     382             201 :         // We use this method to figure out the branching LSN for the new branch, but the
     383             201 :         // GC cutoff could be before the branching point and we cannot create a new branch
     384             201 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     385             201 :         // on the safe side.
     386             201 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     387             201 :         let max_lsn = self.get_last_record_lsn();
     388             201 : 
     389             201 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     390             201 :         // LSN divided by 8.
     391             201 :         let mut low = min_lsn.0 / 8;
     392             201 :         let mut high = max_lsn.0 / 8 + 1;
     393             201 : 
     394             201 :         let mut found_smaller = false;
     395             201 :         let mut found_larger = false;
     396            3474 :         while low < high {
     397            3274 :             if cancel.is_cancelled() {
     398 UBC           0 :                 return Err(PageReconstructError::Cancelled);
     399 CBC        3274 :             }
     400            3274 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     401            3274 :             let mid = (high + low) / 2;
     402                 : 
     403            3274 :             let cmp = self
     404            3274 :                 .is_latest_commit_timestamp_ge_than(
     405            3274 :                     search_timestamp,
     406            3274 :                     Lsn(mid * 8),
     407            3274 :                     &mut found_smaller,
     408            3274 :                     &mut found_larger,
     409            3274 :                     ctx,
     410            3274 :                 )
     411          237224 :                 .await?;
     412                 : 
     413            3273 :             if cmp {
     414             794 :                 high = mid;
     415            2479 :             } else {
     416            2479 :                 low = mid + 1;
     417            2479 :             }
     418                 :         }
     419                 :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     420                 :         // so the LSN of the last commit record before or at `search_timestamp`.
     421                 :         // Remove one from `low` to get `t`.
     422                 :         //
     423                 :         // FIXME: it would be better to get the LSN of the previous commit.
     424                 :         // Otherwise, if you restore to the returned LSN, the database will
     425                 :         // include physical changes from later commits that will be marked
     426                 :         // as aborted, and will need to be vacuumed away.
     427             200 :         let commit_lsn = Lsn((low - 1) * 8);
     428             200 :         match (found_smaller, found_larger) {
     429                 :             (false, false) => {
     430                 :                 // This can happen if no commit records have been processed yet, e.g.
     431                 :                 // just after importing a cluster.
     432              23 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     433                 :             }
     434                 :             (false, true) => {
     435                 :                 // Didn't find any commit timestamps smaller than the request
     436              19 :                 Ok(LsnForTimestamp::Past(min_lsn))
     437                 :             }
     438                 :             (true, false) => {
     439                 :                 // Only found commits with timestamps smaller than the request.
     440                 :                 // It's still a valid case for branch creation, return it.
     441                 :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     442                 :                 // case, anyway.
     443              90 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     444                 :             }
     445              68 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     446                 :         }
     447             200 :     }
     448                 : 
     449                 :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     450                 :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     451                 :     ///
     452                 :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     453                 :     /// with a smaller/larger timestamp.
     454                 :     ///
     455            3274 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     456            3274 :         &self,
     457            3274 :         search_timestamp: TimestampTz,
     458            3274 :         probe_lsn: Lsn,
     459            3274 :         found_smaller: &mut bool,
     460            3274 :         found_larger: &mut bool,
     461            3274 :         ctx: &RequestContext,
     462            3274 :     ) -> Result<bool, PageReconstructError> {
     463            3274 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     464            3026 :             if timestamp >= search_timestamp {
     465             794 :                 *found_larger = true;
     466             794 :                 return ControlFlow::Break(true);
     467            2232 :             } else {
     468            2232 :                 *found_smaller = true;
     469            2232 :             }
     470            2232 :             ControlFlow::Continue(())
     471            3274 :         })
     472          237224 :         .await
     473            3273 :     }
     474                 : 
     475                 :     /// Obtain the possible timestamp range for the given lsn.
     476                 :     ///
     477                 :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     478              12 :     pub(crate) async fn get_timestamp_for_lsn(
     479              12 :         &self,
     480              12 :         probe_lsn: Lsn,
     481              12 :         ctx: &RequestContext,
     482              12 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     483              12 :         let mut max: Option<TimestampTz> = None;
     484              12 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     485              10 :             if let Some(max_prev) = max {
     486 UBC           0 :                 max = Some(max_prev.max(timestamp));
     487 CBC          10 :             } else {
     488              10 :                 max = Some(timestamp);
     489              10 :             }
     490              10 :             ControlFlow::Continue(())
     491              12 :         })
     492              80 :         .await?;
     493                 : 
     494              10 :         Ok(max)
     495              12 :     }
     496                 : 
     497                 :     /// Runs the given function on all the timestamps for a given lsn
     498                 :     ///
     499                 :     /// The return value is either given by the closure, or set to the `Default`
     500                 :     /// impl's output.
     501            3286 :     async fn map_all_timestamps<T: Default>(
     502            3286 :         &self,
     503            3286 :         probe_lsn: Lsn,
     504            3286 :         ctx: &RequestContext,
     505            3286 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     506            3286 :     ) -> Result<T, PageReconstructError> {
     507            3286 :         for segno in self
     508            3286 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     509            1027 :             .await?
     510                 :         {
     511            3284 :             let nblocks = self
     512            3284 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     513             949 :                 .await?;
     514            3284 :             for blknum in (0..nblocks).rev() {
     515            3284 :                 let clog_page = self
     516            3284 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     517          235328 :                     .await?;
     518                 : 
     519            3283 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     520            3036 :                     let mut timestamp_bytes = [0u8; 8];
     521            3036 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     522            3036 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     523            3036 : 
     524            3036 :                     match f(timestamp) {
     525             794 :                         ControlFlow::Break(b) => return Ok(b),
     526            2242 :                         ControlFlow::Continue(()) => (),
     527                 :                     }
     528             247 :                 }
     529                 :             }
     530                 :         }
     531            2489 :         Ok(Default::default())
     532            3285 :     }
     533                 : 
     534                 :     /// Get a list of SLRU segments
     535            4962 :     pub(crate) async fn list_slru_segments(
     536            4962 :         &self,
     537            4962 :         kind: SlruKind,
     538            4962 :         version: Version<'_>,
     539            4962 :         ctx: &RequestContext,
     540            4962 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     541            4962 :         // fetch directory entry
     542            4962 :         let key = slru_dir_to_key(kind);
     543                 : 
     544            4962 :         let buf = version.get(self, key, ctx).await?;
     545            4959 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     546            4959 :             Ok(dir) => Ok(dir.segments),
     547 UBC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     548                 :         }
     549 CBC        4962 :     }
     550                 : 
     551            2249 :     pub(crate) async fn get_relmap_file(
     552            2249 :         &self,
     553            2249 :         spcnode: Oid,
     554            2249 :         dbnode: Oid,
     555            2249 :         version: Version<'_>,
     556            2249 :         ctx: &RequestContext,
     557            2249 :     ) -> Result<Bytes, PageReconstructError> {
     558            2249 :         let key = relmap_file_key(spcnode, dbnode);
     559                 : 
     560            2249 :         let buf = version.get(self, key, ctx).await?;
     561            2249 :         Ok(buf)
     562            2249 :     }
     563                 : 
     564             558 :     pub(crate) async fn list_dbdirs(
     565             558 :         &self,
     566             558 :         lsn: Lsn,
     567             558 :         ctx: &RequestContext,
     568             558 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     569                 :         // fetch directory entry
     570             558 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     571                 : 
     572             558 :         match DbDirectory::des(&buf).context("deserialization failure") {
     573             558 :             Ok(dir) => Ok(dir.dbdirs),
     574 UBC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     575                 :         }
     576 CBC         558 :     }
     577                 : 
     578               2 :     pub(crate) async fn get_twophase_file(
     579               2 :         &self,
     580               2 :         xid: TransactionId,
     581               2 :         lsn: Lsn,
     582               2 :         ctx: &RequestContext,
     583               2 :     ) -> Result<Bytes, PageReconstructError> {
     584               2 :         let key = twophase_file_key(xid);
     585               2 :         let buf = self.get(key, lsn, ctx).await?;
     586               2 :         Ok(buf)
     587               2 :     }
     588                 : 
     589             558 :     pub(crate) async fn list_twophase_files(
     590             558 :         &self,
     591             558 :         lsn: Lsn,
     592             558 :         ctx: &RequestContext,
     593             558 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     594                 :         // fetch directory entry
     595             558 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     596                 : 
     597             558 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     598             558 :             Ok(dir) => Ok(dir.xids),
     599 UBC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     600                 :         }
     601 CBC         558 :     }
     602                 : 
     603             557 :     pub(crate) async fn get_control_file(
     604             557 :         &self,
     605             557 :         lsn: Lsn,
     606             557 :         ctx: &RequestContext,
     607             557 :     ) -> Result<Bytes, PageReconstructError> {
     608             557 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     609             557 :     }
     610                 : 
     611            1798 :     pub(crate) async fn get_checkpoint(
     612            1798 :         &self,
     613            1798 :         lsn: Lsn,
     614            1798 :         ctx: &RequestContext,
     615            1798 :     ) -> Result<Bytes, PageReconstructError> {
     616            1798 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     617            1798 :     }
     618                 : 
     619            2241 :     pub(crate) async fn list_aux_files(
     620            2241 :         &self,
     621            2241 :         lsn: Lsn,
     622            2241 :         ctx: &RequestContext,
     623            2241 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     624            2241 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     625            1305 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     626            1305 :                 Ok(dir) => Ok(dir.files),
     627 UBC           0 :                 Err(e) => Err(PageReconstructError::from(e)),
     628                 :             },
     629 CBC         936 :             Err(e) => {
     630                 :                 // This is expected: historical databases do not have the key.
     631 UBC           0 :                 debug!("Failed to get info about AUX files: {}", e);
     632 CBC         936 :                 Ok(HashMap::new())
     633                 :             }
     634                 :         }
     635            2241 :     }
     636                 : 
     637                 :     /// Does the same as get_current_logical_size but counted on demand.
     638                 :     /// Used to initialize the logical size tracking on startup.
     639                 :     ///
     640                 :     /// Only relation blocks are counted currently. That excludes metadata,
     641                 :     /// SLRUs, twophase files etc.
     642                 :     ///
     643                 :     /// # Cancel-Safety
     644                 :     ///
     645                 :     /// This method is cancellation-safe.
     646             651 :     pub async fn get_current_logical_size_non_incremental(
     647             651 :         &self,
     648             651 :         lsn: Lsn,
     649             651 :         ctx: &RequestContext,
     650             651 :     ) -> Result<u64, CalculateLogicalSizeError> {
     651             651 :         crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
     652                 : 
     653                 :         // Fetch list of database dirs and iterate them
     654             985 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     655             639 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     656                 : 
     657             639 :         let mut total_size: u64 = 0;
     658            2537 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     659          589751 :             for rel in self
     660            2537 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     661             350 :                 .await?
     662                 :             {
     663          589751 :                 if self.cancel.is_cancelled() {
     664 GBC           1 :                     return Err(CalculateLogicalSizeError::Cancelled);
     665 CBC      589750 :                 }
     666          589750 :                 let relsize_key = rel_size_to_key(rel);
     667          589750 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     668          589734 :                 let relsize = buf.get_u32_le();
     669          589734 : 
     670          589734 :                 total_size += relsize as u64;
     671                 :             }
     672                 :         }
     673             622 :         Ok(total_size * BLCKSZ as u64)
     674             641 :     }
     675                 : 
     676                 :     ///
     677                 :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     678                 :     /// Anything that's not listed maybe removed from the underlying storage (from
     679                 :     /// that LSN forwards).
     680             726 :     pub(crate) async fn collect_keyspace(
     681             726 :         &self,
     682             726 :         lsn: Lsn,
     683             726 :         ctx: &RequestContext,
     684             726 :     ) -> Result<KeySpace, CollectKeySpaceError> {
     685             726 :         // Iterate through key ranges, greedily packing them into partitions
     686             726 :         let mut result = KeySpaceAccum::new();
     687             726 : 
     688             726 :         // The dbdir metadata always exists
     689             726 :         result.add_key(DBDIR_KEY);
     690                 : 
     691                 :         // Fetch list of database dirs and iterate them
     692             726 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     693             726 :         let dbdir = DbDirectory::des(&buf)?;
     694                 : 
     695             726 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     696             726 :         dbs.sort_unstable();
     697            3302 :         for (spcnode, dbnode) in dbs {
     698            2579 :             result.add_key(relmap_file_key(spcnode, dbnode));
     699            2579 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     700                 : 
     701            2579 :             let mut rels: Vec<RelTag> = self
     702            2579 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     703             434 :                 .await?
     704            2579 :                 .into_iter()
     705            2579 :                 .collect();
     706            2579 :             rels.sort_unstable();
     707          614555 :             for rel in rels {
     708          611979 :                 let relsize_key = rel_size_to_key(rel);
     709          611979 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     710          611976 :                 let relsize = buf.get_u32_le();
     711          611976 : 
     712          611976 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     713          611976 :                 result.add_key(relsize_key);
     714                 :             }
     715                 :         }
     716                 : 
     717                 :         // Iterate SLRUs next
     718            2169 :         for kind in [
     719             723 :             SlruKind::Clog,
     720             723 :             SlruKind::MultiXactMembers,
     721             723 :             SlruKind::MultiXactOffsets,
     722                 :         ] {
     723            2169 :             let slrudir_key = slru_dir_to_key(kind);
     724            2169 :             result.add_key(slrudir_key);
     725            2169 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     726            2169 :             let dir = SlruSegmentDirectory::des(&buf)?;
     727            2169 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     728            2169 :             segments.sort_unstable();
     729            4097 :             for segno in segments {
     730            1928 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     731            1928 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     732            1928 :                 let segsize = buf.get_u32_le();
     733            1928 : 
     734            1928 :                 result.add_range(
     735            1928 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     736            1928 :                 );
     737            1928 :                 result.add_key(segsize_key);
     738                 :             }
     739                 :         }
     740                 : 
     741                 :         // Then pg_twophase
     742             723 :         result.add_key(TWOPHASEDIR_KEY);
     743             723 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     744             723 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     745             723 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     746             723 :         xids.sort_unstable();
     747             723 :         for xid in xids {
     748 UBC           0 :             result.add_key(twophase_file_key(xid));
     749               0 :         }
     750                 : 
     751 CBC         723 :         result.add_key(CONTROLFILE_KEY);
     752             723 :         result.add_key(CHECKPOINT_KEY);
     753             723 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     754             515 :             result.add_key(AUX_FILES_KEY);
     755             515 :         }
     756             723 :         Ok(result.to_keyspace())
     757             724 :     }
     758                 : 
     759                 :     /// Get cached size of relation if it not updated after specified LSN
     760        51218414 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     761        51218414 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     762        51218414 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     763        51028779 :             if lsn >= *cached_lsn {
     764        50816009 :                 return Some(*nblocks);
     765          212770 :             }
     766          189635 :         }
     767          402405 :         None
     768        51218414 :     }
     769                 : 
     770                 :     /// Update cached relation size if there is no more recent update
     771          151988 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     772          151988 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     773          151988 :         match rel_size_cache.entry(tag) {
     774          128440 :             hash_map::Entry::Occupied(mut entry) => {
     775          128440 :                 let cached_lsn = entry.get_mut();
     776          128440 :                 if lsn >= cached_lsn.0 {
     777               8 :                     *cached_lsn = (lsn, nblocks);
     778          128432 :                 }
     779                 :             }
     780           23548 :             hash_map::Entry::Vacant(entry) => {
     781           23548 :                 entry.insert((lsn, nblocks));
     782           23548 :             }
     783                 :         }
     784          151988 :     }
     785                 : 
     786                 :     /// Store cached relation size
     787         1492536 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     788         1492536 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     789         1492536 :         rel_size_cache.insert(tag, (lsn, nblocks));
     790         1492536 :     }
     791                 : 
     792                 :     /// Remove cached relation size
     793           17594 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     794           17594 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     795           17594 :         rel_size_cache.remove(tag);
     796           17594 :     }
     797                 : }
     798                 : 
     799                 : /// DatadirModification represents an operation to ingest an atomic set of
     800                 : /// updates to the repository. It is created by the 'begin_record'
     801                 : /// function. It is called for each WAL record, so that all the modifications
     802                 : /// by a one WAL record appear atomic.
     803                 : pub struct DatadirModification<'a> {
     804                 :     /// The timeline this modification applies to. You can access this to
     805                 :     /// read the state, but note that any pending updates are *not* reflected
     806                 :     /// in the state in 'tline' yet.
     807                 :     pub tline: &'a Timeline,
     808                 : 
     809                 :     /// Current LSN of the modification
     810                 :     lsn: Lsn,
     811                 : 
     812                 :     // The modifications are not applied directly to the underlying key-value store.
     813                 :     // The put-functions add the modifications here, and they are flushed to the
     814                 :     // underlying key-value store by the 'finish' function.
     815                 :     pending_lsns: Vec<Lsn>,
     816                 :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     817                 :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     818                 :     pending_nblocks: i64,
     819                 : }
     820                 : 
     821                 : impl<'a> DatadirModification<'a> {
     822                 :     /// Get the current lsn
     823        47228369 :     pub(crate) fn get_lsn(&self) -> Lsn {
     824        47228369 :         self.lsn
     825        47228369 :     }
     826                 : 
     827                 :     /// Set the current lsn
     828        47422616 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     829        47422616 :         ensure!(
     830        47422616 :             lsn >= self.lsn,
     831 UBC           0 :             "setting an older lsn {} than {} is not allowed",
     832                 :             lsn,
     833                 :             self.lsn
     834                 :         );
     835 CBC    47422616 :         if lsn > self.lsn {
     836        47422616 :             self.pending_lsns.push(self.lsn);
     837        47422616 :             self.lsn = lsn;
     838        47422616 :         }
     839        47422616 :         Ok(())
     840        47422616 :     }
     841                 : 
     842                 :     /// Initialize a completely new repository.
     843                 :     ///
     844                 :     /// This inserts the directory metadata entries that are assumed to
     845                 :     /// always exist.
     846             570 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     847             570 :         let buf = DbDirectory::ser(&DbDirectory {
     848             570 :             dbdirs: HashMap::new(),
     849             570 :         })?;
     850             570 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     851             570 : 
     852             570 :         // Create AuxFilesDirectory
     853             570 :         self.init_aux_dir()?;
     854                 : 
     855             570 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     856             570 :             xids: HashSet::new(),
     857             570 :         })?;
     858             570 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     859                 : 
     860             570 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     861             570 :         let empty_dir = Value::Image(buf);
     862             570 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     863             570 :         self.put(
     864             570 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     865             570 :             empty_dir.clone(),
     866             570 :         );
     867             570 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     868             570 : 
     869             570 :         Ok(())
     870             570 :     }
     871                 : 
     872                 :     #[cfg(test)]
     873              35 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     874              35 :         self.init_empty()?;
     875              35 :         self.put_control_file(bytes::Bytes::from_static(
     876              35 :             b"control_file contents do not matter",
     877              35 :         ))
     878              35 :         .context("put_control_file")?;
     879              35 :         self.put_checkpoint(bytes::Bytes::from_static(
     880              35 :             b"checkpoint_file contents do not matter",
     881              35 :         ))
     882              35 :         .context("put_checkpoint_file")?;
     883              35 :         Ok(())
     884              35 :     }
     885                 : 
     886                 :     /// Put a new page version that can be constructed from a WAL record
     887                 :     ///
     888                 :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     889                 :     /// current end-of-file. It's up to the caller to check that the relation size
     890                 :     /// matches the blocks inserted!
     891        46962169 :     pub fn put_rel_wal_record(
     892        46962169 :         &mut self,
     893        46962169 :         rel: RelTag,
     894        46962169 :         blknum: BlockNumber,
     895        46962169 :         rec: NeonWalRecord,
     896        46962169 :     ) -> anyhow::Result<()> {
     897        46962169 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     898        46962169 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     899        46962169 :         Ok(())
     900        46962169 :     }
     901                 : 
     902                 :     // Same, but for an SLRU.
     903         1849938 :     pub fn put_slru_wal_record(
     904         1849938 :         &mut self,
     905         1849938 :         kind: SlruKind,
     906         1849938 :         segno: u32,
     907         1849938 :         blknum: BlockNumber,
     908         1849938 :         rec: NeonWalRecord,
     909         1849938 :     ) -> anyhow::Result<()> {
     910         1849938 :         self.put(
     911         1849938 :             slru_block_to_key(kind, segno, blknum),
     912         1849938 :             Value::WalRecord(rec),
     913         1849938 :         );
     914         1849938 :         Ok(())
     915         1849938 :     }
     916                 : 
     917                 :     /// Like put_wal_record, but with ready-made image of the page.
     918         2165845 :     pub fn put_rel_page_image(
     919         2165845 :         &mut self,
     920         2165845 :         rel: RelTag,
     921         2165845 :         blknum: BlockNumber,
     922         2165845 :         img: Bytes,
     923         2165845 :     ) -> anyhow::Result<()> {
     924         2165845 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     925         2165845 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     926         2165845 :         Ok(())
     927         2165845 :     }
     928                 : 
     929            2266 :     pub fn put_slru_page_image(
     930            2266 :         &mut self,
     931            2266 :         kind: SlruKind,
     932            2266 :         segno: u32,
     933            2266 :         blknum: BlockNumber,
     934            2266 :         img: Bytes,
     935            2266 :     ) -> anyhow::Result<()> {
     936            2266 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
     937            2266 :         Ok(())
     938            2266 :     }
     939                 : 
     940                 :     /// Store a relmapper file (pg_filenode.map) in the repository
     941            2203 :     pub async fn put_relmap_file(
     942            2203 :         &mut self,
     943            2203 :         spcnode: Oid,
     944            2203 :         dbnode: Oid,
     945            2203 :         img: Bytes,
     946            2203 :         ctx: &RequestContext,
     947            2203 :     ) -> anyhow::Result<()> {
     948                 :         // Add it to the directory (if it doesn't exist already)
     949            2203 :         let buf = self.get(DBDIR_KEY, ctx).await?;
     950            2203 :         let mut dbdir = DbDirectory::des(&buf)?;
     951                 : 
     952            2203 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
     953            2203 :         if r.is_none() || r == Some(false) {
     954                 :             // The dbdir entry didn't exist, or it contained a
     955                 :             // 'false'. The 'insert' call already updated it with
     956                 :             // 'true', now write the updated 'dbdirs' map back.
     957            2149 :             let buf = DbDirectory::ser(&dbdir)?;
     958            2149 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
     959            2149 : 
     960            2149 :             // Create AuxFilesDirectory as well
     961            2149 :             self.init_aux_dir()?;
     962              54 :         }
     963            2203 :         if r.is_none() {
     964              17 :             // Create RelDirectory
     965              17 :             let buf = RelDirectory::ser(&RelDirectory {
     966              17 :                 rels: HashSet::new(),
     967              17 :             })?;
     968              17 :             self.put(
     969              17 :                 rel_dir_to_key(spcnode, dbnode),
     970              17 :                 Value::Image(Bytes::from(buf)),
     971              17 :             );
     972            2186 :         }
     973                 : 
     974            2203 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
     975            2203 :         Ok(())
     976            2203 :     }
     977                 : 
     978               4 :     pub async fn put_twophase_file(
     979               4 :         &mut self,
     980               4 :         xid: TransactionId,
     981               4 :         img: Bytes,
     982               4 :         ctx: &RequestContext,
     983               4 :     ) -> anyhow::Result<()> {
     984                 :         // Add it to the directory entry
     985               4 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
     986               4 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
     987               4 :         if !dir.xids.insert(xid) {
     988 UBC           0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
     989 CBC           4 :         }
     990               4 :         self.put(
     991               4 :             TWOPHASEDIR_KEY,
     992               4 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
     993                 :         );
     994                 : 
     995               4 :         self.put(twophase_file_key(xid), Value::Image(img));
     996               4 :         Ok(())
     997               4 :     }
     998                 : 
     999             568 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1000             568 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1001             568 :         Ok(())
    1002             568 :     }
    1003                 : 
    1004           28367 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1005           28367 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1006           28367 :         Ok(())
    1007           28367 :     }
    1008                 : 
    1009               3 :     pub async fn drop_dbdir(
    1010               3 :         &mut self,
    1011               3 :         spcnode: Oid,
    1012               3 :         dbnode: Oid,
    1013               3 :         ctx: &RequestContext,
    1014               3 :     ) -> anyhow::Result<()> {
    1015               3 :         let total_blocks = self
    1016               3 :             .tline
    1017               3 :             .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
    1018 UBC           0 :             .await?;
    1019                 : 
    1020                 :         // Remove entry from dbdir
    1021 CBC           3 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1022               3 :         let mut dir = DbDirectory::des(&buf)?;
    1023               3 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1024               3 :             let buf = DbDirectory::ser(&dir)?;
    1025               3 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1026                 :         } else {
    1027 UBC           0 :             warn!(
    1028               0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1029               0 :                 spcnode, dbnode
    1030               0 :             );
    1031                 :         }
    1032                 : 
    1033                 :         // Update logical database size.
    1034 CBC           3 :         self.pending_nblocks -= total_blocks as i64;
    1035               3 : 
    1036               3 :         // Delete all relations and metadata files for the spcnode/dnode
    1037               3 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1038               3 :         Ok(())
    1039               3 :     }
    1040                 : 
    1041                 :     /// Create a relation fork.
    1042                 :     ///
    1043                 :     /// 'nblocks' is the initial size.
    1044          523217 :     pub async fn put_rel_creation(
    1045          523217 :         &mut self,
    1046          523217 :         rel: RelTag,
    1047          523217 :         nblocks: BlockNumber,
    1048          523217 :         ctx: &RequestContext,
    1049          523217 :     ) -> Result<(), RelationError> {
    1050          523217 :         if rel.relnode == 0 {
    1051 UBC           0 :             return Err(RelationError::InvalidRelnode);
    1052 CBC      523217 :         }
    1053                 :         // It's possible that this is the first rel for this db in this
    1054                 :         // tablespace.  Create the reldir entry for it if so.
    1055          523217 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1056          523216 :             .context("deserialize db")?;
    1057          523216 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1058          523216 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
    1059                 :             // Didn't exist. Update dbdir
    1060            2133 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
    1061            2133 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1062            2133 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1063            2133 : 
    1064            2133 :             // and create the RelDirectory
    1065            2133 :             RelDirectory::default()
    1066                 :         } else {
    1067                 :             // reldir already exists, fetch it
    1068          521083 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1069          521083 :                 .context("deserialize db")?
    1070                 :         };
    1071                 : 
    1072                 :         // Add the new relation to the rel directory entry, and write it back
    1073          523216 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1074 UBC           0 :             return Err(RelationError::AlreadyExists);
    1075 CBC      523216 :         }
    1076          523216 :         self.put(
    1077          523216 :             rel_dir_key,
    1078          523216 :             Value::Image(Bytes::from(
    1079          523216 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1080                 :             )),
    1081                 :         );
    1082                 : 
    1083                 :         // Put size
    1084          523216 :         let size_key = rel_size_to_key(rel);
    1085          523216 :         let buf = nblocks.to_le_bytes();
    1086          523216 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1087          523216 : 
    1088          523216 :         self.pending_nblocks += nblocks as i64;
    1089          523216 : 
    1090          523216 :         // Update relation size cache
    1091          523216 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1092          523216 : 
    1093          523216 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1094          523216 :         // caller.
    1095          523216 :         Ok(())
    1096          523217 :     }
    1097                 : 
    1098                 :     /// Truncate relation
    1099            3092 :     pub async fn put_rel_truncation(
    1100            3092 :         &mut self,
    1101            3092 :         rel: RelTag,
    1102            3092 :         nblocks: BlockNumber,
    1103            3092 :         ctx: &RequestContext,
    1104            3092 :     ) -> anyhow::Result<()> {
    1105            3092 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1106            3092 :         if self
    1107            3092 :             .tline
    1108            3092 :             .get_rel_exists(rel, Version::Modified(self), true, ctx)
    1109 UBC           0 :             .await?
    1110                 :         {
    1111 CBC        3092 :             let size_key = rel_size_to_key(rel);
    1112                 :             // Fetch the old size first
    1113            3092 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1114            3092 : 
    1115            3092 :             // Update the entry with the new size.
    1116            3092 :             let buf = nblocks.to_le_bytes();
    1117            3092 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1118            3092 : 
    1119            3092 :             // Update relation size cache
    1120            3092 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1121            3092 : 
    1122            3092 :             // Update relation size cache
    1123            3092 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1124            3092 : 
    1125            3092 :             // Update logical database size.
    1126            3092 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1127 UBC           0 :         }
    1128 CBC        3092 :         Ok(())
    1129            3092 :     }
    1130                 : 
    1131                 :     /// Extend relation
    1132                 :     /// If new size is smaller, do nothing.
    1133         1460341 :     pub async fn put_rel_extend(
    1134         1460341 :         &mut self,
    1135         1460341 :         rel: RelTag,
    1136         1460341 :         nblocks: BlockNumber,
    1137         1460341 :         ctx: &RequestContext,
    1138         1460341 :     ) -> anyhow::Result<()> {
    1139         1460341 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1140                 : 
    1141                 :         // Put size
    1142         1460341 :         let size_key = rel_size_to_key(rel);
    1143         1460341 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1144         1460337 : 
    1145         1460337 :         // only extend relation here. never decrease the size
    1146         1460337 :         if nblocks > old_size {
    1147          963136 :             let buf = nblocks.to_le_bytes();
    1148          963136 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1149          963136 : 
    1150          963136 :             // Update relation size cache
    1151          963136 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1152          963136 : 
    1153          963136 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1154          963136 :         }
    1155         1460337 :         Ok(())
    1156         1460341 :     }
    1157                 : 
    1158                 :     /// Drop a relation.
    1159           17594 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1160           17594 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1161                 : 
    1162                 :         // Remove it from the directory entry
    1163           17594 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1164           17594 :         let buf = self.get(dir_key, ctx).await?;
    1165           17594 :         let mut dir = RelDirectory::des(&buf)?;
    1166                 : 
    1167           17594 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1168           17594 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1169                 :         } else {
    1170 UBC           0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1171                 :         }
    1172                 : 
    1173                 :         // update logical size
    1174 CBC       17594 :         let size_key = rel_size_to_key(rel);
    1175           17594 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1176           17594 :         self.pending_nblocks -= old_size as i64;
    1177           17594 : 
    1178           17594 :         // Remove enty from relation size cache
    1179           17594 :         self.tline.remove_cached_rel_size(&rel);
    1180           17594 : 
    1181           17594 :         // Delete size entry, as well as all blocks
    1182           17594 :         self.delete(rel_key_range(rel));
    1183           17594 : 
    1184           17594 :         Ok(())
    1185           17594 :     }
    1186                 : 
    1187            1620 :     pub async fn put_slru_segment_creation(
    1188            1620 :         &mut self,
    1189            1620 :         kind: SlruKind,
    1190            1620 :         segno: u32,
    1191            1620 :         nblocks: BlockNumber,
    1192            1620 :         ctx: &RequestContext,
    1193            1620 :     ) -> anyhow::Result<()> {
    1194            1620 :         // Add it to the directory entry
    1195            1620 :         let dir_key = slru_dir_to_key(kind);
    1196            1620 :         let buf = self.get(dir_key, ctx).await?;
    1197            1620 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1198                 : 
    1199            1620 :         if !dir.segments.insert(segno) {
    1200 UBC           0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1201 CBC        1620 :         }
    1202            1620 :         self.put(
    1203            1620 :             dir_key,
    1204            1620 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1205                 :         );
    1206                 : 
    1207                 :         // Put size
    1208            1620 :         let size_key = slru_segment_size_to_key(kind, segno);
    1209            1620 :         let buf = nblocks.to_le_bytes();
    1210            1620 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1211            1620 : 
    1212            1620 :         // even if nblocks > 0, we don't insert any actual blocks here
    1213            1620 : 
    1214            1620 :         Ok(())
    1215            1620 :     }
    1216                 : 
    1217                 :     /// Extend SLRU segment
    1218             658 :     pub fn put_slru_extend(
    1219             658 :         &mut self,
    1220             658 :         kind: SlruKind,
    1221             658 :         segno: u32,
    1222             658 :         nblocks: BlockNumber,
    1223             658 :     ) -> anyhow::Result<()> {
    1224             658 :         // Put size
    1225             658 :         let size_key = slru_segment_size_to_key(kind, segno);
    1226             658 :         let buf = nblocks.to_le_bytes();
    1227             658 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1228             658 :         Ok(())
    1229             658 :     }
    1230                 : 
    1231                 :     /// This method is used for marking truncated SLRU files
    1232               9 :     pub async fn drop_slru_segment(
    1233               9 :         &mut self,
    1234               9 :         kind: SlruKind,
    1235               9 :         segno: u32,
    1236               9 :         ctx: &RequestContext,
    1237               9 :     ) -> anyhow::Result<()> {
    1238               9 :         // Remove it from the directory entry
    1239               9 :         let dir_key = slru_dir_to_key(kind);
    1240               9 :         let buf = self.get(dir_key, ctx).await?;
    1241               9 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1242                 : 
    1243               9 :         if !dir.segments.remove(&segno) {
    1244 UBC           0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1245 CBC           9 :         }
    1246               9 :         self.put(
    1247               9 :             dir_key,
    1248               9 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1249                 :         );
    1250                 : 
    1251                 :         // Delete size entry, as well as all blocks
    1252               9 :         self.delete(slru_segment_key_range(kind, segno));
    1253               9 : 
    1254               9 :         Ok(())
    1255               9 :     }
    1256                 : 
    1257                 :     /// Drop a relmapper file (pg_filenode.map)
    1258 UBC           0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1259               0 :         // TODO
    1260               0 :         Ok(())
    1261               0 :     }
    1262                 : 
    1263                 :     /// This method is used for marking truncated SLRU files
    1264 CBC           2 :     pub async fn drop_twophase_file(
    1265               2 :         &mut self,
    1266               2 :         xid: TransactionId,
    1267               2 :         ctx: &RequestContext,
    1268               2 :     ) -> anyhow::Result<()> {
    1269                 :         // Remove it from the directory entry
    1270               2 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1271               2 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1272                 : 
    1273               2 :         if !dir.xids.remove(&xid) {
    1274 UBC           0 :             warn!("twophase file for xid {} does not exist", xid);
    1275 CBC           2 :         }
    1276               2 :         self.put(
    1277               2 :             TWOPHASEDIR_KEY,
    1278               2 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1279                 :         );
    1280                 : 
    1281                 :         // Delete it
    1282               2 :         self.delete(twophase_key_range(xid));
    1283               2 : 
    1284               2 :         Ok(())
    1285               2 :     }
    1286                 : 
    1287            2719 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1288            2719 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1289            2719 :             files: HashMap::new(),
    1290            2719 :         })?;
    1291            2719 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1292            2719 :         Ok(())
    1293            2719 :     }
    1294                 : 
    1295             105 :     pub async fn put_file(
    1296             105 :         &mut self,
    1297             105 :         path: &str,
    1298             105 :         content: &[u8],
    1299             105 :         ctx: &RequestContext,
    1300             105 :     ) -> anyhow::Result<()> {
    1301             105 :         let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
    1302             103 :             Ok(buf) => AuxFilesDirectory::des(&buf)?,
    1303               2 :             Err(e) => {
    1304                 :                 // This is expected: historical databases do not have the key.
    1305 UBC           0 :                 debug!("Failed to get info about AUX files: {}", e);
    1306 CBC           2 :                 AuxFilesDirectory {
    1307               2 :                     files: HashMap::new(),
    1308               2 :                 }
    1309                 :             }
    1310                 :         };
    1311             105 :         let path = path.to_string();
    1312             105 :         if content.is_empty() {
    1313               4 :             dir.files.remove(&path);
    1314             101 :         } else {
    1315             101 :             dir.files.insert(path, Bytes::copy_from_slice(content));
    1316             101 :         }
    1317             105 :         self.put(
    1318             105 :             AUX_FILES_KEY,
    1319             105 :             Value::Image(Bytes::from(
    1320             105 :                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1321                 :             )),
    1322                 :         );
    1323             105 :         Ok(())
    1324             105 :     }
    1325                 : 
    1326                 :     ///
    1327                 :     /// Flush changes accumulated so far to the underlying repository.
    1328                 :     ///
    1329                 :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1330                 :     /// you to flush them to the underlying repository before the final `commit`.
    1331                 :     /// That allows to free up the memory used to hold the pending changes.
    1332                 :     ///
    1333                 :     /// Currently only used during bulk import of a data directory. In that
    1334                 :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1335                 :     /// whole import fails and the timeline will be deleted anyway.
    1336                 :     /// (Or to be precise, it will be left behind for debugging purposes and
    1337                 :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1338                 :     ///
    1339                 :     /// Note: A consequence of flushing the pending operations is that they
    1340                 :     /// won't be visible to subsequent operations until `commit`. The function
    1341                 :     /// retains all the metadata, but data pages are flushed. That's again OK
    1342                 :     /// for bulk import, where you are just loading data pages and won't try to
    1343                 :     /// modify the same pages twice.
    1344          506812 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1345          506812 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1346          506812 :         // to scan through the pending_updates list.
    1347          506812 :         let pending_nblocks = self.pending_nblocks;
    1348          506812 :         if pending_nblocks < 10000 {
    1349          506812 :             return Ok(());
    1350 UBC           0 :         }
    1351                 : 
    1352               0 :         let writer = self.tline.writer().await;
    1353                 : 
    1354                 :         // Flush relation and  SLRU data blocks, keep metadata.
    1355               0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1356               0 :         for (key, values) in self.pending_updates.drain() {
    1357               0 :             for (lsn, value) in values {
    1358               0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1359                 :                     // This bails out on first error without modifying pending_updates.
    1360                 :                     // That's Ok, cf this function's doc comment.
    1361               0 :                     writer.put(key, lsn, &value, ctx).await?;
    1362               0 :                 } else {
    1363               0 :                     retained_pending_updates
    1364               0 :                         .entry(key)
    1365               0 :                         .or_default()
    1366               0 :                         .push((lsn, value));
    1367               0 :                 }
    1368                 :             }
    1369                 :         }
    1370                 : 
    1371               0 :         self.pending_updates = retained_pending_updates;
    1372               0 : 
    1373               0 :         if pending_nblocks != 0 {
    1374               0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1375               0 :             self.pending_nblocks = 0;
    1376               0 :         }
    1377                 : 
    1378               0 :         Ok(())
    1379 CBC      506812 :     }
    1380                 : 
    1381                 :     ///
    1382                 :     /// Finish this atomic update, writing all the updated keys to the
    1383                 :     /// underlying timeline.
    1384                 :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1385                 :     ///
    1386         1333218 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1387         1333217 :         let writer = self.tline.writer().await;
    1388                 : 
    1389         1333217 :         let pending_nblocks = self.pending_nblocks;
    1390         1333217 :         self.pending_nblocks = 0;
    1391         1333217 : 
    1392         1333217 :         if !self.pending_updates.is_empty() {
    1393         1168369 :             writer.put_batch(&self.pending_updates, ctx).await?;
    1394         1168369 :             self.pending_updates.clear();
    1395          164848 :         }
    1396                 : 
    1397         1333217 :         if !self.pending_deletions.is_empty() {
    1398            6569 :             writer.delete_batch(&self.pending_deletions).await?;
    1399            6569 :             self.pending_deletions.clear();
    1400         1326648 :         }
    1401                 : 
    1402         1333217 :         self.pending_lsns.push(self.lsn);
    1403        48755534 :         for pending_lsn in self.pending_lsns.drain(..) {
    1404        48755534 :             // Ideally, we should be able to call writer.finish_write() only once
    1405        48755534 :             // with the highest LSN. However, the last_record_lsn variable in the
    1406        48755534 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1407        48755534 :             // so we need to record every LSN to not leave a gap between them.
    1408        48755534 :             writer.finish_write(pending_lsn);
    1409        48755534 :         }
    1410                 : 
    1411         1333217 :         if pending_nblocks != 0 {
    1412          427894 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1413          905323 :         }
    1414                 : 
    1415         1333217 :         Ok(())
    1416         1333217 :     }
    1417                 : 
    1418        94845227 :     pub(crate) fn len(&self) -> usize {
    1419        94845227 :         self.pending_updates.len() + self.pending_deletions.len()
    1420        94845227 :     }
    1421                 : 
    1422                 :     // Internal helper functions to batch the modifications
    1423                 : 
    1424         2614147 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1425                 :         // Have we already updated the same key? Read the latest pending updated
    1426                 :         // version in that case.
    1427                 :         //
    1428                 :         // Note: we don't check pending_deletions. It is an error to request a
    1429                 :         // value that has been removed, deletion only avoids leaking storage.
    1430         2614147 :         if let Some(values) = self.pending_updates.get(&key) {
    1431         2107182 :             if let Some((_, value)) = values.last() {
    1432         2107182 :                 return if let Value::Image(img) = value {
    1433         2107182 :                     Ok(img.clone())
    1434                 :                 } else {
    1435                 :                     // Currently, we never need to read back a WAL record that we
    1436                 :                     // inserted in the same "transaction". All the metadata updates
    1437                 :                     // work directly with Images, and we never need to read actual
    1438                 :                     // data pages. We could handle this if we had to, by calling
    1439                 :                     // the walredo manager, but let's keep it simple for now.
    1440 UBC           0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1441               0 :                         "unexpected pending WAL record"
    1442               0 :                     )))
    1443                 :                 };
    1444               0 :             }
    1445 CBC      506965 :         }
    1446          506965 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1447          506965 :         self.tline.get(key, lsn, ctx).await
    1448         2614147 :     }
    1449                 : 
    1450        53055503 :     fn put(&mut self, key: Key, val: Value) {
    1451        53055503 :         let values = self.pending_updates.entry(key).or_default();
    1452                 :         // Replace the previous value if it exists at the same lsn
    1453        53055503 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1454        44848600 :             if *last_lsn == self.lsn {
    1455          526141 :                 *last_value = val;
    1456          526141 :                 return;
    1457        44322459 :             }
    1458         8206903 :         }
    1459        52529362 :         values.push((self.lsn, val));
    1460        53055503 :     }
    1461                 : 
    1462           17608 :     fn delete(&mut self, key_range: Range<Key>) {
    1463           17608 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1464           17608 :         self.pending_deletions.push((key_range, self.lsn));
    1465           17608 :     }
    1466                 : }
    1467                 : 
    1468                 : /// This struct facilitates accessing either a committed key from the timeline at a
    1469                 : /// specific LSN, or the latest uncommitted key from a pending modification.
    1470                 : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1471                 : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1472                 : /// need to look up the keys in the modification first before looking them up in the
    1473                 : /// timeline to not miss the latest updates.
    1474 UBC           0 : #[derive(Clone, Copy)]
    1475                 : pub enum Version<'a> {
    1476                 :     Lsn(Lsn),
    1477                 :     Modified(&'a DatadirModification<'a>),
    1478                 : }
    1479                 : 
    1480                 : impl<'a> Version<'a> {
    1481 CBC     3950258 :     async fn get(
    1482         3950258 :         &self,
    1483         3950258 :         timeline: &Timeline,
    1484         3950258 :         key: Key,
    1485         3950258 :         ctx: &RequestContext,
    1486         3950258 :     ) -> Result<Bytes, PageReconstructError> {
    1487         3950258 :         match self {
    1488         3882978 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1489           67280 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1490                 :         }
    1491         3950232 :     }
    1492                 : 
    1493         4142033 :     fn get_lsn(&self) -> Lsn {
    1494         4142033 :         match self {
    1495         3871042 :             Version::Lsn(lsn) => *lsn,
    1496          270991 :             Version::Modified(modification) => modification.lsn,
    1497                 :         }
    1498         4142033 :     }
    1499                 : }
    1500                 : 
    1501                 : //--- Metadata structs stored in key-value pairs in the repository.
    1502                 : 
    1503          527345 : #[derive(Debug, Serialize, Deserialize)]
    1504                 : struct DbDirectory {
    1505                 :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1506                 :     dbdirs: HashMap<(Oid, Oid), bool>,
    1507                 : }
    1508                 : 
    1509            1287 : #[derive(Debug, Serialize, Deserialize)]
    1510                 : struct TwoPhaseDirectory {
    1511                 :     xids: HashSet<TransactionId>,
    1512                 : }
    1513                 : 
    1514         1081654 : #[derive(Debug, Serialize, Deserialize, Default)]
    1515                 : struct RelDirectory {
    1516                 :     // Set of relations that exist. (relfilenode, forknum)
    1517                 :     //
    1518                 :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1519                 :     // key-value pairs, if you have a lot of relations
    1520                 :     rels: HashSet<(Oid, u8)>,
    1521                 : }
    1522                 : 
    1523            5648 : #[derive(Debug, Serialize, Deserialize, Default)]
    1524                 : struct AuxFilesDirectory {
    1525                 :     files: HashMap<String, Bytes>,
    1526                 : }
    1527                 : 
    1528 UBC           0 : #[derive(Debug, Serialize, Deserialize)]
    1529                 : struct RelSizeEntry {
    1530                 :     nblocks: u32,
    1531                 : }
    1532                 : 
    1533 CBC        9421 : #[derive(Debug, Serialize, Deserialize, Default)]
    1534                 : struct SlruSegmentDirectory {
    1535                 :     // Set of SLRU segments that exist.
    1536                 :     segments: HashSet<u32>,
    1537                 : }
    1538                 : 
    1539                 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1540                 : 
    1541                 : // Layout of the Key address space
    1542                 : //
    1543                 : // The Key struct, used to address the underlying key-value store, consists of
    1544                 : // 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
    1545                 : // all the data and metadata keys into those 18 bytes.
    1546                 : //
    1547                 : // Principles for the mapping:
    1548                 : //
    1549                 : // - Things that are often accessed or modified together, should be close to
    1550                 : //   each other in the key space. For example, if a relation is extended by one
    1551                 : //   block, we create a new key-value pair for the block data, and update the
    1552                 : //   relation size entry. Because of that, the RelSize key comes after all the
    1553                 : //   RelBlocks of a relation: the RelSize and the last RelBlock are always next
    1554                 : //   to each other.
    1555                 : //
    1556                 : // The key space is divided into four major sections, identified by the first
    1557                 : // byte, and the form a hierarchy:
    1558                 : //
    1559                 : // 00 Relation data and metadata
    1560                 : //
    1561                 : //   DbDir    () -> (dbnode, spcnode)
    1562                 : //   Filenodemap
    1563                 : //   RelDir   -> relnode forknum
    1564                 : //       RelBlocks
    1565                 : //       RelSize
    1566                 : //
    1567                 : // 01 SLRUs
    1568                 : //
    1569                 : //   SlruDir  kind
    1570                 : //   SlruSegBlocks segno
    1571                 : //   SlruSegSize
    1572                 : //
    1573                 : // 02 pg_twophase
    1574                 : //
    1575                 : // 03 misc
    1576                 : //    Controlfile
    1577                 : //    checkpoint
    1578                 : //    pg_version
    1579                 : //
    1580                 : // 04 aux files
    1581                 : //
    1582                 : // Below is a full list of the keyspace allocation:
    1583                 : //
    1584                 : // DbDir:
    1585                 : // 00 00000000 00000000 00000000 00   00000000
    1586                 : //
    1587                 : // Filenodemap:
    1588                 : // 00 SPCNODE  DBNODE   00000000 00   00000000
    1589                 : //
    1590                 : // RelDir:
    1591                 : // 00 SPCNODE  DBNODE   00000000 00   00000001 (Postgres never uses relfilenode 0)
    1592                 : //
    1593                 : // RelBlock:
    1594                 : // 00 SPCNODE  DBNODE   RELNODE  FORK BLKNUM
    1595                 : //
    1596                 : // RelSize:
    1597                 : // 00 SPCNODE  DBNODE   RELNODE  FORK FFFFFFFF
    1598                 : //
    1599                 : // SlruDir:
    1600                 : // 01 kind     00000000 00000000 00   00000000
    1601                 : //
    1602                 : // SlruSegBlock:
    1603                 : // 01 kind     00000001 SEGNO    00   BLKNUM
    1604                 : //
    1605                 : // SlruSegSize:
    1606                 : // 01 kind     00000001 SEGNO    00   FFFFFFFF
    1607                 : //
    1608                 : // TwoPhaseDir:
    1609                 : // 02 00000000 00000000 00000000 00   00000000
    1610                 : //
    1611                 : // TwoPhaseFile:
    1612                 : // 02 00000000 00000000 00000000 00   XID
    1613                 : //
    1614                 : // ControlFile:
    1615                 : // 03 00000000 00000000 00000000 00   00000000
    1616                 : //
    1617                 : // Checkpoint:
    1618                 : // 03 00000000 00000000 00000000 00   00000001
    1619                 : //
    1620                 : // AuxFiles:
    1621                 : // 03 00000000 00000000 00000000 00   00000002
    1622                 : //
    1623                 : 
    1624                 : //-- Section 01: relation data and metadata
    1625                 : 
    1626                 : const DBDIR_KEY: Key = Key {
    1627                 :     field1: 0x00,
    1628                 :     field2: 0,
    1629                 :     field3: 0,
    1630                 :     field4: 0,
    1631                 :     field5: 0,
    1632                 :     field6: 0,
    1633                 : };
    1634                 : 
    1635               3 : fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
    1636               3 :     Key {
    1637               3 :         field1: 0x00,
    1638               3 :         field2: spcnode,
    1639               3 :         field3: dbnode,
    1640               3 :         field4: 0,
    1641               3 :         field5: 0,
    1642               3 :         field6: 0,
    1643               3 :     }..Key {
    1644               3 :         field1: 0x00,
    1645               3 :         field2: spcnode,
    1646               3 :         field3: dbnode,
    1647               3 :         field4: 0xffffffff,
    1648               3 :         field5: 0xff,
    1649               3 :         field6: 0xffffffff,
    1650               3 :     }
    1651               3 : }
    1652                 : 
    1653            7031 : fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
    1654            7031 :     Key {
    1655            7031 :         field1: 0x00,
    1656            7031 :         field2: spcnode,
    1657            7031 :         field3: dbnode,
    1658            7031 :         field4: 0,
    1659            7031 :         field5: 0,
    1660            7031 :         field6: 0,
    1661            7031 :     }
    1662            7031 : }
    1663                 : 
    1664          650083 : fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
    1665          650083 :     Key {
    1666          650083 :         field1: 0x00,
    1667          650083 :         field2: spcnode,
    1668          650083 :         field3: dbnode,
    1669          650083 :         field4: 0,
    1670          650083 :         field5: 0,
    1671          650083 :         field6: 1,
    1672          650083 :     }
    1673          650083 : }
    1674                 : 
    1675       105429802 : pub(crate) fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
    1676       105429802 :     Key {
    1677       105429802 :         field1: 0x00,
    1678       105429802 :         field2: rel.spcnode,
    1679       105429802 :         field3: rel.dbnode,
    1680       105429802 :         field4: rel.relnode,
    1681       105429802 :         field5: rel.forknum,
    1682       105429802 :         field6: blknum,
    1683       105429802 :     }
    1684       105429802 : }
    1685                 : 
    1686         3506303 : fn rel_size_to_key(rel: RelTag) -> Key {
    1687         3506303 :     Key {
    1688         3506303 :         field1: 0x00,
    1689         3506303 :         field2: rel.spcnode,
    1690         3506303 :         field3: rel.dbnode,
    1691         3506303 :         field4: rel.relnode,
    1692         3506303 :         field5: rel.forknum,
    1693         3506303 :         field6: 0xffffffff,
    1694         3506303 :     }
    1695         3506303 : }
    1696                 : 
    1697           17594 : fn rel_key_range(rel: RelTag) -> Range<Key> {
    1698           17594 :     Key {
    1699           17594 :         field1: 0x00,
    1700           17594 :         field2: rel.spcnode,
    1701           17594 :         field3: rel.dbnode,
    1702           17594 :         field4: rel.relnode,
    1703           17594 :         field5: rel.forknum,
    1704           17594 :         field6: 0,
    1705           17594 :     }..Key {
    1706           17594 :         field1: 0x00,
    1707           17594 :         field2: rel.spcnode,
    1708           17594 :         field3: rel.dbnode,
    1709           17594 :         field4: rel.relnode,
    1710           17594 :         field5: rel.forknum + 1,
    1711           17594 :         field6: 0,
    1712           17594 :     }
    1713           17594 : }
    1714                 : 
    1715                 : //-- Section 02: SLRUs
    1716                 : 
    1717           11134 : fn slru_dir_to_key(kind: SlruKind) -> Key {
    1718           11134 :     Key {
    1719           11134 :         field1: 0x01,
    1720           11134 :         field2: match kind {
    1721            6050 :             SlruKind::Clog => 0x00,
    1722            2685 :             SlruKind::MultiXactMembers => 0x01,
    1723            2399 :             SlruKind::MultiXactOffsets => 0x02,
    1724                 :         },
    1725                 :         field3: 0,
    1726                 :         field4: 0,
    1727                 :         field5: 0,
    1728                 :         field6: 0,
    1729                 :     }
    1730           11134 : }
    1731                 : 
    1732         1861667 : fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
    1733         1861667 :     Key {
    1734         1861667 :         field1: 0x01,
    1735         1861667 :         field2: match kind {
    1736         1807672 :             SlruKind::Clog => 0x00,
    1737           27564 :             SlruKind::MultiXactMembers => 0x01,
    1738           26431 :             SlruKind::MultiXactOffsets => 0x02,
    1739                 :         },
    1740                 :         field3: 1,
    1741         1861667 :         field4: segno,
    1742         1861667 :         field5: 0,
    1743         1861667 :         field6: blknum,
    1744         1861667 :     }
    1745         1861667 : }
    1746                 : 
    1747            9828 : fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
    1748            9828 :     Key {
    1749            9828 :         field1: 0x01,
    1750            9828 :         field2: match kind {
    1751            5732 :             SlruKind::Clog => 0x00,
    1752            2341 :             SlruKind::MultiXactMembers => 0x01,
    1753            1755 :             SlruKind::MultiXactOffsets => 0x02,
    1754                 :         },
    1755                 :         field3: 1,
    1756            9828 :         field4: segno,
    1757            9828 :         field5: 0,
    1758            9828 :         field6: 0xffffffff,
    1759            9828 :     }
    1760            9828 : }
    1761                 : 
    1762               9 : fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
    1763               9 :     let field2 = match kind {
    1764               9 :         SlruKind::Clog => 0x00,
    1765 UBC           0 :         SlruKind::MultiXactMembers => 0x01,
    1766               0 :         SlruKind::MultiXactOffsets => 0x02,
    1767                 :     };
    1768                 : 
    1769 CBC           9 :     Key {
    1770               9 :         field1: 0x01,
    1771               9 :         field2,
    1772               9 :         field3: 1,
    1773               9 :         field4: segno,
    1774               9 :         field5: 0,
    1775               9 :         field6: 0,
    1776               9 :     }..Key {
    1777               9 :         field1: 0x01,
    1778               9 :         field2,
    1779               9 :         field3: 1,
    1780               9 :         field4: segno,
    1781               9 :         field5: 1,
    1782               9 :         field6: 0,
    1783               9 :     }
    1784               9 : }
    1785                 : 
    1786                 : //-- Section 03: pg_twophase
    1787                 : 
    1788                 : const TWOPHASEDIR_KEY: Key = Key {
    1789                 :     field1: 0x02,
    1790                 :     field2: 0,
    1791                 :     field3: 0,
    1792                 :     field4: 0,
    1793                 :     field5: 0,
    1794                 :     field6: 0,
    1795                 : };
    1796                 : 
    1797               6 : fn twophase_file_key(xid: TransactionId) -> Key {
    1798               6 :     Key {
    1799               6 :         field1: 0x02,
    1800               6 :         field2: 0,
    1801               6 :         field3: 0,
    1802               6 :         field4: 0,
    1803               6 :         field5: 0,
    1804               6 :         field6: xid,
    1805               6 :     }
    1806               6 : }
    1807                 : 
    1808               2 : fn twophase_key_range(xid: TransactionId) -> Range<Key> {
    1809               2 :     let (next_xid, overflowed) = xid.overflowing_add(1);
    1810               2 : 
    1811               2 :     Key {
    1812               2 :         field1: 0x02,
    1813               2 :         field2: 0,
    1814               2 :         field3: 0,
    1815               2 :         field4: 0,
    1816               2 :         field5: 0,
    1817               2 :         field6: xid,
    1818               2 :     }..Key {
    1819               2 :         field1: 0x02,
    1820               2 :         field2: 0,
    1821               2 :         field3: 0,
    1822               2 :         field4: 0,
    1823               2 :         field5: u8::from(overflowed),
    1824               2 :         field6: next_xid,
    1825               2 :     }
    1826               2 : }
    1827                 : 
    1828                 : //-- Section 03: Control file
    1829                 : const CONTROLFILE_KEY: Key = Key {
    1830                 :     field1: 0x03,
    1831                 :     field2: 0,
    1832                 :     field3: 0,
    1833                 :     field4: 0,
    1834                 :     field5: 0,
    1835                 :     field6: 0,
    1836                 : };
    1837                 : 
    1838                 : const CHECKPOINT_KEY: Key = Key {
    1839                 :     field1: 0x03,
    1840                 :     field2: 0,
    1841                 :     field3: 0,
    1842                 :     field4: 0,
    1843                 :     field5: 0,
    1844                 :     field6: 1,
    1845                 : };
    1846                 : 
    1847                 : const AUX_FILES_KEY: Key = Key {
    1848                 :     field1: 0x03,
    1849                 :     field2: 0,
    1850                 :     field3: 0,
    1851                 :     field4: 0,
    1852                 :     field5: 0,
    1853                 :     field6: 2,
    1854                 : };
    1855                 : 
    1856                 : // Reverse mappings for a few Keys.
    1857                 : // These are needed by WAL redo manager.
    1858                 : 
    1859                 : // AUX_FILES currently stores only data for logical replication (slots etc), and
    1860                 : // we don't preserve these on a branch because safekeepers can't follow timeline
    1861                 : // switch (and generally it likely should be optional), so ignore these.
    1862        21465031 : pub fn is_inherited_key(key: Key) -> bool {
    1863        21465031 :     key != AUX_FILES_KEY
    1864        21465031 : }
    1865                 : 
    1866                 : /// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
    1867         2057120 : pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
    1868         2057120 :     Ok(match key.field1 {
    1869         2057120 :         0x00 => (
    1870         2057120 :             RelTag {
    1871         2057120 :                 spcnode: key.field2,
    1872         2057120 :                 dbnode: key.field3,
    1873         2057120 :                 relnode: key.field4,
    1874         2057120 :                 forknum: key.field5,
    1875         2057120 :             },
    1876         2057120 :             key.field6,
    1877         2057120 :         ),
    1878 UBC           0 :         _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
    1879                 :     })
    1880 CBC     2057120 : }
    1881 UBC           0 : pub fn is_rel_fsm_block_key(key: Key) -> bool {
    1882               0 :     key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
    1883               0 : }
    1884                 : 
    1885               0 : pub fn is_rel_vm_block_key(key: Key) -> bool {
    1886               0 :     key.field1 == 0x00
    1887               0 :         && key.field4 != 0
    1888               0 :         && key.field5 == VISIBILITYMAP_FORKNUM
    1889               0 :         && key.field6 != 0xffffffff
    1890               0 : }
    1891                 : 
    1892 CBC    16158944 : pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
    1893        16158944 :     Ok(match key.field1 {
    1894                 :         0x01 => {
    1895        16158944 :             let kind = match key.field2 {
    1896        16063072 :                 0x00 => SlruKind::Clog,
    1897           48210 :                 0x01 => SlruKind::MultiXactMembers,
    1898           47662 :                 0x02 => SlruKind::MultiXactOffsets,
    1899 UBC           0 :                 _ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
    1900                 :             };
    1901 CBC    16158944 :             let segno = key.field4;
    1902        16158944 :             let blknum = key.field6;
    1903        16158944 : 
    1904        16158944 :             (kind, segno, blknum)
    1905                 :         }
    1906 UBC           0 :         _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
    1907                 :     })
    1908 CBC    16158944 : }
    1909                 : 
    1910 UBC           0 : fn is_slru_block_key(key: Key) -> bool {
    1911               0 :     key.field1 == 0x01                // SLRU-related
    1912               0 :         && key.field3 == 0x00000001   // but not SlruDir
    1913               0 :         && key.field6 != 0xffffffff // and not SlruSegSize
    1914               0 : }
    1915                 : 
    1916                 : #[allow(clippy::bool_assert_comparison)]
    1917                 : #[cfg(test)]
    1918                 : mod tests {
    1919                 :     //use super::repo_harness::*;
    1920                 :     //use super::*;
    1921                 : 
    1922                 :     /*
    1923                 :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1924                 :             let incremental = timeline.get_current_logical_size();
    1925                 :             let non_incremental = timeline
    1926                 :                 .get_current_logical_size_non_incremental(lsn)
    1927                 :                 .unwrap();
    1928                 :             assert_eq!(incremental, non_incremental);
    1929                 :         }
    1930                 :     */
    1931                 : 
    1932                 :     /*
    1933                 :     ///
    1934                 :     /// Test list_rels() function, with branches and dropped relations
    1935                 :     ///
    1936                 :     #[test]
    1937                 :     fn test_list_rels_drop() -> Result<()> {
    1938                 :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1939                 :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1940                 :         const TESTDB: u32 = 111;
    1941                 : 
    1942                 :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1943                 :         // after branching fails below
    1944                 :         let mut writer = tline.begin_record(Lsn(0x10));
    1945                 :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1946                 :         writer.finish()?;
    1947                 : 
    1948                 :         // Create a relation on the timeline
    1949                 :         let mut writer = tline.begin_record(Lsn(0x20));
    1950                 :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1951                 :         writer.finish()?;
    1952                 : 
    1953                 :         let writer = tline.begin_record(Lsn(0x00));
    1954                 :         writer.finish()?;
    1955                 : 
    1956                 :         // Check that list_rels() lists it after LSN 2, but no before it
    1957                 :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1958                 :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1959                 :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1960                 : 
    1961                 :         // Create a branch, check that the relation is visible there
    1962                 :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1963                 :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1964                 :             Some(timeline) => timeline,
    1965                 :             None => panic!("Should have a local timeline"),
    1966                 :         };
    1967                 :         let newtline = DatadirTimelineImpl::new(newtline);
    1968                 :         assert!(newtline
    1969                 :             .list_rels(0, TESTDB, Lsn(0x30))?
    1970                 :             .contains(&TESTREL_A));
    1971                 : 
    1972                 :         // Drop it on the branch
    1973                 :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1974                 :         new_writer.drop_relation(TESTREL_A)?;
    1975                 :         new_writer.finish()?;
    1976                 : 
    1977                 :         // Check that it's no longer listed on the branch after the point where it was dropped
    1978                 :         assert!(newtline
    1979                 :             .list_rels(0, TESTDB, Lsn(0x30))?
    1980                 :             .contains(&TESTREL_A));
    1981                 :         assert!(!newtline
    1982                 :             .list_rels(0, TESTDB, Lsn(0x40))?
    1983                 :             .contains(&TESTREL_A));
    1984                 : 
    1985                 :         // Run checkpoint and garbage collection and check that it's still not visible
    1986                 :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1987                 :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1988                 : 
    1989                 :         assert!(!newtline
    1990                 :             .list_rels(0, TESTDB, Lsn(0x40))?
    1991                 :             .contains(&TESTREL_A));
    1992                 : 
    1993                 :         Ok(())
    1994                 :     }
    1995                 :      */
    1996                 : 
    1997                 :     /*
    1998                 :     #[test]
    1999                 :     fn test_read_beyond_eof() -> Result<()> {
    2000                 :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    2001                 :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    2002                 : 
    2003                 :         make_some_layers(&tline, Lsn(0x20))?;
    2004                 :         let mut writer = tline.begin_record(Lsn(0x60));
    2005                 :         walingest.put_rel_page_image(
    2006                 :             &mut writer,
    2007                 :             TESTREL_A,
    2008                 :             0,
    2009                 :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    2010                 :         )?;
    2011                 :         writer.finish()?;
    2012                 : 
    2013                 :         // Test read before rel creation. Should error out.
    2014                 :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2015                 : 
    2016                 :         // Read block beyond end of relation at different points in time.
    2017                 :         // These reads should fall into different delta, image, and in-memory layers.
    2018                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2019                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2020                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2021                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2022                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2023                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2024                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2025                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2026                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2027                 : 
    2028                 :         // Test on an in-memory layer with no preceding layer
    2029                 :         let mut writer = tline.begin_record(Lsn(0x70));
    2030                 :         walingest.put_rel_page_image(
    2031                 :             &mut writer,
    2032                 :             TESTREL_B,
    2033                 :             0,
    2034                 :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2035                 :         )?;
    2036                 :         writer.finish()?;
    2037                 : 
    2038                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2039                 : 
    2040                 :         Ok(())
    2041                 :     }
    2042                 :      */
    2043                 : }
        

Generated by: LCOV version 2.1-beta