LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 90.8 % 1036 941
Test Date: 2024-02-12 20:26:03 Functions: 61.5 % 208 128

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::repository::*;
      13              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use pageserver_api::key::{
      18              :     dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
      19              :     rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      20              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      21              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      22              : };
      23              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      24              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      25              : use postgres_ffi::BLCKSZ;
      26              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      27              : use serde::{Deserialize, Serialize};
      28              : use std::collections::{hash_map, HashMap, HashSet};
      29              : use std::ops::ControlFlow;
      30              : use std::ops::Range;
      31              : use strum::IntoEnumIterator;
      32              : use tokio_util::sync::CancellationToken;
      33              : use tracing::{debug, trace, warn};
      34              : use utils::bin_ser::DeserializeError;
      35              : use utils::{bin_ser::BeSer, lsn::Lsn};
      36              : 
      37            0 : #[derive(Debug)]
      38              : pub enum LsnForTimestamp {
      39              :     /// Found commits both before and after the given timestamp
      40              :     Present(Lsn),
      41              : 
      42              :     /// Found no commits after the given timestamp, this means
      43              :     /// that the newest data in the branch is older than the given
      44              :     /// timestamp.
      45              :     ///
      46              :     /// All commits <= LSN happened before the given timestamp
      47              :     Future(Lsn),
      48              : 
      49              :     /// The queried timestamp is past our horizon we look back at (PITR)
      50              :     ///
      51              :     /// All commits > LSN happened after the given timestamp,
      52              :     /// but any commits < LSN might have happened before or after
      53              :     /// the given timestamp. We don't know because no data before
      54              :     /// the given lsn is available.
      55              :     Past(Lsn),
      56              : 
      57              :     /// We have found no commit with a timestamp,
      58              :     /// so we can't return anything meaningful.
      59              :     ///
      60              :     /// The associated LSN is the lower bound value we can safely
      61              :     /// create branches on, but no statement is made if it is
      62              :     /// older or newer than the timestamp.
      63              :     ///
      64              :     /// This variant can e.g. be returned right after a
      65              :     /// cluster import.
      66              :     NoData(Lsn),
      67              : }
      68              : 
      69            0 : #[derive(Debug, thiserror::Error)]
      70              : pub enum CalculateLogicalSizeError {
      71              :     #[error("cancelled")]
      72              :     Cancelled,
      73              :     #[error(transparent)]
      74              :     Other(#[from] anyhow::Error),
      75              : }
      76              : 
      77            2 : #[derive(Debug, thiserror::Error)]
      78              : pub(crate) enum CollectKeySpaceError {
      79              :     #[error(transparent)]
      80              :     Decode(#[from] DeserializeError),
      81              :     #[error(transparent)]
      82              :     PageRead(PageReconstructError),
      83              :     #[error("cancelled")]
      84              :     Cancelled,
      85              : }
      86              : 
      87              : impl From<PageReconstructError> for CollectKeySpaceError {
      88            2 :     fn from(err: PageReconstructError) -> Self {
      89            2 :         match err {
      90            1 :             PageReconstructError::Cancelled => Self::Cancelled,
      91            1 :             err => Self::PageRead(err),
      92              :         }
      93            2 :     }
      94              : }
      95              : 
      96              : impl From<PageReconstructError> for CalculateLogicalSizeError {
      97           28 :     fn from(pre: PageReconstructError) -> Self {
      98           28 :         match pre {
      99              :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
     100           26 :                 Self::Cancelled
     101              :             }
     102            2 :             _ => Self::Other(pre.into()),
     103              :         }
     104           28 :     }
     105              : }
     106              : 
     107            0 : #[derive(Debug, thiserror::Error)]
     108              : pub enum RelationError {
     109              :     #[error("Relation Already Exists")]
     110              :     AlreadyExists,
     111              :     #[error("invalid relnode")]
     112              :     InvalidRelnode,
     113              :     #[error(transparent)]
     114              :     Other(#[from] anyhow::Error),
     115              : }
     116              : 
     117              : ///
     118              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     119              : /// and other special kinds of files, in a versioned key-value store. The
     120              : /// Timeline struct provides the key-value store.
     121              : ///
     122              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     123              : /// implementation, and might be moved into a separate struct later.
     124              : impl Timeline {
     125              :     /// Start ingesting a WAL record, or other atomic modification of
     126              :     /// the timeline.
     127              :     ///
     128              :     /// This provides a transaction-like interface to perform a bunch
     129              :     /// of modifications atomically.
     130              :     ///
     131              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     132              :     /// DatadirModification object. Use the functions in the object to
     133              :     /// modify the repository state, updating all the pages and metadata
     134              :     /// that the WAL record affects. When you're done, call commit() to
     135              :     /// commit the changes.
     136              :     ///
     137              :     /// Lsn stored in modification is advanced by `ingest_record` and
     138              :     /// is used by `commit()` to update `last_record_lsn`.
     139              :     ///
     140              :     /// Calling commit() will flush all the changes and reset the state,
     141              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     142              :     ///
     143              :     /// Note that any pending modifications you make through the
     144              :     /// modification object won't be visible to calls to the 'get' and list
     145              :     /// functions of the timeline until you finish! And if you update the
     146              :     /// same page twice, the last update wins.
     147              :     ///
     148      1059058 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     149      1059058 :     where
     150      1059058 :         Self: Sized,
     151      1059058 :     {
     152      1059058 :         DatadirModification {
     153      1059058 :             tline: self,
     154      1059058 :             pending_lsns: Vec::new(),
     155      1059058 :             pending_updates: HashMap::new(),
     156      1059058 :             pending_deletions: Vec::new(),
     157      1059058 :             pending_nblocks: 0,
     158      1059058 :             lsn,
     159      1059058 :         }
     160      1059058 :     }
     161              : 
     162              :     //------------------------------------------------------------------------------
     163              :     // Public GET functions
     164              :     //------------------------------------------------------------------------------
     165              : 
     166              :     /// Look up given page version.
     167      4512456 :     pub(crate) async fn get_rel_page_at_lsn(
     168      4512456 :         &self,
     169      4512456 :         tag: RelTag,
     170      4512456 :         blknum: BlockNumber,
     171      4512456 :         version: Version<'_>,
     172      4512456 :         latest: bool,
     173      4512456 :         ctx: &RequestContext,
     174      4512456 :     ) -> Result<Bytes, PageReconstructError> {
     175      4512456 :         if tag.relnode == 0 {
     176            0 :             return Err(PageReconstructError::Other(
     177            0 :                 RelationError::InvalidRelnode.into(),
     178            0 :             ));
     179      4512456 :         }
     180              : 
     181      4512456 :         let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
     182      4512456 :         if blknum >= nblocks {
     183            0 :             debug!(
     184            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     185            0 :                 tag,
     186            0 :                 blknum,
     187            0 :                 version.get_lsn(),
     188            0 :                 nblocks
     189            0 :             );
     190       123469 :             return Ok(ZERO_PAGE.clone());
     191      4388987 :         }
     192      4388987 : 
     193      4388987 :         let key = rel_block_to_key(tag, blknum);
     194      4388987 :         version.get(self, key, ctx).await
     195      4512456 :     }
     196              : 
     197              :     // Get size of a database in blocks
     198            8 :     pub(crate) async fn get_db_size(
     199            8 :         &self,
     200            8 :         spcnode: Oid,
     201            8 :         dbnode: Oid,
     202            8 :         version: Version<'_>,
     203            8 :         latest: bool,
     204            8 :         ctx: &RequestContext,
     205            8 :     ) -> Result<usize, PageReconstructError> {
     206            8 :         let mut total_blocks = 0;
     207              : 
     208            8 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     209              : 
     210         2351 :         for rel in rels {
     211         2343 :             let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
     212         2343 :             total_blocks += n_blocks as usize;
     213              :         }
     214            8 :         Ok(total_blocks)
     215            8 :     }
     216              : 
     217              :     /// Get size of a relation file
     218      4655845 :     pub(crate) async fn get_rel_size(
     219      4655845 :         &self,
     220      4655845 :         tag: RelTag,
     221      4655845 :         version: Version<'_>,
     222      4655845 :         latest: bool,
     223      4655845 :         ctx: &RequestContext,
     224      4655845 :     ) -> Result<BlockNumber, PageReconstructError> {
     225      4655845 :         if tag.relnode == 0 {
     226            0 :             return Err(PageReconstructError::Other(
     227            0 :                 RelationError::InvalidRelnode.into(),
     228            0 :             ));
     229      4655845 :         }
     230              : 
     231      4655845 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     232      4347101 :             return Ok(nblocks);
     233       308744 :         }
     234       308744 : 
     235       308744 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     236         7608 :             && !self.get_rel_exists(tag, version, latest, ctx).await?
     237              :         {
     238              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     239              :             // FSM, and smgrnblocks() on it immediately afterwards,
     240              :             // without extending it.  Tolerate that by claiming that
     241              :             // any non-existent FSM fork has size 0.
     242           11 :             return Ok(0);
     243       308733 :         }
     244       308733 : 
     245       308733 :         let key = rel_size_to_key(tag);
     246       308733 :         let mut buf = version.get(self, key, ctx).await?;
     247       308729 :         let nblocks = buf.get_u32_le();
     248       308729 : 
     249       308729 :         if latest {
     250       157378 :             // Update relation size cache only if "latest" flag is set.
     251       157378 :             // This flag is set by compute when it is working with most recent version of relation.
     252       157378 :             // Typically master compute node always set latest=true.
     253       157378 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     254       157378 :             // latest=true, then it can not cause cache corruption, because with latest=true
     255       157378 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     256       157378 :             // associated with most recent value of LSN.
     257       157378 :             self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     258       157378 :         }
     259       308729 :         Ok(nblocks)
     260      4655845 :     }
     261              : 
     262              :     /// Does relation exist?
     263       432499 :     pub(crate) async fn get_rel_exists(
     264       432499 :         &self,
     265       432499 :         tag: RelTag,
     266       432499 :         version: Version<'_>,
     267       432499 :         _latest: bool,
     268       432499 :         ctx: &RequestContext,
     269       432499 :     ) -> Result<bool, PageReconstructError> {
     270       432499 :         if tag.relnode == 0 {
     271            0 :             return Err(PageReconstructError::Other(
     272            0 :                 RelationError::InvalidRelnode.into(),
     273            0 :             ));
     274       432499 :         }
     275              : 
     276              :         // first try to lookup relation in cache
     277       432499 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     278       169724 :             return Ok(true);
     279       262775 :         }
     280       262775 :         // fetch directory listing
     281       262775 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     282       262775 :         let buf = version.get(self, key, ctx).await?;
     283              : 
     284       262775 :         match RelDirectory::des(&buf).context("deserialization failure") {
     285       262775 :             Ok(dir) => {
     286       262775 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     287       262775 :                 Ok(exists)
     288              :             }
     289            0 :             Err(e) => Err(PageReconstructError::from(e)),
     290              :         }
     291       432499 :     }
     292              : 
     293              :     /// Get a list of all existing relations in given tablespace and database.
     294              :     ///
     295              :     /// # Cancel-Safety
     296              :     ///
     297              :     /// This method is cancellation-safe.
     298         8125 :     pub(crate) async fn list_rels(
     299         8125 :         &self,
     300         8125 :         spcnode: Oid,
     301         8125 :         dbnode: Oid,
     302         8125 :         version: Version<'_>,
     303         8125 :         ctx: &RequestContext,
     304         8125 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     305         8125 :         // fetch directory listing
     306         8125 :         let key = rel_dir_to_key(spcnode, dbnode);
     307         8125 :         let buf = version.get(self, key, ctx).await?;
     308              : 
     309         8125 :         match RelDirectory::des(&buf).context("deserialization failure") {
     310         8125 :             Ok(dir) => {
     311         8125 :                 let rels: HashSet<RelTag> =
     312      1923108 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     313      1923108 :                         spcnode,
     314      1923108 :                         dbnode,
     315      1923108 :                         relnode: *relnode,
     316      1923108 :                         forknum: *forknum,
     317      1923108 :                     }));
     318         8125 : 
     319         8125 :                 Ok(rels)
     320              :             }
     321            0 :             Err(e) => Err(PageReconstructError::from(e)),
     322              :         }
     323         8125 :     }
     324              : 
     325              :     /// Get the whole SLRU segment
     326            0 :     pub(crate) async fn get_slru_segment(
     327            0 :         &self,
     328            0 :         kind: SlruKind,
     329            0 :         segno: u32,
     330            0 :         lsn: Lsn,
     331            0 :         ctx: &RequestContext,
     332            0 :     ) -> Result<Bytes, PageReconstructError> {
     333            0 :         let n_blocks = self
     334            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     335            0 :             .await?;
     336            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     337            0 :         for blkno in 0..n_blocks {
     338            0 :             let block = self
     339            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     340            0 :                 .await?;
     341            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     342              :         }
     343            0 :         Ok(segment.freeze())
     344            0 :     }
     345              : 
     346              :     /// Look up given SLRU page version.
     347         2550 :     pub(crate) async fn get_slru_page_at_lsn(
     348         2550 :         &self,
     349         2550 :         kind: SlruKind,
     350         2550 :         segno: u32,
     351         2550 :         blknum: BlockNumber,
     352         2550 :         lsn: Lsn,
     353         2550 :         ctx: &RequestContext,
     354         2550 :     ) -> Result<Bytes, PageReconstructError> {
     355         2550 :         let key = slru_block_to_key(kind, segno, blknum);
     356       264452 :         self.get(key, lsn, ctx).await
     357         2548 :     }
     358              : 
     359              :     /// Get size of an SLRU segment
     360         5991 :     pub(crate) async fn get_slru_segment_size(
     361         5991 :         &self,
     362         5991 :         kind: SlruKind,
     363         5991 :         segno: u32,
     364         5991 :         version: Version<'_>,
     365         5991 :         ctx: &RequestContext,
     366         5991 :     ) -> Result<BlockNumber, PageReconstructError> {
     367         5991 :         let key = slru_segment_size_to_key(kind, segno);
     368         5991 :         let mut buf = version.get(self, key, ctx).await?;
     369         5991 :         Ok(buf.get_u32_le())
     370         5991 :     }
     371              : 
     372              :     /// Get size of an SLRU segment
     373         1646 :     pub(crate) async fn get_slru_segment_exists(
     374         1646 :         &self,
     375         1646 :         kind: SlruKind,
     376         1646 :         segno: u32,
     377         1646 :         version: Version<'_>,
     378         1646 :         ctx: &RequestContext,
     379         1646 :     ) -> Result<bool, PageReconstructError> {
     380         1646 :         // fetch directory listing
     381         1646 :         let key = slru_dir_to_key(kind);
     382         1646 :         let buf = version.get(self, key, ctx).await?;
     383              : 
     384         1646 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     385         1646 :             Ok(dir) => {
     386         1646 :                 let exists = dir.segments.get(&segno).is_some();
     387         1646 :                 Ok(exists)
     388              :             }
     389            0 :             Err(e) => Err(PageReconstructError::from(e)),
     390              :         }
     391         1646 :     }
     392              : 
     393              :     /// Locate LSN, such that all transactions that committed before
     394              :     /// 'search_timestamp' are visible, but nothing newer is.
     395              :     ///
     396              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     397              :     /// so it's not well defined which LSN you get if there were multiple commits
     398              :     /// "in flight" at that point in time.
     399              :     ///
     400          159 :     pub(crate) async fn find_lsn_for_timestamp(
     401          159 :         &self,
     402          159 :         search_timestamp: TimestampTz,
     403          159 :         cancel: &CancellationToken,
     404          159 :         ctx: &RequestContext,
     405          159 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     406          159 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     407          159 :         // We use this method to figure out the branching LSN for the new branch, but the
     408          159 :         // GC cutoff could be before the branching point and we cannot create a new branch
     409          159 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     410          159 :         // on the safe side.
     411          159 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     412          159 :         let max_lsn = self.get_last_record_lsn();
     413          159 : 
     414          159 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     415          159 :         // LSN divided by 8.
     416          159 :         let mut low = min_lsn.0 / 8;
     417          159 :         let mut high = max_lsn.0 / 8 + 1;
     418          159 : 
     419          159 :         let mut found_smaller = false;
     420          159 :         let mut found_larger = false;
     421         2697 :         while low < high {
     422         2540 :             if cancel.is_cancelled() {
     423            0 :                 return Err(PageReconstructError::Cancelled);
     424         2540 :             }
     425         2540 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     426         2540 :             let mid = (high + low) / 2;
     427              : 
     428         2540 :             let cmp = self
     429         2540 :                 .is_latest_commit_timestamp_ge_than(
     430         2540 :                     search_timestamp,
     431         2540 :                     Lsn(mid * 8),
     432         2540 :                     &mut found_smaller,
     433         2540 :                     &mut found_larger,
     434         2540 :                     ctx,
     435         2540 :                 )
     436       266255 :                 .await?;
     437              : 
     438         2538 :             if cmp {
     439          708 :                 high = mid;
     440         1830 :             } else {
     441         1830 :                 low = mid + 1;
     442         1830 :             }
     443              :         }
     444              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     445              :         // so the LSN of the last commit record before or at `search_timestamp`.
     446              :         // Remove one from `low` to get `t`.
     447              :         //
     448              :         // FIXME: it would be better to get the LSN of the previous commit.
     449              :         // Otherwise, if you restore to the returned LSN, the database will
     450              :         // include physical changes from later commits that will be marked
     451              :         // as aborted, and will need to be vacuumed away.
     452          157 :         let commit_lsn = Lsn((low - 1) * 8);
     453          157 :         match (found_smaller, found_larger) {
     454              :             (false, false) => {
     455              :                 // This can happen if no commit records have been processed yet, e.g.
     456              :                 // just after importing a cluster.
     457           21 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     458              :             }
     459              :             (false, true) => {
     460              :                 // Didn't find any commit timestamps smaller than the request
     461           18 :                 Ok(LsnForTimestamp::Past(min_lsn))
     462              :             }
     463              :             (true, false) => {
     464              :                 // Only found commits with timestamps smaller than the request.
     465              :                 // It's still a valid case for branch creation, return it.
     466              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     467              :                 // case, anyway.
     468           62 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     469              :             }
     470           56 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     471              :         }
     472          157 :     }
     473              : 
     474              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     475              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     476              :     ///
     477              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     478              :     /// with a smaller/larger timestamp.
     479              :     ///
     480         2540 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     481         2540 :         &self,
     482         2540 :         search_timestamp: TimestampTz,
     483         2540 :         probe_lsn: Lsn,
     484         2540 :         found_smaller: &mut bool,
     485         2540 :         found_larger: &mut bool,
     486         2540 :         ctx: &RequestContext,
     487         2540 :     ) -> Result<bool, PageReconstructError> {
     488         2540 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     489         2331 :             if timestamp >= search_timestamp {
     490          708 :                 *found_larger = true;
     491          708 :                 return ControlFlow::Break(true);
     492         1623 :             } else {
     493         1623 :                 *found_smaller = true;
     494         1623 :             }
     495         1623 :             ControlFlow::Continue(())
     496         2540 :         })
     497       266255 :         .await
     498         2538 :     }
     499              : 
     500              :     /// Obtain the possible timestamp range for the given lsn.
     501              :     ///
     502              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     503           12 :     pub(crate) async fn get_timestamp_for_lsn(
     504           12 :         &self,
     505           12 :         probe_lsn: Lsn,
     506           12 :         ctx: &RequestContext,
     507           12 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     508           12 :         let mut max: Option<TimestampTz> = None;
     509           12 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     510           10 :             if let Some(max_prev) = max {
     511            0 :                 max = Some(max_prev.max(timestamp));
     512           10 :             } else {
     513           10 :                 max = Some(timestamp);
     514           10 :             }
     515           10 :             ControlFlow::Continue(())
     516           12 :         })
     517           80 :         .await?;
     518              : 
     519           10 :         Ok(max)
     520           12 :     }
     521              : 
     522              :     /// Runs the given function on all the timestamps for a given lsn
     523              :     ///
     524              :     /// The return value is either given by the closure, or set to the `Default`
     525              :     /// impl's output.
     526         2552 :     async fn map_all_timestamps<T: Default>(
     527         2552 :         &self,
     528         2552 :         probe_lsn: Lsn,
     529         2552 :         ctx: &RequestContext,
     530         2552 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     531         2552 :     ) -> Result<T, PageReconstructError> {
     532         2552 :         for segno in self
     533         2552 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     534          923 :             .await?
     535              :         {
     536         2550 :             let nblocks = self
     537         2550 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     538          960 :                 .await?;
     539         2550 :             for blknum in (0..nblocks).rev() {
     540         2550 :                 let clog_page = self
     541         2550 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     542       264452 :                     .await?;
     543              : 
     544         2548 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     545         2341 :                     let mut timestamp_bytes = [0u8; 8];
     546         2341 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     547         2341 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     548         2341 : 
     549         2341 :                     match f(timestamp) {
     550          708 :                         ControlFlow::Break(b) => return Ok(b),
     551         1633 :                         ControlFlow::Continue(()) => (),
     552              :                     }
     553          207 :                 }
     554              :             }
     555              :         }
     556         1840 :         Ok(Default::default())
     557         2550 :     }
     558              : 
     559          607 :     pub(crate) async fn get_slru_keyspace(
     560          607 :         &self,
     561          607 :         version: Version<'_>,
     562          607 :         ctx: &RequestContext,
     563          607 :     ) -> Result<KeySpace, PageReconstructError> {
     564          607 :         let mut accum = KeySpaceAccum::new();
     565              : 
     566         2410 :         for kind in SlruKind::iter() {
     567         1809 :             let mut segments: Vec<u32> = self
     568         1809 :                 .list_slru_segments(kind, version, ctx)
     569          162 :                 .await?
     570         1803 :                 .into_iter()
     571         1803 :                 .collect();
     572         1803 :             segments.sort_unstable();
     573              : 
     574         3642 :             for seg in segments {
     575         1839 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     576              : 
     577         1839 :                 accum.add_range(
     578         1839 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     579         1839 :                 );
     580              :             }
     581              :         }
     582              : 
     583          601 :         Ok(accum.to_keyspace())
     584          607 :     }
     585              : 
     586              :     /// Get a list of SLRU segments
     587         4364 :     pub(crate) async fn list_slru_segments(
     588         4364 :         &self,
     589         4364 :         kind: SlruKind,
     590         4364 :         version: Version<'_>,
     591         4364 :         ctx: &RequestContext,
     592         4364 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     593         4364 :         // fetch directory entry
     594         4364 :         let key = slru_dir_to_key(kind);
     595              : 
     596         4364 :         let buf = version.get(self, key, ctx).await?;
     597         4356 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     598         4356 :             Ok(dir) => Ok(dir.segments),
     599            0 :             Err(e) => Err(PageReconstructError::from(e)),
     600              :         }
     601         4364 :     }
     602              : 
     603         2438 :     pub(crate) async fn get_relmap_file(
     604         2438 :         &self,
     605         2438 :         spcnode: Oid,
     606         2438 :         dbnode: Oid,
     607         2438 :         version: Version<'_>,
     608         2438 :         ctx: &RequestContext,
     609         2438 :     ) -> Result<Bytes, PageReconstructError> {
     610         2438 :         let key = relmap_file_key(spcnode, dbnode);
     611              : 
     612         2438 :         let buf = version.get(self, key, ctx).await?;
     613         2438 :         Ok(buf)
     614         2438 :     }
     615              : 
     616          601 :     pub(crate) async fn list_dbdirs(
     617          601 :         &self,
     618          601 :         lsn: Lsn,
     619          601 :         ctx: &RequestContext,
     620          601 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     621              :         // fetch directory entry
     622          601 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     623              : 
     624          601 :         match DbDirectory::des(&buf).context("deserialization failure") {
     625          601 :             Ok(dir) => Ok(dir.dbdirs),
     626            0 :             Err(e) => Err(PageReconstructError::from(e)),
     627              :         }
     628          601 :     }
     629              : 
     630            2 :     pub(crate) async fn get_twophase_file(
     631            2 :         &self,
     632            2 :         xid: TransactionId,
     633            2 :         lsn: Lsn,
     634            2 :         ctx: &RequestContext,
     635            2 :     ) -> Result<Bytes, PageReconstructError> {
     636            2 :         let key = twophase_file_key(xid);
     637            2 :         let buf = self.get(key, lsn, ctx).await?;
     638            2 :         Ok(buf)
     639            2 :     }
     640              : 
     641          601 :     pub(crate) async fn list_twophase_files(
     642          601 :         &self,
     643          601 :         lsn: Lsn,
     644          601 :         ctx: &RequestContext,
     645          601 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     646              :         // fetch directory entry
     647          601 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     648              : 
     649          601 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     650          601 :             Ok(dir) => Ok(dir.xids),
     651            0 :             Err(e) => Err(PageReconstructError::from(e)),
     652              :         }
     653          601 :     }
     654              : 
     655          595 :     pub(crate) async fn get_control_file(
     656          595 :         &self,
     657          595 :         lsn: Lsn,
     658          595 :         ctx: &RequestContext,
     659          595 :     ) -> Result<Bytes, PageReconstructError> {
     660          595 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     661          595 :     }
     662              : 
     663         1933 :     pub(crate) async fn get_checkpoint(
     664         1933 :         &self,
     665         1933 :         lsn: Lsn,
     666         1933 :         ctx: &RequestContext,
     667         1933 :     ) -> Result<Bytes, PageReconstructError> {
     668         1933 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     669         1933 :     }
     670              : 
     671         2416 :     pub(crate) async fn list_aux_files(
     672         2416 :         &self,
     673         2416 :         lsn: Lsn,
     674         2416 :         ctx: &RequestContext,
     675         2416 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     676         2416 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     677         1420 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     678         1420 :                 Ok(dir) => Ok(dir.files),
     679            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     680              :             },
     681          996 :             Err(e) => {
     682              :                 // This is expected: historical databases do not have the key.
     683            0 :                 debug!("Failed to get info about AUX files: {}", e);
     684          996 :                 Ok(HashMap::new())
     685              :             }
     686              :         }
     687         2416 :     }
     688              : 
     689              :     /// Does the same as get_current_logical_size but counted on demand.
     690              :     /// Used to initialize the logical size tracking on startup.
     691              :     ///
     692              :     /// Only relation blocks are counted currently. That excludes metadata,
     693              :     /// SLRUs, twophase files etc.
     694              :     ///
     695              :     /// # Cancel-Safety
     696              :     ///
     697              :     /// This method is cancellation-safe.
     698          694 :     pub async fn get_current_logical_size_non_incremental(
     699          694 :         &self,
     700          694 :         lsn: Lsn,
     701          694 :         ctx: &RequestContext,
     702          694 :     ) -> Result<u64, CalculateLogicalSizeError> {
     703          694 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     704              : 
     705              :         // Fetch list of database dirs and iterate them
     706          959 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     707          689 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     708              : 
     709          689 :         let mut total_size: u64 = 0;
     710         2725 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     711       631853 :             for rel in self
     712         2725 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     713          445 :                 .await?
     714              :             {
     715       631853 :                 if self.cancel.is_cancelled() {
     716            3 :                     return Err(CalculateLogicalSizeError::Cancelled);
     717       631850 :                 }
     718       631850 :                 let relsize_key = rel_size_to_key(rel);
     719       631850 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     720       631823 :                 let relsize = buf.get_u32_le();
     721       631823 : 
     722       631823 :                 total_size += relsize as u64;
     723              :             }
     724              :         }
     725          659 :         Ok(total_size * BLCKSZ as u64)
     726          690 :     }
     727              : 
     728              :     ///
     729              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     730              :     /// Anything that's not listed maybe removed from the underlying storage (from
     731              :     /// that LSN forwards).
     732          891 :     pub(crate) async fn collect_keyspace(
     733          891 :         &self,
     734          891 :         lsn: Lsn,
     735          891 :         ctx: &RequestContext,
     736          891 :     ) -> Result<KeySpace, CollectKeySpaceError> {
     737          891 :         // Iterate through key ranges, greedily packing them into partitions
     738          891 :         let mut result = KeySpaceAccum::new();
     739          891 : 
     740          891 :         // The dbdir metadata always exists
     741          891 :         result.add_key(DBDIR_KEY);
     742              : 
     743              :         // Fetch list of database dirs and iterate them
     744         2274 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     745          890 :         let dbdir = DbDirectory::des(&buf)?;
     746              : 
     747          890 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     748          890 :         dbs.sort_unstable();
     749         3837 :         for (spcnode, dbnode) in dbs {
     750         2950 :             result.add_key(relmap_file_key(spcnode, dbnode));
     751         2950 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     752              : 
     753         2950 :             let mut rels: Vec<RelTag> = self
     754         2950 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     755          645 :                 .await?
     756         2950 :                 .into_iter()
     757         2950 :                 .collect();
     758         2950 :             rels.sort_unstable();
     759       711838 :             for rel in rels {
     760       708891 :                 let relsize_key = rel_size_to_key(rel);
     761       708891 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     762       708888 :                 let relsize = buf.get_u32_le();
     763       708888 : 
     764       708888 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     765       708888 :                 result.add_key(relsize_key);
     766              :             }
     767              :         }
     768              : 
     769              :         // Iterate SLRUs next
     770         2661 :         for kind in [
     771          887 :             SlruKind::Clog,
     772          887 :             SlruKind::MultiXactMembers,
     773          887 :             SlruKind::MultiXactOffsets,
     774              :         ] {
     775         2661 :             let slrudir_key = slru_dir_to_key(kind);
     776         2661 :             result.add_key(slrudir_key);
     777         8605 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     778         2661 :             let dir = SlruSegmentDirectory::des(&buf)?;
     779         2661 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     780         2661 :             segments.sort_unstable();
     781         4821 :             for segno in segments {
     782         2160 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     783         2160 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     784         2160 :                 let segsize = buf.get_u32_le();
     785         2160 : 
     786         2160 :                 result.add_range(
     787         2160 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     788         2160 :                 );
     789         2160 :                 result.add_key(segsize_key);
     790              :             }
     791              :         }
     792              : 
     793              :         // Then pg_twophase
     794          887 :         result.add_key(TWOPHASEDIR_KEY);
     795         3183 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     796          887 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     797          887 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     798          887 :         xids.sort_unstable();
     799          887 :         for xid in xids {
     800            0 :             result.add_key(twophase_file_key(xid));
     801            0 :         }
     802              : 
     803          887 :         result.add_key(CONTROLFILE_KEY);
     804          887 :         result.add_key(CHECKPOINT_KEY);
     805          887 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     806          622 :             result.add_key(AUX_FILES_KEY);
     807          622 :         }
     808          887 :         Ok(result.to_keyspace())
     809          889 :     }
     810              : 
     811              :     /// Get cached size of relation if it not updated after specified LSN
     812     62055854 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     813     62055854 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     814     62055854 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     815     61697050 :             if lsn >= *cached_lsn {
     816     61479836 :                 return Some(*nblocks);
     817       217213 :             }
     818       358804 :         }
     819       576017 :         None
     820     62055853 :     }
     821              : 
     822              :     /// Update cached relation size if there is no more recent update
     823       157378 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     824       157378 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     825       157378 :         match rel_size_cache.entry(tag) {
     826       130459 :             hash_map::Entry::Occupied(mut entry) => {
     827       130459 :                 let cached_lsn = entry.get_mut();
     828       130459 :                 if lsn >= cached_lsn.0 {
     829           18 :                     *cached_lsn = (lsn, nblocks);
     830       130441 :                 }
     831              :             }
     832        26919 :             hash_map::Entry::Vacant(entry) => {
     833        26919 :                 entry.insert((lsn, nblocks));
     834        26919 :             }
     835              :         }
     836       157378 :     }
     837              : 
     838              :     /// Store cached relation size
     839      1928423 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     840      1928423 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     841      1928423 :         rel_size_cache.insert(tag, (lsn, nblocks));
     842      1928423 :     }
     843              : 
     844              :     /// Remove cached relation size
     845        67300 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     846        67300 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     847        67300 :         rel_size_cache.remove(tag);
     848        67300 :     }
     849              : }
     850              : 
     851              : /// DatadirModification represents an operation to ingest an atomic set of
     852              : /// updates to the repository. It is created by the 'begin_record'
     853              : /// function. It is called for each WAL record, so that all the modifications
     854              : /// by a one WAL record appear atomic.
     855              : pub struct DatadirModification<'a> {
     856              :     /// The timeline this modification applies to. You can access this to
     857              :     /// read the state, but note that any pending updates are *not* reflected
     858              :     /// in the state in 'tline' yet.
     859              :     pub tline: &'a Timeline,
     860              : 
     861              :     /// Current LSN of the modification
     862              :     lsn: Lsn,
     863              : 
     864              :     // The modifications are not applied directly to the underlying key-value store.
     865              :     // The put-functions add the modifications here, and they are flushed to the
     866              :     // underlying key-value store by the 'finish' function.
     867              :     pending_lsns: Vec<Lsn>,
     868              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     869              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     870              :     pending_nblocks: i64,
     871              : }
     872              : 
     873              : impl<'a> DatadirModification<'a> {
     874              :     /// Get the current lsn
     875     56967530 :     pub(crate) fn get_lsn(&self) -> Lsn {
     876     56967530 :         self.lsn
     877     56967530 :     }
     878              : 
     879              :     /// Set the current lsn
     880     73047349 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     881     73047349 :         ensure!(
     882     73047349 :             lsn >= self.lsn,
     883            0 :             "setting an older lsn {} than {} is not allowed",
     884              :             lsn,
     885              :             self.lsn
     886              :         );
     887     73047349 :         if lsn > self.lsn {
     888     73047349 :             self.pending_lsns.push(self.lsn);
     889     73047349 :             self.lsn = lsn;
     890     73047349 :         }
     891     73047349 :         Ok(())
     892     73047349 :     }
     893              : 
     894              :     /// Initialize a completely new repository.
     895              :     ///
     896              :     /// This inserts the directory metadata entries that are assumed to
     897              :     /// always exist.
     898          678 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     899          678 :         let buf = DbDirectory::ser(&DbDirectory {
     900          678 :             dbdirs: HashMap::new(),
     901          678 :         })?;
     902          678 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     903          678 : 
     904          678 :         // Create AuxFilesDirectory
     905          678 :         self.init_aux_dir()?;
     906              : 
     907          678 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     908          678 :             xids: HashSet::new(),
     909          678 :         })?;
     910          678 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     911              : 
     912          678 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     913          678 :         let empty_dir = Value::Image(buf);
     914          678 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     915          678 :         self.put(
     916          678 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     917          678 :             empty_dir.clone(),
     918          678 :         );
     919          678 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     920          678 : 
     921          678 :         Ok(())
     922          678 :     }
     923              : 
     924              :     #[cfg(test)]
     925           70 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     926           70 :         self.init_empty()?;
     927           70 :         self.put_control_file(bytes::Bytes::from_static(
     928           70 :             b"control_file contents do not matter",
     929           70 :         ))
     930           70 :         .context("put_control_file")?;
     931           70 :         self.put_checkpoint(bytes::Bytes::from_static(
     932           70 :             b"checkpoint_file contents do not matter",
     933           70 :         ))
     934           70 :         .context("put_checkpoint_file")?;
     935           70 :         Ok(())
     936           70 :     }
     937              : 
     938              :     /// Put a new page version that can be constructed from a WAL record
     939              :     ///
     940              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     941              :     /// current end-of-file. It's up to the caller to check that the relation size
     942              :     /// matches the blocks inserted!
     943     53007413 :     pub fn put_rel_wal_record(
     944     53007413 :         &mut self,
     945     53007413 :         rel: RelTag,
     946     53007413 :         blknum: BlockNumber,
     947     53007413 :         rec: NeonWalRecord,
     948     53007413 :     ) -> anyhow::Result<()> {
     949     53007413 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     950     53007413 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     951     53007413 :         Ok(())
     952     53007413 :     }
     953              : 
     954              :     // Same, but for an SLRU.
     955      6182727 :     pub fn put_slru_wal_record(
     956      6182727 :         &mut self,
     957      6182727 :         kind: SlruKind,
     958      6182727 :         segno: u32,
     959      6182727 :         blknum: BlockNumber,
     960      6182727 :         rec: NeonWalRecord,
     961      6182727 :     ) -> anyhow::Result<()> {
     962      6182727 :         self.put(
     963      6182727 :             slru_block_to_key(kind, segno, blknum),
     964      6182727 :             Value::WalRecord(rec),
     965      6182727 :         );
     966      6182727 :         Ok(())
     967      6182727 :     }
     968              : 
     969              :     /// Like put_wal_record, but with ready-made image of the page.
     970      2579711 :     pub fn put_rel_page_image(
     971      2579711 :         &mut self,
     972      2579711 :         rel: RelTag,
     973      2579711 :         blknum: BlockNumber,
     974      2579711 :         img: Bytes,
     975      2579711 :     ) -> anyhow::Result<()> {
     976      2579711 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     977      2579711 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     978      2579711 :         Ok(())
     979      2579711 :     }
     980              : 
     981         3476 :     pub fn put_slru_page_image(
     982         3476 :         &mut self,
     983         3476 :         kind: SlruKind,
     984         3476 :         segno: u32,
     985         3476 :         blknum: BlockNumber,
     986         3476 :         img: Bytes,
     987         3476 :     ) -> anyhow::Result<()> {
     988         3476 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
     989         3476 :         Ok(())
     990         3476 :     }
     991              : 
     992              :     /// Store a relmapper file (pg_filenode.map) in the repository
     993         2522 :     pub async fn put_relmap_file(
     994         2522 :         &mut self,
     995         2522 :         spcnode: Oid,
     996         2522 :         dbnode: Oid,
     997         2522 :         img: Bytes,
     998         2522 :         ctx: &RequestContext,
     999         2522 :     ) -> anyhow::Result<()> {
    1000              :         // Add it to the directory (if it doesn't exist already)
    1001         2522 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1002         2522 :         let mut dbdir = DbDirectory::des(&buf)?;
    1003              : 
    1004         2522 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1005         2522 :         if r.is_none() || r == Some(false) {
    1006              :             // The dbdir entry didn't exist, or it contained a
    1007              :             // 'false'. The 'insert' call already updated it with
    1008              :             // 'true', now write the updated 'dbdirs' map back.
    1009         2460 :             let buf = DbDirectory::ser(&dbdir)?;
    1010         2460 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1011         2460 : 
    1012         2460 :             // Create AuxFilesDirectory as well
    1013         2460 :             self.init_aux_dir()?;
    1014           62 :         }
    1015         2522 :         if r.is_none() {
    1016           36 :             // Create RelDirectory
    1017           36 :             let buf = RelDirectory::ser(&RelDirectory {
    1018           36 :                 rels: HashSet::new(),
    1019           36 :             })?;
    1020           36 :             self.put(
    1021           36 :                 rel_dir_to_key(spcnode, dbnode),
    1022           36 :                 Value::Image(Bytes::from(buf)),
    1023           36 :             );
    1024         2486 :         }
    1025              : 
    1026         2522 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1027         2522 :         Ok(())
    1028         2522 :     }
    1029              : 
    1030            4 :     pub async fn put_twophase_file(
    1031            4 :         &mut self,
    1032            4 :         xid: TransactionId,
    1033            4 :         img: Bytes,
    1034            4 :         ctx: &RequestContext,
    1035            4 :     ) -> anyhow::Result<()> {
    1036              :         // Add it to the directory entry
    1037            4 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1038            4 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1039            4 :         if !dir.xids.insert(xid) {
    1040            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1041            4 :         }
    1042            4 :         self.put(
    1043            4 :             TWOPHASEDIR_KEY,
    1044            4 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1045              :         );
    1046              : 
    1047            4 :         self.put(twophase_file_key(xid), Value::Image(img));
    1048            4 :         Ok(())
    1049            4 :     }
    1050              : 
    1051          676 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1052          676 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1053          676 :         Ok(())
    1054          676 :     }
    1055              : 
    1056        35060 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1057        35060 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1058        35060 :         Ok(())
    1059        35060 :     }
    1060              : 
    1061            3 :     pub async fn drop_dbdir(
    1062            3 :         &mut self,
    1063            3 :         spcnode: Oid,
    1064            3 :         dbnode: Oid,
    1065            3 :         ctx: &RequestContext,
    1066            3 :     ) -> anyhow::Result<()> {
    1067            3 :         let total_blocks = self
    1068            3 :             .tline
    1069            3 :             .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
    1070            0 :             .await?;
    1071              : 
    1072              :         // Remove entry from dbdir
    1073            3 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1074            3 :         let mut dir = DbDirectory::des(&buf)?;
    1075            3 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1076            3 :             let buf = DbDirectory::ser(&dir)?;
    1077            3 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1078              :         } else {
    1079            0 :             warn!(
    1080            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1081            0 :                 spcnode, dbnode
    1082            0 :             );
    1083              :         }
    1084              : 
    1085              :         // Update logical database size.
    1086            3 :         self.pending_nblocks -= total_blocks as i64;
    1087            3 : 
    1088            3 :         // Delete all relations and metadata files for the spcnode/dnode
    1089            3 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1090            3 :         Ok(())
    1091            3 :     }
    1092              : 
    1093              :     /// Create a relation fork.
    1094              :     ///
    1095              :     /// 'nblocks' is the initial size.
    1096       651289 :     pub async fn put_rel_creation(
    1097       651289 :         &mut self,
    1098       651289 :         rel: RelTag,
    1099       651289 :         nblocks: BlockNumber,
    1100       651289 :         ctx: &RequestContext,
    1101       651289 :     ) -> Result<(), RelationError> {
    1102       651289 :         if rel.relnode == 0 {
    1103            0 :             return Err(RelationError::InvalidRelnode);
    1104       651289 :         }
    1105              :         // It's possible that this is the first rel for this db in this
    1106              :         // tablespace.  Create the reldir entry for it if so.
    1107       651289 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1108       651289 :             .context("deserialize db")?;
    1109       651289 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1110       651289 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
    1111              :             // Didn't exist. Update dbdir
    1112         2429 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
    1113         2429 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1114         2429 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1115         2429 : 
    1116         2429 :             // and create the RelDirectory
    1117         2429 :             RelDirectory::default()
    1118              :         } else {
    1119              :             // reldir already exists, fetch it
    1120       648860 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1121       648860 :                 .context("deserialize db")?
    1122              :         };
    1123              : 
    1124              :         // Add the new relation to the rel directory entry, and write it back
    1125       651289 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1126            0 :             return Err(RelationError::AlreadyExists);
    1127       651289 :         }
    1128       651289 :         self.put(
    1129       651289 :             rel_dir_key,
    1130       651289 :             Value::Image(Bytes::from(
    1131       651289 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1132              :             )),
    1133              :         );
    1134              : 
    1135              :         // Put size
    1136       651289 :         let size_key = rel_size_to_key(rel);
    1137       651289 :         let buf = nblocks.to_le_bytes();
    1138       651289 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1139       651289 : 
    1140       651289 :         self.pending_nblocks += nblocks as i64;
    1141       651289 : 
    1142       651289 :         // Update relation size cache
    1143       651289 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1144       651289 : 
    1145       651289 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1146       651289 :         // caller.
    1147       651289 :         Ok(())
    1148       651289 :     }
    1149              : 
    1150              :     /// Truncate relation
    1151         6359 :     pub async fn put_rel_truncation(
    1152         6359 :         &mut self,
    1153         6359 :         rel: RelTag,
    1154         6359 :         nblocks: BlockNumber,
    1155         6359 :         ctx: &RequestContext,
    1156         6359 :     ) -> anyhow::Result<()> {
    1157         6359 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1158         6359 :         if self
    1159         6359 :             .tline
    1160         6359 :             .get_rel_exists(rel, Version::Modified(self), true, ctx)
    1161            0 :             .await?
    1162              :         {
    1163         6359 :             let size_key = rel_size_to_key(rel);
    1164              :             // Fetch the old size first
    1165         6359 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1166         6359 : 
    1167         6359 :             // Update the entry with the new size.
    1168         6359 :             let buf = nblocks.to_le_bytes();
    1169         6359 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1170         6359 : 
    1171         6359 :             // Update relation size cache
    1172         6359 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1173         6359 : 
    1174         6359 :             // Update relation size cache
    1175         6359 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1176         6359 : 
    1177         6359 :             // Update logical database size.
    1178         6359 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1179            0 :         }
    1180         6359 :         Ok(())
    1181         6359 :     }
    1182              : 
    1183              :     /// Extend relation
    1184              :     /// If new size is smaller, do nothing.
    1185      1829664 :     pub async fn put_rel_extend(
    1186      1829664 :         &mut self,
    1187      1829664 :         rel: RelTag,
    1188      1829664 :         nblocks: BlockNumber,
    1189      1829664 :         ctx: &RequestContext,
    1190      1829664 :     ) -> anyhow::Result<()> {
    1191      1829664 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1192              : 
    1193              :         // Put size
    1194      1829664 :         let size_key = rel_size_to_key(rel);
    1195      1829664 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1196      1829661 : 
    1197      1829661 :         // only extend relation here. never decrease the size
    1198      1829661 :         if nblocks > old_size {
    1199      1264416 :             let buf = nblocks.to_le_bytes();
    1200      1264416 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1201      1264416 : 
    1202      1264416 :             // Update relation size cache
    1203      1264416 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1204      1264416 : 
    1205      1264416 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1206      1264416 :         }
    1207      1829661 :         Ok(())
    1208      1829664 :     }
    1209              : 
    1210              :     /// Drop a relation.
    1211        67300 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1212        67300 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1213              : 
    1214              :         // Remove it from the directory entry
    1215        67300 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1216        67300 :         let buf = self.get(dir_key, ctx).await?;
    1217        67300 :         let mut dir = RelDirectory::des(&buf)?;
    1218              : 
    1219        67300 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1220        67300 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1221              :         } else {
    1222            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1223              :         }
    1224              : 
    1225              :         // update logical size
    1226        67300 :         let size_key = rel_size_to_key(rel);
    1227        67300 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1228        67300 :         self.pending_nblocks -= old_size as i64;
    1229        67300 : 
    1230        67300 :         // Remove enty from relation size cache
    1231        67300 :         self.tline.remove_cached_rel_size(&rel);
    1232        67300 : 
    1233        67300 :         // Delete size entry, as well as all blocks
    1234        67300 :         self.delete(rel_key_range(rel));
    1235        67300 : 
    1236        67300 :         Ok(())
    1237        67300 :     }
    1238              : 
    1239         1866 :     pub async fn put_slru_segment_creation(
    1240         1866 :         &mut self,
    1241         1866 :         kind: SlruKind,
    1242         1866 :         segno: u32,
    1243         1866 :         nblocks: BlockNumber,
    1244         1866 :         ctx: &RequestContext,
    1245         1866 :     ) -> anyhow::Result<()> {
    1246         1866 :         // Add it to the directory entry
    1247         1866 :         let dir_key = slru_dir_to_key(kind);
    1248         1866 :         let buf = self.get(dir_key, ctx).await?;
    1249         1866 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1250              : 
    1251         1866 :         if !dir.segments.insert(segno) {
    1252            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1253         1866 :         }
    1254         1866 :         self.put(
    1255         1866 :             dir_key,
    1256         1866 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1257              :         );
    1258              : 
    1259              :         // Put size
    1260         1866 :         let size_key = slru_segment_size_to_key(kind, segno);
    1261         1866 :         let buf = nblocks.to_le_bytes();
    1262         1866 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1263         1866 : 
    1264         1866 :         // even if nblocks > 0, we don't insert any actual blocks here
    1265         1866 : 
    1266         1866 :         Ok(())
    1267         1866 :     }
    1268              : 
    1269              :     /// Extend SLRU segment
    1270         1622 :     pub fn put_slru_extend(
    1271         1622 :         &mut self,
    1272         1622 :         kind: SlruKind,
    1273         1622 :         segno: u32,
    1274         1622 :         nblocks: BlockNumber,
    1275         1622 :     ) -> anyhow::Result<()> {
    1276         1622 :         // Put size
    1277         1622 :         let size_key = slru_segment_size_to_key(kind, segno);
    1278         1622 :         let buf = nblocks.to_le_bytes();
    1279         1622 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1280         1622 :         Ok(())
    1281         1622 :     }
    1282              : 
    1283              :     /// This method is used for marking truncated SLRU files
    1284           18 :     pub async fn drop_slru_segment(
    1285           18 :         &mut self,
    1286           18 :         kind: SlruKind,
    1287           18 :         segno: u32,
    1288           18 :         ctx: &RequestContext,
    1289           18 :     ) -> anyhow::Result<()> {
    1290           18 :         // Remove it from the directory entry
    1291           18 :         let dir_key = slru_dir_to_key(kind);
    1292           18 :         let buf = self.get(dir_key, ctx).await?;
    1293           18 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1294              : 
    1295           18 :         if !dir.segments.remove(&segno) {
    1296            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1297           18 :         }
    1298           18 :         self.put(
    1299           18 :             dir_key,
    1300           18 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1301              :         );
    1302              : 
    1303              :         // Delete size entry, as well as all blocks
    1304           18 :         self.delete(slru_segment_key_range(kind, segno));
    1305           18 : 
    1306           18 :         Ok(())
    1307           18 :     }
    1308              : 
    1309              :     /// Drop a relmapper file (pg_filenode.map)
    1310            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1311            0 :         // TODO
    1312            0 :         Ok(())
    1313            0 :     }
    1314              : 
    1315              :     /// This method is used for marking truncated SLRU files
    1316            2 :     pub async fn drop_twophase_file(
    1317            2 :         &mut self,
    1318            2 :         xid: TransactionId,
    1319            2 :         ctx: &RequestContext,
    1320            2 :     ) -> anyhow::Result<()> {
    1321              :         // Remove it from the directory entry
    1322            2 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1323            2 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1324              : 
    1325            2 :         if !dir.xids.remove(&xid) {
    1326            0 :             warn!("twophase file for xid {} does not exist", xid);
    1327            2 :         }
    1328            2 :         self.put(
    1329            2 :             TWOPHASEDIR_KEY,
    1330            2 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1331              :         );
    1332              : 
    1333              :         // Delete it
    1334            2 :         self.delete(twophase_key_range(xid));
    1335            2 : 
    1336            2 :         Ok(())
    1337            2 :     }
    1338              : 
    1339         3138 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1340         3138 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1341         3138 :             files: HashMap::new(),
    1342         3138 :         })?;
    1343         3138 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1344         3138 :         Ok(())
    1345         3138 :     }
    1346              : 
    1347          118 :     pub async fn put_file(
    1348          118 :         &mut self,
    1349          118 :         path: &str,
    1350          118 :         content: &[u8],
    1351          118 :         ctx: &RequestContext,
    1352          118 :     ) -> anyhow::Result<()> {
    1353          118 :         let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
    1354          115 :             Ok(buf) => AuxFilesDirectory::des(&buf)?,
    1355            3 :             Err(e) => {
    1356              :                 // This is expected: historical databases do not have the key.
    1357            0 :                 debug!("Failed to get info about AUX files: {}", e);
    1358            3 :                 AuxFilesDirectory {
    1359            3 :                     files: HashMap::new(),
    1360            3 :                 }
    1361              :             }
    1362              :         };
    1363          118 :         let path = path.to_string();
    1364          118 :         if content.is_empty() {
    1365            5 :             dir.files.remove(&path);
    1366          113 :         } else {
    1367          113 :             dir.files.insert(path, Bytes::copy_from_slice(content));
    1368          113 :         }
    1369          118 :         self.put(
    1370          118 :             AUX_FILES_KEY,
    1371          118 :             Value::Image(Bytes::from(
    1372          118 :                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1373              :             )),
    1374              :         );
    1375          118 :         Ok(())
    1376          118 :     }
    1377              : 
    1378              :     ///
    1379              :     /// Flush changes accumulated so far to the underlying repository.
    1380              :     ///
    1381              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1382              :     /// you to flush them to the underlying repository before the final `commit`.
    1383              :     /// That allows to free up the memory used to hold the pending changes.
    1384              :     ///
    1385              :     /// Currently only used during bulk import of a data directory. In that
    1386              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1387              :     /// whole import fails and the timeline will be deleted anyway.
    1388              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1389              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1390              :     ///
    1391              :     /// Note: A consequence of flushing the pending operations is that they
    1392              :     /// won't be visible to subsequent operations until `commit`. The function
    1393              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1394              :     /// for bulk import, where you are just loading data pages and won't try to
    1395              :     /// modify the same pages twice.
    1396       576178 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1397       576178 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1398       576178 :         // to scan through the pending_updates list.
    1399       576178 :         let pending_nblocks = self.pending_nblocks;
    1400       576178 :         if pending_nblocks < 10000 {
    1401       576178 :             return Ok(());
    1402            0 :         }
    1403              : 
    1404            0 :         let writer = self.tline.writer().await;
    1405              : 
    1406              :         // Flush relation and  SLRU data blocks, keep metadata.
    1407            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1408            0 :         for (key, values) in self.pending_updates.drain() {
    1409            0 :             for (lsn, value) in values {
    1410            0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1411              :                     // This bails out on first error without modifying pending_updates.
    1412              :                     // That's Ok, cf this function's doc comment.
    1413            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1414            0 :                 } else {
    1415            0 :                     retained_pending_updates
    1416            0 :                         .entry(key)
    1417            0 :                         .or_default()
    1418            0 :                         .push((lsn, value));
    1419            0 :                 }
    1420              :             }
    1421              :         }
    1422              : 
    1423            0 :         self.pending_updates = retained_pending_updates;
    1424            0 : 
    1425            0 :         if pending_nblocks != 0 {
    1426            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1427            0 :             self.pending_nblocks = 0;
    1428            0 :         }
    1429              : 
    1430            0 :         Ok(())
    1431       576178 :     }
    1432              : 
    1433              :     ///
    1434              :     /// Finish this atomic update, writing all the updated keys to the
    1435              :     /// underlying timeline.
    1436              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1437              :     ///
    1438      2136802 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1439      2136801 :         let writer = self.tline.writer().await;
    1440              : 
    1441      2136801 :         let pending_nblocks = self.pending_nblocks;
    1442      2136801 :         self.pending_nblocks = 0;
    1443      2136801 : 
    1444      2136801 :         if !self.pending_updates.is_empty() {
    1445      1713238 :             writer.put_batch(&self.pending_updates, ctx).await?;
    1446      1713238 :             self.pending_updates.clear();
    1447       423563 :         }
    1448              : 
    1449      2136801 :         if !self.pending_deletions.is_empty() {
    1450        19251 :             writer.delete_batch(&self.pending_deletions).await?;
    1451        19251 :             self.pending_deletions.clear();
    1452      2117550 :         }
    1453              : 
    1454      2136801 :         self.pending_lsns.push(self.lsn);
    1455     75183994 :         for pending_lsn in self.pending_lsns.drain(..) {
    1456     75183994 :             // Ideally, we should be able to call writer.finish_write() only once
    1457     75183994 :             // with the highest LSN. However, the last_record_lsn variable in the
    1458     75183994 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1459     75183994 :             // so we need to record every LSN to not leave a gap between them.
    1460     75183994 :             writer.finish_write(pending_lsn);
    1461     75183994 :         }
    1462              : 
    1463      2136801 :         if pending_nblocks != 0 {
    1464       638850 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1465      1497951 :         }
    1466              : 
    1467      2136801 :         Ok(())
    1468      2136801 :     }
    1469              : 
    1470    146094695 :     pub(crate) fn len(&self) -> usize {
    1471    146094695 :         self.pending_updates.len() + self.pending_deletions.len()
    1472    146094695 :     }
    1473              : 
    1474              :     // Internal helper functions to batch the modifications
    1475              : 
    1476      3500003 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1477              :         // Have we already updated the same key? Read the latest pending updated
    1478              :         // version in that case.
    1479              :         //
    1480              :         // Note: we don't check pending_deletions. It is an error to request a
    1481              :         // value that has been removed, deletion only avoids leaking storage.
    1482      3500003 :         if let Some(values) = self.pending_updates.get(&key) {
    1483      2626972 :             if let Some((_, value)) = values.last() {
    1484      2626972 :                 return if let Value::Image(img) = value {
    1485      2626972 :                     Ok(img.clone())
    1486              :                 } else {
    1487              :                     // Currently, we never need to read back a WAL record that we
    1488              :                     // inserted in the same "transaction". All the metadata updates
    1489              :                     // work directly with Images, and we never need to read actual
    1490              :                     // data pages. We could handle this if we had to, by calling
    1491              :                     // the walredo manager, but let's keep it simple for now.
    1492            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1493            0 :                         "unexpected pending WAL record"
    1494            0 :                     )))
    1495              :                 };
    1496            0 :             }
    1497       873031 :         }
    1498       873031 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1499       873031 :         self.tline.get(key, lsn, ctx).await
    1500      3500003 :     }
    1501              : 
    1502     64469194 :     fn put(&mut self, key: Key, val: Value) {
    1503     64469194 :         let values = self.pending_updates.entry(key).or_default();
    1504              :         // Replace the previous value if it exists at the same lsn
    1505     64469194 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1506     53943461 :             if *last_lsn == self.lsn {
    1507       647242 :                 *last_value = val;
    1508       647242 :                 return;
    1509     53296219 :             }
    1510     10525733 :         }
    1511     63821952 :         values.push((self.lsn, val));
    1512     64469194 :     }
    1513              : 
    1514        67323 :     fn delete(&mut self, key_range: Range<Key>) {
    1515        67323 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1516        67323 :         self.pending_deletions.push((key_range, self.lsn));
    1517        67323 :     }
    1518              : }
    1519              : 
    1520              : /// This struct facilitates accessing either a committed key from the timeline at a
    1521              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1522              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1523              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1524              : /// need to look up the keys in the modification first before looking them up in the
    1525              : /// timeline to not miss the latest updates.
    1526            0 : #[derive(Clone, Copy)]
    1527              : pub enum Version<'a> {
    1528              :     Lsn(Lsn),
    1529              :     Modified(&'a DatadirModification<'a>),
    1530              : }
    1531              : 
    1532              : impl<'a> Version<'a> {
    1533      4983060 :     async fn get(
    1534      4983060 :         &self,
    1535      4983060 :         timeline: &Timeline,
    1536      4983060 :         key: Key,
    1537      4983060 :         ctx: &RequestContext,
    1538      4983060 :     ) -> Result<Bytes, PageReconstructError> {
    1539      4983059 :         match self {
    1540      4758361 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1541       224698 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1542              :         }
    1543      4983059 :     }
    1544              : 
    1545      5245722 :     fn get_lsn(&self) -> Lsn {
    1546      5245722 :         match self {
    1547      4756830 :             Version::Lsn(lsn) => *lsn,
    1548       488892 :             Version::Modified(modification) => modification.lsn,
    1549              :         }
    1550      5245722 :     }
    1551              : }
    1552              : 
    1553              : //--- Metadata structs stored in key-value pairs in the repository.
    1554              : 
    1555       655994 : #[derive(Debug, Serialize, Deserialize)]
    1556              : struct DbDirectory {
    1557              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1558              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1559              : }
    1560              : 
    1561         1494 : #[derive(Debug, Serialize, Deserialize)]
    1562              : struct TwoPhaseDirectory {
    1563              :     xids: HashSet<TransactionId>,
    1564              : }
    1565              : 
    1566      1437250 : #[derive(Debug, Serialize, Deserialize, Default)]
    1567              : struct RelDirectory {
    1568              :     // Set of relations that exist. (relfilenode, forknum)
    1569              :     //
    1570              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1571              :     // key-value pairs, if you have a lot of relations
    1572              :     rels: HashSet<(Oid, u8)>,
    1573              : }
    1574              : 
    1575         6512 : #[derive(Debug, Serialize, Deserialize, Default)]
    1576              : struct AuxFilesDirectory {
    1577              :     files: HashMap<String, Bytes>,
    1578              : }
    1579              : 
    1580            0 : #[derive(Debug, Serialize, Deserialize)]
    1581              : struct RelSizeEntry {
    1582              :     nblocks: u32,
    1583              : }
    1584              : 
    1585        10547 : #[derive(Debug, Serialize, Deserialize, Default)]
    1586              : struct SlruSegmentDirectory {
    1587              :     // Set of SLRU segments that exist.
    1588              :     segments: HashSet<u32>,
    1589              : }
    1590              : 
    1591              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1592              : 
    1593              : #[allow(clippy::bool_assert_comparison)]
    1594              : #[cfg(test)]
    1595              : mod tests {
    1596              :     //use super::repo_harness::*;
    1597              :     //use super::*;
    1598              : 
    1599              :     /*
    1600              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1601              :             let incremental = timeline.get_current_logical_size();
    1602              :             let non_incremental = timeline
    1603              :                 .get_current_logical_size_non_incremental(lsn)
    1604              :                 .unwrap();
    1605              :             assert_eq!(incremental, non_incremental);
    1606              :         }
    1607              :     */
    1608              : 
    1609              :     /*
    1610              :     ///
    1611              :     /// Test list_rels() function, with branches and dropped relations
    1612              :     ///
    1613              :     #[test]
    1614              :     fn test_list_rels_drop() -> Result<()> {
    1615              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1616              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1617              :         const TESTDB: u32 = 111;
    1618              : 
    1619              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1620              :         // after branching fails below
    1621              :         let mut writer = tline.begin_record(Lsn(0x10));
    1622              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1623              :         writer.finish()?;
    1624              : 
    1625              :         // Create a relation on the timeline
    1626              :         let mut writer = tline.begin_record(Lsn(0x20));
    1627              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1628              :         writer.finish()?;
    1629              : 
    1630              :         let writer = tline.begin_record(Lsn(0x00));
    1631              :         writer.finish()?;
    1632              : 
    1633              :         // Check that list_rels() lists it after LSN 2, but no before it
    1634              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1635              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1636              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1637              : 
    1638              :         // Create a branch, check that the relation is visible there
    1639              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1640              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1641              :             Some(timeline) => timeline,
    1642              :             None => panic!("Should have a local timeline"),
    1643              :         };
    1644              :         let newtline = DatadirTimelineImpl::new(newtline);
    1645              :         assert!(newtline
    1646              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1647              :             .contains(&TESTREL_A));
    1648              : 
    1649              :         // Drop it on the branch
    1650              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1651              :         new_writer.drop_relation(TESTREL_A)?;
    1652              :         new_writer.finish()?;
    1653              : 
    1654              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1655              :         assert!(newtline
    1656              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1657              :             .contains(&TESTREL_A));
    1658              :         assert!(!newtline
    1659              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1660              :             .contains(&TESTREL_A));
    1661              : 
    1662              :         // Run checkpoint and garbage collection and check that it's still not visible
    1663              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1664              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1665              : 
    1666              :         assert!(!newtline
    1667              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1668              :             .contains(&TESTREL_A));
    1669              : 
    1670              :         Ok(())
    1671              :     }
    1672              :      */
    1673              : 
    1674              :     /*
    1675              :     #[test]
    1676              :     fn test_read_beyond_eof() -> Result<()> {
    1677              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1678              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1679              : 
    1680              :         make_some_layers(&tline, Lsn(0x20))?;
    1681              :         let mut writer = tline.begin_record(Lsn(0x60));
    1682              :         walingest.put_rel_page_image(
    1683              :             &mut writer,
    1684              :             TESTREL_A,
    1685              :             0,
    1686              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1687              :         )?;
    1688              :         writer.finish()?;
    1689              : 
    1690              :         // Test read before rel creation. Should error out.
    1691              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1692              : 
    1693              :         // Read block beyond end of relation at different points in time.
    1694              :         // These reads should fall into different delta, image, and in-memory layers.
    1695              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1696              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1697              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1698              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1699              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1700              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1701              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1702              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1703              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1704              : 
    1705              :         // Test on an in-memory layer with no preceding layer
    1706              :         let mut writer = tline.begin_record(Lsn(0x70));
    1707              :         walingest.put_rel_page_image(
    1708              :             &mut writer,
    1709              :             TESTREL_B,
    1710              :             0,
    1711              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1712              :         )?;
    1713              :         writer.finish()?;
    1714              : 
    1715              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1716              : 
    1717              :         Ok(())
    1718              :     }
    1719              :      */
    1720              : }
        

Generated by: LCOV version 2.1-beta