LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 91.4 % 1157 1057
Test Date: 2024-02-14 18:05:35 Functions: 61.8 % 217 134

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::repository::*;
      13              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use pageserver_api::key::{
      19              :     dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
      20              :     rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      21              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      22              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      23              : };
      24              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      25              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      26              : use postgres_ffi::BLCKSZ;
      27              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      28              : use serde::{Deserialize, Serialize};
      29              : use std::collections::{hash_map, HashMap, HashSet};
      30              : use std::ops::ControlFlow;
      31              : use std::ops::Range;
      32              : use strum::IntoEnumIterator;
      33              : use tokio_util::sync::CancellationToken;
      34              : use tracing::{debug, trace, warn};
      35              : use utils::bin_ser::DeserializeError;
      36              : use utils::{bin_ser::BeSer, lsn::Lsn};
      37              : 
      38            0 : #[derive(Debug)]
      39              : pub enum LsnForTimestamp {
      40              :     /// Found commits both before and after the given timestamp
      41              :     Present(Lsn),
      42              : 
      43              :     /// Found no commits after the given timestamp, this means
      44              :     /// that the newest data in the branch is older than the given
      45              :     /// timestamp.
      46              :     ///
      47              :     /// All commits <= LSN happened before the given timestamp
      48              :     Future(Lsn),
      49              : 
      50              :     /// The queried timestamp is past our horizon we look back at (PITR)
      51              :     ///
      52              :     /// All commits > LSN happened after the given timestamp,
      53              :     /// but any commits < LSN might have happened before or after
      54              :     /// the given timestamp. We don't know because no data before
      55              :     /// the given lsn is available.
      56              :     Past(Lsn),
      57              : 
      58              :     /// We have found no commit with a timestamp,
      59              :     /// so we can't return anything meaningful.
      60              :     ///
      61              :     /// The associated LSN is the lower bound value we can safely
      62              :     /// create branches on, but no statement is made if it is
      63              :     /// older or newer than the timestamp.
      64              :     ///
      65              :     /// This variant can e.g. be returned right after a
      66              :     /// cluster import.
      67              :     NoData(Lsn),
      68              : }
      69              : 
      70            0 : #[derive(Debug, thiserror::Error)]
      71              : pub enum CalculateLogicalSizeError {
      72              :     #[error("cancelled")]
      73              :     Cancelled,
      74              :     #[error(transparent)]
      75              :     Other(#[from] anyhow::Error),
      76              : }
      77              : 
      78            2 : #[derive(Debug, thiserror::Error)]
      79              : pub(crate) enum CollectKeySpaceError {
      80              :     #[error(transparent)]
      81              :     Decode(#[from] DeserializeError),
      82              :     #[error(transparent)]
      83              :     PageRead(PageReconstructError),
      84              :     #[error("cancelled")]
      85              :     Cancelled,
      86              : }
      87              : 
      88              : impl From<PageReconstructError> for CollectKeySpaceError {
      89            1 :     fn from(err: PageReconstructError) -> Self {
      90            1 :         match err {
      91            0 :             PageReconstructError::Cancelled => Self::Cancelled,
      92            1 :             err => Self::PageRead(err),
      93              :         }
      94            1 :     }
      95              : }
      96              : 
      97              : impl From<PageReconstructError> for CalculateLogicalSizeError {
      98           29 :     fn from(pre: PageReconstructError) -> Self {
      99           29 :         match pre {
     100              :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
     101           27 :                 Self::Cancelled
     102              :             }
     103            2 :             _ => Self::Other(pre.into()),
     104              :         }
     105           29 :     }
     106              : }
     107              : 
     108            0 : #[derive(Debug, thiserror::Error)]
     109              : pub enum RelationError {
     110              :     #[error("Relation Already Exists")]
     111              :     AlreadyExists,
     112              :     #[error("invalid relnode")]
     113              :     InvalidRelnode,
     114              :     #[error(transparent)]
     115              :     Other(#[from] anyhow::Error),
     116              : }
     117              : 
     118              : ///
     119              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     120              : /// and other special kinds of files, in a versioned key-value store. The
     121              : /// Timeline struct provides the key-value store.
     122              : ///
     123              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     124              : /// implementation, and might be moved into a separate struct later.
     125              : impl Timeline {
     126              :     /// Start ingesting a WAL record, or other atomic modification of
     127              :     /// the timeline.
     128              :     ///
     129              :     /// This provides a transaction-like interface to perform a bunch
     130              :     /// of modifications atomically.
     131              :     ///
     132              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     133              :     /// DatadirModification object. Use the functions in the object to
     134              :     /// modify the repository state, updating all the pages and metadata
     135              :     /// that the WAL record affects. When you're done, call commit() to
     136              :     /// commit the changes.
     137              :     ///
     138              :     /// Lsn stored in modification is advanced by `ingest_record` and
     139              :     /// is used by `commit()` to update `last_record_lsn`.
     140              :     ///
     141              :     /// Calling commit() will flush all the changes and reset the state,
     142              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     143              :     ///
     144              :     /// Note that any pending modifications you make through the
     145              :     /// modification object won't be visible to calls to the 'get' and list
     146              :     /// functions of the timeline until you finish! And if you update the
     147              :     /// same page twice, the last update wins.
     148              :     ///
     149      1018095 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     150      1018095 :     where
     151      1018095 :         Self: Sized,
     152      1018095 :     {
     153      1018095 :         DatadirModification {
     154      1018095 :             tline: self,
     155      1018095 :             pending_lsns: Vec::new(),
     156      1018095 :             pending_updates: HashMap::new(),
     157      1018095 :             pending_deletions: Vec::new(),
     158      1018095 :             pending_nblocks: 0,
     159      1018095 :             pending_aux_files: None,
     160      1018095 :             pending_directory_entries: Vec::new(),
     161      1018095 :             lsn,
     162      1018095 :         }
     163      1018095 :     }
     164              : 
     165              :     //------------------------------------------------------------------------------
     166              :     // Public GET functions
     167              :     //------------------------------------------------------------------------------
     168              : 
     169              :     /// Look up given page version.
     170      4470425 :     pub(crate) async fn get_rel_page_at_lsn(
     171      4470425 :         &self,
     172      4470425 :         tag: RelTag,
     173      4470425 :         blknum: BlockNumber,
     174      4470425 :         version: Version<'_>,
     175      4470425 :         latest: bool,
     176      4470425 :         ctx: &RequestContext,
     177      4470425 :     ) -> Result<Bytes, PageReconstructError> {
     178      4470425 :         if tag.relnode == 0 {
     179            0 :             return Err(PageReconstructError::Other(
     180            0 :                 RelationError::InvalidRelnode.into(),
     181            0 :             ));
     182      4470425 :         }
     183              : 
     184      4470425 :         let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
     185      4470425 :         if blknum >= nblocks {
     186            0 :             debug!(
     187            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     188            0 :                 tag,
     189            0 :                 blknum,
     190            0 :                 version.get_lsn(),
     191            0 :                 nblocks
     192            0 :             );
     193       124652 :             return Ok(ZERO_PAGE.clone());
     194      4345773 :         }
     195      4345773 : 
     196      4345773 :         let key = rel_block_to_key(tag, blknum);
     197      4345773 :         version.get(self, key, ctx).await
     198      4470424 :     }
     199              : 
     200              :     // Get size of a database in blocks
     201            8 :     pub(crate) async fn get_db_size(
     202            8 :         &self,
     203            8 :         spcnode: Oid,
     204            8 :         dbnode: Oid,
     205            8 :         version: Version<'_>,
     206            8 :         latest: bool,
     207            8 :         ctx: &RequestContext,
     208            8 :     ) -> Result<usize, PageReconstructError> {
     209            8 :         let mut total_blocks = 0;
     210              : 
     211            8 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     212              : 
     213         2351 :         for rel in rels {
     214         2343 :             let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
     215         2343 :             total_blocks += n_blocks as usize;
     216              :         }
     217            8 :         Ok(total_blocks)
     218            8 :     }
     219              : 
     220              :     /// Get size of a relation file
     221      4612051 :     pub(crate) async fn get_rel_size(
     222      4612051 :         &self,
     223      4612051 :         tag: RelTag,
     224      4612051 :         version: Version<'_>,
     225      4612051 :         latest: bool,
     226      4612051 :         ctx: &RequestContext,
     227      4612051 :     ) -> Result<BlockNumber, PageReconstructError> {
     228      4612051 :         if tag.relnode == 0 {
     229            0 :             return Err(PageReconstructError::Other(
     230            0 :                 RelationError::InvalidRelnode.into(),
     231            0 :             ));
     232      4612051 :         }
     233              : 
     234      4612051 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     235      4304878 :             return Ok(nblocks);
     236       307173 :         }
     237       307173 : 
     238       307173 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     239         7667 :             && !self.get_rel_exists(tag, version, latest, ctx).await?
     240              :         {
     241              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     242              :             // FSM, and smgrnblocks() on it immediately afterwards,
     243              :             // without extending it.  Tolerate that by claiming that
     244              :             // any non-existent FSM fork has size 0.
     245           11 :             return Ok(0);
     246       307162 :         }
     247       307162 : 
     248       307162 :         let key = rel_size_to_key(tag);
     249       307162 :         let mut buf = version.get(self, key, ctx).await?;
     250       307158 :         let nblocks = buf.get_u32_le();
     251       307158 : 
     252       307158 :         if latest {
     253       155937 :             // Update relation size cache only if "latest" flag is set.
     254       155937 :             // This flag is set by compute when it is working with most recent version of relation.
     255       155937 :             // Typically master compute node always set latest=true.
     256       155937 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     257       155937 :             // latest=true, then it can not cause cache corruption, because with latest=true
     258       155937 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     259       155937 :             // associated with most recent value of LSN.
     260       155937 :             self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     261       155937 :         }
     262       307158 :         Ok(nblocks)
     263      4612051 :     }
     264              : 
     265              :     /// Does relation exist?
     266       430565 :     pub(crate) async fn get_rel_exists(
     267       430565 :         &self,
     268       430565 :         tag: RelTag,
     269       430565 :         version: Version<'_>,
     270       430565 :         _latest: bool,
     271       430565 :         ctx: &RequestContext,
     272       430565 :     ) -> Result<bool, PageReconstructError> {
     273       430565 :         if tag.relnode == 0 {
     274            0 :             return Err(PageReconstructError::Other(
     275            0 :                 RelationError::InvalidRelnode.into(),
     276            0 :             ));
     277       430565 :         }
     278              : 
     279              :         // first try to lookup relation in cache
     280       430565 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     281       167901 :             return Ok(true);
     282       262664 :         }
     283       262664 :         // fetch directory listing
     284       262664 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     285       262664 :         let buf = version.get(self, key, ctx).await?;
     286              : 
     287       262664 :         match RelDirectory::des(&buf).context("deserialization failure") {
     288       262664 :             Ok(dir) => {
     289       262664 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     290       262664 :                 Ok(exists)
     291              :             }
     292            0 :             Err(e) => Err(PageReconstructError::from(e)),
     293              :         }
     294       430565 :     }
     295              : 
     296              :     /// Get a list of all existing relations in given tablespace and database.
     297              :     ///
     298              :     /// # Cancel-Safety
     299              :     ///
     300              :     /// This method is cancellation-safe.
     301         8170 :     pub(crate) async fn list_rels(
     302         8170 :         &self,
     303         8170 :         spcnode: Oid,
     304         8170 :         dbnode: Oid,
     305         8170 :         version: Version<'_>,
     306         8170 :         ctx: &RequestContext,
     307         8170 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     308         8170 :         // fetch directory listing
     309         8170 :         let key = rel_dir_to_key(spcnode, dbnode);
     310         8170 :         let buf = version.get(self, key, ctx).await?;
     311              : 
     312         8170 :         match RelDirectory::des(&buf).context("deserialization failure") {
     313         8170 :             Ok(dir) => {
     314         8170 :                 let rels: HashSet<RelTag> =
     315      1937069 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     316      1937069 :                         spcnode,
     317      1937069 :                         dbnode,
     318      1937069 :                         relnode: *relnode,
     319      1937069 :                         forknum: *forknum,
     320      1937069 :                     }));
     321         8170 : 
     322         8170 :                 Ok(rels)
     323              :             }
     324            0 :             Err(e) => Err(PageReconstructError::from(e)),
     325              :         }
     326         8170 :     }
     327              : 
     328              :     /// Get the whole SLRU segment
     329            0 :     pub(crate) async fn get_slru_segment(
     330            0 :         &self,
     331            0 :         kind: SlruKind,
     332            0 :         segno: u32,
     333            0 :         lsn: Lsn,
     334            0 :         ctx: &RequestContext,
     335            0 :     ) -> Result<Bytes, PageReconstructError> {
     336            0 :         let n_blocks = self
     337            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     338            0 :             .await?;
     339            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     340            0 :         for blkno in 0..n_blocks {
     341            0 :             let block = self
     342            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     343            0 :                 .await?;
     344            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     345              :         }
     346            0 :         Ok(segment.freeze())
     347            0 :     }
     348              : 
     349              :     /// Look up given SLRU page version.
     350         2918 :     pub(crate) async fn get_slru_page_at_lsn(
     351         2918 :         &self,
     352         2918 :         kind: SlruKind,
     353         2918 :         segno: u32,
     354         2918 :         blknum: BlockNumber,
     355         2918 :         lsn: Lsn,
     356         2918 :         ctx: &RequestContext,
     357         2918 :     ) -> Result<Bytes, PageReconstructError> {
     358         2918 :         let key = slru_block_to_key(kind, segno, blknum);
     359       274951 :         self.get(key, lsn, ctx).await
     360         2917 :     }
     361              : 
     362              :     /// Get size of an SLRU segment
     363         6355 :     pub(crate) async fn get_slru_segment_size(
     364         6355 :         &self,
     365         6355 :         kind: SlruKind,
     366         6355 :         segno: u32,
     367         6355 :         version: Version<'_>,
     368         6355 :         ctx: &RequestContext,
     369         6355 :     ) -> Result<BlockNumber, PageReconstructError> {
     370         6355 :         let key = slru_segment_size_to_key(kind, segno);
     371         6355 :         let mut buf = version.get(self, key, ctx).await?;
     372         6355 :         Ok(buf.get_u32_le())
     373         6355 :     }
     374              : 
     375              :     /// Get size of an SLRU segment
     376         1642 :     pub(crate) async fn get_slru_segment_exists(
     377         1642 :         &self,
     378         1642 :         kind: SlruKind,
     379         1642 :         segno: u32,
     380         1642 :         version: Version<'_>,
     381         1642 :         ctx: &RequestContext,
     382         1642 :     ) -> Result<bool, PageReconstructError> {
     383         1642 :         // fetch directory listing
     384         1642 :         let key = slru_dir_to_key(kind);
     385         1642 :         let buf = version.get(self, key, ctx).await?;
     386              : 
     387         1642 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     388         1642 :             Ok(dir) => {
     389         1642 :                 let exists = dir.segments.get(&segno).is_some();
     390         1642 :                 Ok(exists)
     391              :             }
     392            0 :             Err(e) => Err(PageReconstructError::from(e)),
     393              :         }
     394         1642 :     }
     395              : 
     396              :     /// Locate LSN, such that all transactions that committed before
     397              :     /// 'search_timestamp' are visible, but nothing newer is.
     398              :     ///
     399              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     400              :     /// so it's not well defined which LSN you get if there were multiple commits
     401              :     /// "in flight" at that point in time.
     402              :     ///
     403          181 :     pub(crate) async fn find_lsn_for_timestamp(
     404          181 :         &self,
     405          181 :         search_timestamp: TimestampTz,
     406          181 :         cancel: &CancellationToken,
     407          181 :         ctx: &RequestContext,
     408          181 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     409          181 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     410          181 :         // We use this method to figure out the branching LSN for the new branch, but the
     411          181 :         // GC cutoff could be before the branching point and we cannot create a new branch
     412          181 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     413          181 :         // on the safe side.
     414          181 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     415          181 :         let max_lsn = self.get_last_record_lsn();
     416          181 : 
     417          181 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     418          181 :         // LSN divided by 8.
     419          181 :         let mut low = min_lsn.0 / 8;
     420          181 :         let mut high = max_lsn.0 / 8 + 1;
     421          181 : 
     422          181 :         let mut found_smaller = false;
     423          181 :         let mut found_larger = false;
     424         3088 :         while low < high {
     425         2908 :             if cancel.is_cancelled() {
     426            0 :                 return Err(PageReconstructError::Cancelled);
     427         2908 :             }
     428         2908 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     429         2908 :             let mid = (high + low) / 2;
     430              : 
     431         2908 :             let cmp = self
     432         2908 :                 .is_latest_commit_timestamp_ge_than(
     433         2908 :                     search_timestamp,
     434         2908 :                     Lsn(mid * 8),
     435         2908 :                     &mut found_smaller,
     436         2908 :                     &mut found_larger,
     437         2908 :                     ctx,
     438         2908 :                 )
     439       276993 :                 .await?;
     440              : 
     441         2907 :             if cmp {
     442          765 :                 high = mid;
     443         2142 :             } else {
     444         2142 :                 low = mid + 1;
     445         2142 :             }
     446              :         }
     447              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     448              :         // so the LSN of the last commit record before or at `search_timestamp`.
     449              :         // Remove one from `low` to get `t`.
     450              :         //
     451              :         // FIXME: it would be better to get the LSN of the previous commit.
     452              :         // Otherwise, if you restore to the returned LSN, the database will
     453              :         // include physical changes from later commits that will be marked
     454              :         // as aborted, and will need to be vacuumed away.
     455          180 :         let commit_lsn = Lsn((low - 1) * 8);
     456          180 :         match (found_smaller, found_larger) {
     457              :             (false, false) => {
     458              :                 // This can happen if no commit records have been processed yet, e.g.
     459              :                 // just after importing a cluster.
     460           24 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     461              :             }
     462              :             (false, true) => {
     463              :                 // Didn't find any commit timestamps smaller than the request
     464           19 :                 Ok(LsnForTimestamp::Past(min_lsn))
     465              :             }
     466              :             (true, false) => {
     467              :                 // Only found commits with timestamps smaller than the request.
     468              :                 // It's still a valid case for branch creation, return it.
     469              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     470              :                 // case, anyway.
     471           80 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     472              :             }
     473           57 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     474              :         }
     475          180 :     }
     476              : 
     477              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     478              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     479              :     ///
     480              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     481              :     /// with a smaller/larger timestamp.
     482              :     ///
     483         2908 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     484         2908 :         &self,
     485         2908 :         search_timestamp: TimestampTz,
     486         2908 :         probe_lsn: Lsn,
     487         2908 :         found_smaller: &mut bool,
     488         2908 :         found_larger: &mut bool,
     489         2908 :         ctx: &RequestContext,
     490         2908 :     ) -> Result<bool, PageReconstructError> {
     491         2908 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     492         2695 :             if timestamp >= search_timestamp {
     493          765 :                 *found_larger = true;
     494          765 :                 return ControlFlow::Break(true);
     495         1930 :             } else {
     496         1930 :                 *found_smaller = true;
     497         1930 :             }
     498         1930 :             ControlFlow::Continue(())
     499         2908 :         })
     500       276993 :         .await
     501         2907 :     }
     502              : 
     503              :     /// Obtain the possible timestamp range for the given lsn.
     504              :     ///
     505              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     506           12 :     pub(crate) async fn get_timestamp_for_lsn(
     507           12 :         &self,
     508           12 :         probe_lsn: Lsn,
     509           12 :         ctx: &RequestContext,
     510           12 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     511           12 :         let mut max: Option<TimestampTz> = None;
     512           12 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     513           10 :             if let Some(max_prev) = max {
     514            0 :                 max = Some(max_prev.max(timestamp));
     515           10 :             } else {
     516           10 :                 max = Some(timestamp);
     517           10 :             }
     518           10 :             ControlFlow::Continue(())
     519           12 :         })
     520           82 :         .await?;
     521              : 
     522           10 :         Ok(max)
     523           12 :     }
     524              : 
     525              :     /// Runs the given function on all the timestamps for a given lsn
     526              :     ///
     527              :     /// The return value is either given by the closure, or set to the `Default`
     528              :     /// impl's output.
     529         2920 :     async fn map_all_timestamps<T: Default>(
     530         2920 :         &self,
     531         2920 :         probe_lsn: Lsn,
     532         2920 :         ctx: &RequestContext,
     533         2920 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     534         2920 :     ) -> Result<T, PageReconstructError> {
     535         2920 :         for segno in self
     536         2920 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     537         1084 :             .await?
     538              :         {
     539         2918 :             let nblocks = self
     540         2918 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     541         1040 :                 .await?;
     542         2918 :             for blknum in (0..nblocks).rev() {
     543         2918 :                 let clog_page = self
     544         2918 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     545       274951 :                     .await?;
     546              : 
     547         2917 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     548         2705 :                     let mut timestamp_bytes = [0u8; 8];
     549         2705 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     550         2705 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     551         2705 : 
     552         2705 :                     match f(timestamp) {
     553          765 :                         ControlFlow::Break(b) => return Ok(b),
     554         1940 :                         ControlFlow::Continue(()) => (),
     555              :                     }
     556          212 :                 }
     557              :             }
     558              :         }
     559         2152 :         Ok(Default::default())
     560         2919 :     }
     561              : 
     562          607 :     pub(crate) async fn get_slru_keyspace(
     563          607 :         &self,
     564          607 :         version: Version<'_>,
     565          607 :         ctx: &RequestContext,
     566          607 :     ) -> Result<KeySpace, PageReconstructError> {
     567          607 :         let mut accum = KeySpaceAccum::new();
     568              : 
     569         2410 :         for kind in SlruKind::iter() {
     570         1809 :             let mut segments: Vec<u32> = self
     571         1809 :                 .list_slru_segments(kind, version, ctx)
     572          160 :                 .await?
     573         1803 :                 .into_iter()
     574         1803 :                 .collect();
     575         1803 :             segments.sort_unstable();
     576              : 
     577         3642 :             for seg in segments {
     578         1839 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     579              : 
     580         1839 :                 accum.add_range(
     581         1839 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     582         1839 :                 );
     583              :             }
     584              :         }
     585              : 
     586          601 :         Ok(accum.to_keyspace())
     587          607 :     }
     588              : 
     589              :     /// Get a list of SLRU segments
     590         4732 :     pub(crate) async fn list_slru_segments(
     591         4732 :         &self,
     592         4732 :         kind: SlruKind,
     593         4732 :         version: Version<'_>,
     594         4732 :         ctx: &RequestContext,
     595         4732 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     596         4732 :         // fetch directory entry
     597         4732 :         let key = slru_dir_to_key(kind);
     598              : 
     599         4732 :         let buf = version.get(self, key, ctx).await?;
     600         4724 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     601         4724 :             Ok(dir) => Ok(dir.segments),
     602            0 :             Err(e) => Err(PageReconstructError::from(e)),
     603              :         }
     604         4732 :     }
     605              : 
     606         2438 :     pub(crate) async fn get_relmap_file(
     607         2438 :         &self,
     608         2438 :         spcnode: Oid,
     609         2438 :         dbnode: Oid,
     610         2438 :         version: Version<'_>,
     611         2438 :         ctx: &RequestContext,
     612         2438 :     ) -> Result<Bytes, PageReconstructError> {
     613         2438 :         let key = relmap_file_key(spcnode, dbnode);
     614              : 
     615         2438 :         let buf = version.get(self, key, ctx).await?;
     616         2438 :         Ok(buf)
     617         2438 :     }
     618              : 
     619          601 :     pub(crate) async fn list_dbdirs(
     620          601 :         &self,
     621          601 :         lsn: Lsn,
     622          601 :         ctx: &RequestContext,
     623          601 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     624              :         // fetch directory entry
     625          601 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     626              : 
     627          601 :         match DbDirectory::des(&buf).context("deserialization failure") {
     628          601 :             Ok(dir) => Ok(dir.dbdirs),
     629            0 :             Err(e) => Err(PageReconstructError::from(e)),
     630              :         }
     631          601 :     }
     632              : 
     633            2 :     pub(crate) async fn get_twophase_file(
     634            2 :         &self,
     635            2 :         xid: TransactionId,
     636            2 :         lsn: Lsn,
     637            2 :         ctx: &RequestContext,
     638            2 :     ) -> Result<Bytes, PageReconstructError> {
     639            2 :         let key = twophase_file_key(xid);
     640            2 :         let buf = self.get(key, lsn, ctx).await?;
     641            2 :         Ok(buf)
     642            2 :     }
     643              : 
     644          601 :     pub(crate) async fn list_twophase_files(
     645          601 :         &self,
     646          601 :         lsn: Lsn,
     647          601 :         ctx: &RequestContext,
     648          601 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     649              :         // fetch directory entry
     650          601 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     651              : 
     652          601 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     653          601 :             Ok(dir) => Ok(dir.xids),
     654            0 :             Err(e) => Err(PageReconstructError::from(e)),
     655              :         }
     656          601 :     }
     657              : 
     658          595 :     pub(crate) async fn get_control_file(
     659          595 :         &self,
     660          595 :         lsn: Lsn,
     661          595 :         ctx: &RequestContext,
     662          595 :     ) -> Result<Bytes, PageReconstructError> {
     663          595 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     664          595 :     }
     665              : 
     666         1965 :     pub(crate) async fn get_checkpoint(
     667         1965 :         &self,
     668         1965 :         lsn: Lsn,
     669         1965 :         ctx: &RequestContext,
     670         1965 :     ) -> Result<Bytes, PageReconstructError> {
     671         1965 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     672         1965 :     }
     673              : 
     674         2422 :     pub(crate) async fn list_aux_files(
     675         2422 :         &self,
     676         2422 :         lsn: Lsn,
     677         2422 :         ctx: &RequestContext,
     678         2422 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     679         2422 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     680         1430 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     681         1430 :                 Ok(dir) => Ok(dir.files),
     682            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     683              :             },
     684          992 :             Err(e) => {
     685              :                 // This is expected: historical databases do not have the key.
     686            0 :                 debug!("Failed to get info about AUX files: {}", e);
     687          992 :                 Ok(HashMap::new())
     688              :             }
     689              :         }
     690         2422 :     }
     691              : 
     692              :     /// Does the same as get_current_logical_size but counted on demand.
     693              :     /// Used to initialize the logical size tracking on startup.
     694              :     ///
     695              :     /// Only relation blocks are counted currently. That excludes metadata,
     696              :     /// SLRUs, twophase files etc.
     697              :     ///
     698              :     /// # Cancel-Safety
     699              :     ///
     700              :     /// This method is cancellation-safe.
     701          689 :     pub async fn get_current_logical_size_non_incremental(
     702          689 :         &self,
     703          689 :         lsn: Lsn,
     704          689 :         ctx: &RequestContext,
     705          689 :     ) -> Result<u64, CalculateLogicalSizeError> {
     706          689 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     707              : 
     708              :         // Fetch list of database dirs and iterate them
     709          932 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     710          682 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     711              : 
     712          682 :         let mut total_size: u64 = 0;
     713         2698 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     714       625253 :             for rel in self
     715         2698 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     716          363 :                 .await?
     717              :             {
     718       625253 :                 if self.cancel.is_cancelled() {
     719            1 :                     return Err(CalculateLogicalSizeError::Cancelled);
     720       625252 :                 }
     721       625252 :                 let relsize_key = rel_size_to_key(rel);
     722       625252 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     723       625219 :                 let relsize = buf.get_u32_le();
     724       625219 : 
     725       625219 :                 total_size += relsize as u64;
     726              :             }
     727              :         }
     728          648 :         Ok(total_size * BLCKSZ as u64)
     729          678 :     }
     730              : 
     731              :     ///
     732              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     733              :     /// Anything that's not listed maybe removed from the underlying storage (from
     734              :     /// that LSN forwards).
     735          907 :     pub(crate) async fn collect_keyspace(
     736          907 :         &self,
     737          907 :         lsn: Lsn,
     738          907 :         ctx: &RequestContext,
     739          907 :     ) -> Result<KeySpace, CollectKeySpaceError> {
     740          907 :         // Iterate through key ranges, greedily packing them into partitions
     741          907 :         let mut result = KeySpaceAccum::new();
     742          907 : 
     743          907 :         // The dbdir metadata always exists
     744          907 :         result.add_key(DBDIR_KEY);
     745              : 
     746              :         // Fetch list of database dirs and iterate them
     747         2297 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     748          906 :         let dbdir = DbDirectory::des(&buf)?;
     749              : 
     750          906 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     751          906 :         dbs.sort_unstable();
     752         3927 :         for (spcnode, dbnode) in dbs {
     753         3022 :             result.add_key(relmap_file_key(spcnode, dbnode));
     754         3022 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     755              : 
     756         3022 :             let mut rels: Vec<RelTag> = self
     757         3022 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     758          654 :                 .await?
     759         3022 :                 .into_iter()
     760         3022 :                 .collect();
     761         3022 :             rels.sort_unstable();
     762       731883 :             for rel in rels {
     763       728862 :                 let relsize_key = rel_size_to_key(rel);
     764       728862 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     765       728861 :                 let relsize = buf.get_u32_le();
     766       728861 : 
     767       728861 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     768       728861 :                 result.add_key(relsize_key);
     769              :             }
     770              :         }
     771              : 
     772              :         // Iterate SLRUs next
     773         2715 :         for kind in [
     774          905 :             SlruKind::Clog,
     775          905 :             SlruKind::MultiXactMembers,
     776          905 :             SlruKind::MultiXactOffsets,
     777              :         ] {
     778         2715 :             let slrudir_key = slru_dir_to_key(kind);
     779         2715 :             result.add_key(slrudir_key);
     780         8465 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     781         2715 :             let dir = SlruSegmentDirectory::des(&buf)?;
     782         2715 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     783         2715 :             segments.sort_unstable();
     784         4932 :             for segno in segments {
     785         2217 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     786         2217 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     787         2217 :                 let segsize = buf.get_u32_le();
     788         2217 : 
     789         2217 :                 result.add_range(
     790         2217 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     791         2217 :                 );
     792         2217 :                 result.add_key(segsize_key);
     793              :             }
     794              :         }
     795              : 
     796              :         // Then pg_twophase
     797          905 :         result.add_key(TWOPHASEDIR_KEY);
     798         3170 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     799          905 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     800          905 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     801          905 :         xids.sort_unstable();
     802          905 :         for xid in xids {
     803            0 :             result.add_key(twophase_file_key(xid));
     804            0 :         }
     805              : 
     806          905 :         result.add_key(CONTROLFILE_KEY);
     807          905 :         result.add_key(CHECKPOINT_KEY);
     808          905 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     809          632 :             result.add_key(AUX_FILES_KEY);
     810          632 :         }
     811          905 :         Ok(result.to_keyspace())
     812          906 :     }
     813              : 
     814              :     /// Get cached size of relation if it not updated after specified LSN
     815     61931885 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     816     61931885 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     817     61931885 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     818     61572835 :             if lsn >= *cached_lsn {
     819     61357462 :                 return Some(*nblocks);
     820       215373 :             }
     821       359050 :         }
     822       574423 :         None
     823     61931885 :     }
     824              : 
     825              :     /// Update cached relation size if there is no more recent update
     826       155937 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     827       155937 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     828       155937 :         match rel_size_cache.entry(tag) {
     829       128967 :             hash_map::Entry::Occupied(mut entry) => {
     830       128967 :                 let cached_lsn = entry.get_mut();
     831       128967 :                 if lsn >= cached_lsn.0 {
     832           12 :                     *cached_lsn = (lsn, nblocks);
     833       128955 :                 }
     834              :             }
     835        26970 :             hash_map::Entry::Vacant(entry) => {
     836        26970 :                 entry.insert((lsn, nblocks));
     837        26970 :             }
     838              :         }
     839       155937 :     }
     840              : 
     841              :     /// Store cached relation size
     842      1928349 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     843      1928349 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     844      1928349 :         rel_size_cache.insert(tag, (lsn, nblocks));
     845      1928349 :     }
     846              : 
     847              :     /// Remove cached relation size
     848        67312 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     849        67312 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     850        67312 :         rel_size_cache.remove(tag);
     851        67312 :     }
     852              : }
     853              : 
     854              : /// DatadirModification represents an operation to ingest an atomic set of
     855              : /// updates to the repository. It is created by the 'begin_record'
     856              : /// function. It is called for each WAL record, so that all the modifications
     857              : /// by a one WAL record appear atomic.
     858              : pub struct DatadirModification<'a> {
     859              :     /// The timeline this modification applies to. You can access this to
     860              :     /// read the state, but note that any pending updates are *not* reflected
     861              :     /// in the state in 'tline' yet.
     862              :     pub tline: &'a Timeline,
     863              : 
     864              :     /// Current LSN of the modification
     865              :     lsn: Lsn,
     866              : 
     867              :     // The modifications are not applied directly to the underlying key-value store.
     868              :     // The put-functions add the modifications here, and they are flushed to the
     869              :     // underlying key-value store by the 'finish' function.
     870              :     pending_lsns: Vec<Lsn>,
     871              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     872              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     873              :     pending_nblocks: i64,
     874              : 
     875              :     // If we already wrote any aux file changes in this modification, stash the latest dir.  If set,
     876              :     // [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
     877              :     // if AUX_FILES_KEY is already set.
     878              :     pending_aux_files: Option<AuxFilesDirectory>,
     879              : 
     880              :     /// For special "directory" keys that store key-value maps, track the size of the map
     881              :     /// if it was updated in this modification.
     882              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
     883              : }
     884              : 
     885              : impl<'a> DatadirModification<'a> {
     886              :     /// Get the current lsn
     887     56889269 :     pub(crate) fn get_lsn(&self) -> Lsn {
     888     56889269 :         self.lsn
     889     56889269 :     }
     890              : 
     891              :     /// Set the current lsn
     892     72693847 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     893     72693847 :         ensure!(
     894     72693847 :             lsn >= self.lsn,
     895            0 :             "setting an older lsn {} than {} is not allowed",
     896              :             lsn,
     897              :             self.lsn
     898              :         );
     899     72693847 :         if lsn > self.lsn {
     900     72693847 :             self.pending_lsns.push(self.lsn);
     901     72693847 :             self.lsn = lsn;
     902     72693847 :         }
     903     72693847 :         Ok(())
     904     72693847 :     }
     905              : 
     906              :     /// Initialize a completely new repository.
     907              :     ///
     908              :     /// This inserts the directory metadata entries that are assumed to
     909              :     /// always exist.
     910          680 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     911          680 :         let buf = DbDirectory::ser(&DbDirectory {
     912          680 :             dbdirs: HashMap::new(),
     913          680 :         })?;
     914          680 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
     915          680 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     916          680 : 
     917          680 :         // Create AuxFilesDirectory
     918          680 :         self.init_aux_dir()?;
     919              : 
     920          680 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     921          680 :             xids: HashSet::new(),
     922          680 :         })?;
     923          680 :         self.pending_directory_entries
     924          680 :             .push((DirectoryKind::TwoPhase, 0));
     925          680 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     926              : 
     927          680 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     928          680 :         let empty_dir = Value::Image(buf);
     929          680 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     930          680 :         self.pending_directory_entries
     931          680 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     932          680 :         self.put(
     933          680 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     934          680 :             empty_dir.clone(),
     935          680 :         );
     936          680 :         self.pending_directory_entries
     937          680 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     938          680 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     939          680 :         self.pending_directory_entries
     940          680 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
     941          680 : 
     942          680 :         Ok(())
     943          680 :     }
     944              : 
     945              :     #[cfg(test)]
     946           70 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     947           70 :         self.init_empty()?;
     948           70 :         self.put_control_file(bytes::Bytes::from_static(
     949           70 :             b"control_file contents do not matter",
     950           70 :         ))
     951           70 :         .context("put_control_file")?;
     952           70 :         self.put_checkpoint(bytes::Bytes::from_static(
     953           70 :             b"checkpoint_file contents do not matter",
     954           70 :         ))
     955           70 :         .context("put_checkpoint_file")?;
     956           70 :         Ok(())
     957           70 :     }
     958              : 
     959              :     /// Put a new page version that can be constructed from a WAL record
     960              :     ///
     961              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     962              :     /// current end-of-file. It's up to the caller to check that the relation size
     963              :     /// matches the blocks inserted!
     964     52739517 :     pub fn put_rel_wal_record(
     965     52739517 :         &mut self,
     966     52739517 :         rel: RelTag,
     967     52739517 :         blknum: BlockNumber,
     968     52739517 :         rec: NeonWalRecord,
     969     52739517 :     ) -> anyhow::Result<()> {
     970     52739517 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     971     52739517 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     972     52739517 :         Ok(())
     973     52739517 :     }
     974              : 
     975              :     // Same, but for an SLRU.
     976      6091255 :     pub fn put_slru_wal_record(
     977      6091255 :         &mut self,
     978      6091255 :         kind: SlruKind,
     979      6091255 :         segno: u32,
     980      6091255 :         blknum: BlockNumber,
     981      6091255 :         rec: NeonWalRecord,
     982      6091255 :     ) -> anyhow::Result<()> {
     983      6091255 :         self.put(
     984      6091255 :             slru_block_to_key(kind, segno, blknum),
     985      6091255 :             Value::WalRecord(rec),
     986      6091255 :         );
     987      6091255 :         Ok(())
     988      6091255 :     }
     989              : 
     990              :     /// Like put_wal_record, but with ready-made image of the page.
     991      2591084 :     pub fn put_rel_page_image(
     992      2591084 :         &mut self,
     993      2591084 :         rel: RelTag,
     994      2591084 :         blknum: BlockNumber,
     995      2591084 :         img: Bytes,
     996      2591084 :     ) -> anyhow::Result<()> {
     997      2591084 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     998      2591084 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     999      2591084 :         Ok(())
    1000      2591084 :     }
    1001              : 
    1002         3478 :     pub fn put_slru_page_image(
    1003         3478 :         &mut self,
    1004         3478 :         kind: SlruKind,
    1005         3478 :         segno: u32,
    1006         3478 :         blknum: BlockNumber,
    1007         3478 :         img: Bytes,
    1008         3478 :     ) -> anyhow::Result<()> {
    1009         3478 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
    1010         3478 :         Ok(())
    1011         3478 :     }
    1012              : 
    1013              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1014         2530 :     pub async fn put_relmap_file(
    1015         2530 :         &mut self,
    1016         2530 :         spcnode: Oid,
    1017         2530 :         dbnode: Oid,
    1018         2530 :         img: Bytes,
    1019         2530 :         ctx: &RequestContext,
    1020         2530 :     ) -> anyhow::Result<()> {
    1021              :         // Add it to the directory (if it doesn't exist already)
    1022         2530 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1023         2530 :         let mut dbdir = DbDirectory::des(&buf)?;
    1024              : 
    1025         2530 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1026         2530 :         if r.is_none() || r == Some(false) {
    1027              :             // The dbdir entry didn't exist, or it contained a
    1028              :             // 'false'. The 'insert' call already updated it with
    1029              :             // 'true', now write the updated 'dbdirs' map back.
    1030         2468 :             let buf = DbDirectory::ser(&dbdir)?;
    1031         2468 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1032         2468 : 
    1033         2468 :             // Create AuxFilesDirectory as well
    1034         2468 :             self.init_aux_dir()?;
    1035           62 :         }
    1036         2530 :         if r.is_none() {
    1037           36 :             // Create RelDirectory
    1038           36 :             let buf = RelDirectory::ser(&RelDirectory {
    1039           36 :                 rels: HashSet::new(),
    1040           36 :             })?;
    1041           36 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1042           36 :             self.put(
    1043           36 :                 rel_dir_to_key(spcnode, dbnode),
    1044           36 :                 Value::Image(Bytes::from(buf)),
    1045           36 :             );
    1046         2494 :         }
    1047              : 
    1048         2530 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1049         2530 :         Ok(())
    1050         2530 :     }
    1051              : 
    1052            4 :     pub async fn put_twophase_file(
    1053            4 :         &mut self,
    1054            4 :         xid: TransactionId,
    1055            4 :         img: Bytes,
    1056            4 :         ctx: &RequestContext,
    1057            4 :     ) -> anyhow::Result<()> {
    1058              :         // Add it to the directory entry
    1059            4 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1060            4 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1061            4 :         if !dir.xids.insert(xid) {
    1062            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1063            4 :         }
    1064            4 :         self.pending_directory_entries
    1065            4 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1066            4 :         self.put(
    1067            4 :             TWOPHASEDIR_KEY,
    1068            4 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1069              :         );
    1070              : 
    1071            4 :         self.put(twophase_file_key(xid), Value::Image(img));
    1072            4 :         Ok(())
    1073            4 :     }
    1074              : 
    1075          678 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1076          678 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1077          678 :         Ok(())
    1078          678 :     }
    1079              : 
    1080        34992 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1081        34992 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1082        34992 :         Ok(())
    1083        34992 :     }
    1084              : 
    1085            3 :     pub async fn drop_dbdir(
    1086            3 :         &mut self,
    1087            3 :         spcnode: Oid,
    1088            3 :         dbnode: Oid,
    1089            3 :         ctx: &RequestContext,
    1090            3 :     ) -> anyhow::Result<()> {
    1091            3 :         let total_blocks = self
    1092            3 :             .tline
    1093            3 :             .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
    1094            0 :             .await?;
    1095              : 
    1096              :         // Remove entry from dbdir
    1097            3 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1098            3 :         let mut dir = DbDirectory::des(&buf)?;
    1099            3 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1100            3 :             let buf = DbDirectory::ser(&dir)?;
    1101            3 :             self.pending_directory_entries
    1102            3 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1103            3 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1104              :         } else {
    1105            0 :             warn!(
    1106            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1107            0 :                 spcnode, dbnode
    1108            0 :             );
    1109              :         }
    1110              : 
    1111              :         // Update logical database size.
    1112            3 :         self.pending_nblocks -= total_blocks as i64;
    1113            3 : 
    1114            3 :         // Delete all relations and metadata files for the spcnode/dnode
    1115            3 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1116            3 :         Ok(())
    1117            3 :     }
    1118              : 
    1119              :     /// Create a relation fork.
    1120              :     ///
    1121              :     /// 'nblocks' is the initial size.
    1122       653184 :     pub async fn put_rel_creation(
    1123       653184 :         &mut self,
    1124       653184 :         rel: RelTag,
    1125       653184 :         nblocks: BlockNumber,
    1126       653184 :         ctx: &RequestContext,
    1127       653184 :     ) -> Result<(), RelationError> {
    1128       653184 :         if rel.relnode == 0 {
    1129            0 :             return Err(RelationError::InvalidRelnode);
    1130       653184 :         }
    1131              :         // It's possible that this is the first rel for this db in this
    1132              :         // tablespace.  Create the reldir entry for it if so.
    1133       653184 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1134       653184 :             .context("deserialize db")?;
    1135       653184 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1136       653184 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
    1137              :             // Didn't exist. Update dbdir
    1138         2437 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
    1139         2437 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1140         2437 :             self.pending_directory_entries
    1141         2437 :                 .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1142         2437 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1143         2437 : 
    1144         2437 :             // and create the RelDirectory
    1145         2437 :             RelDirectory::default()
    1146              :         } else {
    1147              :             // reldir already exists, fetch it
    1148       650747 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1149       650747 :                 .context("deserialize db")?
    1150              :         };
    1151              : 
    1152              :         // Add the new relation to the rel directory entry, and write it back
    1153       653184 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1154            0 :             return Err(RelationError::AlreadyExists);
    1155       653184 :         }
    1156       653184 : 
    1157       653184 :         self.pending_directory_entries
    1158       653184 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1159       653184 : 
    1160       653184 :         self.put(
    1161       653184 :             rel_dir_key,
    1162       653184 :             Value::Image(Bytes::from(
    1163       653184 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1164              :             )),
    1165              :         );
    1166              : 
    1167              :         // Put size
    1168       653184 :         let size_key = rel_size_to_key(rel);
    1169       653184 :         let buf = nblocks.to_le_bytes();
    1170       653184 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1171       653184 : 
    1172       653184 :         self.pending_nblocks += nblocks as i64;
    1173       653184 : 
    1174       653184 :         // Update relation size cache
    1175       653184 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1176       653184 : 
    1177       653184 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1178       653184 :         // caller.
    1179       653184 :         Ok(())
    1180       653184 :     }
    1181              : 
    1182              :     /// Truncate relation
    1183         6344 :     pub async fn put_rel_truncation(
    1184         6344 :         &mut self,
    1185         6344 :         rel: RelTag,
    1186         6344 :         nblocks: BlockNumber,
    1187         6344 :         ctx: &RequestContext,
    1188         6344 :     ) -> anyhow::Result<()> {
    1189         6344 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1190         6344 :         if self
    1191         6344 :             .tline
    1192         6344 :             .get_rel_exists(rel, Version::Modified(self), true, ctx)
    1193            0 :             .await?
    1194              :         {
    1195         6344 :             let size_key = rel_size_to_key(rel);
    1196              :             // Fetch the old size first
    1197         6344 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1198         6344 : 
    1199         6344 :             // Update the entry with the new size.
    1200         6344 :             let buf = nblocks.to_le_bytes();
    1201         6344 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1202         6344 : 
    1203         6344 :             // Update relation size cache
    1204         6344 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1205         6344 : 
    1206         6344 :             // Update relation size cache
    1207         6344 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1208         6344 : 
    1209         6344 :             // Update logical database size.
    1210         6344 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1211            0 :         }
    1212         6344 :         Ok(())
    1213         6344 :     }
    1214              : 
    1215              :     /// Extend relation
    1216              :     /// If new size is smaller, do nothing.
    1217      1829588 :     pub async fn put_rel_extend(
    1218      1829588 :         &mut self,
    1219      1829588 :         rel: RelTag,
    1220      1829588 :         nblocks: BlockNumber,
    1221      1829588 :         ctx: &RequestContext,
    1222      1829588 :     ) -> anyhow::Result<()> {
    1223      1829588 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1224              : 
    1225              :         // Put size
    1226      1829588 :         let size_key = rel_size_to_key(rel);
    1227      1829588 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1228      1829584 : 
    1229      1829584 :         // only extend relation here. never decrease the size
    1230      1829584 :         if nblocks > old_size {
    1231      1262477 :             let buf = nblocks.to_le_bytes();
    1232      1262477 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1233      1262477 : 
    1234      1262477 :             // Update relation size cache
    1235      1262477 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1236      1262477 : 
    1237      1262477 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1238      1262477 :         }
    1239      1829584 :         Ok(())
    1240      1829588 :     }
    1241              : 
    1242              :     /// Drop a relation.
    1243        67312 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1244        67312 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1245              : 
    1246              :         // Remove it from the directory entry
    1247        67312 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1248        67312 :         let buf = self.get(dir_key, ctx).await?;
    1249        67312 :         let mut dir = RelDirectory::des(&buf)?;
    1250              : 
    1251        67312 :         self.pending_directory_entries
    1252        67312 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1253        67312 : 
    1254        67312 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1255        67312 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1256              :         } else {
    1257            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1258              :         }
    1259              : 
    1260              :         // update logical size
    1261        67312 :         let size_key = rel_size_to_key(rel);
    1262        67312 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1263        67312 :         self.pending_nblocks -= old_size as i64;
    1264        67312 : 
    1265        67312 :         // Remove enty from relation size cache
    1266        67312 :         self.tline.remove_cached_rel_size(&rel);
    1267        67312 : 
    1268        67312 :         // Delete size entry, as well as all blocks
    1269        67312 :         self.delete(rel_key_range(rel));
    1270        67312 : 
    1271        67312 :         Ok(())
    1272        67312 :     }
    1273              : 
    1274         1872 :     pub async fn put_slru_segment_creation(
    1275         1872 :         &mut self,
    1276         1872 :         kind: SlruKind,
    1277         1872 :         segno: u32,
    1278         1872 :         nblocks: BlockNumber,
    1279         1872 :         ctx: &RequestContext,
    1280         1872 :     ) -> anyhow::Result<()> {
    1281         1872 :         // Add it to the directory entry
    1282         1872 :         let dir_key = slru_dir_to_key(kind);
    1283         1872 :         let buf = self.get(dir_key, ctx).await?;
    1284         1872 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1285              : 
    1286         1872 :         if !dir.segments.insert(segno) {
    1287            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1288         1872 :         }
    1289         1872 :         self.pending_directory_entries
    1290         1872 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1291         1872 :         self.put(
    1292         1872 :             dir_key,
    1293         1872 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1294              :         );
    1295              : 
    1296              :         // Put size
    1297         1872 :         let size_key = slru_segment_size_to_key(kind, segno);
    1298         1872 :         let buf = nblocks.to_le_bytes();
    1299         1872 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1300         1872 : 
    1301         1872 :         // even if nblocks > 0, we don't insert any actual blocks here
    1302         1872 : 
    1303         1872 :         Ok(())
    1304         1872 :     }
    1305              : 
    1306              :     /// Extend SLRU segment
    1307         1618 :     pub fn put_slru_extend(
    1308         1618 :         &mut self,
    1309         1618 :         kind: SlruKind,
    1310         1618 :         segno: u32,
    1311         1618 :         nblocks: BlockNumber,
    1312         1618 :     ) -> anyhow::Result<()> {
    1313         1618 :         // Put size
    1314         1618 :         let size_key = slru_segment_size_to_key(kind, segno);
    1315         1618 :         let buf = nblocks.to_le_bytes();
    1316         1618 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1317         1618 :         Ok(())
    1318         1618 :     }
    1319              : 
    1320              :     /// This method is used for marking truncated SLRU files
    1321           18 :     pub async fn drop_slru_segment(
    1322           18 :         &mut self,
    1323           18 :         kind: SlruKind,
    1324           18 :         segno: u32,
    1325           18 :         ctx: &RequestContext,
    1326           18 :     ) -> anyhow::Result<()> {
    1327           18 :         // Remove it from the directory entry
    1328           18 :         let dir_key = slru_dir_to_key(kind);
    1329           18 :         let buf = self.get(dir_key, ctx).await?;
    1330           18 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1331              : 
    1332           18 :         if !dir.segments.remove(&segno) {
    1333            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1334           18 :         }
    1335           18 :         self.pending_directory_entries
    1336           18 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1337           18 :         self.put(
    1338           18 :             dir_key,
    1339           18 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1340              :         );
    1341              : 
    1342              :         // Delete size entry, as well as all blocks
    1343           18 :         self.delete(slru_segment_key_range(kind, segno));
    1344           18 : 
    1345           18 :         Ok(())
    1346           18 :     }
    1347              : 
    1348              :     /// Drop a relmapper file (pg_filenode.map)
    1349            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1350            0 :         // TODO
    1351            0 :         Ok(())
    1352            0 :     }
    1353              : 
    1354              :     /// This method is used for marking truncated SLRU files
    1355            2 :     pub async fn drop_twophase_file(
    1356            2 :         &mut self,
    1357            2 :         xid: TransactionId,
    1358            2 :         ctx: &RequestContext,
    1359            2 :     ) -> anyhow::Result<()> {
    1360              :         // Remove it from the directory entry
    1361            2 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1362            2 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1363              : 
    1364            2 :         if !dir.xids.remove(&xid) {
    1365            0 :             warn!("twophase file for xid {} does not exist", xid);
    1366            2 :         }
    1367            2 :         self.pending_directory_entries
    1368            2 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1369            2 :         self.put(
    1370            2 :             TWOPHASEDIR_KEY,
    1371            2 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1372              :         );
    1373              : 
    1374              :         // Delete it
    1375            2 :         self.delete(twophase_key_range(xid));
    1376            2 : 
    1377            2 :         Ok(())
    1378            2 :     }
    1379              : 
    1380         3148 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1381         3148 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1382         3148 :             files: HashMap::new(),
    1383         3148 :         })?;
    1384         3148 :         self.pending_directory_entries
    1385         3148 :             .push((DirectoryKind::AuxFiles, 0));
    1386         3148 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1387         3148 :         Ok(())
    1388         3148 :     }
    1389              : 
    1390          124 :     pub async fn put_file(
    1391          124 :         &mut self,
    1392          124 :         path: &str,
    1393          124 :         content: &[u8],
    1394          124 :         ctx: &RequestContext,
    1395          124 :     ) -> anyhow::Result<()> {
    1396          124 :         let file_path = path.to_string();
    1397          124 :         let content = if content.is_empty() {
    1398            7 :             None
    1399              :         } else {
    1400          117 :             Some(Bytes::copy_from_slice(content))
    1401              :         };
    1402              : 
    1403          124 :         let dir = if let Some(mut dir) = self.pending_aux_files.take() {
    1404              :             // We already updated aux files in `self`: emit a delta and update our latest value
    1405              : 
    1406           95 :             self.put(
    1407           95 :                 AUX_FILES_KEY,
    1408           95 :                 Value::WalRecord(NeonWalRecord::AuxFile {
    1409           95 :                     file_path: file_path.clone(),
    1410           95 :                     content: content.clone(),
    1411           95 :                 }),
    1412           95 :             );
    1413           95 : 
    1414           95 :             dir.upsert(file_path, content);
    1415           95 :             dir
    1416              :         } else {
    1417              :             // Check if the AUX_FILES_KEY is initialized
    1418           29 :             match self.get(AUX_FILES_KEY, ctx).await {
    1419           24 :                 Ok(dir_bytes) => {
    1420           24 :                     let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1421              :                     // Key is already set, we may append a delta
    1422           24 :                     self.put(
    1423           24 :                         AUX_FILES_KEY,
    1424           24 :                         Value::WalRecord(NeonWalRecord::AuxFile {
    1425           24 :                             file_path: file_path.clone(),
    1426           24 :                             content: content.clone(),
    1427           24 :                         }),
    1428           24 :                     );
    1429           24 :                     dir.upsert(file_path, content);
    1430           24 :                     dir
    1431              :                 }
    1432              :                 Err(
    1433            0 :                     e @ (PageReconstructError::AncestorStopping(_)
    1434              :                     | PageReconstructError::Cancelled
    1435              :                     | PageReconstructError::AncestorLsnTimeout(_)),
    1436              :                 ) => {
    1437              :                     // Important that we do not interpret a shutdown error as "not found" and thereby
    1438              :                     // reset the map.
    1439            0 :                     return Err(e.into());
    1440              :                 }
    1441              :                 // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
    1442              :                 // we are assuming that all _other_ possible errors represents a missing key.  If some
    1443              :                 // other error occurs, we may incorrectly reset the map of aux files.
    1444              :                 Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
    1445              :                     // Key is missing, we must insert an image as the basis for subsequent deltas.
    1446              : 
    1447            5 :                     let mut dir = AuxFilesDirectory {
    1448            5 :                         files: HashMap::new(),
    1449            5 :                     };
    1450            5 :                     dir.upsert(file_path, content);
    1451            5 :                     self.put(
    1452            5 :                         AUX_FILES_KEY,
    1453            5 :                         Value::Image(Bytes::from(
    1454            5 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1455              :                         )),
    1456              :                     );
    1457            5 :                     dir
    1458              :                 }
    1459              :             }
    1460              :         };
    1461              : 
    1462          124 :         self.pending_directory_entries
    1463          124 :             .push((DirectoryKind::AuxFiles, dir.files.len()));
    1464          124 :         self.pending_aux_files = Some(dir);
    1465          124 : 
    1466          124 :         Ok(())
    1467          124 :     }
    1468              : 
    1469              :     ///
    1470              :     /// Flush changes accumulated so far to the underlying repository.
    1471              :     ///
    1472              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1473              :     /// you to flush them to the underlying repository before the final `commit`.
    1474              :     /// That allows to free up the memory used to hold the pending changes.
    1475              :     ///
    1476              :     /// Currently only used during bulk import of a data directory. In that
    1477              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1478              :     /// whole import fails and the timeline will be deleted anyway.
    1479              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1480              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1481              :     ///
    1482              :     /// Note: A consequence of flushing the pending operations is that they
    1483              :     /// won't be visible to subsequent operations until `commit`. The function
    1484              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1485              :     /// for bulk import, where you are just loading data pages and won't try to
    1486              :     /// modify the same pages twice.
    1487       578076 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1488       578076 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1489       578076 :         // to scan through the pending_updates list.
    1490       578076 :         let pending_nblocks = self.pending_nblocks;
    1491       578076 :         if pending_nblocks < 10000 {
    1492       578076 :             return Ok(());
    1493            0 :         }
    1494              : 
    1495            0 :         let writer = self.tline.writer().await;
    1496              : 
    1497              :         // Flush relation and  SLRU data blocks, keep metadata.
    1498            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1499            0 :         for (key, values) in self.pending_updates.drain() {
    1500            0 :             for (lsn, value) in values {
    1501            0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1502              :                     // This bails out on first error without modifying pending_updates.
    1503              :                     // That's Ok, cf this function's doc comment.
    1504            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1505            0 :                 } else {
    1506            0 :                     retained_pending_updates
    1507            0 :                         .entry(key)
    1508            0 :                         .or_default()
    1509            0 :                         .push((lsn, value));
    1510            0 :                 }
    1511              :             }
    1512              :         }
    1513              : 
    1514            0 :         self.pending_updates = retained_pending_updates;
    1515            0 : 
    1516            0 :         if pending_nblocks != 0 {
    1517            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1518            0 :             self.pending_nblocks = 0;
    1519            0 :         }
    1520              : 
    1521            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1522            0 :             writer.update_directory_entries_count(kind, count as u64);
    1523            0 :         }
    1524              : 
    1525            0 :         Ok(())
    1526       578076 :     }
    1527              : 
    1528              :     ///
    1529              :     /// Finish this atomic update, writing all the updated keys to the
    1530              :     /// underlying timeline.
    1531              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1532              :     ///
    1533      2096508 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1534      2096508 :         let writer = self.tline.writer().await;
    1535              : 
    1536      2096508 :         let pending_nblocks = self.pending_nblocks;
    1537      2096508 :         self.pending_nblocks = 0;
    1538      2096508 : 
    1539      2096508 :         if !self.pending_updates.is_empty() {
    1540      1674304 :             writer.put_batch(&self.pending_updates, ctx).await?;
    1541      1674303 :             self.pending_updates.clear();
    1542       422204 :         }
    1543              : 
    1544      2096507 :         if !self.pending_deletions.is_empty() {
    1545        19261 :             writer.delete_batch(&self.pending_deletions).await?;
    1546        19261 :             self.pending_deletions.clear();
    1547      2077246 :         }
    1548              : 
    1549      2096507 :         self.pending_lsns.push(self.lsn);
    1550     74789978 :         for pending_lsn in self.pending_lsns.drain(..) {
    1551     74789978 :             // Ideally, we should be able to call writer.finish_write() only once
    1552     74789978 :             // with the highest LSN. However, the last_record_lsn variable in the
    1553     74789978 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1554     74789978 :             // so we need to record every LSN to not leave a gap between them.
    1555     74789978 :             writer.finish_write(pending_lsn);
    1556     74789978 :         }
    1557              : 
    1558      2096507 :         if pending_nblocks != 0 {
    1559       640015 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1560      1456492 :         }
    1561              : 
    1562      2096507 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1563       730583 :             writer.update_directory_entries_count(kind, count as u64);
    1564       730583 :         }
    1565              : 
    1566      2096507 :         Ok(())
    1567      2096507 :     }
    1568              : 
    1569    145387682 :     pub(crate) fn len(&self) -> usize {
    1570    145387682 :         self.pending_updates.len() + self.pending_deletions.len()
    1571    145387682 :     }
    1572              : 
    1573              :     // Internal helper functions to batch the modifications
    1574              : 
    1575      3503511 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1576              :         // Have we already updated the same key? Read the latest pending updated
    1577              :         // version in that case.
    1578              :         //
    1579              :         // Note: we don't check pending_deletions. It is an error to request a
    1580              :         // value that has been removed, deletion only avoids leaking storage.
    1581      3503511 :         if let Some(values) = self.pending_updates.get(&key) {
    1582      2629426 :             if let Some((_, value)) = values.last() {
    1583      2629426 :                 return if let Value::Image(img) = value {
    1584      2629426 :                     Ok(img.clone())
    1585              :                 } else {
    1586              :                     // Currently, we never need to read back a WAL record that we
    1587              :                     // inserted in the same "transaction". All the metadata updates
    1588              :                     // work directly with Images, and we never need to read actual
    1589              :                     // data pages. We could handle this if we had to, by calling
    1590              :                     // the walredo manager, but let's keep it simple for now.
    1591            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1592            0 :                         "unexpected pending WAL record"
    1593            0 :                     )))
    1594              :                 };
    1595            0 :             }
    1596       874085 :         }
    1597       874085 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1598       874085 :         self.tline.get(key, lsn, ctx).await
    1599      3503511 :     }
    1600              : 
    1601     64123041 :     fn put(&mut self, key: Key, val: Value) {
    1602     64123041 :         let values = self.pending_updates.entry(key).or_default();
    1603              :         // Replace the previous value if it exists at the same lsn
    1604     64123041 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1605     53878203 :             if *last_lsn == self.lsn {
    1606       649168 :                 *last_value = val;
    1607       649168 :                 return;
    1608     53229035 :             }
    1609     10244838 :         }
    1610     63473873 :         values.push((self.lsn, val));
    1611     64123041 :     }
    1612              : 
    1613        67335 :     fn delete(&mut self, key_range: Range<Key>) {
    1614        67335 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1615        67335 :         self.pending_deletions.push((key_range, self.lsn));
    1616        67335 :     }
    1617              : }
    1618              : 
    1619              : /// This struct facilitates accessing either a committed key from the timeline at a
    1620              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1621              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1622              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1623              : /// need to look up the keys in the modification first before looking them up in the
    1624              : /// timeline to not miss the latest updates.
    1625            0 : #[derive(Clone, Copy)]
    1626              : pub enum Version<'a> {
    1627              :     Lsn(Lsn),
    1628              :     Modified(&'a DatadirModification<'a>),
    1629              : }
    1630              : 
    1631              : impl<'a> Version<'a> {
    1632      4938936 :     async fn get(
    1633      4938936 :         &self,
    1634      4938936 :         timeline: &Timeline,
    1635      4938936 :         key: Key,
    1636      4938936 :         ctx: &RequestContext,
    1637      4938936 :     ) -> Result<Bytes, PageReconstructError> {
    1638      4938936 :         match self {
    1639      4714370 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1640       224566 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1641              :         }
    1642      4938935 :     }
    1643              : 
    1644      5198553 :     fn get_lsn(&self) -> Lsn {
    1645      5198553 :         match self {
    1646      4713389 :             Version::Lsn(lsn) => *lsn,
    1647       485164 :             Version::Modified(modification) => modification.lsn,
    1648              :         }
    1649      5198553 :     }
    1650              : }
    1651              : 
    1652              : //--- Metadata structs stored in key-value pairs in the repository.
    1653              : 
    1654       657906 : #[derive(Debug, Serialize, Deserialize)]
    1655              : struct DbDirectory {
    1656              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1657              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1658              : }
    1659              : 
    1660         1512 : #[derive(Debug, Serialize, Deserialize)]
    1661              : struct TwoPhaseDirectory {
    1662              :     xids: HashSet<TransactionId>,
    1663              : }
    1664              : 
    1665      1441064 : #[derive(Debug, Serialize, Deserialize, Default)]
    1666              : struct RelDirectory {
    1667              :     // Set of relations that exist. (relfilenode, forknum)
    1668              :     //
    1669              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1670              :     // key-value pairs, if you have a lot of relations
    1671              :     rels: HashSet<(Oid, u8)>,
    1672              : }
    1673              : 
    1674         7679 : #[derive(Debug, Serialize, Deserialize, Default)]
    1675              : pub(crate) struct AuxFilesDirectory {
    1676              :     pub(crate) files: HashMap<String, Bytes>,
    1677              : }
    1678              : 
    1679              : impl AuxFilesDirectory {
    1680         1493 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    1681         1493 :         if let Some(value) = value {
    1682         1421 :             self.files.insert(key, value);
    1683         1421 :         } else {
    1684           72 :             self.files.remove(&key);
    1685           72 :         }
    1686         1493 :     }
    1687              : }
    1688              : 
    1689            0 : #[derive(Debug, Serialize, Deserialize)]
    1690              : struct RelSizeEntry {
    1691              :     nblocks: u32,
    1692              : }
    1693              : 
    1694        10971 : #[derive(Debug, Serialize, Deserialize, Default)]
    1695              : struct SlruSegmentDirectory {
    1696              :     // Set of SLRU segments that exist.
    1697              :     segments: HashSet<u32>,
    1698              : }
    1699              : 
    1700      1461166 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    1701              : #[repr(u8)]
    1702              : pub(crate) enum DirectoryKind {
    1703              :     Db,
    1704              :     TwoPhase,
    1705              :     Rel,
    1706              :     AuxFiles,
    1707              :     SlruSegment(SlruKind),
    1708              : }
    1709              : 
    1710              : impl DirectoryKind {
    1711              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    1712      1461166 :     pub(crate) fn offset(&self) -> usize {
    1713      1461166 :         self.into_usize()
    1714      1461166 :     }
    1715              : }
    1716              : 
    1717              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1718              : 
    1719              : #[allow(clippy::bool_assert_comparison)]
    1720              : #[cfg(test)]
    1721              : mod tests {
    1722              :     use hex_literal::hex;
    1723              :     use utils::id::TimelineId;
    1724              : 
    1725              :     use super::*;
    1726              : 
    1727              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    1728              : 
    1729              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    1730            2 :     #[tokio::test]
    1731            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    1732            2 :         let name = "aux_files_round_trip";
    1733            2 :         let harness = TenantHarness::create(name)?;
    1734            2 : 
    1735            2 :         pub const TIMELINE_ID: TimelineId =
    1736            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    1737            2 : 
    1738            2 :         let (tenant, ctx) = harness.load().await;
    1739            2 :         let tline = tenant
    1740            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    1741            2 :             .await?;
    1742            2 :         let tline = tline.raw_timeline().unwrap();
    1743            2 : 
    1744            2 :         // First modification: insert two keys
    1745            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    1746            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    1747            2 :         modification.set_lsn(Lsn(0x1008))?;
    1748            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    1749            2 :         modification.commit(&ctx).await?;
    1750            2 :         let expect_1008 = HashMap::from([
    1751            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    1752            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    1753            2 :         ]);
    1754            2 : 
    1755            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1756            2 :         assert_eq!(readback, expect_1008);
    1757            2 : 
    1758            2 :         // Second modification: update one key, remove the other
    1759            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    1760            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    1761            2 :         modification.set_lsn(Lsn(0x2008))?;
    1762            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    1763            2 :         modification.commit(&ctx).await?;
    1764            2 :         let expect_2008 =
    1765            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    1766            2 : 
    1767            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    1768            2 :         assert_eq!(readback, expect_2008);
    1769            2 : 
    1770            2 :         // Reading back in time works
    1771            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1772            2 :         assert_eq!(readback, expect_1008);
    1773            2 : 
    1774            2 :         Ok(())
    1775            2 :     }
    1776              : 
    1777              :     /*
    1778              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1779              :             let incremental = timeline.get_current_logical_size();
    1780              :             let non_incremental = timeline
    1781              :                 .get_current_logical_size_non_incremental(lsn)
    1782              :                 .unwrap();
    1783              :             assert_eq!(incremental, non_incremental);
    1784              :         }
    1785              :     */
    1786              : 
    1787              :     /*
    1788              :     ///
    1789              :     /// Test list_rels() function, with branches and dropped relations
    1790              :     ///
    1791              :     #[test]
    1792              :     fn test_list_rels_drop() -> Result<()> {
    1793              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1794              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1795              :         const TESTDB: u32 = 111;
    1796              : 
    1797              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1798              :         // after branching fails below
    1799              :         let mut writer = tline.begin_record(Lsn(0x10));
    1800              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1801              :         writer.finish()?;
    1802              : 
    1803              :         // Create a relation on the timeline
    1804              :         let mut writer = tline.begin_record(Lsn(0x20));
    1805              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1806              :         writer.finish()?;
    1807              : 
    1808              :         let writer = tline.begin_record(Lsn(0x00));
    1809              :         writer.finish()?;
    1810              : 
    1811              :         // Check that list_rels() lists it after LSN 2, but no before it
    1812              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1813              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1814              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1815              : 
    1816              :         // Create a branch, check that the relation is visible there
    1817              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1818              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1819              :             Some(timeline) => timeline,
    1820              :             None => panic!("Should have a local timeline"),
    1821              :         };
    1822              :         let newtline = DatadirTimelineImpl::new(newtline);
    1823              :         assert!(newtline
    1824              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1825              :             .contains(&TESTREL_A));
    1826              : 
    1827              :         // Drop it on the branch
    1828              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1829              :         new_writer.drop_relation(TESTREL_A)?;
    1830              :         new_writer.finish()?;
    1831              : 
    1832              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1833              :         assert!(newtline
    1834              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1835              :             .contains(&TESTREL_A));
    1836              :         assert!(!newtline
    1837              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1838              :             .contains(&TESTREL_A));
    1839              : 
    1840              :         // Run checkpoint and garbage collection and check that it's still not visible
    1841              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1842              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1843              : 
    1844              :         assert!(!newtline
    1845              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1846              :             .contains(&TESTREL_A));
    1847              : 
    1848              :         Ok(())
    1849              :     }
    1850              :      */
    1851              : 
    1852              :     /*
    1853              :     #[test]
    1854              :     fn test_read_beyond_eof() -> Result<()> {
    1855              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1856              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1857              : 
    1858              :         make_some_layers(&tline, Lsn(0x20))?;
    1859              :         let mut writer = tline.begin_record(Lsn(0x60));
    1860              :         walingest.put_rel_page_image(
    1861              :             &mut writer,
    1862              :             TESTREL_A,
    1863              :             0,
    1864              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1865              :         )?;
    1866              :         writer.finish()?;
    1867              : 
    1868              :         // Test read before rel creation. Should error out.
    1869              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1870              : 
    1871              :         // Read block beyond end of relation at different points in time.
    1872              :         // These reads should fall into different delta, image, and in-memory layers.
    1873              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1874              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1875              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1876              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1877              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1878              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1879              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1880              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1881              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1882              : 
    1883              :         // Test on an in-memory layer with no preceding layer
    1884              :         let mut writer = tline.begin_record(Lsn(0x70));
    1885              :         walingest.put_rel_page_image(
    1886              :             &mut writer,
    1887              :             TESTREL_B,
    1888              :             0,
    1889              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1890              :         )?;
    1891              :         writer.finish()?;
    1892              : 
    1893              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1894              : 
    1895              :         Ok(())
    1896              :     }
    1897              :      */
    1898              : }
        

Generated by: LCOV version 2.1-beta