LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 90.8 % 1030 935
Test Date: 2024-02-07 07:37:29 Functions: 61.1 % 208 127

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::repository::*;
      13              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use pageserver_api::key::{
      18              :     dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
      19              :     rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      20              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      21              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      22              : };
      23              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      24              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      25              : use postgres_ffi::BLCKSZ;
      26              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      27              : use serde::{Deserialize, Serialize};
      28              : use std::collections::{hash_map, HashMap, HashSet};
      29              : use std::ops::ControlFlow;
      30              : use std::ops::Range;
      31              : use strum::IntoEnumIterator;
      32              : use tokio_util::sync::CancellationToken;
      33              : use tracing::{debug, trace, warn};
      34              : use utils::bin_ser::DeserializeError;
      35              : use utils::{bin_ser::BeSer, lsn::Lsn};
      36              : 
      37            0 : #[derive(Debug)]
      38              : pub enum LsnForTimestamp {
      39              :     /// Found commits both before and after the given timestamp
      40              :     Present(Lsn),
      41              : 
      42              :     /// Found no commits after the given timestamp, this means
      43              :     /// that the newest data in the branch is older than the given
      44              :     /// timestamp.
      45              :     ///
      46              :     /// All commits <= LSN happened before the given timestamp
      47              :     Future(Lsn),
      48              : 
      49              :     /// The queried timestamp is past our horizon we look back at (PITR)
      50              :     ///
      51              :     /// All commits > LSN happened after the given timestamp,
      52              :     /// but any commits < LSN might have happened before or after
      53              :     /// the given timestamp. We don't know because no data before
      54              :     /// the given lsn is available.
      55              :     Past(Lsn),
      56              : 
      57              :     /// We have found no commit with a timestamp,
      58              :     /// so we can't return anything meaningful.
      59              :     ///
      60              :     /// The associated LSN is the lower bound value we can safely
      61              :     /// create branches on, but no statement is made if it is
      62              :     /// older or newer than the timestamp.
      63              :     ///
      64              :     /// This variant can e.g. be returned right after a
      65              :     /// cluster import.
      66              :     NoData(Lsn),
      67              : }
      68              : 
      69            0 : #[derive(Debug, thiserror::Error)]
      70              : pub enum CalculateLogicalSizeError {
      71              :     #[error("cancelled")]
      72              :     Cancelled,
      73              :     #[error(transparent)]
      74              :     Other(#[from] anyhow::Error),
      75              : }
      76              : 
      77            2 : #[derive(Debug, thiserror::Error)]
      78              : pub(crate) enum CollectKeySpaceError {
      79              :     #[error(transparent)]
      80              :     Decode(#[from] DeserializeError),
      81              :     #[error(transparent)]
      82              :     PageRead(PageReconstructError),
      83              :     #[error("cancelled")]
      84              :     Cancelled,
      85              : }
      86              : 
      87              : impl From<PageReconstructError> for CollectKeySpaceError {
      88            3 :     fn from(err: PageReconstructError) -> Self {
      89            3 :         match err {
      90            1 :             PageReconstructError::Cancelled => Self::Cancelled,
      91            2 :             err => Self::PageRead(err),
      92              :         }
      93            3 :     }
      94              : }
      95              : 
      96              : impl From<PageReconstructError> for CalculateLogicalSizeError {
      97           31 :     fn from(pre: PageReconstructError) -> Self {
      98           31 :         match pre {
      99              :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
     100           29 :                 Self::Cancelled
     101              :             }
     102            2 :             _ => Self::Other(pre.into()),
     103              :         }
     104           31 :     }
     105              : }
     106              : 
     107            0 : #[derive(Debug, thiserror::Error)]
     108              : pub enum RelationError {
     109              :     #[error("Relation Already Exists")]
     110              :     AlreadyExists,
     111              :     #[error("invalid relnode")]
     112              :     InvalidRelnode,
     113              :     #[error(transparent)]
     114              :     Other(#[from] anyhow::Error),
     115              : }
     116              : 
     117              : ///
     118              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     119              : /// and other special kinds of files, in a versioned key-value store. The
     120              : /// Timeline struct provides the key-value store.
     121              : ///
     122              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     123              : /// implementation, and might be moved into a separate struct later.
     124              : impl Timeline {
     125              :     /// Start ingesting a WAL record, or other atomic modification of
     126              :     /// the timeline.
     127              :     ///
     128              :     /// This provides a transaction-like interface to perform a bunch
     129              :     /// of modifications atomically.
     130              :     ///
     131              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     132              :     /// DatadirModification object. Use the functions in the object to
     133              :     /// modify the repository state, updating all the pages and metadata
     134              :     /// that the WAL record affects. When you're done, call commit() to
     135              :     /// commit the changes.
     136              :     ///
     137              :     /// Lsn stored in modification is advanced by `ingest_record` and
     138              :     /// is used by `commit()` to update `last_record_lsn`.
     139              :     ///
     140              :     /// Calling commit() will flush all the changes and reset the state,
     141              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     142              :     ///
     143              :     /// Note that any pending modifications you make through the
     144              :     /// modification object won't be visible to calls to the 'get' and list
     145              :     /// functions of the timeline until you finish! And if you update the
     146              :     /// same page twice, the last update wins.
     147              :     ///
     148      1061791 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     149      1061791 :     where
     150      1061791 :         Self: Sized,
     151      1061791 :     {
     152      1061791 :         DatadirModification {
     153      1061791 :             tline: self,
     154      1061791 :             pending_lsns: Vec::new(),
     155      1061791 :             pending_updates: HashMap::new(),
     156      1061791 :             pending_deletions: Vec::new(),
     157      1061791 :             pending_nblocks: 0,
     158      1061791 :             lsn,
     159      1061791 :         }
     160      1061791 :     }
     161              : 
     162              :     //------------------------------------------------------------------------------
     163              :     // Public GET functions
     164              :     //------------------------------------------------------------------------------
     165              : 
     166              :     /// Look up given page version.
     167      5622574 :     pub(crate) async fn get_rel_page_at_lsn(
     168      5622574 :         &self,
     169      5622574 :         tag: RelTag,
     170      5622574 :         blknum: BlockNumber,
     171      5622574 :         version: Version<'_>,
     172      5622574 :         horizon: Lsn,
     173      5622574 :         ctx: &RequestContext,
     174      5622574 :     ) -> Result<Bytes, PageReconstructError> {
     175      5622574 :         if tag.relnode == 0 {
     176            0 :             return Err(PageReconstructError::Other(
     177            0 :                 RelationError::InvalidRelnode.into(),
     178            0 :             ));
     179      5622574 :         }
     180              : 
     181      5622574 :         let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?;
     182      5622574 :         if blknum >= nblocks {
     183            0 :             debug!(
     184            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     185            0 :                 tag,
     186            0 :                 blknum,
     187            0 :                 version.get_lsn(),
     188            0 :                 nblocks
     189            0 :             );
     190       129498 :             return Ok(ZERO_PAGE.clone());
     191      5493076 :         }
     192      5493076 : 
     193      5493076 :         let key = rel_block_to_key(tag, blknum);
     194      5493076 :         version.get(self, key, ctx).await
     195      5622574 :     }
     196              : 
     197              :     // Get size of a database in blocks
     198            8 :     pub(crate) async fn get_db_size(
     199            8 :         &self,
     200            8 :         spcnode: Oid,
     201            8 :         dbnode: Oid,
     202            8 :         version: Version<'_>,
     203            8 :         horizon: Lsn,
     204            8 :         ctx: &RequestContext,
     205            8 :     ) -> Result<usize, PageReconstructError> {
     206            8 :         let mut total_blocks = 0;
     207              : 
     208            8 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     209              : 
     210         2351 :         for rel in rels {
     211         2343 :             let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?;
     212         2343 :             total_blocks += n_blocks as usize;
     213              :         }
     214            8 :         Ok(total_blocks)
     215            8 :     }
     216              : 
     217              :     /// Get size of a relation file
     218      5775974 :     pub(crate) async fn get_rel_size(
     219      5775974 :         &self,
     220      5775974 :         tag: RelTag,
     221      5775974 :         version: Version<'_>,
     222      5775974 :         horizon: Lsn,
     223      5775974 :         ctx: &RequestContext,
     224      5775974 :     ) -> Result<BlockNumber, PageReconstructError> {
     225      5775974 :         if tag.relnode == 0 {
     226            0 :             return Err(PageReconstructError::Other(
     227            0 :                 RelationError::InvalidRelnode.into(),
     228            0 :             ));
     229      5775974 :         }
     230              : 
     231      5775974 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     232      5380575 :             return Ok(nblocks);
     233       395399 :         }
     234       395399 : 
     235       395399 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     236         7387 :             && !self.get_rel_exists(tag, version, horizon, ctx).await?
     237              :         {
     238              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     239              :             // FSM, and smgrnblocks() on it immediately afterwards,
     240              :             // without extending it.  Tolerate that by claiming that
     241              :             // any non-existent FSM fork has size 0.
     242           27 :             return Ok(0);
     243       395372 :         }
     244       395372 : 
     245       395372 :         let key = rel_size_to_key(tag);
     246       395372 :         let mut buf = version.get(self, key, ctx).await?;
     247       395368 :         let nblocks = buf.get_u32_le();
     248       395368 : 
     249       395368 :         if horizon == Lsn::MAX {
     250       155970 :             // Update relation size cache only if latest version is requested.
     251       155970 :             self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     252       239398 :         }
     253       395368 :         Ok(nblocks)
     254      5775974 :     }
     255              : 
     256              :     /// Does relation exist?
     257       441817 :     pub(crate) async fn get_rel_exists(
     258       441817 :         &self,
     259       441817 :         tag: RelTag,
     260       441817 :         version: Version<'_>,
     261       441817 :         _horizon: Lsn,
     262       441817 :         ctx: &RequestContext,
     263       441817 :     ) -> Result<bool, PageReconstructError> {
     264       441817 :         if tag.relnode == 0 {
     265            0 :             return Err(PageReconstructError::Other(
     266            0 :                 RelationError::InvalidRelnode.into(),
     267            0 :             ));
     268       441817 :         }
     269              : 
     270              :         // first try to lookup relation in cache
     271       441817 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     272       180049 :             return Ok(true);
     273       261768 :         }
     274       261768 :         // fetch directory listing
     275       261768 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     276       261768 :         let buf = version.get(self, key, ctx).await?;
     277              : 
     278       261768 :         match RelDirectory::des(&buf).context("deserialization failure") {
     279       261768 :             Ok(dir) => {
     280       261768 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     281       261768 :                 Ok(exists)
     282              :             }
     283            0 :             Err(e) => Err(PageReconstructError::from(e)),
     284              :         }
     285       441817 :     }
     286              : 
     287              :     /// Get a list of all existing relations in given tablespace and database.
     288              :     ///
     289              :     /// # Cancel-Safety
     290              :     ///
     291              :     /// This method is cancellation-safe.
     292         8142 :     pub(crate) async fn list_rels(
     293         8142 :         &self,
     294         8142 :         spcnode: Oid,
     295         8142 :         dbnode: Oid,
     296         8142 :         version: Version<'_>,
     297         8142 :         ctx: &RequestContext,
     298         8142 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     299         8142 :         // fetch directory listing
     300         8142 :         let key = rel_dir_to_key(spcnode, dbnode);
     301         8142 :         let buf = version.get(self, key, ctx).await?;
     302              : 
     303         8141 :         match RelDirectory::des(&buf).context("deserialization failure") {
     304         8141 :             Ok(dir) => {
     305         8141 :                 let rels: HashSet<RelTag> =
     306      1927910 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     307      1927910 :                         spcnode,
     308      1927910 :                         dbnode,
     309      1927910 :                         relnode: *relnode,
     310      1927910 :                         forknum: *forknum,
     311      1927910 :                     }));
     312         8141 : 
     313         8141 :                 Ok(rels)
     314              :             }
     315            0 :             Err(e) => Err(PageReconstructError::from(e)),
     316              :         }
     317         8142 :     }
     318              : 
     319              :     /// Get the whole SLRU segment
     320            0 :     pub(crate) async fn get_slru_segment(
     321            0 :         &self,
     322            0 :         kind: SlruKind,
     323            0 :         segno: u32,
     324            0 :         lsn: Lsn,
     325            0 :         ctx: &RequestContext,
     326            0 :     ) -> Result<Bytes, PageReconstructError> {
     327            0 :         let n_blocks = self
     328            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     329            0 :             .await?;
     330            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     331            0 :         for blkno in 0..n_blocks {
     332            0 :             let block = self
     333            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     334            0 :                 .await?;
     335            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     336              :         }
     337            0 :         Ok(segment.freeze())
     338            0 :     }
     339              : 
     340              :     /// Look up given SLRU page version.
     341         3174 :     pub(crate) async fn get_slru_page_at_lsn(
     342         3174 :         &self,
     343         3174 :         kind: SlruKind,
     344         3174 :         segno: u32,
     345         3174 :         blknum: BlockNumber,
     346         3174 :         lsn: Lsn,
     347         3174 :         ctx: &RequestContext,
     348         3174 :     ) -> Result<Bytes, PageReconstructError> {
     349         3174 :         let key = slru_block_to_key(kind, segno, blknum);
     350       262876 :         self.get(key, lsn, ctx).await
     351         3173 :     }
     352              : 
     353              :     /// Get size of an SLRU segment
     354         6323 :     pub(crate) async fn get_slru_segment_size(
     355         6323 :         &self,
     356         6323 :         kind: SlruKind,
     357         6323 :         segno: u32,
     358         6323 :         version: Version<'_>,
     359         6323 :         ctx: &RequestContext,
     360         6323 :     ) -> Result<BlockNumber, PageReconstructError> {
     361         6323 :         let key = slru_segment_size_to_key(kind, segno);
     362         6323 :         let mut buf = version.get(self, key, ctx).await?;
     363         6323 :         Ok(buf.get_u32_le())
     364         6323 :     }
     365              : 
     366              :     /// Get size of an SLRU segment
     367         1339 :     pub(crate) async fn get_slru_segment_exists(
     368         1339 :         &self,
     369         1339 :         kind: SlruKind,
     370         1339 :         segno: u32,
     371         1339 :         version: Version<'_>,
     372         1339 :         ctx: &RequestContext,
     373         1339 :     ) -> Result<bool, PageReconstructError> {
     374         1339 :         // fetch directory listing
     375         1339 :         let key = slru_dir_to_key(kind);
     376         1339 :         let buf = version.get(self, key, ctx).await?;
     377              : 
     378         1339 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     379         1339 :             Ok(dir) => {
     380         1339 :                 let exists = dir.segments.get(&segno).is_some();
     381         1339 :                 Ok(exists)
     382              :             }
     383            0 :             Err(e) => Err(PageReconstructError::from(e)),
     384              :         }
     385         1339 :     }
     386              : 
     387              :     /// Locate LSN, such that all transactions that committed before
     388              :     /// 'search_timestamp' are visible, but nothing newer is.
     389              :     ///
     390              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     391              :     /// so it's not well defined which LSN you get if there were multiple commits
     392              :     /// "in flight" at that point in time.
     393              :     ///
     394          190 :     pub(crate) async fn find_lsn_for_timestamp(
     395          190 :         &self,
     396          190 :         search_timestamp: TimestampTz,
     397          190 :         cancel: &CancellationToken,
     398          190 :         ctx: &RequestContext,
     399          190 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     400          190 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     401          190 :         // We use this method to figure out the branching LSN for the new branch, but the
     402          190 :         // GC cutoff could be before the branching point and we cannot create a new branch
     403          190 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     404          190 :         // on the safe side.
     405          190 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     406          190 :         let max_lsn = self.get_last_record_lsn();
     407          190 : 
     408          190 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     409          190 :         // LSN divided by 8.
     410          190 :         let mut low = min_lsn.0 / 8;
     411          190 :         let mut high = max_lsn.0 / 8 + 1;
     412          190 : 
     413          190 :         let mut found_smaller = false;
     414          190 :         let mut found_larger = false;
     415         3353 :         while low < high {
     416         3164 :             if cancel.is_cancelled() {
     417            0 :                 return Err(PageReconstructError::Cancelled);
     418         3164 :             }
     419         3164 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     420         3164 :             let mid = (high + low) / 2;
     421              : 
     422         3164 :             let cmp = self
     423         3164 :                 .is_latest_commit_timestamp_ge_than(
     424         3164 :                     search_timestamp,
     425         3164 :                     Lsn(mid * 8),
     426         3164 :                     &mut found_smaller,
     427         3164 :                     &mut found_larger,
     428         3164 :                     ctx,
     429         3164 :                 )
     430       265761 :                 .await?;
     431              : 
     432         3163 :             if cmp {
     433          889 :                 high = mid;
     434         2274 :             } else {
     435         2274 :                 low = mid + 1;
     436         2274 :             }
     437              :         }
     438              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     439              :         // so the LSN of the last commit record before or at `search_timestamp`.
     440              :         // Remove one from `low` to get `t`.
     441              :         //
     442              :         // FIXME: it would be better to get the LSN of the previous commit.
     443              :         // Otherwise, if you restore to the returned LSN, the database will
     444              :         // include physical changes from later commits that will be marked
     445              :         // as aborted, and will need to be vacuumed away.
     446          189 :         let commit_lsn = Lsn((low - 1) * 8);
     447          189 :         match (found_smaller, found_larger) {
     448              :             (false, false) => {
     449              :                 // This can happen if no commit records have been processed yet, e.g.
     450              :                 // just after importing a cluster.
     451           21 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     452              :             }
     453              :             (false, true) => {
     454              :                 // Didn't find any commit timestamps smaller than the request
     455           23 :                 Ok(LsnForTimestamp::Past(min_lsn))
     456              :             }
     457              :             (true, false) => {
     458              :                 // Only found commits with timestamps smaller than the request.
     459              :                 // It's still a valid case for branch creation, return it.
     460              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     461              :                 // case, anyway.
     462           82 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     463              :             }
     464           63 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     465              :         }
     466          189 :     }
     467              : 
     468              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     469              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     470              :     ///
     471              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     472              :     /// with a smaller/larger timestamp.
     473              :     ///
     474         3164 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     475         3164 :         &self,
     476         3164 :         search_timestamp: TimestampTz,
     477         3164 :         probe_lsn: Lsn,
     478         3164 :         found_smaller: &mut bool,
     479         3164 :         found_larger: &mut bool,
     480         3164 :         ctx: &RequestContext,
     481         3164 :     ) -> Result<bool, PageReconstructError> {
     482         3164 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     483         2943 :             if timestamp >= search_timestamp {
     484          889 :                 *found_larger = true;
     485          889 :                 return ControlFlow::Break(true);
     486         2054 :             } else {
     487         2054 :                 *found_smaller = true;
     488         2054 :             }
     489         2054 :             ControlFlow::Continue(())
     490         3164 :         })
     491       265761 :         .await
     492         3163 :     }
     493              : 
     494              :     /// Obtain the possible timestamp range for the given lsn.
     495              :     ///
     496              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     497           12 :     pub(crate) async fn get_timestamp_for_lsn(
     498           12 :         &self,
     499           12 :         probe_lsn: Lsn,
     500           12 :         ctx: &RequestContext,
     501           12 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     502           12 :         let mut max: Option<TimestampTz> = None;
     503           12 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     504           10 :             if let Some(max_prev) = max {
     505            0 :                 max = Some(max_prev.max(timestamp));
     506           10 :             } else {
     507           10 :                 max = Some(timestamp);
     508           10 :             }
     509           10 :             ControlFlow::Continue(())
     510           12 :         })
     511           81 :         .await?;
     512              : 
     513           10 :         Ok(max)
     514           12 :     }
     515              : 
     516              :     /// Runs the given function on all the timestamps for a given lsn
     517              :     ///
     518              :     /// The return value is either given by the closure, or set to the `Default`
     519              :     /// impl's output.
     520         3176 :     async fn map_all_timestamps<T: Default>(
     521         3176 :         &self,
     522         3176 :         probe_lsn: Lsn,
     523         3176 :         ctx: &RequestContext,
     524         3176 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     525         3176 :     ) -> Result<T, PageReconstructError> {
     526         3176 :         for segno in self
     527         3176 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     528         1509 :             .await?
     529              :         {
     530         3174 :             let nblocks = self
     531         3174 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     532         1457 :                 .await?;
     533         3174 :             for blknum in (0..nblocks).rev() {
     534         3174 :                 let clog_page = self
     535         3174 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     536       262876 :                     .await?;
     537              : 
     538         3173 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     539         2953 :                     let mut timestamp_bytes = [0u8; 8];
     540         2953 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     541         2953 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     542         2953 : 
     543         2953 :                     match f(timestamp) {
     544          889 :                         ControlFlow::Break(b) => return Ok(b),
     545         2064 :                         ControlFlow::Continue(()) => (),
     546              :                     }
     547          220 :                 }
     548              :             }
     549              :         }
     550         2284 :         Ok(Default::default())
     551         3175 :     }
     552              : 
     553          609 :     pub(crate) async fn get_slru_keyspace(
     554          609 :         &self,
     555          609 :         version: Version<'_>,
     556          609 :         ctx: &RequestContext,
     557          609 :     ) -> Result<KeySpace, PageReconstructError> {
     558          609 :         let mut accum = KeySpaceAccum::new();
     559              : 
     560         2418 :         for kind in SlruKind::iter() {
     561         1815 :             let mut segments: Vec<u32> = self
     562         1815 :                 .list_slru_segments(kind, version, ctx)
     563          185 :                 .await?
     564         1809 :                 .into_iter()
     565         1809 :                 .collect();
     566         1809 :             segments.sort_unstable();
     567              : 
     568         3654 :             for seg in segments {
     569         1845 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     570              : 
     571         1845 :                 accum.add_range(
     572         1845 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     573         1845 :                 );
     574              :             }
     575              :         }
     576              : 
     577          603 :         Ok(accum.to_keyspace())
     578          609 :     }
     579              : 
     580              :     /// Get a list of SLRU segments
     581         4993 :     pub(crate) async fn list_slru_segments(
     582         4993 :         &self,
     583         4993 :         kind: SlruKind,
     584         4993 :         version: Version<'_>,
     585         4993 :         ctx: &RequestContext,
     586         4993 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     587         4993 :         // fetch directory entry
     588         4993 :         let key = slru_dir_to_key(kind);
     589              : 
     590         4993 :         let buf = version.get(self, key, ctx).await?;
     591         4985 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     592         4985 :             Ok(dir) => Ok(dir.segments),
     593            0 :             Err(e) => Err(PageReconstructError::from(e)),
     594              :         }
     595         4993 :     }
     596              : 
     597         2446 :     pub(crate) async fn get_relmap_file(
     598         2446 :         &self,
     599         2446 :         spcnode: Oid,
     600         2446 :         dbnode: Oid,
     601         2446 :         version: Version<'_>,
     602         2446 :         ctx: &RequestContext,
     603         2446 :     ) -> Result<Bytes, PageReconstructError> {
     604         2446 :         let key = relmap_file_key(spcnode, dbnode);
     605              : 
     606         2446 :         let buf = version.get(self, key, ctx).await?;
     607         2446 :         Ok(buf)
     608         2446 :     }
     609              : 
     610          603 :     pub(crate) async fn list_dbdirs(
     611          603 :         &self,
     612          603 :         lsn: Lsn,
     613          603 :         ctx: &RequestContext,
     614          603 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     615              :         // fetch directory entry
     616          603 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     617              : 
     618          603 :         match DbDirectory::des(&buf).context("deserialization failure") {
     619          603 :             Ok(dir) => Ok(dir.dbdirs),
     620            0 :             Err(e) => Err(PageReconstructError::from(e)),
     621              :         }
     622          603 :     }
     623              : 
     624            2 :     pub(crate) async fn get_twophase_file(
     625            2 :         &self,
     626            2 :         xid: TransactionId,
     627            2 :         lsn: Lsn,
     628            2 :         ctx: &RequestContext,
     629            2 :     ) -> Result<Bytes, PageReconstructError> {
     630            2 :         let key = twophase_file_key(xid);
     631            2 :         let buf = self.get(key, lsn, ctx).await?;
     632            2 :         Ok(buf)
     633            2 :     }
     634              : 
     635          603 :     pub(crate) async fn list_twophase_files(
     636          603 :         &self,
     637          603 :         lsn: Lsn,
     638          603 :         ctx: &RequestContext,
     639          603 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     640              :         // fetch directory entry
     641          603 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     642              : 
     643          603 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     644          603 :             Ok(dir) => Ok(dir.xids),
     645            0 :             Err(e) => Err(PageReconstructError::from(e)),
     646              :         }
     647          603 :     }
     648              : 
     649          597 :     pub(crate) async fn get_control_file(
     650          597 :         &self,
     651          597 :         lsn: Lsn,
     652          597 :         ctx: &RequestContext,
     653          597 :     ) -> Result<Bytes, PageReconstructError> {
     654          597 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     655          597 :     }
     656              : 
     657         1943 :     pub(crate) async fn get_checkpoint(
     658         1943 :         &self,
     659         1943 :         lsn: Lsn,
     660         1943 :         ctx: &RequestContext,
     661         1943 :     ) -> Result<Bytes, PageReconstructError> {
     662         1943 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     663         1943 :     }
     664              : 
     665         2424 :     pub(crate) async fn list_aux_files(
     666         2424 :         &self,
     667         2424 :         lsn: Lsn,
     668         2424 :         ctx: &RequestContext,
     669         2424 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     670         2424 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     671         1436 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     672         1436 :                 Ok(dir) => Ok(dir.files),
     673            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     674              :             },
     675          988 :             Err(e) => {
     676              :                 // This is expected: historical databases do not have the key.
     677            0 :                 debug!("Failed to get info about AUX files: {}", e);
     678          988 :                 Ok(HashMap::new())
     679              :             }
     680              :         }
     681         2424 :     }
     682              : 
     683              :     /// Does the same as get_current_logical_size but counted on demand.
     684              :     /// Used to initialize the logical size tracking on startup.
     685              :     ///
     686              :     /// Only relation blocks are counted currently. That excludes metadata,
     687              :     /// SLRUs, twophase files etc.
     688              :     ///
     689              :     /// # Cancel-Safety
     690              :     ///
     691              :     /// This method is cancellation-safe.
     692          695 :     pub async fn get_current_logical_size_non_incremental(
     693          695 :         &self,
     694          695 :         lsn: Lsn,
     695          695 :         ctx: &RequestContext,
     696          695 :     ) -> Result<u64, CalculateLogicalSizeError> {
     697          695 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     698              : 
     699              :         // Fetch list of database dirs and iterate them
     700          943 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     701          687 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     702              : 
     703          687 :         let mut total_size: u64 = 0;
     704         2721 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     705       630374 :             for rel in self
     706         2721 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     707          600 :                 .await?
     708              :             {
     709       630374 :                 if self.cancel.is_cancelled() {
     710            2 :                     return Err(CalculateLogicalSizeError::Cancelled);
     711       630372 :                 }
     712       630372 :                 let relsize_key = rel_size_to_key(rel);
     713       630372 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     714       630340 :                 let relsize = buf.get_u32_le();
     715       630340 : 
     716       630340 :                 total_size += relsize as u64;
     717              :             }
     718              :         }
     719          652 :         Ok(total_size * BLCKSZ as u64)
     720          685 :     }
     721              : 
     722              :     ///
     723              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     724              :     /// Anything that's not listed maybe removed from the underlying storage (from
     725              :     /// that LSN forwards).
     726          893 :     pub(crate) async fn collect_keyspace(
     727          893 :         &self,
     728          893 :         lsn: Lsn,
     729          893 :         ctx: &RequestContext,
     730          893 :     ) -> Result<KeySpace, CollectKeySpaceError> {
     731          893 :         // Iterate through key ranges, greedily packing them into partitions
     732          893 :         let mut result = KeySpaceAccum::new();
     733          893 : 
     734          893 :         // The dbdir metadata always exists
     735          893 :         result.add_key(DBDIR_KEY);
     736              : 
     737              :         // Fetch list of database dirs and iterate them
     738         2300 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     739          891 :         let dbdir = DbDirectory::des(&buf)?;
     740              : 
     741          891 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     742          891 :         dbs.sort_unstable();
     743         3852 :         for (spcnode, dbnode) in dbs {
     744         2963 :             result.add_key(relmap_file_key(spcnode, dbnode));
     745         2963 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     746              : 
     747         2963 :             let mut rels: Vec<RelTag> = self
     748         2963 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     749          785 :                 .await?
     750         2963 :                 .into_iter()
     751         2963 :                 .collect();
     752         2963 :             rels.sort_unstable();
     753       715757 :             for rel in rels {
     754       712796 :                 let relsize_key = rel_size_to_key(rel);
     755       712796 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     756       712794 :                 let relsize = buf.get_u32_le();
     757       712794 : 
     758       712794 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     759       712794 :                 result.add_key(relsize_key);
     760              :             }
     761              :         }
     762              : 
     763              :         // Iterate SLRUs next
     764         2667 :         for kind in [
     765          889 :             SlruKind::Clog,
     766          889 :             SlruKind::MultiXactMembers,
     767          889 :             SlruKind::MultiXactOffsets,
     768              :         ] {
     769         2667 :             let slrudir_key = slru_dir_to_key(kind);
     770         2667 :             result.add_key(slrudir_key);
     771         8753 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     772         2667 :             let dir = SlruSegmentDirectory::des(&buf)?;
     773         2667 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     774         2667 :             segments.sort_unstable();
     775         4847 :             for segno in segments {
     776         2180 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     777         2180 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     778         2180 :                 let segsize = buf.get_u32_le();
     779         2180 : 
     780         2180 :                 result.add_range(
     781         2180 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     782         2180 :                 );
     783         2180 :                 result.add_key(segsize_key);
     784              :             }
     785              :         }
     786              : 
     787              :         // Then pg_twophase
     788          889 :         result.add_key(TWOPHASEDIR_KEY);
     789         3256 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     790          889 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     791          889 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     792          889 :         xids.sort_unstable();
     793          889 :         for xid in xids {
     794            0 :             result.add_key(twophase_file_key(xid));
     795            0 :         }
     796              : 
     797          889 :         result.add_key(CONTROLFILE_KEY);
     798          889 :         result.add_key(CHECKPOINT_KEY);
     799          889 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     800          621 :             result.add_key(AUX_FILES_KEY);
     801          621 :         }
     802          889 :         Ok(result.to_keyspace())
     803          892 :     }
     804              : 
     805              :     /// Get cached size of relation if it not updated after specified LSN
     806     63432140 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     807     63432140 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     808     63432140 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     809     63075245 :             if lsn >= *cached_lsn {
     810     62770501 :                 return Some(*nblocks);
     811       304744 :             }
     812       356895 :         }
     813       661639 :         None
     814     63432140 :     }
     815              : 
     816              :     /// Update cached relation size if there is no more recent update
     817       155970 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     818       155970 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     819       155970 :         match rel_size_cache.entry(tag) {
     820       130012 :             hash_map::Entry::Occupied(mut entry) => {
     821       130012 :                 let cached_lsn = entry.get_mut();
     822       130012 :                 if lsn >= cached_lsn.0 {
     823           19 :                     *cached_lsn = (lsn, nblocks);
     824       129993 :                 }
     825              :             }
     826        25958 :             hash_map::Entry::Vacant(entry) => {
     827        25958 :                 entry.insert((lsn, nblocks));
     828        25958 :             }
     829              :         }
     830       155970 :     }
     831              : 
     832              :     /// Store cached relation size
     833      1962230 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     834      1962230 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     835      1962230 :         rel_size_cache.insert(tag, (lsn, nblocks));
     836      1962230 :     }
     837              : 
     838              :     /// Remove cached relation size
     839        67233 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     840        67233 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     841        67233 :         rel_size_cache.remove(tag);
     842        67233 :     }
     843              : }
     844              : 
     845              : /// DatadirModification represents an operation to ingest an atomic set of
     846              : /// updates to the repository. It is created by the 'begin_record'
     847              : /// function. It is called for each WAL record, so that all the modifications
     848              : /// by a one WAL record appear atomic.
     849              : pub struct DatadirModification<'a> {
     850              :     /// The timeline this modification applies to. You can access this to
     851              :     /// read the state, but note that any pending updates are *not* reflected
     852              :     /// in the state in 'tline' yet.
     853              :     pub tline: &'a Timeline,
     854              : 
     855              :     /// Current LSN of the modification
     856              :     lsn: Lsn,
     857              : 
     858              :     // The modifications are not applied directly to the underlying key-value store.
     859              :     // The put-functions add the modifications here, and they are flushed to the
     860              :     // underlying key-value store by the 'finish' function.
     861              :     pending_lsns: Vec<Lsn>,
     862              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     863              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     864              :     pending_nblocks: i64,
     865              : }
     866              : 
     867              : impl<'a> DatadirModification<'a> {
     868              :     /// Get the current lsn
     869     57214349 :     pub(crate) fn get_lsn(&self) -> Lsn {
     870     57214349 :         self.lsn
     871     57214349 :     }
     872              : 
     873              :     /// Set the current lsn
     874     73215430 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     875     73215430 :         ensure!(
     876     73215430 :             lsn >= self.lsn,
     877            0 :             "setting an older lsn {} than {} is not allowed",
     878              :             lsn,
     879              :             self.lsn
     880              :         );
     881     73215430 :         if lsn > self.lsn {
     882     73215430 :             self.pending_lsns.push(self.lsn);
     883     73215430 :             self.lsn = lsn;
     884     73215430 :         }
     885     73215430 :         Ok(())
     886     73215430 :     }
     887              : 
     888              :     /// Initialize a completely new repository.
     889              :     ///
     890              :     /// This inserts the directory metadata entries that are assumed to
     891              :     /// always exist.
     892          675 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     893          675 :         let buf = DbDirectory::ser(&DbDirectory {
     894          675 :             dbdirs: HashMap::new(),
     895          675 :         })?;
     896          675 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     897          675 : 
     898          675 :         // Create AuxFilesDirectory
     899          675 :         self.init_aux_dir()?;
     900              : 
     901          675 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     902          675 :             xids: HashSet::new(),
     903          675 :         })?;
     904          675 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     905              : 
     906          675 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     907          675 :         let empty_dir = Value::Image(buf);
     908          675 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     909          675 :         self.put(
     910          675 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     911          675 :             empty_dir.clone(),
     912          675 :         );
     913          675 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     914          675 : 
     915          675 :         Ok(())
     916          675 :     }
     917              : 
     918              :     #[cfg(test)]
     919           70 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     920           70 :         self.init_empty()?;
     921           70 :         self.put_control_file(bytes::Bytes::from_static(
     922           70 :             b"control_file contents do not matter",
     923           70 :         ))
     924           70 :         .context("put_control_file")?;
     925           70 :         self.put_checkpoint(bytes::Bytes::from_static(
     926           70 :             b"checkpoint_file contents do not matter",
     927           70 :         ))
     928           70 :         .context("put_checkpoint_file")?;
     929           70 :         Ok(())
     930           70 :     }
     931              : 
     932              :     /// Put a new page version that can be constructed from a WAL record
     933              :     ///
     934              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     935              :     /// current end-of-file. It's up to the caller to check that the relation size
     936              :     /// matches the blocks inserted!
     937     53273693 :     pub fn put_rel_wal_record(
     938     53273693 :         &mut self,
     939     53273693 :         rel: RelTag,
     940     53273693 :         blknum: BlockNumber,
     941     53273693 :         rec: NeonWalRecord,
     942     53273693 :     ) -> anyhow::Result<()> {
     943     53273693 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     944     53273693 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     945     53273693 :         Ok(())
     946     53273693 :     }
     947              : 
     948              :     // Same, but for an SLRU.
     949      6163782 :     pub fn put_slru_wal_record(
     950      6163782 :         &mut self,
     951      6163782 :         kind: SlruKind,
     952      6163782 :         segno: u32,
     953      6163782 :         blknum: BlockNumber,
     954      6163782 :         rec: NeonWalRecord,
     955      6163782 :     ) -> anyhow::Result<()> {
     956      6163782 :         self.put(
     957      6163782 :             slru_block_to_key(kind, segno, blknum),
     958      6163782 :             Value::WalRecord(rec),
     959      6163782 :         );
     960      6163782 :         Ok(())
     961      6163782 :     }
     962              : 
     963              :     /// Like put_wal_record, but with ready-made image of the page.
     964      2579137 :     pub fn put_rel_page_image(
     965      2579137 :         &mut self,
     966      2579137 :         rel: RelTag,
     967      2579137 :         blknum: BlockNumber,
     968      2579137 :         img: Bytes,
     969      2579137 :     ) -> anyhow::Result<()> {
     970      2579137 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     971      2579137 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     972      2579137 :         Ok(())
     973      2579137 :     }
     974              : 
     975         3160 :     pub fn put_slru_page_image(
     976         3160 :         &mut self,
     977         3160 :         kind: SlruKind,
     978         3160 :         segno: u32,
     979         3160 :         blknum: BlockNumber,
     980         3160 :         img: Bytes,
     981         3160 :     ) -> anyhow::Result<()> {
     982         3160 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
     983         3160 :         Ok(())
     984         3160 :     }
     985              : 
     986              :     /// Store a relmapper file (pg_filenode.map) in the repository
     987         2510 :     pub async fn put_relmap_file(
     988         2510 :         &mut self,
     989         2510 :         spcnode: Oid,
     990         2510 :         dbnode: Oid,
     991         2510 :         img: Bytes,
     992         2510 :         ctx: &RequestContext,
     993         2510 :     ) -> anyhow::Result<()> {
     994              :         // Add it to the directory (if it doesn't exist already)
     995         2510 :         let buf = self.get(DBDIR_KEY, ctx).await?;
     996         2510 :         let mut dbdir = DbDirectory::des(&buf)?;
     997              : 
     998         2510 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
     999         2510 :         if r.is_none() || r == Some(false) {
    1000              :             // The dbdir entry didn't exist, or it contained a
    1001              :             // 'false'. The 'insert' call already updated it with
    1002              :             // 'true', now write the updated 'dbdirs' map back.
    1003         2448 :             let buf = DbDirectory::ser(&dbdir)?;
    1004         2448 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1005         2448 : 
    1006         2448 :             // Create AuxFilesDirectory as well
    1007         2448 :             self.init_aux_dir()?;
    1008           62 :         }
    1009         2510 :         if r.is_none() {
    1010           36 :             // Create RelDirectory
    1011           36 :             let buf = RelDirectory::ser(&RelDirectory {
    1012           36 :                 rels: HashSet::new(),
    1013           36 :             })?;
    1014           36 :             self.put(
    1015           36 :                 rel_dir_to_key(spcnode, dbnode),
    1016           36 :                 Value::Image(Bytes::from(buf)),
    1017           36 :             );
    1018         2474 :         }
    1019              : 
    1020         2510 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1021         2510 :         Ok(())
    1022         2510 :     }
    1023              : 
    1024            4 :     pub async fn put_twophase_file(
    1025            4 :         &mut self,
    1026            4 :         xid: TransactionId,
    1027            4 :         img: Bytes,
    1028            4 :         ctx: &RequestContext,
    1029            4 :     ) -> anyhow::Result<()> {
    1030              :         // Add it to the directory entry
    1031            4 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1032            4 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1033            4 :         if !dir.xids.insert(xid) {
    1034            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1035            4 :         }
    1036            4 :         self.put(
    1037            4 :             TWOPHASEDIR_KEY,
    1038            4 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1039              :         );
    1040              : 
    1041            4 :         self.put(twophase_file_key(xid), Value::Image(img));
    1042            4 :         Ok(())
    1043            4 :     }
    1044              : 
    1045          673 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1046          673 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1047          673 :         Ok(())
    1048          673 :     }
    1049              : 
    1050        34046 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1051        34046 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1052        34046 :         Ok(())
    1053        34046 :     }
    1054              : 
    1055            3 :     pub async fn drop_dbdir(
    1056            3 :         &mut self,
    1057            3 :         spcnode: Oid,
    1058            3 :         dbnode: Oid,
    1059            3 :         ctx: &RequestContext,
    1060            3 :     ) -> anyhow::Result<()> {
    1061            3 :         let total_blocks = self
    1062            3 :             .tline
    1063            3 :             .get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx)
    1064            0 :             .await?;
    1065              : 
    1066              :         // Remove entry from dbdir
    1067            3 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1068            3 :         let mut dir = DbDirectory::des(&buf)?;
    1069            3 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1070            3 :             let buf = DbDirectory::ser(&dir)?;
    1071            3 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1072              :         } else {
    1073            0 :             warn!(
    1074            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1075            0 :                 spcnode, dbnode
    1076            0 :             );
    1077              :         }
    1078              : 
    1079              :         // Update logical database size.
    1080            3 :         self.pending_nblocks -= total_blocks as i64;
    1081            3 : 
    1082            3 :         // Delete all relations and metadata files for the spcnode/dnode
    1083            3 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1084            3 :         Ok(())
    1085            3 :     }
    1086              : 
    1087              :     /// Create a relation fork.
    1088              :     ///
    1089              :     /// 'nblocks' is the initial size.
    1090       648391 :     pub async fn put_rel_creation(
    1091       648391 :         &mut self,
    1092       648391 :         rel: RelTag,
    1093       648391 :         nblocks: BlockNumber,
    1094       648391 :         ctx: &RequestContext,
    1095       648391 :     ) -> Result<(), RelationError> {
    1096       648391 :         if rel.relnode == 0 {
    1097            0 :             return Err(RelationError::InvalidRelnode);
    1098       648391 :         }
    1099              :         // It's possible that this is the first rel for this db in this
    1100              :         // tablespace.  Create the reldir entry for it if so.
    1101       648391 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1102       648391 :             .context("deserialize db")?;
    1103       648391 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1104       648391 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
    1105              :             // Didn't exist. Update dbdir
    1106         2417 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
    1107         2417 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1108         2417 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1109         2417 : 
    1110         2417 :             // and create the RelDirectory
    1111         2417 :             RelDirectory::default()
    1112              :         } else {
    1113              :             // reldir already exists, fetch it
    1114       645974 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1115       645974 :                 .context("deserialize db")?
    1116              :         };
    1117              : 
    1118              :         // Add the new relation to the rel directory entry, and write it back
    1119       648391 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1120            0 :             return Err(RelationError::AlreadyExists);
    1121       648391 :         }
    1122       648391 :         self.put(
    1123       648391 :             rel_dir_key,
    1124       648391 :             Value::Image(Bytes::from(
    1125       648391 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1126              :             )),
    1127              :         );
    1128              : 
    1129              :         // Put size
    1130       648391 :         let size_key = rel_size_to_key(rel);
    1131       648391 :         let buf = nblocks.to_le_bytes();
    1132       648391 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1133       648391 : 
    1134       648391 :         self.pending_nblocks += nblocks as i64;
    1135       648391 : 
    1136       648391 :         // Update relation size cache
    1137       648391 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1138       648391 : 
    1139       648391 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1140       648391 :         // caller.
    1141       648391 :         Ok(())
    1142       648391 :     }
    1143              : 
    1144              :     /// Truncate relation
    1145         6322 :     pub async fn put_rel_truncation(
    1146         6322 :         &mut self,
    1147         6322 :         rel: RelTag,
    1148         6322 :         nblocks: BlockNumber,
    1149         6322 :         ctx: &RequestContext,
    1150         6322 :     ) -> anyhow::Result<()> {
    1151         6322 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1152         6322 :         if self
    1153         6322 :             .tline
    1154         6322 :             .get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx)
    1155            0 :             .await?
    1156              :         {
    1157         6322 :             let size_key = rel_size_to_key(rel);
    1158              :             // Fetch the old size first
    1159         6322 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1160         6322 : 
    1161         6322 :             // Update the entry with the new size.
    1162         6322 :             let buf = nblocks.to_le_bytes();
    1163         6322 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1164         6322 : 
    1165         6322 :             // Update relation size cache
    1166         6322 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1167         6322 : 
    1168         6322 :             // Update relation size cache
    1169         6322 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1170         6322 : 
    1171         6322 :             // Update logical database size.
    1172         6322 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1173            0 :         }
    1174         6322 :         Ok(())
    1175         6322 :     }
    1176              : 
    1177              :     /// Extend relation
    1178              :     /// If new size is smaller, do nothing.
    1179      1863650 :     pub async fn put_rel_extend(
    1180      1863650 :         &mut self,
    1181      1863650 :         rel: RelTag,
    1182      1863650 :         nblocks: BlockNumber,
    1183      1863650 :         ctx: &RequestContext,
    1184      1863650 :     ) -> anyhow::Result<()> {
    1185      1863650 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1186              : 
    1187              :         // Put size
    1188      1863650 :         let size_key = rel_size_to_key(rel);
    1189      1863650 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1190      1863647 : 
    1191      1863647 :         // only extend relation here. never decrease the size
    1192      1863647 :         if nblocks > old_size {
    1193      1301195 :             let buf = nblocks.to_le_bytes();
    1194      1301195 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1195      1301195 : 
    1196      1301195 :             // Update relation size cache
    1197      1301195 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1198      1301195 : 
    1199      1301195 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1200      1301195 :         }
    1201      1863647 :         Ok(())
    1202      1863650 :     }
    1203              : 
    1204              :     /// Drop a relation.
    1205        67233 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1206        67233 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1207              : 
    1208              :         // Remove it from the directory entry
    1209        67233 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1210        67233 :         let buf = self.get(dir_key, ctx).await?;
    1211        67233 :         let mut dir = RelDirectory::des(&buf)?;
    1212              : 
    1213        67233 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1214        67233 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1215              :         } else {
    1216            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1217              :         }
    1218              : 
    1219              :         // update logical size
    1220        67233 :         let size_key = rel_size_to_key(rel);
    1221        67233 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1222        67233 :         self.pending_nblocks -= old_size as i64;
    1223        67233 : 
    1224        67233 :         // Remove enty from relation size cache
    1225        67233 :         self.tline.remove_cached_rel_size(&rel);
    1226        67233 : 
    1227        67233 :         // Delete size entry, as well as all blocks
    1228        67233 :         self.delete(rel_key_range(rel));
    1229        67233 : 
    1230        67233 :         Ok(())
    1231        67233 :     }
    1232              : 
    1233         1848 :     pub async fn put_slru_segment_creation(
    1234         1848 :         &mut self,
    1235         1848 :         kind: SlruKind,
    1236         1848 :         segno: u32,
    1237         1848 :         nblocks: BlockNumber,
    1238         1848 :         ctx: &RequestContext,
    1239         1848 :     ) -> anyhow::Result<()> {
    1240         1848 :         // Add it to the directory entry
    1241         1848 :         let dir_key = slru_dir_to_key(kind);
    1242         1848 :         let buf = self.get(dir_key, ctx).await?;
    1243         1848 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1244              : 
    1245         1848 :         if !dir.segments.insert(segno) {
    1246            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1247         1848 :         }
    1248         1848 :         self.put(
    1249         1848 :             dir_key,
    1250         1848 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1251              :         );
    1252              : 
    1253              :         // Put size
    1254         1848 :         let size_key = slru_segment_size_to_key(kind, segno);
    1255         1848 :         let buf = nblocks.to_le_bytes();
    1256         1848 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1257         1848 : 
    1258         1848 :         // even if nblocks > 0, we don't insert any actual blocks here
    1259         1848 : 
    1260         1848 :         Ok(())
    1261         1848 :     }
    1262              : 
    1263              :     /// Extend SLRU segment
    1264         1315 :     pub fn put_slru_extend(
    1265         1315 :         &mut self,
    1266         1315 :         kind: SlruKind,
    1267         1315 :         segno: u32,
    1268         1315 :         nblocks: BlockNumber,
    1269         1315 :     ) -> anyhow::Result<()> {
    1270         1315 :         // Put size
    1271         1315 :         let size_key = slru_segment_size_to_key(kind, segno);
    1272         1315 :         let buf = nblocks.to_le_bytes();
    1273         1315 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1274         1315 :         Ok(())
    1275         1315 :     }
    1276              : 
    1277              :     /// This method is used for marking truncated SLRU files
    1278           10 :     pub async fn drop_slru_segment(
    1279           10 :         &mut self,
    1280           10 :         kind: SlruKind,
    1281           10 :         segno: u32,
    1282           10 :         ctx: &RequestContext,
    1283           10 :     ) -> anyhow::Result<()> {
    1284           10 :         // Remove it from the directory entry
    1285           10 :         let dir_key = slru_dir_to_key(kind);
    1286           10 :         let buf = self.get(dir_key, ctx).await?;
    1287           10 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1288              : 
    1289           10 :         if !dir.segments.remove(&segno) {
    1290            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1291           10 :         }
    1292           10 :         self.put(
    1293           10 :             dir_key,
    1294           10 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1295              :         );
    1296              : 
    1297              :         // Delete size entry, as well as all blocks
    1298           10 :         self.delete(slru_segment_key_range(kind, segno));
    1299           10 : 
    1300           10 :         Ok(())
    1301           10 :     }
    1302              : 
    1303              :     /// Drop a relmapper file (pg_filenode.map)
    1304            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1305            0 :         // TODO
    1306            0 :         Ok(())
    1307            0 :     }
    1308              : 
    1309              :     /// This method is used for marking truncated SLRU files
    1310            2 :     pub async fn drop_twophase_file(
    1311            2 :         &mut self,
    1312            2 :         xid: TransactionId,
    1313            2 :         ctx: &RequestContext,
    1314            2 :     ) -> anyhow::Result<()> {
    1315              :         // Remove it from the directory entry
    1316            2 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1317            2 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1318              : 
    1319            2 :         if !dir.xids.remove(&xid) {
    1320            0 :             warn!("twophase file for xid {} does not exist", xid);
    1321            2 :         }
    1322            2 :         self.put(
    1323            2 :             TWOPHASEDIR_KEY,
    1324            2 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1325              :         );
    1326              : 
    1327              :         // Delete it
    1328            2 :         self.delete(twophase_key_range(xid));
    1329            2 : 
    1330            2 :         Ok(())
    1331            2 :     }
    1332              : 
    1333         3123 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1334         3123 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1335         3123 :             files: HashMap::new(),
    1336         3123 :         })?;
    1337         3123 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1338         3123 :         Ok(())
    1339         3123 :     }
    1340              : 
    1341          117 :     pub async fn put_file(
    1342          117 :         &mut self,
    1343          117 :         path: &str,
    1344          117 :         content: &[u8],
    1345          117 :         ctx: &RequestContext,
    1346          117 :     ) -> anyhow::Result<()> {
    1347          117 :         let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
    1348          114 :             Ok(buf) => AuxFilesDirectory::des(&buf)?,
    1349            3 :             Err(e) => {
    1350              :                 // This is expected: historical databases do not have the key.
    1351            0 :                 debug!("Failed to get info about AUX files: {}", e);
    1352            3 :                 AuxFilesDirectory {
    1353            3 :                     files: HashMap::new(),
    1354            3 :                 }
    1355              :             }
    1356              :         };
    1357          117 :         let path = path.to_string();
    1358          117 :         if content.is_empty() {
    1359            5 :             dir.files.remove(&path);
    1360          112 :         } else {
    1361          112 :             dir.files.insert(path, Bytes::copy_from_slice(content));
    1362          112 :         }
    1363          117 :         self.put(
    1364          117 :             AUX_FILES_KEY,
    1365          117 :             Value::Image(Bytes::from(
    1366          117 :                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1367              :             )),
    1368              :         );
    1369          117 :         Ok(())
    1370          117 :     }
    1371              : 
    1372              :     ///
    1373              :     /// Flush changes accumulated so far to the underlying repository.
    1374              :     ///
    1375              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1376              :     /// you to flush them to the underlying repository before the final `commit`.
    1377              :     /// That allows to free up the memory used to hold the pending changes.
    1378              :     ///
    1379              :     /// Currently only used during bulk import of a data directory. In that
    1380              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1381              :     /// whole import fails and the timeline will be deleted anyway.
    1382              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1383              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1384              :     ///
    1385              :     /// Note: A consequence of flushing the pending operations is that they
    1386              :     /// won't be visible to subsequent operations until `commit`. The function
    1387              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1388              :     /// for bulk import, where you are just loading data pages and won't try to
    1389              :     /// modify the same pages twice.
    1390       573331 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1391       573331 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1392       573331 :         // to scan through the pending_updates list.
    1393       573331 :         let pending_nblocks = self.pending_nblocks;
    1394       573331 :         if pending_nblocks < 10000 {
    1395       573331 :             return Ok(());
    1396            0 :         }
    1397              : 
    1398            0 :         let writer = self.tline.writer().await;
    1399              : 
    1400              :         // Flush relation and  SLRU data blocks, keep metadata.
    1401            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1402            0 :         for (key, values) in self.pending_updates.drain() {
    1403            0 :             for (lsn, value) in values {
    1404            0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1405              :                     // This bails out on first error without modifying pending_updates.
    1406              :                     // That's Ok, cf this function's doc comment.
    1407            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1408            0 :                 } else {
    1409            0 :                     retained_pending_updates
    1410            0 :                         .entry(key)
    1411            0 :                         .or_default()
    1412            0 :                         .push((lsn, value));
    1413            0 :                 }
    1414              :             }
    1415              :         }
    1416              : 
    1417            0 :         self.pending_updates = retained_pending_updates;
    1418            0 : 
    1419            0 :         if pending_nblocks != 0 {
    1420            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1421            0 :             self.pending_nblocks = 0;
    1422            0 :         }
    1423              : 
    1424            0 :         Ok(())
    1425       573331 :     }
    1426              : 
    1427              :     ///
    1428              :     /// Finish this atomic update, writing all the updated keys to the
    1429              :     /// underlying timeline.
    1430              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1431              :     ///
    1432      2141052 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1433      2141052 :         let writer = self.tline.writer().await;
    1434              : 
    1435      2141052 :         let pending_nblocks = self.pending_nblocks;
    1436      2141052 :         self.pending_nblocks = 0;
    1437      2141052 : 
    1438      2141052 :         if !self.pending_updates.is_empty() {
    1439      1718651 :             writer.put_batch(&self.pending_updates, ctx).await?;
    1440      1718650 :             self.pending_updates.clear();
    1441       422401 :         }
    1442              : 
    1443      2141051 :         if !self.pending_deletions.is_empty() {
    1444        19216 :             writer.delete_batch(&self.pending_deletions).await?;
    1445        19216 :             self.pending_deletions.clear();
    1446      2121835 :         }
    1447              : 
    1448      2141051 :         self.pending_lsns.push(self.lsn);
    1449     75356216 :         for pending_lsn in self.pending_lsns.drain(..) {
    1450     75356216 :             // Ideally, we should be able to call writer.finish_write() only once
    1451     75356216 :             // with the highest LSN. However, the last_record_lsn variable in the
    1452     75356216 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1453     75356216 :             // so we need to record every LSN to not leave a gap between them.
    1454     75356216 :             writer.finish_write(pending_lsn);
    1455     75356216 :         }
    1456              : 
    1457      2141051 :         if pending_nblocks != 0 {
    1458       642421 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1459      1498630 :         }
    1460              : 
    1461      2141051 :         Ok(())
    1462      2141051 :     }
    1463              : 
    1464    146430857 :     pub(crate) fn len(&self) -> usize {
    1465    146430857 :         self.pending_updates.len() + self.pending_deletions.len()
    1466    146430857 :     }
    1467              : 
    1468              :     // Internal helper functions to batch the modifications
    1469              : 
    1470      3527034 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1471              :         // Have we already updated the same key? Read the latest pending updated
    1472              :         // version in that case.
    1473              :         //
    1474              :         // Note: we don't check pending_deletions. It is an error to request a
    1475              :         // value that has been removed, deletion only avoids leaking storage.
    1476      3527034 :         if let Some(values) = self.pending_updates.get(&key) {
    1477      2651066 :             if let Some((_, value)) = values.last() {
    1478      2651066 :                 return if let Value::Image(img) = value {
    1479      2651066 :                     Ok(img.clone())
    1480              :                 } else {
    1481              :                     // Currently, we never need to read back a WAL record that we
    1482              :                     // inserted in the same "transaction". All the metadata updates
    1483              :                     // work directly with Images, and we never need to read actual
    1484              :                     // data pages. We could handle this if we had to, by calling
    1485              :                     // the walredo manager, but let's keep it simple for now.
    1486            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1487            0 :                         "unexpected pending WAL record"
    1488            0 :                     )))
    1489              :                 };
    1490            0 :             }
    1491       875968 :         }
    1492       875968 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1493       875968 :         self.tline.get(key, lsn, ctx).await
    1494      3527034 :     }
    1495              : 
    1496     64745083 :     fn put(&mut self, key: Key, val: Value) {
    1497     64745083 :         let values = self.pending_updates.entry(key).or_default();
    1498              :         // Replace the previous value if it exists at the same lsn
    1499     64745083 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1500     54114977 :             if *last_lsn == self.lsn {
    1501       644317 :                 *last_value = val;
    1502       644317 :                 return;
    1503     53470660 :             }
    1504     10630106 :         }
    1505     64100766 :         values.push((self.lsn, val));
    1506     64745083 :     }
    1507              : 
    1508        67248 :     fn delete(&mut self, key_range: Range<Key>) {
    1509        67248 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1510        67248 :         self.pending_deletions.push((key_range, self.lsn));
    1511        67248 :     }
    1512              : }
    1513              : 
    1514              : /// This struct facilitates accessing either a committed key from the timeline at a
    1515              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1516              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1517              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1518              : /// need to look up the keys in the modification first before looking them up in the
    1519              : /// timeline to not miss the latest updates.
    1520            0 : #[derive(Clone, Copy)]
    1521              : pub enum Version<'a> {
    1522              :     Lsn(Lsn),
    1523              :     Modified(&'a DatadirModification<'a>),
    1524              : }
    1525              : 
    1526              : impl<'a> Version<'a> {
    1527      6173459 :     async fn get(
    1528      6173459 :         &self,
    1529      6173459 :         timeline: &Timeline,
    1530      6173459 :         key: Key,
    1531      6173459 :         ctx: &RequestContext,
    1532      6173459 :     ) -> Result<Bytes, PageReconstructError> {
    1533      6173459 :         match self {
    1534      5949722 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1535       223737 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1536              :         }
    1537      6173459 :     }
    1538              : 
    1539      6373761 :     fn get_lsn(&self) -> Lsn {
    1540      6373761 :         match self {
    1541      5864439 :             Version::Lsn(lsn) => *lsn,
    1542       509322 :             Version::Modified(modification) => modification.lsn,
    1543              :         }
    1544      6373761 :     }
    1545              : }
    1546              : 
    1547              : //--- Metadata structs stored in key-value pairs in the repository.
    1548              : 
    1549       653085 : #[derive(Debug, Serialize, Deserialize)]
    1550              : struct DbDirectory {
    1551              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1552              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1553              : }
    1554              : 
    1555         1498 : #[derive(Debug, Serialize, Deserialize)]
    1556              : struct TwoPhaseDirectory {
    1557              :     xids: HashSet<TransactionId>,
    1558              : }
    1559              : 
    1560      1431320 : #[derive(Debug, Serialize, Deserialize, Default)]
    1561              : struct RelDirectory {
    1562              :     // Set of relations that exist. (relfilenode, forknum)
    1563              :     //
    1564              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1565              :     // key-value pairs, if you have a lot of relations
    1566              :     rels: HashSet<(Oid, u8)>,
    1567              : }
    1568              : 
    1569         6480 : #[derive(Debug, Serialize, Deserialize, Default)]
    1570              : struct AuxFilesDirectory {
    1571              :     files: HashMap<String, Bytes>,
    1572              : }
    1573              : 
    1574            0 : #[derive(Debug, Serialize, Deserialize)]
    1575              : struct RelSizeEntry {
    1576              :     nblocks: u32,
    1577              : }
    1578              : 
    1579        10849 : #[derive(Debug, Serialize, Deserialize, Default)]
    1580              : struct SlruSegmentDirectory {
    1581              :     // Set of SLRU segments that exist.
    1582              :     segments: HashSet<u32>,
    1583              : }
    1584              : 
    1585              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1586              : 
    1587              : #[allow(clippy::bool_assert_comparison)]
    1588              : #[cfg(test)]
    1589              : mod tests {
    1590              :     //use super::repo_harness::*;
    1591              :     //use super::*;
    1592              : 
    1593              :     /*
    1594              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1595              :             let incremental = timeline.get_current_logical_size();
    1596              :             let non_incremental = timeline
    1597              :                 .get_current_logical_size_non_incremental(lsn)
    1598              :                 .unwrap();
    1599              :             assert_eq!(incremental, non_incremental);
    1600              :         }
    1601              :     */
    1602              : 
    1603              :     /*
    1604              :     ///
    1605              :     /// Test list_rels() function, with branches and dropped relations
    1606              :     ///
    1607              :     #[test]
    1608              :     fn test_list_rels_drop() -> Result<()> {
    1609              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1610              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1611              :         const TESTDB: u32 = 111;
    1612              : 
    1613              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1614              :         // after branching fails below
    1615              :         let mut writer = tline.begin_record(Lsn(0x10));
    1616              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1617              :         writer.finish()?;
    1618              : 
    1619              :         // Create a relation on the timeline
    1620              :         let mut writer = tline.begin_record(Lsn(0x20));
    1621              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1622              :         writer.finish()?;
    1623              : 
    1624              :         let writer = tline.begin_record(Lsn(0x00));
    1625              :         writer.finish()?;
    1626              : 
    1627              :         // Check that list_rels() lists it after LSN 2, but no before it
    1628              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1629              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1630              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1631              : 
    1632              :         // Create a branch, check that the relation is visible there
    1633              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1634              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1635              :             Some(timeline) => timeline,
    1636              :             None => panic!("Should have a local timeline"),
    1637              :         };
    1638              :         let newtline = DatadirTimelineImpl::new(newtline);
    1639              :         assert!(newtline
    1640              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1641              :             .contains(&TESTREL_A));
    1642              : 
    1643              :         // Drop it on the branch
    1644              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1645              :         new_writer.drop_relation(TESTREL_A)?;
    1646              :         new_writer.finish()?;
    1647              : 
    1648              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1649              :         assert!(newtline
    1650              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1651              :             .contains(&TESTREL_A));
    1652              :         assert!(!newtline
    1653              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1654              :             .contains(&TESTREL_A));
    1655              : 
    1656              :         // Run checkpoint and garbage collection and check that it's still not visible
    1657              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1658              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1659              : 
    1660              :         assert!(!newtline
    1661              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1662              :             .contains(&TESTREL_A));
    1663              : 
    1664              :         Ok(())
    1665              :     }
    1666              :      */
    1667              : 
    1668              :     /*
    1669              :     #[test]
    1670              :     fn test_read_beyond_eof() -> Result<()> {
    1671              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1672              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1673              : 
    1674              :         make_some_layers(&tline, Lsn(0x20))?;
    1675              :         let mut writer = tline.begin_record(Lsn(0x60));
    1676              :         walingest.put_rel_page_image(
    1677              :             &mut writer,
    1678              :             TESTREL_A,
    1679              :             0,
    1680              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1681              :         )?;
    1682              :         writer.finish()?;
    1683              : 
    1684              :         // Test read before rel creation. Should error out.
    1685              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1686              : 
    1687              :         // Read block beyond end of relation at different points in time.
    1688              :         // These reads should fall into different delta, image, and in-memory layers.
    1689              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1690              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1691              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1692              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1693              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1694              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1695              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1696              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1697              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1698              : 
    1699              :         // Test on an in-memory layer with no preceding layer
    1700              :         let mut writer = tline.begin_record(Lsn(0x70));
    1701              :         walingest.put_rel_page_image(
    1702              :             &mut writer,
    1703              :             TESTREL_B,
    1704              :             0,
    1705              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1706              :         )?;
    1707              :         writer.finish()?;
    1708              : 
    1709              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1710              : 
    1711              :         Ok(())
    1712              :     }
    1713              :      */
    1714              : }
        

Generated by: LCOV version 2.1-beta