LCOV - differential code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC CBC EUB ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 91.8 % 1113 1022 31 49 11 40 281 701 38 261
Current Date: 2023-10-19 02:04:12 Functions: 62.7 % 209 131 78 129 2 77 122
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! This provides an abstraction to store PostgreSQL relations and other files
       3                 : //! in the key-value store that implements the Repository interface.
       4                 : //!
       5                 : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6                 : //! walingest.rs handles a few things like implicit relation creation and extension.
       7                 : //! Clarify that)
       8                 : //!
       9                 : use super::tenant::{PageReconstructError, Timeline};
      10                 : use crate::context::RequestContext;
      11                 : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12                 : use crate::repository::*;
      13                 : use crate::walrecord::NeonWalRecord;
      14                 : use anyhow::Context;
      15                 : use bytes::{Buf, Bytes};
      16                 : use pageserver_api::reltag::{RelTag, SlruKind};
      17                 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      18                 : use postgres_ffi::BLCKSZ;
      19                 : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      20                 : use serde::{Deserialize, Serialize};
      21                 : use std::collections::{hash_map, HashMap, HashSet};
      22                 : use std::ops::ControlFlow;
      23                 : use std::ops::Range;
      24                 : use tokio_util::sync::CancellationToken;
      25                 : use tracing::{debug, trace, warn};
      26                 : use utils::{bin_ser::BeSer, lsn::Lsn};
      27                 : 
      28                 : /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
      29                 : pub type BlockNumber = u32;
      30 EUB             : 
      31 UIC           0 : #[derive(Debug)]
      32                 : pub enum LsnForTimestamp {
      33                 :     Present(Lsn),
      34                 :     Future(Lsn),
      35                 :     Past(Lsn),
      36                 :     NoData(Lsn),
      37                 : }
      38 ECB         (3) : 
      39 GIC           3 : #[derive(Debug, thiserror::Error)]
      40                 : pub enum CalculateLogicalSizeError {
      41                 :     #[error("cancelled")]
      42                 :     Cancelled,
      43                 :     #[error(transparent)]
      44                 :     Other(#[from] anyhow::Error),
      45                 : }
      46 EUB             : 
      47 UIC           0 : #[derive(Debug, thiserror::Error)]
      48                 : pub enum RelationError {
      49                 :     #[error("Relation Already Exists")]
      50                 :     AlreadyExists,
      51                 :     #[error("invalid relnode")]
      52                 :     InvalidRelnode,
      53                 :     #[error(transparent)]
      54                 :     Other(#[from] anyhow::Error),
      55                 : }
      56                 : 
      57                 : ///
      58                 : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
      59                 : /// and other special kinds of files, in a versioned key-value store. The
      60                 : /// Timeline struct provides the key-value store.
      61                 : ///
      62                 : /// This is a separate impl, so that we can easily include all these functions in a Timeline
      63                 : /// implementation, and might be moved into a separate struct later.
      64                 : impl Timeline {
      65                 :     /// Start ingesting a WAL record, or other atomic modification of
      66                 :     /// the timeline.
      67                 :     ///
      68                 :     /// This provides a transaction-like interface to perform a bunch
      69                 :     /// of modifications atomically.
      70                 :     ///
      71                 :     /// To ingest a WAL record, call begin_modification(lsn) to get a
      72                 :     /// DatadirModification object. Use the functions in the object to
      73                 :     /// modify the repository state, updating all the pages and metadata
      74                 :     /// that the WAL record affects. When you're done, call commit() to
      75                 :     /// commit the changes.
      76                 :     ///
      77                 :     /// Lsn stored in modification is advanced by `ingest_record` and
      78                 :     /// is used by `commit()` to update `last_record_lsn`.
      79                 :     ///
      80                 :     /// Calling commit() will flush all the changes and reset the state,
      81                 :     /// so the `DatadirModification` struct can be reused to perform the next modification.
      82                 :     ///
      83                 :     /// Note that any pending modifications you make through the
      84                 :     /// modification object won't be visible to calls to the 'get' and list
      85                 :     /// functions of the timeline until you finish! And if you update the
      86                 :     /// same page twice, the last update wins.
      87 ECB    (948596) :     ///
      88 CBC      910264 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
      89          910264 :     where
      90          910264 :         Self: Sized,
      91          910264 :     {
      92          910264 :         DatadirModification {
      93          910264 :             tline: self,
      94          910264 :             pending_updates: HashMap::new(),
      95          910264 :             pending_deletions: Vec::new(),
      96          910264 :             pending_nblocks: 0,
      97          910264 :             lsn,
      98          910264 :         }
      99 GIC      910264 :     }
     100                 : 
     101                 :     //------------------------------------------------------------------------------
     102                 :     // Public GET functions
     103                 :     //------------------------------------------------------------------------------
     104                 : 
     105 ECB   (3738587) :     /// Look up given page version.
     106 CBC     3765663 :     pub async fn get_rel_page_at_lsn(
     107         3765663 :         &self,
     108         3765663 :         tag: RelTag,
     109         3765663 :         blknum: BlockNumber,
     110         3765663 :         lsn: Lsn,
     111         3765663 :         latest: bool,
     112         3765663 :         ctx: &RequestContext,
     113         3765663 :     ) -> Result<Bytes, PageReconstructError> {
     114 GBC     3765662 :         if tag.relnode == 0 {
     115 UBC           0 :             return Err(PageReconstructError::Other(
     116               0 :                 RelationError::InvalidRelnode.into(),
     117 LBC   (3738587) :             ));
     118 GIC     3765662 :         }
     119 ECB   (3738587) : 
     120 CBC     3765662 :         let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
     121 GBC     3765662 :         if blknum >= nblocks {
     122 UBC           0 :             debug!(
     123               0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     124               0 :                 tag, blknum, lsn, nblocks
     125 LBC    (162580) :             );
     126 CBC      166513 :             return Ok(ZERO_PAGE.clone());
     127         3599149 :         }
     128         3599149 : 
     129         3599149 :         let key = rel_block_to_key(tag, blknum);
     130         5291916 :         self.get(key, lsn, ctx).await
     131 GIC     3765632 :     }
     132                 : 
     133 ECB         (8) :     // Get size of a database in blocks
     134 CBC           8 :     pub async fn get_db_size(
     135               8 :         &self,
     136               8 :         spcnode: Oid,
     137               8 :         dbnode: Oid,
     138               8 :         lsn: Lsn,
     139               8 :         latest: bool,
     140               8 :         ctx: &RequestContext,
     141               8 :     ) -> Result<usize, PageReconstructError> {
     142 GIC           8 :         let mut total_blocks = 0;
     143 ECB         (8) : 
     144 GIC           8 :         let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?;
     145 ECB      (2351) : 
     146 CBC        2351 :         for rel in rels {
     147            2343 :             let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?;
     148 GIC        2343 :             total_blocks += n_blocks as usize;
     149 ECB         (8) :         }
     150 CBC           8 :         Ok(total_blocks)
     151 GIC           8 :     }
     152                 : 
     153 ECB  (73842396) :     /// Get size of a relation file
     154 CBC    74248149 :     pub async fn get_rel_size(
     155        74248149 :         &self,
     156        74248149 :         tag: RelTag,
     157        74248149 :         lsn: Lsn,
     158        74248149 :         latest: bool,
     159        74248149 :         ctx: &RequestContext,
     160        74248149 :     ) -> Result<BlockNumber, PageReconstructError> {
     161 GBC    74248056 :         if tag.relnode == 0 {
     162 UBC           0 :             return Err(PageReconstructError::Other(
     163               0 :                 RelationError::InvalidRelnode.into(),
     164 LBC  (73842336) :             ));
     165 GIC    74248056 :         }
     166 ECB  (73842336) : 
     167 CBC    74248056 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
     168        73989791 :             return Ok(nblocks);
     169          258265 :         }
     170          258265 : 
     171          258265 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     172 GIC        7329 :             && !self.get_rel_exists(tag, lsn, latest, ctx).await?
     173                 :         {
     174                 :             // FIXME: Postgres sometimes calls smgrcreate() to create
     175                 :             // FSM, and smgrnblocks() on it immediately afterwards,
     176                 :             // without extending it.  Tolerate that by claiming that
     177 EUB             :             // any non-existent FSM fork has size 0.
     178 LBC    (272635) :             return Ok(0);
     179 CBC      258265 :         }
     180          258265 : 
     181          258265 :         let key = rel_size_to_key(tag);
     182          258265 :         let mut buf = self.get(key, lsn, ctx).await?;
     183          258263 :         let nblocks = buf.get_u32_le();
     184          258263 : 
     185          258263 :         if latest {
     186          139850 :             // Update relation size cache only if "latest" flag is set.
     187          139850 :             // This flag is set by compute when it is working with most recent version of relation.
     188          139850 :             // Typically master compute node always set latest=true.
     189          139850 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     190          139850 :             // latest=true, then it can not cause cache corruption, because with latest=true
     191          139850 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     192          139850 :             // associated with most recent value of LSN.
     193          139850 :             self.update_cached_rel_size(tag, lsn, nblocks);
     194          139850 :         }
     195          258263 :         Ok(nblocks)
     196 GIC    74248056 :     }
     197                 : 
     198 ECB  (70195304) :     /// Does relation exist?
     199 CBC    70574056 :     pub async fn get_rel_exists(
     200        70574056 :         &self,
     201        70574056 :         tag: RelTag,
     202        70574056 :         lsn: Lsn,
     203        70574056 :         _latest: bool,
     204        70574056 :         ctx: &RequestContext,
     205        70574056 :     ) -> Result<bool, PageReconstructError> {
     206 GBC    70573964 :         if tag.relnode == 0 {
     207 UBC           0 :             return Err(PageReconstructError::Other(
     208               0 :                 RelationError::InvalidRelnode.into(),
     209 LBC  (70195244) :             ));
     210 GIC    70573964 :         }
     211                 : 
     212 ECB  (70195244) :         // first try to lookup relation in cache
     213 CBC    70573964 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
     214        70470746 :             return Ok(true);
     215          103218 :         }
     216          103218 :         // fetch directory listing
     217          103218 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     218 GIC      103218 :         let buf = self.get(key, lsn, ctx).await?;
     219 ECB    (103123) : 
     220 CBC      103218 :         match RelDirectory::des(&buf).context("deserialization failure") {
     221          103218 :             Ok(dir) => {
     222          103218 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     223 GIC      103218 :                 Ok(exists)
     224 EUB             :             }
     225 UIC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     226 ECB  (70195244) :         }
     227 GIC    70573964 :     }
     228                 : 
     229 ECB      (6649) :     /// Get a list of all existing relations in given tablespace and database.
     230 CBC        6641 :     pub async fn list_rels(
     231            6641 :         &self,
     232            6641 :         spcnode: Oid,
     233            6641 :         dbnode: Oid,
     234            6641 :         lsn: Lsn,
     235            6641 :         ctx: &RequestContext,
     236            6641 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     237            6641 :         // fetch directory listing
     238            6641 :         let key = rel_dir_to_key(spcnode, dbnode);
     239 GIC        6641 :         let buf = self.get(key, lsn, ctx).await?;
     240 ECB      (6649) : 
     241 CBC        6641 :         match RelDirectory::des(&buf).context("deserialization failure") {
     242            6641 :             Ok(dir) => {
     243            6641 :                 let rels: HashSet<RelTag> =
     244         1561081 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     245         1561081 :                         spcnode,
     246         1561081 :                         dbnode,
     247         1561081 :                         relnode: *relnode,
     248         1561081 :                         forknum: *forknum,
     249         1561081 :                     }));
     250            6641 : 
     251 GIC        6641 :                 Ok(rels)
     252 EUB             :             }
     253 UIC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     254 ECB      (6649) :         }
     255 GIC        6641 :     }
     256                 : 
     257 ECB      (5726) :     /// Look up given SLRU page version.
     258 CBC        5927 :     pub async fn get_slru_page_at_lsn(
     259            5927 :         &self,
     260            5927 :         kind: SlruKind,
     261            5927 :         segno: u32,
     262            5927 :         blknum: BlockNumber,
     263            5927 :         lsn: Lsn,
     264            5927 :         ctx: &RequestContext,
     265            5927 :     ) -> Result<Bytes, PageReconstructError> {
     266            5927 :         let key = slru_block_to_key(kind, segno, blknum);
     267          365153 :         self.get(key, lsn, ctx).await
     268 GIC        5927 :     }
     269                 : 
     270 ECB      (5633) :     /// Get size of an SLRU segment
     271 CBC        5869 :     pub async fn get_slru_segment_size(
     272            5869 :         &self,
     273            5869 :         kind: SlruKind,
     274            5869 :         segno: u32,
     275            5869 :         lsn: Lsn,
     276            5869 :         ctx: &RequestContext,
     277            5869 :     ) -> Result<BlockNumber, PageReconstructError> {
     278            5869 :         let key = slru_segment_size_to_key(kind, segno);
     279            5869 :         let mut buf = self.get(key, lsn, ctx).await?;
     280            5869 :         Ok(buf.get_u32_le())
     281 GIC        5869 :     }
     282                 : 
     283 ECB       (667) :     /// Get size of an SLRU segment
     284 CBC         667 :     pub async fn get_slru_segment_exists(
     285             667 :         &self,
     286             667 :         kind: SlruKind,
     287             667 :         segno: u32,
     288             667 :         lsn: Lsn,
     289             667 :         ctx: &RequestContext,
     290             667 :     ) -> Result<bool, PageReconstructError> {
     291             667 :         // fetch directory listing
     292             667 :         let key = slru_dir_to_key(kind);
     293 GIC         667 :         let buf = self.get(key, lsn, ctx).await?;
     294 ECB       (667) : 
     295 CBC         667 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     296             667 :             Ok(dir) => {
     297             667 :                 let exists = dir.segments.get(&segno).is_some();
     298 GIC         667 :                 Ok(exists)
     299 EUB             :             }
     300 UIC           0 :             Err(e) => Err(PageReconstructError::from(e)),
     301 ECB       (667) :         }
     302 GIC         667 :     }
     303                 : 
     304                 :     /// Locate LSN, such that all transactions that committed before
     305                 :     /// 'search_timestamp' are visible, but nothing newer is.
     306                 :     ///
     307                 :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     308                 :     /// so it's not well defined which LSN you get if there were multiple commits
     309                 :     /// "in flight" at that point in time.
     310 ECB       (181) :     ///
     311 CBC         195 :     pub async fn find_lsn_for_timestamp(
     312             195 :         &self,
     313             195 :         search_timestamp: TimestampTz,
     314             195 :         ctx: &RequestContext,
     315             195 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     316             195 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     317             195 :         let min_lsn = *gc_cutoff_lsn_guard;
     318             195 :         let max_lsn = self.get_last_record_lsn();
     319             195 : 
     320             195 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     321             195 :         // LSN divided by 8.
     322             195 :         let mut low = min_lsn.0 / 8;
     323             195 :         let mut high = max_lsn.0 / 8 + 1;
     324             195 : 
     325             195 :         let mut found_smaller = false;
     326             195 :         let mut found_larger = false;
     327 GIC        3470 :         while low < high {
     328 ECB      (3058) :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     329 GIC        3275 :             let mid = (high + low) / 2;
     330 ECB      (3058) : 
     331 CBC        3275 :             let cmp = self
     332            3275 :                 .is_latest_commit_timestamp_ge_than(
     333            3275 :                     search_timestamp,
     334            3275 :                     Lsn(mid * 8),
     335            3275 :                     &mut found_smaller,
     336            3275 :                     &mut found_larger,
     337            3275 :                     ctx,
     338            3275 :                 )
     339 GIC      338184 :                 .await?;
     340 ECB      (3058) : 
     341 CBC        3275 :             if cmp {
     342             896 :                 high = mid;
     343            2379 :             } else {
     344            2379 :                 low = mid + 1;
     345 GIC        2379 :             }
     346 ECB       (181) :         }
     347 GIC         195 :         match (found_smaller, found_larger) {
     348                 :             (false, false) => {
     349                 :                 // This can happen if no commit records have been processed yet, e.g.
     350 ECB        (21) :                 // just after importing a cluster.
     351 GIC          23 :                 Ok(LsnForTimestamp::NoData(max_lsn))
     352                 :             }
     353                 :             (true, false) => {
     354 ECB        (73) :                 // Didn't find any commit timestamps larger than the request
     355 GIC          78 :                 Ok(LsnForTimestamp::Future(max_lsn))
     356                 :             }
     357                 :             (false, true) => {
     358 ECB        (12) :                 // Didn't find any commit timestamps smaller than the request
     359 GIC          13 :                 Ok(LsnForTimestamp::Past(max_lsn))
     360                 :             }
     361                 :             (true, true) => {
     362                 :                 // low is the LSN of the first commit record *after* the search_timestamp,
     363                 :                 // Back off by one to get to the point just before the commit.
     364                 :                 //
     365                 :                 // FIXME: it would be better to get the LSN of the previous commit.
     366                 :                 // Otherwise, if you restore to the returned LSN, the database will
     367                 :                 // include physical changes from later commits that will be marked
     368 ECB        (75) :                 // as aborted, and will need to be vacuumed away.
     369 GIC          81 :                 Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
     370                 :             }
     371 ECB       (181) :         }
     372 GIC         195 :     }
     373                 : 
     374                 :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     375                 :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     376                 :     ///
     377                 :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     378                 :     /// with a smaller/larger timestamp.
     379                 :     ///
     380 CBC        3275 :     pub async fn is_latest_commit_timestamp_ge_than(
     381            3275 :         &self,
     382            3275 :         search_timestamp: TimestampTz,
     383            3275 :         probe_lsn: Lsn,
     384            3275 :         found_smaller: &mut bool,
     385            3275 :         found_larger: &mut bool,
     386            3275 :         ctx: &RequestContext,
     387            3275 :     ) -> Result<bool, PageReconstructError> {
     388            3275 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     389            3084 :             if timestamp >= search_timestamp {
     390             896 :                 *found_larger = true;
     391 GIC         896 :                 return ControlFlow::Break(true);
     392 CBC        2188 :             } else {
     393            2188 :                 *found_smaller = true;
     394            2188 :             }
     395            2188 :             ControlFlow::Continue(())
     396            3275 :         })
     397          338184 :         .await
     398            3275 :     }
     399                 : 
     400 ECB      (3154) :     /// Obtain the possible timestamp range for the given lsn.
     401          (2923) :     ///
     402          (2923) :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     403 CBC          12 :     pub async fn get_timestamp_for_lsn(
     404              12 :         &self,
     405              12 :         probe_lsn: Lsn,
     406              12 :         ctx: &RequestContext,
     407              12 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     408              12 :         let mut max: Option<TimestampTz> = None;
     409              12 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     410              10 :             if let Some(max_prev) = max {
     411 LBC       (231) :                 max = Some(max_prev.max(timestamp));
     412 GIC          10 :             } else {
     413              10 :                 max = Some(timestamp);
     414 CBC          10 :             }
     415              10 :             ControlFlow::Continue(())
     416 GIC          12 :         })
     417              80 :         .await?;
     418 ECB      (4968) : 
     419 CBC          10 :         Ok(max)
     420              12 :     }
     421 ECB      (4968) : 
     422          (4968) :     /// Runs the given function on all the timestamps for a given lsn
     423          (4968) :     ///
     424          (4968) :     /// The return value is either given by the closure, or set to the `Default`
     425          (4968) :     /// impl's output.
     426 GIC        3287 :     async fn map_all_timestamps<T: Default>(
     427 CBC        3287 :         &self,
     428            3287 :         probe_lsn: Lsn,
     429            3287 :         ctx: &RequestContext,
     430 GBC        3287 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     431 GIC        3287 :     ) -> Result<T, PageReconstructError> {
     432 CBC        3287 :         for segno in self
     433 GIC        3287 :             .list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
     434 CBC        2583 :             .await?
     435 ECB      (2569) :         {
     436 CBC        3285 :             let nblocks = self
     437            3285 :                 .get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx)
     438            2137 :                 .await?;
     439            3346 :             for blknum in (0..nblocks).rev() {
     440            3346 :                 let clog_page = self
     441            3346 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     442 GIC      333544 :                     .await?;
     443 ECB      (2569) : 
     444 CBC        3346 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     445            3094 :                     let mut timestamp_bytes = [0u8; 8];
     446 GIC        3094 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     447 CBC        3094 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     448            3094 : 
     449            3094 :                     match f(timestamp) {
     450             896 :                         ControlFlow::Break(b) => return Ok(b),
     451            2198 :                         ControlFlow::Continue(()) => (),
     452                 :                     }
     453             252 :                 }
     454                 :             }
     455 ECB       (636) :         }
     456 CBC        2389 :         Ok(Default::default())
     457 GBC        3287 :     }
     458                 : 
     459 ECB       (636) :     /// Get a list of SLRU segments
     460 GIC        5206 :     pub async fn list_slru_segments(
     461 CBC        5206 :         &self,
     462            5206 :         kind: SlruKind,
     463            5206 :         lsn: Lsn,
     464            5206 :         ctx: &RequestContext,
     465            5206 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     466            5206 :         // fetch directory entry
     467            5206 :         let key = slru_dir_to_key(kind);
     468 ECB         (2) : 
     469 CBC        5206 :         let buf = self.get(key, lsn, ctx).await?;
     470            5203 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     471 GIC        5203 :             Ok(dir) => Ok(dir.segments),
     472 LBC       (636) :             Err(e) => Err(PageReconstructError::from(e)),
     473 ECB       (636) :         }
     474 CBC        5206 :     }
     475 ECB       (636) : 
     476 CBC        2579 :     pub async fn get_relmap_file(
     477 GIC        2579 :         &self,
     478 CBC        2579 :         spcnode: Oid,
     479 GIC        2579 :         dbnode: Oid,
     480 CBC        2579 :         lsn: Lsn,
     481            2579 :         ctx: &RequestContext,
     482 GBC        2579 :     ) -> Result<Bytes, PageReconstructError> {
     483 GIC        2579 :         let key = relmap_file_key(spcnode, dbnode);
     484 ECB       (636) : 
     485 GIC        2579 :         let buf = self.get(key, lsn, ctx).await?;
     486 CBC        2579 :         Ok(buf)
     487            2579 :     }
     488 ECB       (635) : 
     489 CBC         639 :     pub async fn list_dbdirs(
     490             639 :         &self,
     491             639 :         lsn: Lsn,
     492             639 :         ctx: &RequestContext,
     493 GIC         639 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     494 ECB      (1881) :         // fetch directory entry
     495 CBC         639 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     496 ECB      (1881) : 
     497 CBC         639 :         match DbDirectory::des(&buf).context("deserialization failure") {
     498             639 :             Ok(dir) => Ok(dir.dbdirs),
     499 LBC      (1881) :             Err(e) => Err(PageReconstructError::from(e)),
     500 ECB      (1881) :         }
     501 GIC         639 :     }
     502 ECB      (2557) : 
     503 CBC           2 :     pub async fn get_twophase_file(
     504               2 :         &self,
     505               2 :         xid: TransactionId,
     506               2 :         lsn: Lsn,
     507               2 :         ctx: &RequestContext,
     508               2 :     ) -> Result<Bytes, PageReconstructError> {
     509               2 :         let key = twophase_file_key(xid);
     510 GBC           2 :         let buf = self.get(key, lsn, ctx).await?;
     511 GIC           2 :         Ok(buf)
     512 CBC           2 :     }
     513 ECB         (4) : 
     514 CBC         639 :     pub async fn list_twophase_files(
     515 GIC         639 :         &self,
     516             639 :         lsn: Lsn,
     517 CBC         639 :         ctx: &RequestContext,
     518 GIC         639 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     519                 :         // fetch directory entry
     520             639 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     521                 : 
     522             639 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     523             639 :             Ok(dir) => Ok(dir.xids),
     524 LBC       (428) :             Err(e) => Err(PageReconstructError::from(e)),
     525 ECB       (428) :         }
     526 CBC         639 :     }
     527 ECB       (428) : 
     528 CBC         638 :     pub async fn get_control_file(
     529             638 :         &self,
     530             638 :         lsn: Lsn,
     531 GIC         638 :         ctx: &RequestContext,
     532             638 :     ) -> Result<Bytes, PageReconstructError> {
     533 CBC         638 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     534             638 :     }
     535                 : 
     536            1895 :     pub async fn get_checkpoint(
     537            1895 :         &self,
     538            1895 :         lsn: Lsn,
     539            1895 :         ctx: &RequestContext,
     540            1895 :     ) -> Result<Bytes, PageReconstructError> {
     541            1895 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     542 GIC        1895 :     }
     543 ECB    (395532) : 
     544 CBC        2567 :     pub async fn list_aux_files(
     545            2567 :         &self,
     546            2567 :         lsn: Lsn,
     547            2567 :         ctx: &RequestContext,
     548            2567 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     549            2567 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     550            2563 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     551            2563 :                 Ok(dir) => Ok(dir.files),
     552 LBC    (395530) :                 Err(e) => Err(PageReconstructError::from(e)),
     553 ECB    (395530) :             },
     554 GIC           4 :             Err(e) => {
     555               4 :                 warn!("Failed to get info about AUX files: {}", e);
     556 CBC           4 :                 Ok(HashMap::new())
     557 ECB       (426) :             }
     558                 :         }
     559 GIC        2567 :     }
     560                 : 
     561                 :     /// Does the same as get_current_logical_size but counted on demand.
     562                 :     /// Used to initialize the logical size tracking on startup.
     563 ECB       (677) :     ///
     564           (677) :     /// Only relation blocks are counted currently. That excludes metadata,
     565           (677) :     /// SLRUs, twophase files etc.
     566 CBC         431 :     pub async fn get_current_logical_size_non_incremental(
     567             431 :         &self,
     568             431 :         lsn: Lsn,
     569             431 :         cancel: CancellationToken,
     570             431 :         ctx: &RequestContext,
     571             431 :     ) -> Result<u64, CalculateLogicalSizeError> {
     572             431 :         crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
     573                 : 
     574                 :         // Fetch list of database dirs and iterate them
     575             431 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
     576             425 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     577                 : 
     578             425 :         let mut total_size: u64 = 0;
     579            1709 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     580          399146 :             for rel in self
     581            1709 :                 .list_rels(*spcnode, *dbnode, lsn, ctx)
     582             949 :                 .await
     583 GIC        1709 :                 .context("list rels")?
     584 ECB      (2376) :             {
     585 CBC      399146 :                 if cancel.is_cancelled() {
     586 LBC      (1360) :                     return Err(CalculateLogicalSizeError::Cancelled);
     587 CBC      399146 :                 }
     588          399146 :                 let relsize_key = rel_size_to_key(rel);
     589          399146 :                 let mut buf = self
     590          399146 :                     .get(relsize_key, lsn, ctx)
     591          197707 :                     .await
     592          399146 :                     .with_context(|| format!("read relation size of {rel:?}"))?;
     593          399146 :                 let relsize = buf.get_u32_le();
     594          399146 : 
     595          399146 :                 total_size += relsize as u64;
     596 ECB    (560968) :             }
     597                 :         }
     598 GIC         425 :         Ok(total_size * BLCKSZ as u64)
     599             429 :     }
     600                 : 
     601 ECB      (2031) :     ///
     602           (677) :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     603           (677) :     /// Anything that's not listed maybe removed from the underlying storage (from
     604           (677) :     /// that LSN forwards).
     605 GIC         669 :     pub async fn collect_keyspace(
     606 CBC         669 :         &self,
     607             669 :         lsn: Lsn,
     608             669 :         ctx: &RequestContext,
     609             669 :     ) -> anyhow::Result<KeySpace> {
     610             669 :         // Iterate through key ranges, greedily packing them into partitions
     611             669 :         let mut result = KeySpaceAccum::new();
     612             669 : 
     613             669 :         // The dbdir metadata always exists
     614             669 :         result.add_key(DBDIR_KEY);
     615 ECB      (1781) : 
     616          (1781) :         // Fetch list of database dirs and iterate them
     617 CBC         669 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     618             668 :         let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
     619 ECB      (1781) : 
     620 CBC         668 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     621 GIC         668 :         dbs.sort_unstable();
     622            3011 :         for (spcnode, dbnode) in dbs {
     623            2343 :             result.add_key(relmap_file_key(spcnode, dbnode));
     624            2343 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     625 ECB       (677) : 
     626 CBC        2343 :             let mut rels: Vec<RelTag> = self
     627            2343 :                 .list_rels(spcnode, dbnode, lsn, ctx)
     628            1121 :                 .await?
     629            2343 :                 .into_iter()
     630            2343 :                 .collect();
     631 GBC        2343 :             rels.sort_unstable();
     632          556534 :             for rel in rels {
     633 GIC      554191 :                 let relsize_key = rel_size_to_key(rel);
     634 CBC      554191 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     635          554191 :                 let relsize = buf.get_u32_le();
     636          554191 : 
     637          554191 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     638          554191 :                 result.add_key(relsize_key);
     639 ECB       (677) :             }
     640                 :         }
     641                 : 
     642     (144037700) :         // Iterate SLRUs next
     643 CBC        2004 :         for kind in [
     644             668 :             SlruKind::Clog,
     645             668 :             SlruKind::MultiXactMembers,
     646             668 :             SlruKind::MultiXactOffsets,
     647 ECB    (137872) :         ] {
     648 CBC        2004 :             let slrudir_key = slru_dir_to_key(kind);
     649            2004 :             result.add_key(slrudir_key);
     650            2004 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     651 GIC        2004 :             let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
     652            2004 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     653 CBC        2004 :             segments.sort_unstable();
     654            3756 :             for segno in segments {
     655            1752 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     656            1752 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     657            1752 :                 let segsize = buf.get_u32_le();
     658            1752 : 
     659            1752 :                 result.add_range(
     660            1752 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     661 GIC        1752 :                 );
     662 CBC        1752 :                 result.add_key(segsize_key);
     663 ECB     (20036) :             }
     664         (20036) :         }
     665                 : 
     666        (153446) :         // Then pg_twophase
     667 GIC         668 :         result.add_key(TWOPHASEDIR_KEY);
     668             668 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     669 CBC         668 :         let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
     670             668 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     671             668 :         xids.sort_unstable();
     672             668 :         for xid in xids {
     673 UIC           0 :             result.add_key(twophase_file_key(xid));
     674               0 :         }
     675 ECB     (17672) : 
     676 CBC         668 :         result.add_key(CONTROLFILE_KEY);
     677             668 :         result.add_key(CHECKPOINT_KEY);
     678             668 :         result.add_key(AUX_FILES_KEY);
     679 GIC         668 : 
     680             668 :         Ok(result.to_keyspace())
     681             668 :     }
     682                 : 
     683                 :     /// Get cached size of relation if it not updated after specified LSN
     684       144822205 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     685       144822205 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     686       144822205 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     687       144584882 :             if lsn >= *cached_lsn {
     688       144460722 :                 return Some(*nblocks);
     689          124160 :             }
     690          237323 :         }
     691          361483 :         None
     692       144822205 :     }
     693                 : 
     694                 :     /// Update cached relation size if there is no more recent update
     695          139850 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     696          139850 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     697          139850 :         match rel_size_cache.entry(tag) {
     698          119708 :             hash_map::Entry::Occupied(mut entry) => {
     699          119708 :                 let cached_lsn = entry.get_mut();
     700          119708 :                 if lsn >= cached_lsn.0 {
     701               3 :                     *cached_lsn = (lsn, nblocks);
     702          119705 :                 }
     703                 :             }
     704           20142 :             hash_map::Entry::Vacant(entry) => {
     705           20142 :                 entry.insert((lsn, nblocks));
     706           20142 :             }
     707 ECB       (613) :         }
     708 CBC      139850 :     }
     709 ECB       (613) : 
     710           (613) :     /// Store cached relation size
     711 CBC     1598602 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     712 GIC     1598602 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     713         1598602 :         rel_size_cache.insert(tag, (lsn, nblocks));
     714 CBC     1598602 :     }
     715 ECB       (613) : 
     716           (613) :     /// Remove cached relation size
     717 CBC       17673 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     718 GIC       17673 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     719 CBC       17673 :         rel_size_cache.remove(tag);
     720           17673 :     }
     721 ECB       (613) : }
     722           (613) : 
     723                 : /// DatadirModification represents an operation to ingest an atomic set of
     724           (613) : /// updates to the repository. It is created by the 'begin_record'
     725           (613) : /// function. It is called for each WAL record, so that all the modifications
     726           (613) : /// by a one WAL record appear atomic.
     727           (613) : pub struct DatadirModification<'a> {
     728           (613) :     /// The timeline this modification applies to. You can access this to
     729           (613) :     /// read the state, but note that any pending updates are *not* reflected
     730           (613) :     /// in the state in 'tline' yet.
     731           (613) :     pub tline: &'a Timeline,
     732           (613) : 
     733           (613) :     /// Lsn assigned by begin_modification
     734           (613) :     pub lsn: Lsn,
     735                 : 
     736                 :     // The modifications are not applied directly to the underlying key-value store.
     737                 :     // The put-functions add the modifications here, and they are flushed to the
     738            (36) :     // underlying key-value store by the 'finish' function.
     739            (36) :     pending_updates: HashMap<Key, Value>,
     740            (36) :     pending_deletions: Vec<Range<Key>>,
     741            (36) :     pending_nblocks: i64,
     742            (36) : }
     743            (36) : 
     744            (36) : impl<'a> DatadirModification<'a> {
     745            (36) :     /// Initialize a completely new repository.
     746            (36) :     ///
     747            (36) :     /// This inserts the directory metadata entries that are assumed to
     748            (36) :     /// always exist.
     749 GIC         615 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     750             615 :         let buf = DbDirectory::ser(&DbDirectory {
     751             615 :             dbdirs: HashMap::new(),
     752             615 :         })?;
     753             615 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     754                 : 
     755 ECB  (69675515) :         // Create AuxFilesDirectory
     756 CBC         615 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
     757             615 :             files: HashMap::new(),
     758             615 :         })?;
     759             615 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
     760 ECB  (69675515) : 
     761 CBC         615 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     762             615 :             xids: HashSet::new(),
     763             615 :         })?;
     764             615 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     765                 : 
     766 GIC         615 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     767 CBC         615 :         let empty_dir = Value::Image(buf);
     768             615 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     769             615 :         self.put(
     770             615 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     771             615 :             empty_dir.clone(),
     772             615 :         );
     773             615 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     774             615 : 
     775             615 :         Ok(())
     776             615 :     }
     777 ECB   (1991227) : 
     778       (1991227) :     #[cfg(test)]
     779       (1991227) :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     780 GIC          36 :         self.init_empty()?;
     781              36 :         self.put_control_file(bytes::Bytes::from_static(
     782 CBC          36 :             b"control_file contents do not matter",
     783              36 :         ))
     784              36 :         .context("put_control_file")?;
     785              36 :         self.put_checkpoint(bytes::Bytes::from_static(
     786              36 :             b"checkpoint_file contents do not matter",
     787              36 :         ))
     788              36 :         .context("put_checkpoint_file")?;
     789              36 :         Ok(())
     790              36 :     }
     791 ECB   (2302125) : 
     792                 :     /// Put a new page version that can be constructed from a WAL record
     793          (2395) :     ///
     794          (2395) :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     795          (2395) :     /// current end-of-file. It's up to the caller to check that the relation size
     796          (2395) :     /// matches the blocks inserted!
     797 CBC    70051708 :     pub fn put_rel_wal_record(
     798        70051708 :         &mut self,
     799        70051708 :         rel: RelTag,
     800        70051708 :         blknum: BlockNumber,
     801        70051708 :         rec: NeonWalRecord,
     802        70051708 :     ) -> anyhow::Result<()> {
     803 GIC    70051708 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     804        70051708 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     805 CBC    70051708 :         Ok(())
     806        70051708 :     }
     807 ECB      (2375) : 
     808          (2375) :     // Same, but for an SLRU.
     809 CBC     1998464 :     pub fn put_slru_wal_record(
     810         1998464 :         &mut self,
     811         1998464 :         kind: SlruKind,
     812 GIC     1998464 :         segno: u32,
     813 CBC     1998464 :         blknum: BlockNumber,
     814         1998464 :         rec: NeonWalRecord,
     815 GIC     1998464 :     ) -> anyhow::Result<()> {
     816 CBC     1998464 :         self.put(
     817         1998464 :             slru_block_to_key(kind, segno, blknum),
     818 GIC     1998464 :             Value::WalRecord(rec),
     819         1998464 :         );
     820         1998464 :         Ok(())
     821 CBC     1998464 :     }
     822 ECB      (2321) : 
     823                 :     /// Like put_wal_record, but with ready-made image of the page.
     824 GIC     2312475 :     pub fn put_rel_page_image(
     825 CBC     2312475 :         &mut self,
     826         2312475 :         rel: RelTag,
     827         2312475 :         blknum: BlockNumber,
     828         2312475 :         img: Bytes,
     829         2312475 :     ) -> anyhow::Result<()> {
     830         2312475 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     831         2312475 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     832         2312475 :         Ok(())
     833         2312475 :     }
     834 ECB        (21) : 
     835 CBC        2401 :     pub fn put_slru_page_image(
     836            2401 :         &mut self,
     837            2401 :         kind: SlruKind,
     838            2401 :         segno: u32,
     839            2401 :         blknum: BlockNumber,
     840 GIC        2401 :         img: Bytes,
     841 CBC        2401 :     ) -> anyhow::Result<()> {
     842            2401 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
     843            2401 :         Ok(())
     844 GIC        2401 :     }
     845 ECB         (4) : 
     846             (4) :     /// Store a relmapper file (pg_filenode.map) in the repository
     847 CBC        2383 :     pub async fn put_relmap_file(
     848            2383 :         &mut self,
     849            2383 :         spcnode: Oid,
     850            2383 :         dbnode: Oid,
     851 GIC        2383 :         img: Bytes,
     852 CBC        2383 :         ctx: &RequestContext,
     853            2383 :     ) -> anyhow::Result<()> {
     854 ECB         (4) :         // Add it to the directory (if it doesn't exist already)
     855 GBC        2383 :         let buf = self.get(DBDIR_KEY, ctx).await?;
     856 CBC        2383 :         let mut dbdir = DbDirectory::des(&buf)?;
     857 ECB         (4) : 
     858 CBC        2383 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
     859            2383 :         if r.is_none() || r == Some(false) {
     860                 :             // The dbdir entry didn't exist, or it contained a
     861                 :             // 'false'. The 'insert' call already updated it with
     862 ECB         (4) :             // 'true', now write the updated 'dbdirs' map back.
     863 CBC        2329 :             let buf = DbDirectory::ser(&dbdir)?;
     864            2329 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
     865                 : 
     866 ECB       (611) :             // Create AuxFilesDirectory as well
     867 CBC        2329 :             let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
     868            2329 :                 files: HashMap::new(),
     869            2329 :             })?;
     870 GIC        2329 :             self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
     871 CBC          54 :         }
     872            2383 :         if r.is_none() {
     873              21 :             // Create RelDirectory
     874              21 :             let buf = RelDirectory::ser(&RelDirectory {
     875 GIC          21 :                 rels: HashSet::new(),
     876 CBC          21 :             })?;
     877              21 :             self.put(
     878              21 :                 rel_dir_to_key(spcnode, dbnode),
     879              21 :                 Value::Image(Bytes::from(buf)),
     880              21 :             );
     881            2362 :         }
     882 ECB         (3) : 
     883 GIC        2383 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
     884 CBC        2383 :         Ok(())
     885            2383 :     }
     886 ECB         (3) : 
     887 GBC           4 :     pub async fn put_twophase_file(
     888 GIC           4 :         &mut self,
     889               4 :         xid: TransactionId,
     890 CBC           4 :         img: Bytes,
     891               4 :         ctx: &RequestContext,
     892               4 :     ) -> anyhow::Result<()> {
     893 ECB         (3) :         // Add it to the directory entry
     894 CBC           4 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
     895 GIC           4 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
     896 GBC           4 :         if !dir.xids.insert(xid) {
     897 UBC           0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
     898 GBC           4 :         }
     899               4 :         self.put(
     900 GIC           4 :             TWOPHASEDIR_KEY,
     901               4 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
     902                 :         );
     903 ECB         (3) : 
     904 CBC           4 :         self.put(twophase_file_key(xid), Value::Image(img));
     905               4 :         Ok(())
     906               4 :     }
     907 ECB         (3) : 
     908 CBC         613 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
     909 GIC         613 :         self.put(CONTROLFILE_KEY, Value::Image(img));
     910             613 :         Ok(())
     911             613 :     }
     912                 : 
     913 CBC       28659 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
     914           28659 :         self.put(CHECKPOINT_KEY, Value::Image(img));
     915           28659 :         Ok(())
     916           28659 :     }
     917 ECB    (563733) : 
     918 CBC           3 :     pub async fn drop_dbdir(
     919               3 :         &mut self,
     920 GBC           3 :         spcnode: Oid,
     921 CBC           3 :         dbnode: Oid,
     922 GIC           3 :         ctx: &RequestContext,
     923               3 :     ) -> anyhow::Result<()> {
     924 CBC           3 :         let req_lsn = self.tline.get_last_record_lsn();
     925 ECB    (563733) : 
     926 CBC           3 :         let total_blocks = self
     927               3 :             .tline
     928 GIC           3 :             .get_db_size(spcnode, dbnode, req_lsn, true, ctx)
     929 LBC      (2301) :             .await?;
     930 ECB      (2301) : 
     931          (2301) :         // Remove entry from dbdir
     932 CBC           3 :         let buf = self.get(DBDIR_KEY, ctx).await?;
     933               3 :         let mut dir = DbDirectory::des(&buf)?;
     934               3 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
     935 GIC           3 :             let buf = DbDirectory::ser(&dir)?;
     936               3 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
     937 ECB    (561432) :         } else {
     938 LBC    (561432) :             warn!(
     939 UIC           0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
     940               0 :                 spcnode, dbnode
     941               0 :             );
     942 ECB    (563733) :         }
     943 EUB             : 
     944 ECB    (563733) :         // Update logical database size.
     945 CBC           3 :         self.pending_nblocks -= total_blocks as i64;
     946               3 : 
     947               3 :         // Delete all relations and metadata files for the spcnode/dnode
     948               3 :         self.delete(dbdir_key_range(spcnode, dbnode));
     949 GIC           3 :         Ok(())
     950               3 :     }
     951                 : 
     952                 :     /// Create a relation fork.
     953 ECB    (563733) :     ///
     954        (563733) :     /// 'nblocks' is the initial size.
     955 CBC      565601 :     pub async fn put_rel_creation(
     956          565601 :         &mut self,
     957          565601 :         rel: RelTag,
     958          565601 :         nblocks: BlockNumber,
     959          565601 :         ctx: &RequestContext,
     960          565601 :     ) -> Result<(), RelationError> {
     961          565601 :         if rel.relnode == 0 {
     962 LBC    (563733) :             return Err(RelationError::InvalidRelnode);
     963 CBC      565601 :         }
     964 ECB    (563733) :         // It's possible that this is the first rel for this db in this
     965        (563733) :         // tablespace.  Create the reldir entry for it if so.
     966 GIC      565601 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
     967          565601 :             .context("deserialize db")?;
     968 CBC      565601 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
     969          565601 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
     970 ECB      (3097) :             // Didn't exist. Update dbdir
     971 CBC        2309 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
     972            2309 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
     973            2309 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
     974            2309 : 
     975            2309 :             // and create the RelDirectory
     976            2309 :             RelDirectory::default()
     977 ECB      (3097) :         } else {
     978                 :             // reldir already exists, fetch it
     979 CBC      563292 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
     980          563292 :                 .context("deserialize db")?
     981 ECB      (3097) :         };
     982          (3097) : 
     983          (3097) :         // Add the new relation to the rel directory entry, and write it back
     984 CBC      565601 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
     985 LBC      (3097) :             return Err(RelationError::AlreadyExists);
     986 CBC      565601 :         }
     987          565601 :         self.put(
     988          565601 :             rel_dir_key,
     989          565601 :             Value::Image(Bytes::from(
     990          565601 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
     991 ECB      (3097) :             )),
     992          (3097) :         );
     993 EUB             : 
     994 ECB      (3097) :         // Put size
     995 CBC      565601 :         let size_key = rel_size_to_key(rel);
     996 GIC      565601 :         let buf = nblocks.to_le_bytes();
     997          565601 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
     998          565601 : 
     999 CBC      565601 :         self.pending_nblocks += nblocks as i64;
    1000          565601 : 
    1001          565601 :         // Update relation size cache
    1002          565601 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1003          565601 : 
    1004          565601 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1005          565601 :         // caller.
    1006 GIC      565601 :         Ok(())
    1007          565601 :     }
    1008 ECB   (1564545) : 
    1009       (1564545) :     /// Truncate relation
    1010 CBC        3094 :     pub async fn put_rel_truncation(
    1011            3094 :         &mut self,
    1012            3094 :         rel: RelTag,
    1013            3094 :         nblocks: BlockNumber,
    1014            3094 :         ctx: &RequestContext,
    1015            3094 :     ) -> anyhow::Result<()> {
    1016            3094 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1017            3094 :         let last_lsn = self.tline.get_last_record_lsn();
    1018            3094 :         if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
    1019            3094 :             let size_key = rel_size_to_key(rel);
    1020 ECB   (1028277) :             // Fetch the old size first
    1021 CBC        3094 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1022            3094 : 
    1023 GIC        3094 :             // Update the entry with the new size.
    1024            3094 :             let buf = nblocks.to_le_bytes();
    1025 CBC        3094 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1026            3094 : 
    1027 GIC        3094 :             // Update relation size cache
    1028            3094 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1029 CBC        3094 : 
    1030            3094 :             // Update relation size cache
    1031            3094 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1032 GIC        3094 : 
    1033 CBC        3094 :             // Update logical database size.
    1034            3094 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1035 UIC           0 :         }
    1036 GBC        3094 :         Ok(())
    1037 GIC        3094 :     }
    1038                 : 
    1039                 :     /// Extend relation
    1040 ECB     (17672) :     /// If new size is smaller, do nothing.
    1041 CBC     1564943 :     pub async fn put_rel_extend(
    1042         1564943 :         &mut self,
    1043         1564943 :         rel: RelTag,
    1044         1564943 :         nblocks: BlockNumber,
    1045         1564943 :         ctx: &RequestContext,
    1046         1564943 :     ) -> anyhow::Result<()> {
    1047         1564943 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1048 ECB     (17672) : 
    1049         (17672) :         // Put size
    1050 CBC     1564943 :         let size_key = rel_size_to_key(rel);
    1051         1564943 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1052 GIC     1564943 : 
    1053 CBC     1564943 :         // only extend relation here. never decrease the size
    1054         1564943 :         if nblocks > old_size {
    1055         1026813 :             let buf = nblocks.to_le_bytes();
    1056         1026813 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1057         1026813 : 
    1058         1026813 :             // Update relation size cache
    1059         1026813 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1060         1026813 : 
    1061         1026813 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1062         1026813 :         }
    1063         1564943 :         Ok(())
    1064 GIC     1564943 :     }
    1065 ECB      (1746) : 
    1066 EUB             :     /// Drop a relation.
    1067 CBC       17673 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1068           17673 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1069 ECB      (1746) : 
    1070          (1746) :         // Remove it from the directory entry
    1071 GIC       17673 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1072           17673 :         let buf = self.get(dir_key, ctx).await?;
    1073           17673 :         let mut dir = RelDirectory::des(&buf)?;
    1074 ECB      (1746) : 
    1075 CBC       17673 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1076           17673 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1077 ECB      (1746) :         } else {
    1078 LBC      (1746) :             warn!("dropped rel {} did not exist in rel directory", rel);
    1079 ECB      (1746) :         }
    1080          (1746) : 
    1081          (1746) :         // update logical size
    1082 GIC       17673 :         let size_key = rel_size_to_key(rel);
    1083           17673 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1084 CBC       17673 :         self.pending_nblocks -= old_size as i64;
    1085           17673 : 
    1086           17673 :         // Remove enty from relation size cache
    1087           17673 :         self.tline.remove_cached_rel_size(&rel);
    1088           17673 : 
    1089           17673 :         // Delete size entry, as well as all blocks
    1090           17673 :         self.delete(rel_key_range(rel));
    1091           17673 : 
    1092           17673 :         Ok(())
    1093           17673 :     }
    1094 ECB       (661) : 
    1095 CBC        1752 :     pub async fn put_slru_segment_creation(
    1096 GIC        1752 :         &mut self,
    1097            1752 :         kind: SlruKind,
    1098 CBC        1752 :         segno: u32,
    1099            1752 :         nblocks: BlockNumber,
    1100            1752 :         ctx: &RequestContext,
    1101            1752 :     ) -> anyhow::Result<()> {
    1102            1752 :         // Add it to the directory entry
    1103            1752 :         let dir_key = slru_dir_to_key(kind);
    1104            1752 :         let buf = self.get(dir_key, ctx).await?;
    1105            1752 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1106 ECB         (9) : 
    1107 CBC        1752 :         if !dir.segments.insert(segno) {
    1108 UIC           0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1109 CBC        1752 :         }
    1110 GBC        1752 :         self.put(
    1111 CBC        1752 :             dir_key,
    1112            1752 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1113 ECB         (9) :         );
    1114             (9) : 
    1115                 :         // Put size
    1116 GIC        1752 :         let size_key = slru_segment_size_to_key(kind, segno);
    1117            1752 :         let buf = nblocks.to_le_bytes();
    1118 CBC        1752 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1119            1752 : 
    1120            1752 :         // even if nblocks > 0, we don't insert any actual blocks here
    1121            1752 : 
    1122 GIC        1752 :         Ok(())
    1123            1752 :     }
    1124 EUB             : 
    1125                 :     /// Extend SLRU segment
    1126 GBC         661 :     pub fn put_slru_extend(
    1127             661 :         &mut self,
    1128 GIC         661 :         kind: SlruKind,
    1129             661 :         segno: u32,
    1130 CBC         661 :         nblocks: BlockNumber,
    1131             661 :     ) -> anyhow::Result<()> {
    1132             661 :         // Put size
    1133             661 :         let size_key = slru_segment_size_to_key(kind, segno);
    1134             661 :         let buf = nblocks.to_le_bytes();
    1135 GIC         661 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1136 CBC         661 :         Ok(())
    1137             661 :     }
    1138                 : 
    1139 ECB         (2) :     /// This method is used for marking truncated SLRU files
    1140 GBC           9 :     pub async fn drop_slru_segment(
    1141 CBC           9 :         &mut self,
    1142               9 :         kind: SlruKind,
    1143               9 :         segno: u32,
    1144               9 :         ctx: &RequestContext,
    1145 GIC           9 :     ) -> anyhow::Result<()> {
    1146               9 :         // Remove it from the directory entry
    1147               9 :         let dir_key = slru_dir_to_key(kind);
    1148 CBC           9 :         let buf = self.get(dir_key, ctx).await?;
    1149               9 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1150 ECB         (2) : 
    1151 CBC           9 :         if !dir.segments.remove(&segno) {
    1152 UIC           0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1153 CBC           9 :         }
    1154               9 :         self.put(
    1155               9 :             dir_key,
    1156               9 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1157 ECB        (94) :         );
    1158            (94) : 
    1159            (94) :         // Delete size entry, as well as all blocks
    1160 CBC           9 :         self.delete(slru_segment_key_range(kind, segno));
    1161 GBC           9 : 
    1162               9 :         Ok(())
    1163               9 :     }
    1164 EUB             : 
    1165                 :     /// Drop a relmapper file (pg_filenode.map)
    1166 UIC           0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1167               0 :         // TODO
    1168 LBC        (94) :         Ok(())
    1169            (94) :     }
    1170 ECB         (3) : 
    1171            (91) :     /// This method is used for marking truncated SLRU files
    1172 CBC           2 :     pub async fn drop_twophase_file(
    1173               2 :         &mut self,
    1174               2 :         xid: TransactionId,
    1175               2 :         ctx: &RequestContext,
    1176               2 :     ) -> anyhow::Result<()> {
    1177 ECB        (94) :         // Remove it from the directory entry
    1178 GIC           2 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1179               2 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1180 ECB        (94) : 
    1181 CBC           2 :         if !dir.xids.remove(&xid) {
    1182 UIC           0 :             warn!("twophase file for xid {} does not exist", xid);
    1183 GIC           2 :         }
    1184               2 :         self.put(
    1185               2 :             TWOPHASEDIR_KEY,
    1186               2 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1187                 :         );
    1188                 : 
    1189                 :         // Delete it
    1190               2 :         self.delete(twophase_key_range(xid));
    1191               2 : 
    1192               2 :         Ok(())
    1193               2 :     }
    1194                 : 
    1195              94 :     pub async fn put_file(
    1196              94 :         &mut self,
    1197              94 :         path: &str,
    1198              94 :         content: &[u8],
    1199              94 :         ctx: &RequestContext,
    1200              94 :     ) -> anyhow::Result<()> {
    1201 CBC          94 :         let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
    1202              94 :             Ok(buf) => AuxFilesDirectory::des(&buf)?,
    1203 LBC    (546640) :             Err(e) => {
    1204        (546640) :                 warn!("Failed to get info about AUX files: {}", e);
    1205        (546640) :                 AuxFilesDirectory {
    1206        (546640) :                     files: HashMap::new(),
    1207 UBC           0 :                 }
    1208                 :             }
    1209 EUB             :         };
    1210 GIC          94 :         let path = path.to_string();
    1211              94 :         if content.is_empty() {
    1212 GBC           3 :             dir.files.remove(&path);
    1213              91 :         } else {
    1214              91 :             dir.files.insert(path, Bytes::copy_from_slice(content));
    1215 GIC          91 :         }
    1216              94 :         self.put(
    1217 GBC          94 :             AUX_FILES_KEY,
    1218              94 :             Value::Image(Bytes::from(
    1219              94 :                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1220 EUB             :             )),
    1221                 :         );
    1222 GBC          94 :         Ok(())
    1223              94 :     }
    1224 EUB             : 
    1225                 :     ///
    1226                 :     /// Flush changes accumulated so far to the underlying repository.
    1227                 :     ///
    1228                 :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1229                 :     /// you to flush them to the underlying repository before the final `commit`.
    1230 ECB    (546640) :     /// That allows to free up the memory used to hold the pending changes.
    1231                 :     ///
    1232                 :     /// Currently only used during bulk import of a data directory. In that
    1233                 :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1234                 :     /// whole import fails and the timeline will be deleted anyway.
    1235                 :     /// (Or to be precise, it will be left behind for debugging purposes and
    1236                 :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1237      (68371807) :     ///
    1238      (68371793) :     /// Note: A consequence of flushing the pending operations is that they
    1239      (68371793) :     /// won't be visible to subsequent operations until `commit`. The function
    1240      (68371793) :     /// retains all the metadata, but data pages are flushed. That's again OK
    1241      (68371793) :     /// for bulk import, where you are just loading data pages and won't try to
    1242                 :     /// modify the same pages twice.
    1243 CBC      548538 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1244          548538 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1245 GIC      548538 :         // to scan through the pending_updates list.
    1246 CBC      548538 :         let pending_nblocks = self.pending_nblocks;
    1247          548538 :         if pending_nblocks < 10000 {
    1248 GIC      548538 :             return Ok(());
    1249 UIC           0 :         }
    1250 ECB  (68371785) : 
    1251 LBC  (68371785) :         let writer = self.tline.writer().await;
    1252 ECB  (68371785) : 
    1253       (1030507) :         // Flush relation and  SLRU data blocks, keep metadata.
    1254 LBC  (67341278) :         let mut retained_pending_updates = HashMap::new();
    1255 UIC           0 :         for (key, value) in self.pending_updates.drain() {
    1256 LBC  (68371785) :             if is_rel_block_key(key) || is_slru_block_key(key) {
    1257 ECB  (68371785) :                 // This bails out on first error without modifying pending_updates.
    1258                 :                 // That's Ok, cf this function's doc comment.
    1259 UIC           0 :                 writer.put(key, self.lsn, &value, ctx).await?;
    1260               0 :             } else {
    1261 LBC   (2732384) :                 retained_pending_updates.insert(key, value);
    1262 UIC           0 :             }
    1263                 :         }
    1264               0 :         self.pending_updates.extend(retained_pending_updates);
    1265               0 : 
    1266               0 :         if pending_nblocks != 0 {
    1267 LBC   (2732384) :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1268       (1638695) :             self.pending_nblocks = 0;
    1269       (1638695) :         }
    1270                 : 
    1271 UIC           0 :         Ok(())
    1272 GIC      548538 :     }
    1273                 : 
    1274                 :     ///
    1275                 :     /// Finish this atomic update, writing all the updated keys to the
    1276 EUB             :     /// underlying timeline.
    1277                 :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1278                 :     ///
    1279 GIC    68723403 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1280        68723388 :         let writer = self.tline.writer().await;
    1281 CBC    68723388 :         let lsn = self.lsn;
    1282        68723388 :         let pending_nblocks = self.pending_nblocks;
    1283 GIC    68723388 :         self.pending_nblocks = 0;
    1284 ECB   (2732384) : 
    1285 GIC    76014982 :         for (key, value) in self.pending_updates.drain() {
    1286 CBC    76014982 :             writer.put(key, lsn, &value, ctx).await?;
    1287 ECB  (76194320) :         }
    1288 CBC    68723381 :         for key_range in self.pending_deletions.drain(..) {
    1289 GIC       17687 :             writer.delete(key_range, lsn).await?;
    1290 ECB     (17686) :         }
    1291         (17686) : 
    1292 CBC    68723381 :         writer.finish_write(lsn);
    1293        68723381 : 
    1294 GIC    68723381 :         if pending_nblocks != 0 {
    1295         1029042 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1296        67694339 :         }
    1297                 : 
    1298 CBC    68723381 :         Ok(())
    1299 GIC    68723381 :     }
    1300                 : 
    1301                 :     // Internal helper functions to batch the modifications
    1302                 : 
    1303         2736523 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1304 ECB      (1319) :         // Have we already updated the same key? Read the pending updated
    1305                 :         // version in that case.
    1306                 :         //
    1307                 :         // Note: we don't check pending_deletions. It is an error to request a
    1308                 :         // value that has been removed, deletion only avoids leaking storage.
    1309 CBC     2736523 :         if let Some(value) = self.pending_updates.get(&key) {
    1310 GIC     1644295 :             if let Value::Image(img) = value {
    1311         1644295 :                 Ok(img.clone())
    1312                 :             } else {
    1313                 :                 // Currently, we never need to read back a WAL record that we
    1314                 :                 // inserted in the same "transaction". All the metadata updates
    1315                 :                 // work directly with Images, and we never need to read actual
    1316                 :                 // data pages. We could handle this if we had to, by calling
    1317                 :                 // the walredo manager, but let's keep it simple for now.
    1318 LBC      (6056) :                 Err(PageReconstructError::from(anyhow::anyhow!(
    1319 UIC           0 :                     "unexpected pending WAL record"
    1320               0 :                 )))
    1321                 :             }
    1322                 :         } else {
    1323 GBC     1092228 :             let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1324 GIC     1092228 :             self.tline.get(key, lsn, ctx).await
    1325                 :         }
    1326         2736523 :     }
    1327                 : 
    1328 CBC    76590444 :     fn put(&mut self, key: Key, val: Value) {
    1329 GIC    76590444 :         self.pending_updates.insert(key, val);
    1330        76590444 :     }
    1331                 : 
    1332           17687 :     fn delete(&mut self, key_range: Range<Key>) {
    1333           17687 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1334           17687 :         self.pending_deletions.push(key_range);
    1335           17687 :     }
    1336                 : }
    1337                 : 
    1338                 : //--- Metadata structs stored in key-value pairs in the repository.
    1339                 : 
    1340          569719 : #[derive(Debug, Serialize, Deserialize)]
    1341                 : struct DbDirectory {
    1342                 :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1343                 :     dbdirs: HashMap<(Oid, Oid), bool>,
    1344                 : }
    1345                 : 
    1346            1313 : #[derive(Debug, Serialize, Deserialize)]
    1347                 : struct TwoPhaseDirectory {
    1348                 :     xids: HashSet<TransactionId>,
    1349                 : }
    1350                 : 
    1351         1166590 : #[derive(Debug, Serialize, Deserialize, Default)]
    1352                 : struct RelDirectory {
    1353                 :     // Set of relations that exist. (relfilenode, forknum)
    1354                 :     //
    1355                 :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1356                 :     // key-value pairs, if you have a lot of relations
    1357                 :     rels: HashSet<(Oid, u8)>,
    1358                 : }
    1359                 : 
    1360            6076 : #[derive(Debug, Serialize, Deserialize, Default)]
    1361                 : struct AuxFilesDirectory {
    1362                 :     files: HashMap<String, Bytes>,
    1363                 : }
    1364                 : 
    1365 UIC           0 : #[derive(Debug, Serialize, Deserialize)]
    1366                 : struct RelSizeEntry {
    1367                 :     nblocks: u32,
    1368                 : }
    1369                 : 
    1370 GIC        9635 : #[derive(Debug, Serialize, Deserialize, Default)]
    1371                 : struct SlruSegmentDirectory {
    1372                 :     // Set of SLRU segments that exist.
    1373                 :     segments: HashSet<u32>,
    1374                 : }
    1375                 : 
    1376                 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1377                 : 
    1378                 : // Layout of the Key address space
    1379                 : //
    1380                 : // The Key struct, used to address the underlying key-value store, consists of
    1381                 : // 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
    1382                 : // all the data and metadata keys into those 18 bytes.
    1383                 : //
    1384                 : // Principles for the mapping:
    1385                 : //
    1386                 : // - Things that are often accessed or modified together, should be close to
    1387                 : //   each other in the key space. For example, if a relation is extended by one
    1388                 : //   block, we create a new key-value pair for the block data, and update the
    1389                 : //   relation size entry. Because of that, the RelSize key comes after all the
    1390                 : //   RelBlocks of a relation: the RelSize and the last RelBlock are always next
    1391                 : //   to each other.
    1392                 : //
    1393                 : // The key space is divided into four major sections, identified by the first
    1394                 : // byte, and the form a hierarchy:
    1395                 : //
    1396                 : // 00 Relation data and metadata
    1397                 : //
    1398                 : //   DbDir    () -> (dbnode, spcnode)
    1399                 : //   Filenodemap
    1400                 : //   RelDir   -> relnode forknum
    1401                 : //       RelBlocks
    1402                 : //       RelSize
    1403                 : //
    1404                 : // 01 SLRUs
    1405                 : //
    1406                 : //   SlruDir  kind
    1407                 : //   SlruSegBlocks segno
    1408                 : //   SlruSegSize
    1409                 : //
    1410                 : // 02 pg_twophase
    1411                 : //
    1412                 : // 03 misc
    1413                 : //    Controlfile
    1414                 : //    checkpoint
    1415                 : //    pg_version
    1416                 : //
    1417                 : // 04 aux files
    1418                 : //
    1419                 : // Below is a full list of the keyspace allocation:
    1420                 : //
    1421                 : // DbDir:
    1422                 : // 00 00000000 00000000 00000000 00   00000000
    1423                 : //
    1424                 : // Filenodemap:
    1425                 : // 00 SPCNODE  DBNODE   00000000 00   00000000
    1426                 : //
    1427                 : // RelDir:
    1428                 : // 00 SPCNODE  DBNODE   00000000 00   00000001 (Postgres never uses relfilenode 0)
    1429                 : //
    1430 ECB         (3) : // RelBlock:
    1431             (3) : // 00 SPCNODE  DBNODE   RELNODE  FORK BLKNUM
    1432             (3) : //
    1433             (3) : // RelSize:
    1434             (3) : // 00 SPCNODE  DBNODE   RELNODE  FORK FFFFFFFF
    1435             (3) : //
    1436             (3) : // SlruDir:
    1437             (3) : // 01 kind     00000000 00000000 00   00000000
    1438             (3) : //
    1439             (3) : // SlruSegBlock:
    1440             (3) : // 01 kind     00000001 SEGNO    00   BLKNUM
    1441             (3) : //
    1442             (3) : // SlruSegSize:
    1443             (3) : // 01 kind     00000001 SEGNO    00   FFFFFFFF
    1444             (3) : //
    1445             (3) : // TwoPhaseDir:
    1446             (3) : // 02 00000000 00000000 00000000 00   00000000
    1447                 : //
    1448          (7320) : // TwoPhaseFile:
    1449          (7320) : // 02 00000000 00000000 00000000 00   XID
    1450          (7320) : //
    1451          (7320) : // ControlFile:
    1452          (7320) : // 03 00000000 00000000 00000000 00   00000000
    1453          (7320) : //
    1454          (7320) : // Checkpoint:
    1455          (7320) : // 03 00000000 00000000 00000000 00   00000001
    1456          (7320) : //
    1457          (7320) : // AuxFiles:
    1458                 : // 03 00000000 00000000 00000000 00   00000002
    1459        (693574) : //
    1460        (693574) : 
    1461        (693574) : //-- Section 01: relation data and metadata
    1462        (693574) : 
    1463        (693574) : const DBDIR_KEY: Key = Key {
    1464        (693574) :     field1: 0x00,
    1465        (693574) :     field2: 0,
    1466        (693574) :     field3: 0,
    1467        (693574) :     field4: 0,
    1468        (693574) :     field5: 0,
    1469                 :     field6: 0,
    1470      (76675583) : };
    1471      (76675583) : 
    1472 CBC           3 : fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
    1473               3 :     Key {
    1474               3 :         field1: 0x00,
    1475               3 :         field2: spcnode,
    1476               3 :         field3: dbnode,
    1477               3 :         field4: 0,
    1478               3 :         field5: 0,
    1479               3 :         field6: 0,
    1480 GIC           3 :     }..Key {
    1481 CBC           3 :         field1: 0x00,
    1482               3 :         field2: spcnode,
    1483               3 :         field3: dbnode,
    1484               3 :         field4: 0xffffffff,
    1485               3 :         field5: 0xff,
    1486               3 :         field6: 0xffffffff,
    1487               3 :     }
    1488               3 : }
    1489 ECB   (3378180) : 
    1490 CBC        7305 : fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
    1491 GIC        7305 :     Key {
    1492 CBC        7305 :         field1: 0x00,
    1493            7305 :         field2: spcnode,
    1494            7305 :         field3: dbnode,
    1495            7305 :         field4: 0,
    1496            7305 :         field5: 0,
    1497            7305 :         field6: 0,
    1498            7305 :     }
    1499            7305 : }
    1500 ECB     (17672) : 
    1501 CBC      695497 : fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
    1502          695497 :     Key {
    1503          695497 :         field1: 0x00,
    1504          695497 :         field2: spcnode,
    1505          695497 :         field3: dbnode,
    1506          695497 :         field4: 0,
    1507          695497 :         field5: 0,
    1508          695497 :         field6: 1,
    1509 GIC      695497 :     }
    1510          695497 : }
    1511                 : 
    1512 CBC    77071715 : fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
    1513        77071715 :     Key {
    1514        77071715 :         field1: 0x00,
    1515        77071715 :         field2: rel.spcnode,
    1516        77071715 :         field3: rel.dbnode,
    1517        77071715 :         field4: rel.relnode,
    1518        77071715 :         field5: rel.forknum,
    1519 GIC    77071715 :         field6: blknum,
    1520        77071715 :     }
    1521        77071715 : }
    1522                 : 
    1523         3362913 : fn rel_size_to_key(rel: RelTag) -> Key {
    1524         3362913 :     Key {
    1525 CBC     3362913 :         field1: 0x00,
    1526 GIC     3362913 :         field2: rel.spcnode,
    1527 CBC     3362913 :         field3: rel.dbnode,
    1528         3362913 :         field4: rel.relnode,
    1529         3362913 :         field5: rel.forknum,
    1530         3362913 :         field6: 0xffffffff,
    1531         3362913 :     }
    1532         3362913 : }
    1533 ECB     (26457) : 
    1534 GIC       17673 : fn rel_key_range(rel: RelTag) -> Range<Key> {
    1535           17673 :     Key {
    1536 CBC       17673 :         field1: 0x00,
    1537           17673 :         field2: rel.spcnode,
    1538           17673 :         field3: rel.dbnode,
    1539           17673 :         field4: rel.relnode,
    1540           17673 :         field5: rel.forknum,
    1541 GIC       17673 :         field6: 0,
    1542 CBC       17673 :     }..Key {
    1543           17673 :         field1: 0x00,
    1544           17673 :         field2: rel.spcnode,
    1545           17673 :         field3: rel.dbnode,
    1546           17673 :         field4: rel.relnode,
    1547           17673 :         field5: rel.forknum + 1,
    1548           17673 :         field6: 0,
    1549 GIC       17673 :     }
    1550           17673 : }
    1551 ECB      (9821) : 
    1552          (9821) : //-- Section 02: SLRUs
    1553          (9821) : 
    1554 CBC       11483 : fn slru_dir_to_key(kind: SlruKind) -> Key {
    1555           11483 :     Key {
    1556 GIC       11483 :         field1: 0x01,
    1557 CBC       11483 :         field2: match kind {
    1558            6169 :             SlruKind::Clog => 0x00,
    1559            2800 :             SlruKind::MultiXactMembers => 0x01,
    1560 GBC        2514 :             SlruKind::MultiXactOffsets => 0x02,
    1561 EUB             :         },
    1562                 :         field3: 0,
    1563                 :         field4: 0,
    1564 ECB         (9) :         field5: 0,
    1565             (9) :         field6: 0,
    1566             (9) :     }
    1567 CBC       11483 : }
    1568 ECB         (9) : 
    1569 CBC     2010296 : fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
    1570         2010296 :     Key {
    1571         2010296 :         field1: 0x01,
    1572         2010296 :         field2: match kind {
    1573         1956291 :             SlruKind::Clog => 0x00,
    1574           27561 :             SlruKind::MultiXactMembers => 0x01,
    1575           26444 :             SlruKind::MultiXactOffsets => 0x02,
    1576 ECB         (9) :         },
    1577             (9) :         field3: 1,
    1578 CBC     2010296 :         field4: segno,
    1579         2010296 :         field5: 0,
    1580 GIC     2010296 :         field6: blknum,
    1581         2010296 :     }
    1582         2010296 : }
    1583                 : 
    1584           10034 : fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
    1585           10034 :     Key {
    1586           10034 :         field1: 0x01,
    1587           10034 :         field2: match kind {
    1588            5808 :             SlruKind::Clog => 0x00,
    1589            2402 :             SlruKind::MultiXactMembers => 0x01,
    1590            1824 :             SlruKind::MultiXactOffsets => 0x02,
    1591                 :         },
    1592 ECB         (6) :         field3: 1,
    1593 CBC       10034 :         field4: segno,
    1594           10034 :         field5: 0,
    1595           10034 :         field6: 0xffffffff,
    1596           10034 :     }
    1597           10034 : }
    1598 ECB         (6) : 
    1599 CBC           9 : fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
    1600               9 :     let field2 = match kind {
    1601               9 :         SlruKind::Clog => 0x00,
    1602 UIC           0 :         SlruKind::MultiXactMembers => 0x01,
    1603 LBC         (2) :         SlruKind::MultiXactOffsets => 0x02,
    1604 ECB         (2) :     };
    1605             (2) : 
    1606 CBC           9 :     Key {
    1607               9 :         field1: 0x01,
    1608               9 :         field2,
    1609               9 :         field3: 1,
    1610               9 :         field4: segno,
    1611               9 :         field5: 0,
    1612               9 :         field6: 0,
    1613               9 :     }..Key {
    1614               9 :         field1: 0x01,
    1615               9 :         field2,
    1616               9 :         field3: 1,
    1617               9 :         field4: segno,
    1618               9 :         field5: 1,
    1619               9 :         field6: 0,
    1620               9 :     }
    1621               9 : }
    1622                 : 
    1623                 : //-- Section 03: pg_twophase
    1624                 : 
    1625                 : const TWOPHASEDIR_KEY: Key = Key {
    1626                 :     field1: 0x02,
    1627                 :     field2: 0,
    1628                 :     field3: 0,
    1629                 :     field4: 0,
    1630                 :     field5: 0,
    1631                 :     field6: 0,
    1632                 : };
    1633                 : 
    1634 GIC           6 : fn twophase_file_key(xid: TransactionId) -> Key {
    1635               6 :     Key {
    1636               6 :         field1: 0x02,
    1637               6 :         field2: 0,
    1638               6 :         field3: 0,
    1639               6 :         field4: 0,
    1640               6 :         field5: 0,
    1641               6 :         field6: xid,
    1642               6 :     }
    1643               6 : }
    1644                 : 
    1645               2 : fn twophase_key_range(xid: TransactionId) -> Range<Key> {
    1646               2 :     let (next_xid, overflowed) = xid.overflowing_add(1);
    1647               2 : 
    1648               2 :     Key {
    1649               2 :         field1: 0x02,
    1650               2 :         field2: 0,
    1651               2 :         field3: 0,
    1652               2 :         field4: 0,
    1653               2 :         field5: 0,
    1654 CBC           2 :         field6: xid,
    1655               2 :     }..Key {
    1656               2 :         field1: 0x02,
    1657               2 :         field2: 0,
    1658               2 :         field3: 0,
    1659               2 :         field4: 0,
    1660               2 :         field5: u8::from(overflowed),
    1661               2 :         field6: next_xid,
    1662               2 :     }
    1663               2 : }
    1664 ECB   (2268096) : 
    1665 EUB             : //-- Section 03: Control file
    1666                 : const CONTROLFILE_KEY: Key = Key {
    1667 ECB   (2268096) :     field1: 0x03,
    1668                 :     field2: 0,
    1669 EUB             :     field3: 0,
    1670                 :     field4: 0,
    1671                 :     field5: 0,
    1672                 :     field6: 0,
    1673                 : };
    1674                 : 
    1675                 : const CHECKPOINT_KEY: Key = Key {
    1676                 :     field1: 0x03,
    1677                 :     field2: 0,
    1678                 :     field3: 0,
    1679                 :     field4: 0,
    1680                 :     field5: 0,
    1681                 :     field6: 1,
    1682                 : };
    1683                 : 
    1684 ECB  (21637090) : const AUX_FILES_KEY: Key = Key {
    1685      (21637090) :     field1: 0x03,
    1686                 :     field2: 0,
    1687      (21637090) :     field3: 0,
    1688      (21541218) :     field4: 0,
    1689         (48210) :     field5: 0,
    1690         (47662) :     field6: 2,
    1691 EUB             : };
    1692                 : 
    1693 ECB  (21637090) : // Reverse mappings for a few Keys.
    1694      (21637090) : // These are needed by WAL redo manager.
    1695      (21637090) : 
    1696 CBC     2262451 : pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
    1697 GIC     2262451 :     Ok(match key.field1 {
    1698 GBC     2262451 :         0x00 => (
    1699 GIC     2262451 :             RelTag {
    1700 CBC     2262451 :                 spcnode: key.field2,
    1701 GIC     2262451 :                 dbnode: key.field3,
    1702 GBC     2262451 :                 relnode: key.field4,
    1703         2262451 :                 forknum: key.field5,
    1704         2262451 :             },
    1705         2262451 :             key.field6,
    1706         2262451 :         ),
    1707 UIC           0 :         _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
    1708                 :     })
    1709 GIC     2262451 : }
    1710                 : 
    1711 UIC           0 : fn is_rel_block_key(key: Key) -> bool {
    1712               0 :     key.field1 == 0x00 && key.field4 != 0
    1713               0 : }
    1714                 : 
    1715               0 : pub fn is_rel_fsm_block_key(key: Key) -> bool {
    1716               0 :     key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
    1717               0 : }
    1718                 : 
    1719               0 : pub fn is_rel_vm_block_key(key: Key) -> bool {
    1720               0 :     key.field1 == 0x00
    1721               0 :         && key.field4 != 0
    1722               0 :         && key.field5 == VISIBILITYMAP_FORKNUM
    1723               0 :         && key.field6 != 0xffffffff
    1724               0 : }
    1725                 : 
    1726 GIC    23039581 : pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
    1727        23039581 :     Ok(match key.field1 {
    1728                 :         0x01 => {
    1729        23039581 :             let kind = match key.field2 {
    1730        22943709 :                 0x00 => SlruKind::Clog,
    1731           48210 :                 0x01 => SlruKind::MultiXactMembers,
    1732           47662 :                 0x02 => SlruKind::MultiXactOffsets,
    1733 UIC           0 :                 _ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
    1734                 :             };
    1735 GIC    23039581 :             let segno = key.field4;
    1736        23039581 :             let blknum = key.field6;
    1737        23039581 : 
    1738        23039581 :             (kind, segno, blknum)
    1739                 :         }
    1740 UIC           0 :         _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
    1741                 :     })
    1742 GIC    23039581 : }
    1743                 : 
    1744 UIC           0 : fn is_slru_block_key(key: Key) -> bool {
    1745               0 :     key.field1 == 0x01                // SLRU-related
    1746               0 :         && key.field3 == 0x00000001   // but not SlruDir
    1747               0 :         && key.field6 != 0xffffffff // and not SlruSegSize
    1748               0 : }
    1749                 : 
    1750                 : #[allow(clippy::bool_assert_comparison)]
    1751                 : #[cfg(test)]
    1752                 : mod tests {
    1753                 :     //use super::repo_harness::*;
    1754                 :     //use super::*;
    1755                 : 
    1756                 :     /*
    1757                 :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1758                 :             let incremental = timeline.get_current_logical_size();
    1759                 :             let non_incremental = timeline
    1760                 :                 .get_current_logical_size_non_incremental(lsn)
    1761                 :                 .unwrap();
    1762                 :             assert_eq!(incremental, non_incremental);
    1763                 :         }
    1764                 :     */
    1765                 : 
    1766                 :     /*
    1767                 :     ///
    1768                 :     /// Test list_rels() function, with branches and dropped relations
    1769                 :     ///
    1770                 :     #[test]
    1771                 :     fn test_list_rels_drop() -> Result<()> {
    1772                 :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1773                 :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1774                 :         const TESTDB: u32 = 111;
    1775                 : 
    1776                 :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1777                 :         // after branching fails below
    1778                 :         let mut writer = tline.begin_record(Lsn(0x10));
    1779                 :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1780                 :         writer.finish()?;
    1781                 : 
    1782                 :         // Create a relation on the timeline
    1783                 :         let mut writer = tline.begin_record(Lsn(0x20));
    1784                 :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1785                 :         writer.finish()?;
    1786                 : 
    1787                 :         let writer = tline.begin_record(Lsn(0x00));
    1788                 :         writer.finish()?;
    1789                 : 
    1790                 :         // Check that list_rels() lists it after LSN 2, but no before it
    1791                 :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1792                 :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1793                 :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1794                 : 
    1795                 :         // Create a branch, check that the relation is visible there
    1796                 :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1797                 :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1798                 :             Some(timeline) => timeline,
    1799                 :             None => panic!("Should have a local timeline"),
    1800                 :         };
    1801                 :         let newtline = DatadirTimelineImpl::new(newtline);
    1802                 :         assert!(newtline
    1803                 :             .list_rels(0, TESTDB, Lsn(0x30))?
    1804                 :             .contains(&TESTREL_A));
    1805                 : 
    1806                 :         // Drop it on the branch
    1807                 :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1808                 :         new_writer.drop_relation(TESTREL_A)?;
    1809                 :         new_writer.finish()?;
    1810                 : 
    1811                 :         // Check that it's no longer listed on the branch after the point where it was dropped
    1812                 :         assert!(newtline
    1813                 :             .list_rels(0, TESTDB, Lsn(0x30))?
    1814                 :             .contains(&TESTREL_A));
    1815                 :         assert!(!newtline
    1816                 :             .list_rels(0, TESTDB, Lsn(0x40))?
    1817                 :             .contains(&TESTREL_A));
    1818                 : 
    1819                 :         // Run checkpoint and garbage collection and check that it's still not visible
    1820                 :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1821                 :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1822                 : 
    1823                 :         assert!(!newtline
    1824                 :             .list_rels(0, TESTDB, Lsn(0x40))?
    1825                 :             .contains(&TESTREL_A));
    1826                 : 
    1827                 :         Ok(())
    1828                 :     }
    1829                 :      */
    1830                 : 
    1831                 :     /*
    1832                 :     #[test]
    1833                 :     fn test_read_beyond_eof() -> Result<()> {
    1834                 :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1835                 :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1836                 : 
    1837                 :         make_some_layers(&tline, Lsn(0x20))?;
    1838                 :         let mut writer = tline.begin_record(Lsn(0x60));
    1839                 :         walingest.put_rel_page_image(
    1840                 :             &mut writer,
    1841                 :             TESTREL_A,
    1842                 :             0,
    1843                 :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1844                 :         )?;
    1845                 :         writer.finish()?;
    1846                 : 
    1847                 :         // Test read before rel creation. Should error out.
    1848                 :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1849                 : 
    1850                 :         // Read block beyond end of relation at different points in time.
    1851                 :         // These reads should fall into different delta, image, and in-memory layers.
    1852                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1853                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1854                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1855                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1856                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1857                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1858                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1859                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1860                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1861                 : 
    1862                 :         // Test on an in-memory layer with no preceding layer
    1863                 :         let mut writer = tline.begin_record(Lsn(0x70));
    1864                 :         walingest.put_rel_page_image(
    1865                 :             &mut writer,
    1866                 :             TESTREL_B,
    1867                 :             0,
    1868                 :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1869                 :         )?;
    1870                 :         writer.finish()?;
    1871                 : 
    1872                 :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1873                 : 
    1874                 :         Ok(())
    1875                 :     }
    1876                 :      */
    1877                 : }
        

Generated by: LCOV version 2.1-beta