LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 52.7 % 1164 613
Test Date: 2024-02-29 11:57:12 Functions: 37.3 % 217 81

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::repository::*;
      13              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use pageserver_api::key::{
      19              :     dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
      20              :     rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      21              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      22              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      23              : };
      24              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      25              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      26              : use postgres_ffi::BLCKSZ;
      27              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      28              : use serde::{Deserialize, Serialize};
      29              : use std::collections::{hash_map, HashMap, HashSet};
      30              : use std::ops::ControlFlow;
      31              : use std::ops::Range;
      32              : use strum::IntoEnumIterator;
      33              : use tokio_util::sync::CancellationToken;
      34              : use tracing::{debug, trace, warn};
      35              : use utils::bin_ser::DeserializeError;
      36              : use utils::{bin_ser::BeSer, lsn::Lsn};
      37              : 
      38              : const MAX_AUX_FILE_DELTAS: usize = 1024;
      39              : 
      40            0 : #[derive(Debug)]
      41              : pub enum LsnForTimestamp {
      42              :     /// Found commits both before and after the given timestamp
      43              :     Present(Lsn),
      44              : 
      45              :     /// Found no commits after the given timestamp, this means
      46              :     /// that the newest data in the branch is older than the given
      47              :     /// timestamp.
      48              :     ///
      49              :     /// All commits <= LSN happened before the given timestamp
      50              :     Future(Lsn),
      51              : 
      52              :     /// The queried timestamp is past our horizon we look back at (PITR)
      53              :     ///
      54              :     /// All commits > LSN happened after the given timestamp,
      55              :     /// but any commits < LSN might have happened before or after
      56              :     /// the given timestamp. We don't know because no data before
      57              :     /// the given lsn is available.
      58              :     Past(Lsn),
      59              : 
      60              :     /// We have found no commit with a timestamp,
      61              :     /// so we can't return anything meaningful.
      62              :     ///
      63              :     /// The associated LSN is the lower bound value we can safely
      64              :     /// create branches on, but no statement is made if it is
      65              :     /// older or newer than the timestamp.
      66              :     ///
      67              :     /// This variant can e.g. be returned right after a
      68              :     /// cluster import.
      69              :     NoData(Lsn),
      70              : }
      71              : 
      72            0 : #[derive(Debug, thiserror::Error)]
      73              : pub enum CalculateLogicalSizeError {
      74              :     #[error("cancelled")]
      75              :     Cancelled,
      76              :     #[error(transparent)]
      77              :     Other(#[from] anyhow::Error),
      78              : }
      79              : 
      80            0 : #[derive(Debug, thiserror::Error)]
      81              : pub(crate) enum CollectKeySpaceError {
      82              :     #[error(transparent)]
      83              :     Decode(#[from] DeserializeError),
      84              :     #[error(transparent)]
      85              :     PageRead(PageReconstructError),
      86              :     #[error("cancelled")]
      87              :     Cancelled,
      88              : }
      89              : 
      90              : impl From<PageReconstructError> for CollectKeySpaceError {
      91            0 :     fn from(err: PageReconstructError) -> Self {
      92            0 :         match err {
      93            0 :             PageReconstructError::Cancelled => Self::Cancelled,
      94            0 :             err => Self::PageRead(err),
      95              :         }
      96            0 :     }
      97              : }
      98              : 
      99              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     100            0 :     fn from(pre: PageReconstructError) -> Self {
     101            0 :         match pre {
     102              :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
     103            0 :                 Self::Cancelled
     104              :             }
     105            0 :             _ => Self::Other(pre.into()),
     106              :         }
     107            0 :     }
     108              : }
     109              : 
     110            0 : #[derive(Debug, thiserror::Error)]
     111              : pub enum RelationError {
     112              :     #[error("Relation Already Exists")]
     113              :     AlreadyExists,
     114              :     #[error("invalid relnode")]
     115              :     InvalidRelnode,
     116              :     #[error(transparent)]
     117              :     Other(#[from] anyhow::Error),
     118              : }
     119              : 
     120              : ///
     121              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     122              : /// and other special kinds of files, in a versioned key-value store. The
     123              : /// Timeline struct provides the key-value store.
     124              : ///
     125              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     126              : /// implementation, and might be moved into a separate struct later.
     127              : impl Timeline {
     128              :     /// Start ingesting a WAL record, or other atomic modification of
     129              :     /// the timeline.
     130              :     ///
     131              :     /// This provides a transaction-like interface to perform a bunch
     132              :     /// of modifications atomically.
     133              :     ///
     134              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     135              :     /// DatadirModification object. Use the functions in the object to
     136              :     /// modify the repository state, updating all the pages and metadata
     137              :     /// that the WAL record affects. When you're done, call commit() to
     138              :     /// commit the changes.
     139              :     ///
     140              :     /// Lsn stored in modification is advanced by `ingest_record` and
     141              :     /// is used by `commit()` to update `last_record_lsn`.
     142              :     ///
     143              :     /// Calling commit() will flush all the changes and reset the state,
     144              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     145              :     ///
     146              :     /// Note that any pending modifications you make through the
     147              :     /// modification object won't be visible to calls to the 'get' and list
     148              :     /// functions of the timeline until you finish! And if you update the
     149              :     /// same page twice, the last update wins.
     150              :     ///
     151       268272 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     152       268272 :     where
     153       268272 :         Self: Sized,
     154       268272 :     {
     155       268272 :         DatadirModification {
     156       268272 :             tline: self,
     157       268272 :             pending_lsns: Vec::new(),
     158       268272 :             pending_updates: HashMap::new(),
     159       268272 :             pending_deletions: Vec::new(),
     160       268272 :             pending_nblocks: 0,
     161       268272 :             pending_directory_entries: Vec::new(),
     162       268272 :             lsn,
     163       268272 :         }
     164       268272 :     }
     165              : 
     166              :     //------------------------------------------------------------------------------
     167              :     // Public GET functions
     168              :     //------------------------------------------------------------------------------
     169              : 
     170              :     /// Look up given page version.
     171        18384 :     pub(crate) async fn get_rel_page_at_lsn(
     172        18384 :         &self,
     173        18384 :         tag: RelTag,
     174        18384 :         blknum: BlockNumber,
     175        18384 :         version: Version<'_>,
     176        18384 :         latest: bool,
     177        18384 :         ctx: &RequestContext,
     178        18384 :     ) -> Result<Bytes, PageReconstructError> {
     179        18384 :         if tag.relnode == 0 {
     180            0 :             return Err(PageReconstructError::Other(
     181            0 :                 RelationError::InvalidRelnode.into(),
     182            0 :             ));
     183        18384 :         }
     184              : 
     185        18384 :         let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
     186        18384 :         if blknum >= nblocks {
     187            0 :             debug!(
     188            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     189            0 :                 tag,
     190            0 :                 blknum,
     191            0 :                 version.get_lsn(),
     192            0 :                 nblocks
     193            0 :             );
     194            0 :             return Ok(ZERO_PAGE.clone());
     195        18384 :         }
     196        18384 : 
     197        18384 :         let key = rel_block_to_key(tag, blknum);
     198        18384 :         version.get(self, key, ctx).await
     199        18384 :     }
     200              : 
     201              :     // Get size of a database in blocks
     202            0 :     pub(crate) async fn get_db_size(
     203            0 :         &self,
     204            0 :         spcnode: Oid,
     205            0 :         dbnode: Oid,
     206            0 :         version: Version<'_>,
     207            0 :         latest: bool,
     208            0 :         ctx: &RequestContext,
     209            0 :     ) -> Result<usize, PageReconstructError> {
     210            0 :         let mut total_blocks = 0;
     211              : 
     212            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     213              : 
     214            0 :         for rel in rels {
     215            0 :             let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
     216            0 :             total_blocks += n_blocks as usize;
     217              :         }
     218            0 :         Ok(total_blocks)
     219            0 :     }
     220              : 
     221              :     /// Get size of a relation file
     222        24434 :     pub(crate) async fn get_rel_size(
     223        24434 :         &self,
     224        24434 :         tag: RelTag,
     225        24434 :         version: Version<'_>,
     226        24434 :         latest: bool,
     227        24434 :         ctx: &RequestContext,
     228        24434 :     ) -> Result<BlockNumber, PageReconstructError> {
     229        24434 :         if tag.relnode == 0 {
     230            0 :             return Err(PageReconstructError::Other(
     231            0 :                 RelationError::InvalidRelnode.into(),
     232            0 :             ));
     233        24434 :         }
     234              : 
     235        24434 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     236        19294 :             return Ok(nblocks);
     237         5140 :         }
     238         5140 : 
     239         5140 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     240            0 :             && !self.get_rel_exists(tag, version, latest, ctx).await?
     241              :         {
     242              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     243              :             // FSM, and smgrnblocks() on it immediately afterwards,
     244              :             // without extending it.  Tolerate that by claiming that
     245              :             // any non-existent FSM fork has size 0.
     246            0 :             return Ok(0);
     247         5140 :         }
     248         5140 : 
     249         5140 :         let key = rel_size_to_key(tag);
     250         5140 :         let mut buf = version.get(self, key, ctx).await?;
     251         5136 :         let nblocks = buf.get_u32_le();
     252         5136 : 
     253         5136 :         if latest {
     254            0 :             // Update relation size cache only if "latest" flag is set.
     255            0 :             // This flag is set by compute when it is working with most recent version of relation.
     256            0 :             // Typically master compute node always set latest=true.
     257            0 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     258            0 :             // latest=true, then it can not cause cache corruption, because with latest=true
     259            0 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     260            0 :             // associated with most recent value of LSN.
     261            0 :             self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     262         5136 :         }
     263         5136 :         Ok(nblocks)
     264        24434 :     }
     265              : 
     266              :     /// Does relation exist?
     267         6050 :     pub(crate) async fn get_rel_exists(
     268         6050 :         &self,
     269         6050 :         tag: RelTag,
     270         6050 :         version: Version<'_>,
     271         6050 :         _latest: bool,
     272         6050 :         ctx: &RequestContext,
     273         6050 :     ) -> Result<bool, PageReconstructError> {
     274         6050 :         if tag.relnode == 0 {
     275            0 :             return Err(PageReconstructError::Other(
     276            0 :                 RelationError::InvalidRelnode.into(),
     277            0 :             ));
     278         6050 :         }
     279              : 
     280              :         // first try to lookup relation in cache
     281         6050 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     282         6032 :             return Ok(true);
     283           18 :         }
     284           18 :         // fetch directory listing
     285           18 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     286           18 :         let buf = version.get(self, key, ctx).await?;
     287              : 
     288           18 :         match RelDirectory::des(&buf).context("deserialization failure") {
     289           18 :             Ok(dir) => {
     290           18 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     291           18 :                 Ok(exists)
     292              :             }
     293            0 :             Err(e) => Err(PageReconstructError::from(e)),
     294              :         }
     295         6050 :     }
     296              : 
     297              :     /// Get a list of all existing relations in given tablespace and database.
     298              :     ///
     299              :     /// # Cancel-Safety
     300              :     ///
     301              :     /// This method is cancellation-safe.
     302            0 :     pub(crate) async fn list_rels(
     303            0 :         &self,
     304            0 :         spcnode: Oid,
     305            0 :         dbnode: Oid,
     306            0 :         version: Version<'_>,
     307            0 :         ctx: &RequestContext,
     308            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     309            0 :         // fetch directory listing
     310            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     311            0 :         let buf = version.get(self, key, ctx).await?;
     312              : 
     313            0 :         match RelDirectory::des(&buf).context("deserialization failure") {
     314            0 :             Ok(dir) => {
     315            0 :                 let rels: HashSet<RelTag> =
     316            0 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     317            0 :                         spcnode,
     318            0 :                         dbnode,
     319            0 :                         relnode: *relnode,
     320            0 :                         forknum: *forknum,
     321            0 :                     }));
     322            0 : 
     323            0 :                 Ok(rels)
     324              :             }
     325            0 :             Err(e) => Err(PageReconstructError::from(e)),
     326              :         }
     327            0 :     }
     328              : 
     329              :     /// Get the whole SLRU segment
     330            0 :     pub(crate) async fn get_slru_segment(
     331            0 :         &self,
     332            0 :         kind: SlruKind,
     333            0 :         segno: u32,
     334            0 :         lsn: Lsn,
     335            0 :         ctx: &RequestContext,
     336            0 :     ) -> Result<Bytes, PageReconstructError> {
     337            0 :         let n_blocks = self
     338            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     339            0 :             .await?;
     340            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     341            0 :         for blkno in 0..n_blocks {
     342            0 :             let block = self
     343            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     344            0 :                 .await?;
     345            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     346              :         }
     347            0 :         Ok(segment.freeze())
     348            0 :     }
     349              : 
     350              :     /// Look up given SLRU page version.
     351            0 :     pub(crate) async fn get_slru_page_at_lsn(
     352            0 :         &self,
     353            0 :         kind: SlruKind,
     354            0 :         segno: u32,
     355            0 :         blknum: BlockNumber,
     356            0 :         lsn: Lsn,
     357            0 :         ctx: &RequestContext,
     358            0 :     ) -> Result<Bytes, PageReconstructError> {
     359            0 :         let key = slru_block_to_key(kind, segno, blknum);
     360            0 :         self.get(key, lsn, ctx).await
     361            0 :     }
     362              : 
     363              :     /// Get size of an SLRU segment
     364            0 :     pub(crate) async fn get_slru_segment_size(
     365            0 :         &self,
     366            0 :         kind: SlruKind,
     367            0 :         segno: u32,
     368            0 :         version: Version<'_>,
     369            0 :         ctx: &RequestContext,
     370            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     371            0 :         let key = slru_segment_size_to_key(kind, segno);
     372            0 :         let mut buf = version.get(self, key, ctx).await?;
     373            0 :         Ok(buf.get_u32_le())
     374            0 :     }
     375              : 
     376              :     /// Get size of an SLRU segment
     377            0 :     pub(crate) async fn get_slru_segment_exists(
     378            0 :         &self,
     379            0 :         kind: SlruKind,
     380            0 :         segno: u32,
     381            0 :         version: Version<'_>,
     382            0 :         ctx: &RequestContext,
     383            0 :     ) -> Result<bool, PageReconstructError> {
     384            0 :         // fetch directory listing
     385            0 :         let key = slru_dir_to_key(kind);
     386            0 :         let buf = version.get(self, key, ctx).await?;
     387              : 
     388            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     389            0 :             Ok(dir) => {
     390            0 :                 let exists = dir.segments.get(&segno).is_some();
     391            0 :                 Ok(exists)
     392              :             }
     393            0 :             Err(e) => Err(PageReconstructError::from(e)),
     394              :         }
     395            0 :     }
     396              : 
     397              :     /// Locate LSN, such that all transactions that committed before
     398              :     /// 'search_timestamp' are visible, but nothing newer is.
     399              :     ///
     400              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     401              :     /// so it's not well defined which LSN you get if there were multiple commits
     402              :     /// "in flight" at that point in time.
     403              :     ///
     404            0 :     pub(crate) async fn find_lsn_for_timestamp(
     405            0 :         &self,
     406            0 :         search_timestamp: TimestampTz,
     407            0 :         cancel: &CancellationToken,
     408            0 :         ctx: &RequestContext,
     409            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     410            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     411            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     412            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     413            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     414            0 :         // on the safe side.
     415            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     416            0 :         let max_lsn = self.get_last_record_lsn();
     417            0 : 
     418            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     419            0 :         // LSN divided by 8.
     420            0 :         let mut low = min_lsn.0 / 8;
     421            0 :         let mut high = max_lsn.0 / 8 + 1;
     422            0 : 
     423            0 :         let mut found_smaller = false;
     424            0 :         let mut found_larger = false;
     425            0 :         while low < high {
     426            0 :             if cancel.is_cancelled() {
     427            0 :                 return Err(PageReconstructError::Cancelled);
     428            0 :             }
     429            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     430            0 :             let mid = (high + low) / 2;
     431              : 
     432            0 :             let cmp = self
     433            0 :                 .is_latest_commit_timestamp_ge_than(
     434            0 :                     search_timestamp,
     435            0 :                     Lsn(mid * 8),
     436            0 :                     &mut found_smaller,
     437            0 :                     &mut found_larger,
     438            0 :                     ctx,
     439            0 :                 )
     440            0 :                 .await?;
     441              : 
     442            0 :             if cmp {
     443            0 :                 high = mid;
     444            0 :             } else {
     445            0 :                 low = mid + 1;
     446            0 :             }
     447              :         }
     448              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     449              :         // so the LSN of the last commit record before or at `search_timestamp`.
     450              :         // Remove one from `low` to get `t`.
     451              :         //
     452              :         // FIXME: it would be better to get the LSN of the previous commit.
     453              :         // Otherwise, if you restore to the returned LSN, the database will
     454              :         // include physical changes from later commits that will be marked
     455              :         // as aborted, and will need to be vacuumed away.
     456            0 :         let commit_lsn = Lsn((low - 1) * 8);
     457            0 :         match (found_smaller, found_larger) {
     458              :             (false, false) => {
     459              :                 // This can happen if no commit records have been processed yet, e.g.
     460              :                 // just after importing a cluster.
     461            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     462              :             }
     463              :             (false, true) => {
     464              :                 // Didn't find any commit timestamps smaller than the request
     465            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     466              :             }
     467              :             (true, false) => {
     468              :                 // Only found commits with timestamps smaller than the request.
     469              :                 // It's still a valid case for branch creation, return it.
     470              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     471              :                 // case, anyway.
     472            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     473              :             }
     474            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     475              :         }
     476            0 :     }
     477              : 
     478              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     479              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     480              :     ///
     481              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     482              :     /// with a smaller/larger timestamp.
     483              :     ///
     484            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     485            0 :         &self,
     486            0 :         search_timestamp: TimestampTz,
     487            0 :         probe_lsn: Lsn,
     488            0 :         found_smaller: &mut bool,
     489            0 :         found_larger: &mut bool,
     490            0 :         ctx: &RequestContext,
     491            0 :     ) -> Result<bool, PageReconstructError> {
     492            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     493            0 :             if timestamp >= search_timestamp {
     494            0 :                 *found_larger = true;
     495            0 :                 return ControlFlow::Break(true);
     496            0 :             } else {
     497            0 :                 *found_smaller = true;
     498            0 :             }
     499            0 :             ControlFlow::Continue(())
     500            0 :         })
     501            0 :         .await
     502            0 :     }
     503              : 
     504              :     /// Obtain the possible timestamp range for the given lsn.
     505              :     ///
     506              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     507            0 :     pub(crate) async fn get_timestamp_for_lsn(
     508            0 :         &self,
     509            0 :         probe_lsn: Lsn,
     510            0 :         ctx: &RequestContext,
     511            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     512            0 :         let mut max: Option<TimestampTz> = None;
     513            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     514            0 :             if let Some(max_prev) = max {
     515            0 :                 max = Some(max_prev.max(timestamp));
     516            0 :             } else {
     517            0 :                 max = Some(timestamp);
     518            0 :             }
     519            0 :             ControlFlow::Continue(())
     520            0 :         })
     521            0 :         .await?;
     522              : 
     523            0 :         Ok(max)
     524            0 :     }
     525              : 
     526              :     /// Runs the given function on all the timestamps for a given lsn
     527              :     ///
     528              :     /// The return value is either given by the closure, or set to the `Default`
     529              :     /// impl's output.
     530            0 :     async fn map_all_timestamps<T: Default>(
     531            0 :         &self,
     532            0 :         probe_lsn: Lsn,
     533            0 :         ctx: &RequestContext,
     534            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     535            0 :     ) -> Result<T, PageReconstructError> {
     536            0 :         for segno in self
     537            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     538            0 :             .await?
     539              :         {
     540            0 :             let nblocks = self
     541            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     542            0 :                 .await?;
     543            0 :             for blknum in (0..nblocks).rev() {
     544            0 :                 let clog_page = self
     545            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     546            0 :                     .await?;
     547              : 
     548            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     549            0 :                     let mut timestamp_bytes = [0u8; 8];
     550            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     551            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     552            0 : 
     553            0 :                     match f(timestamp) {
     554            0 :                         ControlFlow::Break(b) => return Ok(b),
     555            0 :                         ControlFlow::Continue(()) => (),
     556              :                     }
     557            0 :                 }
     558              :             }
     559              :         }
     560            0 :         Ok(Default::default())
     561            0 :     }
     562              : 
     563            0 :     pub(crate) async fn get_slru_keyspace(
     564            0 :         &self,
     565            0 :         version: Version<'_>,
     566            0 :         ctx: &RequestContext,
     567            0 :     ) -> Result<KeySpace, PageReconstructError> {
     568            0 :         let mut accum = KeySpaceAccum::new();
     569              : 
     570            0 :         for kind in SlruKind::iter() {
     571            0 :             let mut segments: Vec<u32> = self
     572            0 :                 .list_slru_segments(kind, version, ctx)
     573            0 :                 .await?
     574            0 :                 .into_iter()
     575            0 :                 .collect();
     576            0 :             segments.sort_unstable();
     577              : 
     578            0 :             for seg in segments {
     579            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     580              : 
     581            0 :                 accum.add_range(
     582            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     583            0 :                 );
     584              :             }
     585              :         }
     586              : 
     587            0 :         Ok(accum.to_keyspace())
     588            0 :     }
     589              : 
     590              :     /// Get a list of SLRU segments
     591            0 :     pub(crate) async fn list_slru_segments(
     592            0 :         &self,
     593            0 :         kind: SlruKind,
     594            0 :         version: Version<'_>,
     595            0 :         ctx: &RequestContext,
     596            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     597            0 :         // fetch directory entry
     598            0 :         let key = slru_dir_to_key(kind);
     599              : 
     600            0 :         let buf = version.get(self, key, ctx).await?;
     601            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     602            0 :             Ok(dir) => Ok(dir.segments),
     603            0 :             Err(e) => Err(PageReconstructError::from(e)),
     604              :         }
     605            0 :     }
     606              : 
     607            0 :     pub(crate) async fn get_relmap_file(
     608            0 :         &self,
     609            0 :         spcnode: Oid,
     610            0 :         dbnode: Oid,
     611            0 :         version: Version<'_>,
     612            0 :         ctx: &RequestContext,
     613            0 :     ) -> Result<Bytes, PageReconstructError> {
     614            0 :         let key = relmap_file_key(spcnode, dbnode);
     615              : 
     616            0 :         let buf = version.get(self, key, ctx).await?;
     617            0 :         Ok(buf)
     618            0 :     }
     619              : 
     620            0 :     pub(crate) async fn list_dbdirs(
     621            0 :         &self,
     622            0 :         lsn: Lsn,
     623            0 :         ctx: &RequestContext,
     624            0 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     625              :         // fetch directory entry
     626            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     627              : 
     628            0 :         match DbDirectory::des(&buf).context("deserialization failure") {
     629            0 :             Ok(dir) => Ok(dir.dbdirs),
     630            0 :             Err(e) => Err(PageReconstructError::from(e)),
     631              :         }
     632            0 :     }
     633              : 
     634            0 :     pub(crate) async fn get_twophase_file(
     635            0 :         &self,
     636            0 :         xid: TransactionId,
     637            0 :         lsn: Lsn,
     638            0 :         ctx: &RequestContext,
     639            0 :     ) -> Result<Bytes, PageReconstructError> {
     640            0 :         let key = twophase_file_key(xid);
     641            0 :         let buf = self.get(key, lsn, ctx).await?;
     642            0 :         Ok(buf)
     643            0 :     }
     644              : 
     645            0 :     pub(crate) async fn list_twophase_files(
     646            0 :         &self,
     647            0 :         lsn: Lsn,
     648            0 :         ctx: &RequestContext,
     649            0 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     650              :         // fetch directory entry
     651            0 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     652              : 
     653            0 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     654            0 :             Ok(dir) => Ok(dir.xids),
     655            0 :             Err(e) => Err(PageReconstructError::from(e)),
     656              :         }
     657            0 :     }
     658              : 
     659            0 :     pub(crate) async fn get_control_file(
     660            0 :         &self,
     661            0 :         lsn: Lsn,
     662            0 :         ctx: &RequestContext,
     663            0 :     ) -> Result<Bytes, PageReconstructError> {
     664            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     665            0 :     }
     666              : 
     667           12 :     pub(crate) async fn get_checkpoint(
     668           12 :         &self,
     669           12 :         lsn: Lsn,
     670           12 :         ctx: &RequestContext,
     671           12 :     ) -> Result<Bytes, PageReconstructError> {
     672           12 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     673           12 :     }
     674              : 
     675            6 :     pub(crate) async fn list_aux_files(
     676            6 :         &self,
     677            6 :         lsn: Lsn,
     678            6 :         ctx: &RequestContext,
     679            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     680            6 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     681            6 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     682            6 :                 Ok(dir) => Ok(dir.files),
     683            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     684              :             },
     685            0 :             Err(e) => {
     686              :                 // This is expected: historical databases do not have the key.
     687            0 :                 debug!("Failed to get info about AUX files: {}", e);
     688            0 :                 Ok(HashMap::new())
     689              :             }
     690              :         }
     691            6 :     }
     692              : 
     693              :     /// Does the same as get_current_logical_size but counted on demand.
     694              :     /// Used to initialize the logical size tracking on startup.
     695              :     ///
     696              :     /// Only relation blocks are counted currently. That excludes metadata,
     697              :     /// SLRUs, twophase files etc.
     698              :     ///
     699              :     /// # Cancel-Safety
     700              :     ///
     701              :     /// This method is cancellation-safe.
     702            0 :     pub async fn get_current_logical_size_non_incremental(
     703            0 :         &self,
     704            0 :         lsn: Lsn,
     705            0 :         ctx: &RequestContext,
     706            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     707            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     708              : 
     709              :         // Fetch list of database dirs and iterate them
     710            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     711            0 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     712              : 
     713            0 :         let mut total_size: u64 = 0;
     714            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     715            0 :             for rel in self
     716            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     717            0 :                 .await?
     718              :             {
     719            0 :                 if self.cancel.is_cancelled() {
     720            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     721            0 :                 }
     722            0 :                 let relsize_key = rel_size_to_key(rel);
     723            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     724            0 :                 let relsize = buf.get_u32_le();
     725            0 : 
     726            0 :                 total_size += relsize as u64;
     727              :             }
     728              :         }
     729            0 :         Ok(total_size * BLCKSZ as u64)
     730            0 :     }
     731              : 
     732              :     ///
     733              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     734              :     /// Anything that's not listed maybe removed from the underlying storage (from
     735              :     /// that LSN forwards).
     736          174 :     pub(crate) async fn collect_keyspace(
     737          174 :         &self,
     738          174 :         lsn: Lsn,
     739          174 :         ctx: &RequestContext,
     740          174 :     ) -> Result<KeySpace, CollectKeySpaceError> {
     741          174 :         // Iterate through key ranges, greedily packing them into partitions
     742          174 :         let mut result = KeySpaceAccum::new();
     743          174 : 
     744          174 :         // The dbdir metadata always exists
     745          174 :         result.add_key(DBDIR_KEY);
     746              : 
     747              :         // Fetch list of database dirs and iterate them
     748         2205 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     749          174 :         let dbdir = DbDirectory::des(&buf)?;
     750              : 
     751          174 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     752          174 :         dbs.sort_unstable();
     753          174 :         for (spcnode, dbnode) in dbs {
     754            0 :             result.add_key(relmap_file_key(spcnode, dbnode));
     755            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     756              : 
     757            0 :             let mut rels: Vec<RelTag> = self
     758            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     759            0 :                 .await?
     760            0 :                 .into_iter()
     761            0 :                 .collect();
     762            0 :             rels.sort_unstable();
     763            0 :             for rel in rels {
     764            0 :                 let relsize_key = rel_size_to_key(rel);
     765            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     766            0 :                 let relsize = buf.get_u32_le();
     767            0 : 
     768            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     769            0 :                 result.add_key(relsize_key);
     770              :             }
     771              :         }
     772              : 
     773              :         // Iterate SLRUs next
     774          522 :         for kind in [
     775          174 :             SlruKind::Clog,
     776          174 :             SlruKind::MultiXactMembers,
     777          174 :             SlruKind::MultiXactOffsets,
     778              :         ] {
     779          522 :             let slrudir_key = slru_dir_to_key(kind);
     780          522 :             result.add_key(slrudir_key);
     781         7978 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     782          522 :             let dir = SlruSegmentDirectory::des(&buf)?;
     783          522 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     784          522 :             segments.sort_unstable();
     785          522 :             for segno in segments {
     786            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     787            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     788            0 :                 let segsize = buf.get_u32_le();
     789            0 : 
     790            0 :                 result.add_range(
     791            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     792            0 :                 );
     793            0 :                 result.add_key(segsize_key);
     794              :             }
     795              :         }
     796              : 
     797              :         // Then pg_twophase
     798          174 :         result.add_key(TWOPHASEDIR_KEY);
     799         3050 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     800          174 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     801          174 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     802          174 :         xids.sort_unstable();
     803          174 :         for xid in xids {
     804            0 :             result.add_key(twophase_file_key(xid));
     805            0 :         }
     806              : 
     807          174 :         result.add_key(CONTROLFILE_KEY);
     808          174 :         result.add_key(CHECKPOINT_KEY);
     809          174 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     810           74 :             result.add_key(AUX_FILES_KEY);
     811          100 :         }
     812          174 :         Ok(result.to_keyspace())
     813          174 :     }
     814              : 
     815              :     /// Get cached size of relation if it not updated after specified LSN
     816       448540 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     817       448540 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     818       448540 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     819       448518 :             if lsn >= *cached_lsn {
     820       443372 :                 return Some(*nblocks);
     821         5146 :             }
     822           22 :         }
     823         5168 :         None
     824       448540 :     }
     825              : 
     826              :     /// Update cached relation size if there is no more recent update
     827            0 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     828            0 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     829            0 :         match rel_size_cache.entry(tag) {
     830            0 :             hash_map::Entry::Occupied(mut entry) => {
     831            0 :                 let cached_lsn = entry.get_mut();
     832            0 :                 if lsn >= cached_lsn.0 {
     833            0 :                     *cached_lsn = (lsn, nblocks);
     834            0 :                 }
     835              :             }
     836            0 :             hash_map::Entry::Vacant(entry) => {
     837            0 :                 entry.insert((lsn, nblocks));
     838            0 :             }
     839              :         }
     840            0 :     }
     841              : 
     842              :     /// Store cached relation size
     843       288732 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     844       288732 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     845       288732 :         rel_size_cache.insert(tag, (lsn, nblocks));
     846       288732 :     }
     847              : 
     848              :     /// Remove cached relation size
     849            2 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     850            2 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     851            2 :         rel_size_cache.remove(tag);
     852            2 :     }
     853              : }
     854              : 
     855              : /// DatadirModification represents an operation to ingest an atomic set of
     856              : /// updates to the repository. It is created by the 'begin_record'
     857              : /// function. It is called for each WAL record, so that all the modifications
     858              : /// by a one WAL record appear atomic.
     859              : pub struct DatadirModification<'a> {
     860              :     /// The timeline this modification applies to. You can access this to
     861              :     /// read the state, but note that any pending updates are *not* reflected
     862              :     /// in the state in 'tline' yet.
     863              :     pub tline: &'a Timeline,
     864              : 
     865              :     /// Current LSN of the modification
     866              :     lsn: Lsn,
     867              : 
     868              :     // The modifications are not applied directly to the underlying key-value store.
     869              :     // The put-functions add the modifications here, and they are flushed to the
     870              :     // underlying key-value store by the 'finish' function.
     871              :     pending_lsns: Vec<Lsn>,
     872              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     873              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     874              :     pending_nblocks: i64,
     875              : 
     876              :     /// For special "directory" keys that store key-value maps, track the size of the map
     877              :     /// if it was updated in this modification.
     878              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
     879              : }
     880              : 
     881              : impl<'a> DatadirModification<'a> {
     882              :     /// Get the current lsn
     883       418056 :     pub(crate) fn get_lsn(&self) -> Lsn {
     884       418056 :         self.lsn
     885       418056 :     }
     886              : 
     887              :     /// Set the current lsn
     888       145856 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     889       145856 :         ensure!(
     890       145856 :             lsn >= self.lsn,
     891            0 :             "setting an older lsn {} than {} is not allowed",
     892              :             lsn,
     893              :             self.lsn
     894              :         );
     895       145856 :         if lsn > self.lsn {
     896       145856 :             self.pending_lsns.push(self.lsn);
     897       145856 :             self.lsn = lsn;
     898       145856 :         }
     899       145856 :         Ok(())
     900       145856 :     }
     901              : 
     902              :     /// Initialize a completely new repository.
     903              :     ///
     904              :     /// This inserts the directory metadata entries that are assumed to
     905              :     /// always exist.
     906           76 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     907           76 :         let buf = DbDirectory::ser(&DbDirectory {
     908           76 :             dbdirs: HashMap::new(),
     909           76 :         })?;
     910           76 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
     911           76 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     912           76 : 
     913           76 :         // Create AuxFilesDirectory
     914           76 :         self.init_aux_dir()?;
     915              : 
     916           76 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     917           76 :             xids: HashSet::new(),
     918           76 :         })?;
     919           76 :         self.pending_directory_entries
     920           76 :             .push((DirectoryKind::TwoPhase, 0));
     921           76 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     922              : 
     923           76 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     924           76 :         let empty_dir = Value::Image(buf);
     925           76 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     926           76 :         self.pending_directory_entries
     927           76 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     928           76 :         self.put(
     929           76 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     930           76 :             empty_dir.clone(),
     931           76 :         );
     932           76 :         self.pending_directory_entries
     933           76 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     934           76 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     935           76 :         self.pending_directory_entries
     936           76 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
     937           76 : 
     938           76 :         Ok(())
     939           76 :     }
     940              : 
     941              :     #[cfg(test)]
     942           74 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     943           74 :         self.init_empty()?;
     944           74 :         self.put_control_file(bytes::Bytes::from_static(
     945           74 :             b"control_file contents do not matter",
     946           74 :         ))
     947           74 :         .context("put_control_file")?;
     948           74 :         self.put_checkpoint(bytes::Bytes::from_static(
     949           74 :             b"checkpoint_file contents do not matter",
     950           74 :         ))
     951           74 :         .context("put_checkpoint_file")?;
     952           74 :         Ok(())
     953           74 :     }
     954              : 
     955              :     /// Put a new page version that can be constructed from a WAL record
     956              :     ///
     957              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     958              :     /// current end-of-file. It's up to the caller to check that the relation size
     959              :     /// matches the blocks inserted!
     960       145630 :     pub fn put_rel_wal_record(
     961       145630 :         &mut self,
     962       145630 :         rel: RelTag,
     963       145630 :         blknum: BlockNumber,
     964       145630 :         rec: NeonWalRecord,
     965       145630 :     ) -> anyhow::Result<()> {
     966       145630 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     967       145630 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     968       145630 :         Ok(())
     969       145630 :     }
     970              : 
     971              :     // Same, but for an SLRU.
     972            8 :     pub fn put_slru_wal_record(
     973            8 :         &mut self,
     974            8 :         kind: SlruKind,
     975            8 :         segno: u32,
     976            8 :         blknum: BlockNumber,
     977            8 :         rec: NeonWalRecord,
     978            8 :     ) -> anyhow::Result<()> {
     979            8 :         self.put(
     980            8 :             slru_block_to_key(kind, segno, blknum),
     981            8 :             Value::WalRecord(rec),
     982            8 :         );
     983            8 :         Ok(())
     984            8 :     }
     985              : 
     986              :     /// Like put_wal_record, but with ready-made image of the page.
     987       280864 :     pub fn put_rel_page_image(
     988       280864 :         &mut self,
     989       280864 :         rel: RelTag,
     990       280864 :         blknum: BlockNumber,
     991       280864 :         img: Bytes,
     992       280864 :     ) -> anyhow::Result<()> {
     993       280864 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     994       280864 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     995       280864 :         Ok(())
     996       280864 :     }
     997              : 
     998            6 :     pub fn put_slru_page_image(
     999            6 :         &mut self,
    1000            6 :         kind: SlruKind,
    1001            6 :         segno: u32,
    1002            6 :         blknum: BlockNumber,
    1003            6 :         img: Bytes,
    1004            6 :     ) -> anyhow::Result<()> {
    1005            6 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
    1006            6 :         Ok(())
    1007            6 :     }
    1008              : 
    1009              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1010           16 :     pub async fn put_relmap_file(
    1011           16 :         &mut self,
    1012           16 :         spcnode: Oid,
    1013           16 :         dbnode: Oid,
    1014           16 :         img: Bytes,
    1015           16 :         ctx: &RequestContext,
    1016           16 :     ) -> anyhow::Result<()> {
    1017              :         // Add it to the directory (if it doesn't exist already)
    1018           16 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1019           16 :         let mut dbdir = DbDirectory::des(&buf)?;
    1020              : 
    1021           16 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1022           16 :         if r.is_none() || r == Some(false) {
    1023              :             // The dbdir entry didn't exist, or it contained a
    1024              :             // 'false'. The 'insert' call already updated it with
    1025              :             // 'true', now write the updated 'dbdirs' map back.
    1026           16 :             let buf = DbDirectory::ser(&dbdir)?;
    1027           16 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1028           16 : 
    1029           16 :             // Create AuxFilesDirectory as well
    1030           16 :             self.init_aux_dir()?;
    1031            0 :         }
    1032           16 :         if r.is_none() {
    1033            8 :             // Create RelDirectory
    1034            8 :             let buf = RelDirectory::ser(&RelDirectory {
    1035            8 :                 rels: HashSet::new(),
    1036            8 :             })?;
    1037            8 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1038            8 :             self.put(
    1039            8 :                 rel_dir_to_key(spcnode, dbnode),
    1040            8 :                 Value::Image(Bytes::from(buf)),
    1041            8 :             );
    1042            8 :         }
    1043              : 
    1044           16 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1045           16 :         Ok(())
    1046           16 :     }
    1047              : 
    1048            0 :     pub async fn put_twophase_file(
    1049            0 :         &mut self,
    1050            0 :         xid: TransactionId,
    1051            0 :         img: Bytes,
    1052            0 :         ctx: &RequestContext,
    1053            0 :     ) -> anyhow::Result<()> {
    1054              :         // Add it to the directory entry
    1055            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1056            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1057            0 :         if !dir.xids.insert(xid) {
    1058            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1059            0 :         }
    1060            0 :         self.pending_directory_entries
    1061            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1062            0 :         self.put(
    1063            0 :             TWOPHASEDIR_KEY,
    1064            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1065              :         );
    1066              : 
    1067            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1068            0 :         Ok(())
    1069            0 :     }
    1070              : 
    1071           76 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1072           76 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1073           76 :         Ok(())
    1074           76 :     }
    1075              : 
    1076           90 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1077           90 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1078           90 :         Ok(())
    1079           90 :     }
    1080              : 
    1081            0 :     pub async fn drop_dbdir(
    1082            0 :         &mut self,
    1083            0 :         spcnode: Oid,
    1084            0 :         dbnode: Oid,
    1085            0 :         ctx: &RequestContext,
    1086            0 :     ) -> anyhow::Result<()> {
    1087            0 :         let total_blocks = self
    1088            0 :             .tline
    1089            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
    1090            0 :             .await?;
    1091              : 
    1092              :         // Remove entry from dbdir
    1093            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1094            0 :         let mut dir = DbDirectory::des(&buf)?;
    1095            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1096            0 :             let buf = DbDirectory::ser(&dir)?;
    1097            0 :             self.pending_directory_entries
    1098            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1099            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1100              :         } else {
    1101            0 :             warn!(
    1102            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1103            0 :                 spcnode, dbnode
    1104            0 :             );
    1105              :         }
    1106              : 
    1107              :         // Update logical database size.
    1108            0 :         self.pending_nblocks -= total_blocks as i64;
    1109            0 : 
    1110            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1111            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1112            0 :         Ok(())
    1113            0 :     }
    1114              : 
    1115              :     /// Create a relation fork.
    1116              :     ///
    1117              :     /// 'nblocks' is the initial size.
    1118         1920 :     pub async fn put_rel_creation(
    1119         1920 :         &mut self,
    1120         1920 :         rel: RelTag,
    1121         1920 :         nblocks: BlockNumber,
    1122         1920 :         ctx: &RequestContext,
    1123         1920 :     ) -> Result<(), RelationError> {
    1124         1920 :         if rel.relnode == 0 {
    1125            0 :             return Err(RelationError::InvalidRelnode);
    1126         1920 :         }
    1127              :         // It's possible that this is the first rel for this db in this
    1128              :         // tablespace.  Create the reldir entry for it if so.
    1129         1920 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1130         1920 :             .context("deserialize db")?;
    1131         1920 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1132         1920 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
    1133              :             // Didn't exist. Update dbdir
    1134            8 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
    1135            8 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1136            8 :             self.pending_directory_entries
    1137            8 :                 .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1138            8 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1139            8 : 
    1140            8 :             // and create the RelDirectory
    1141            8 :             RelDirectory::default()
    1142              :         } else {
    1143              :             // reldir already exists, fetch it
    1144         1912 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1145         1912 :                 .context("deserialize db")?
    1146              :         };
    1147              : 
    1148              :         // Add the new relation to the rel directory entry, and write it back
    1149         1920 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1150            0 :             return Err(RelationError::AlreadyExists);
    1151         1920 :         }
    1152         1920 : 
    1153         1920 :         self.pending_directory_entries
    1154         1920 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1155         1920 : 
    1156         1920 :         self.put(
    1157         1920 :             rel_dir_key,
    1158         1920 :             Value::Image(Bytes::from(
    1159         1920 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1160              :             )),
    1161              :         );
    1162              : 
    1163              :         // Put size
    1164         1920 :         let size_key = rel_size_to_key(rel);
    1165         1920 :         let buf = nblocks.to_le_bytes();
    1166         1920 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1167         1920 : 
    1168         1920 :         self.pending_nblocks += nblocks as i64;
    1169         1920 : 
    1170         1920 :         // Update relation size cache
    1171         1920 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1172         1920 : 
    1173         1920 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1174         1920 :         // caller.
    1175         1920 :         Ok(())
    1176         1920 :     }
    1177              : 
    1178              :     /// Truncate relation
    1179         6012 :     pub async fn put_rel_truncation(
    1180         6012 :         &mut self,
    1181         6012 :         rel: RelTag,
    1182         6012 :         nblocks: BlockNumber,
    1183         6012 :         ctx: &RequestContext,
    1184         6012 :     ) -> anyhow::Result<()> {
    1185         6012 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1186         6012 :         if self
    1187         6012 :             .tline
    1188         6012 :             .get_rel_exists(rel, Version::Modified(self), true, ctx)
    1189            0 :             .await?
    1190              :         {
    1191         6012 :             let size_key = rel_size_to_key(rel);
    1192              :             // Fetch the old size first
    1193         6012 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1194         6012 : 
    1195         6012 :             // Update the entry with the new size.
    1196         6012 :             let buf = nblocks.to_le_bytes();
    1197         6012 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1198         6012 : 
    1199         6012 :             // Update relation size cache
    1200         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1201         6012 : 
    1202         6012 :             // Update relation size cache
    1203         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1204         6012 : 
    1205         6012 :             // Update logical database size.
    1206         6012 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1207            0 :         }
    1208         6012 :         Ok(())
    1209         6012 :     }
    1210              : 
    1211              :     /// Extend relation
    1212              :     /// If new size is smaller, do nothing.
    1213       276680 :     pub async fn put_rel_extend(
    1214       276680 :         &mut self,
    1215       276680 :         rel: RelTag,
    1216       276680 :         nblocks: BlockNumber,
    1217       276680 :         ctx: &RequestContext,
    1218       276680 :     ) -> anyhow::Result<()> {
    1219       276680 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1220              : 
    1221              :         // Put size
    1222       276680 :         let size_key = rel_size_to_key(rel);
    1223       276680 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1224       276680 : 
    1225       276680 :         // only extend relation here. never decrease the size
    1226       276680 :         if nblocks > old_size {
    1227       274788 :             let buf = nblocks.to_le_bytes();
    1228       274788 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1229       274788 : 
    1230       274788 :             // Update relation size cache
    1231       274788 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1232       274788 : 
    1233       274788 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1234       274788 :         }
    1235       276680 :         Ok(())
    1236       276680 :     }
    1237              : 
    1238              :     /// Drop a relation.
    1239            2 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1240            2 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1241              : 
    1242              :         // Remove it from the directory entry
    1243            2 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1244            2 :         let buf = self.get(dir_key, ctx).await?;
    1245            2 :         let mut dir = RelDirectory::des(&buf)?;
    1246              : 
    1247            2 :         self.pending_directory_entries
    1248            2 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1249            2 : 
    1250            2 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1251            2 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1252              :         } else {
    1253            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1254              :         }
    1255              : 
    1256              :         // update logical size
    1257            2 :         let size_key = rel_size_to_key(rel);
    1258            2 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1259            2 :         self.pending_nblocks -= old_size as i64;
    1260            2 : 
    1261            2 :         // Remove enty from relation size cache
    1262            2 :         self.tline.remove_cached_rel_size(&rel);
    1263            2 : 
    1264            2 :         // Delete size entry, as well as all blocks
    1265            2 :         self.delete(rel_key_range(rel));
    1266            2 : 
    1267            2 :         Ok(())
    1268            2 :     }
    1269              : 
    1270            6 :     pub async fn put_slru_segment_creation(
    1271            6 :         &mut self,
    1272            6 :         kind: SlruKind,
    1273            6 :         segno: u32,
    1274            6 :         nblocks: BlockNumber,
    1275            6 :         ctx: &RequestContext,
    1276            6 :     ) -> anyhow::Result<()> {
    1277            6 :         // Add it to the directory entry
    1278            6 :         let dir_key = slru_dir_to_key(kind);
    1279            6 :         let buf = self.get(dir_key, ctx).await?;
    1280            6 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1281              : 
    1282            6 :         if !dir.segments.insert(segno) {
    1283            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1284            6 :         }
    1285            6 :         self.pending_directory_entries
    1286            6 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1287            6 :         self.put(
    1288            6 :             dir_key,
    1289            6 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1290              :         );
    1291              : 
    1292              :         // Put size
    1293            6 :         let size_key = slru_segment_size_to_key(kind, segno);
    1294            6 :         let buf = nblocks.to_le_bytes();
    1295            6 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1296            6 : 
    1297            6 :         // even if nblocks > 0, we don't insert any actual blocks here
    1298            6 : 
    1299            6 :         Ok(())
    1300            6 :     }
    1301              : 
    1302              :     /// Extend SLRU segment
    1303            0 :     pub fn put_slru_extend(
    1304            0 :         &mut self,
    1305            0 :         kind: SlruKind,
    1306            0 :         segno: u32,
    1307            0 :         nblocks: BlockNumber,
    1308            0 :     ) -> anyhow::Result<()> {
    1309            0 :         // Put size
    1310            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1311            0 :         let buf = nblocks.to_le_bytes();
    1312            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1313            0 :         Ok(())
    1314            0 :     }
    1315              : 
    1316              :     /// This method is used for marking truncated SLRU files
    1317            0 :     pub async fn drop_slru_segment(
    1318            0 :         &mut self,
    1319            0 :         kind: SlruKind,
    1320            0 :         segno: u32,
    1321            0 :         ctx: &RequestContext,
    1322            0 :     ) -> anyhow::Result<()> {
    1323            0 :         // Remove it from the directory entry
    1324            0 :         let dir_key = slru_dir_to_key(kind);
    1325            0 :         let buf = self.get(dir_key, ctx).await?;
    1326            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1327              : 
    1328            0 :         if !dir.segments.remove(&segno) {
    1329            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1330            0 :         }
    1331            0 :         self.pending_directory_entries
    1332            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1333            0 :         self.put(
    1334            0 :             dir_key,
    1335            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1336              :         );
    1337              : 
    1338              :         // Delete size entry, as well as all blocks
    1339            0 :         self.delete(slru_segment_key_range(kind, segno));
    1340            0 : 
    1341            0 :         Ok(())
    1342            0 :     }
    1343              : 
    1344              :     /// Drop a relmapper file (pg_filenode.map)
    1345            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1346            0 :         // TODO
    1347            0 :         Ok(())
    1348            0 :     }
    1349              : 
    1350              :     /// This method is used for marking truncated SLRU files
    1351            0 :     pub async fn drop_twophase_file(
    1352            0 :         &mut self,
    1353            0 :         xid: TransactionId,
    1354            0 :         ctx: &RequestContext,
    1355            0 :     ) -> anyhow::Result<()> {
    1356              :         // Remove it from the directory entry
    1357            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1358            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1359              : 
    1360            0 :         if !dir.xids.remove(&xid) {
    1361            0 :             warn!("twophase file for xid {} does not exist", xid);
    1362            0 :         }
    1363            0 :         self.pending_directory_entries
    1364            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1365            0 :         self.put(
    1366            0 :             TWOPHASEDIR_KEY,
    1367            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1368              :         );
    1369              : 
    1370              :         // Delete it
    1371            0 :         self.delete(twophase_key_range(xid));
    1372            0 : 
    1373            0 :         Ok(())
    1374            0 :     }
    1375              : 
    1376           92 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1377           92 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1378           92 :             files: HashMap::new(),
    1379           92 :         })?;
    1380           92 :         self.pending_directory_entries
    1381           92 :             .push((DirectoryKind::AuxFiles, 0));
    1382           92 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1383           92 :         Ok(())
    1384           92 :     }
    1385              : 
    1386            8 :     pub async fn put_file(
    1387            8 :         &mut self,
    1388            8 :         path: &str,
    1389            8 :         content: &[u8],
    1390            8 :         ctx: &RequestContext,
    1391            8 :     ) -> anyhow::Result<()> {
    1392            8 :         let file_path = path.to_string();
    1393            8 :         let content = if content.is_empty() {
    1394            2 :             None
    1395              :         } else {
    1396            6 :             Some(Bytes::copy_from_slice(content))
    1397              :         };
    1398              : 
    1399              :         let n_files;
    1400            8 :         let mut aux_files = self.tline.aux_files.lock().await;
    1401            8 :         if let Some(mut dir) = aux_files.dir.take() {
    1402              :             // We already updated aux files in `self`: emit a delta and update our latest value
    1403            6 :             dir.upsert(file_path.clone(), content.clone());
    1404            6 :             n_files = dir.files.len();
    1405            6 :             if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
    1406            0 :                 self.put(
    1407            0 :                     AUX_FILES_KEY,
    1408            0 :                     Value::Image(Bytes::from(
    1409            0 :                         AuxFilesDirectory::ser(&dir).context("serialize")?,
    1410              :                     )),
    1411              :                 );
    1412            0 :                 aux_files.n_deltas = 0;
    1413            6 :             } else {
    1414            6 :                 self.put(
    1415            6 :                     AUX_FILES_KEY,
    1416            6 :                     Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
    1417            6 :                 );
    1418            6 :                 aux_files.n_deltas += 1;
    1419            6 :             }
    1420            6 :             aux_files.dir = Some(dir);
    1421              :         } else {
    1422              :             // Check if the AUX_FILES_KEY is initialized
    1423            2 :             match self.get(AUX_FILES_KEY, ctx).await {
    1424            0 :                 Ok(dir_bytes) => {
    1425            0 :                     let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1426              :                     // Key is already set, we may append a delta
    1427            0 :                     self.put(
    1428            0 :                         AUX_FILES_KEY,
    1429            0 :                         Value::WalRecord(NeonWalRecord::AuxFile {
    1430            0 :                             file_path: file_path.clone(),
    1431            0 :                             content: content.clone(),
    1432            0 :                         }),
    1433            0 :                     );
    1434            0 :                     dir.upsert(file_path, content);
    1435            0 :                     n_files = dir.files.len();
    1436            0 :                     aux_files.dir = Some(dir);
    1437              :                 }
    1438              :                 Err(
    1439            0 :                     e @ (PageReconstructError::AncestorStopping(_)
    1440              :                     | PageReconstructError::Cancelled
    1441              :                     | PageReconstructError::AncestorLsnTimeout(_)),
    1442              :                 ) => {
    1443              :                     // Important that we do not interpret a shutdown error as "not found" and thereby
    1444              :                     // reset the map.
    1445            0 :                     return Err(e.into());
    1446              :                 }
    1447              :                 // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
    1448              :                 // we are assuming that all _other_ possible errors represents a missing key.  If some
    1449              :                 // other error occurs, we may incorrectly reset the map of aux files.
    1450              :                 Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
    1451              :                     // Key is missing, we must insert an image as the basis for subsequent deltas.
    1452              : 
    1453            2 :                     let mut dir = AuxFilesDirectory {
    1454            2 :                         files: HashMap::new(),
    1455            2 :                     };
    1456            2 :                     dir.upsert(file_path, content);
    1457            2 :                     self.put(
    1458            2 :                         AUX_FILES_KEY,
    1459            2 :                         Value::Image(Bytes::from(
    1460            2 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1461              :                         )),
    1462              :                     );
    1463            2 :                     n_files = 1;
    1464            2 :                     aux_files.dir = Some(dir);
    1465              :                 }
    1466              :             }
    1467              :         }
    1468              : 
    1469            8 :         self.pending_directory_entries
    1470            8 :             .push((DirectoryKind::AuxFiles, n_files));
    1471            8 : 
    1472            8 :         Ok(())
    1473            8 :     }
    1474              : 
    1475              :     ///
    1476              :     /// Flush changes accumulated so far to the underlying repository.
    1477              :     ///
    1478              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1479              :     /// you to flush them to the underlying repository before the final `commit`.
    1480              :     /// That allows to free up the memory used to hold the pending changes.
    1481              :     ///
    1482              :     /// Currently only used during bulk import of a data directory. In that
    1483              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1484              :     /// whole import fails and the timeline will be deleted anyway.
    1485              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1486              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1487              :     ///
    1488              :     /// Note: A consequence of flushing the pending operations is that they
    1489              :     /// won't be visible to subsequent operations until `commit`. The function
    1490              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1491              :     /// for bulk import, where you are just loading data pages and won't try to
    1492              :     /// modify the same pages twice.
    1493         1930 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1494         1930 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1495         1930 :         // to scan through the pending_updates list.
    1496         1930 :         let pending_nblocks = self.pending_nblocks;
    1497         1930 :         if pending_nblocks < 10000 {
    1498         1930 :             return Ok(());
    1499            0 :         }
    1500              : 
    1501            0 :         let writer = self.tline.writer().await;
    1502              : 
    1503              :         // Flush relation and  SLRU data blocks, keep metadata.
    1504            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1505            0 :         for (key, values) in self.pending_updates.drain() {
    1506            0 :             for (lsn, value) in values {
    1507            0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1508              :                     // This bails out on first error without modifying pending_updates.
    1509              :                     // That's Ok, cf this function's doc comment.
    1510            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1511            0 :                 } else {
    1512            0 :                     retained_pending_updates
    1513            0 :                         .entry(key)
    1514            0 :                         .or_default()
    1515            0 :                         .push((lsn, value));
    1516            0 :                 }
    1517              :             }
    1518              :         }
    1519              : 
    1520            0 :         self.pending_updates = retained_pending_updates;
    1521            0 : 
    1522            0 :         if pending_nblocks != 0 {
    1523            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1524            0 :             self.pending_nblocks = 0;
    1525            0 :         }
    1526              : 
    1527            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1528            0 :             writer.update_directory_entries_count(kind, count as u64);
    1529            0 :         }
    1530              : 
    1531            0 :         Ok(())
    1532         1930 :     }
    1533              : 
    1534              :     ///
    1535              :     /// Finish this atomic update, writing all the updated keys to the
    1536              :     /// underlying timeline.
    1537              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1538              :     ///
    1539       742956 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1540       742956 :         let writer = self.tline.writer().await;
    1541              : 
    1542       742956 :         let pending_nblocks = self.pending_nblocks;
    1543       742956 :         self.pending_nblocks = 0;
    1544       742956 : 
    1545       742956 :         if !self.pending_updates.is_empty() {
    1546       413940 :             writer.put_batch(&self.pending_updates, ctx).await?;
    1547       413940 :             self.pending_updates.clear();
    1548       329016 :         }
    1549              : 
    1550       742956 :         if !self.pending_deletions.is_empty() {
    1551            2 :             writer.delete_batch(&self.pending_deletions).await?;
    1552            2 :             self.pending_deletions.clear();
    1553       742954 :         }
    1554              : 
    1555       742956 :         self.pending_lsns.push(self.lsn);
    1556       888812 :         for pending_lsn in self.pending_lsns.drain(..) {
    1557       888812 :             // Ideally, we should be able to call writer.finish_write() only once
    1558       888812 :             // with the highest LSN. However, the last_record_lsn variable in the
    1559       888812 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1560       888812 :             // so we need to record every LSN to not leave a gap between them.
    1561       888812 :             writer.finish_write(pending_lsn);
    1562       888812 :         }
    1563              : 
    1564       742956 :         if pending_nblocks != 0 {
    1565       270570 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1566       472386 :         }
    1567              : 
    1568       742956 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1569         2424 :             writer.update_directory_entries_count(kind, count as u64);
    1570         2424 :         }
    1571              : 
    1572       742956 :         Ok(())
    1573       742956 :     }
    1574              : 
    1575       291704 :     pub(crate) fn len(&self) -> usize {
    1576       291704 :         self.pending_updates.len() + self.pending_deletions.len()
    1577       291704 :     }
    1578              : 
    1579              :     // Internal helper functions to batch the modifications
    1580              : 
    1581       286562 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1582              :         // Have we already updated the same key? Read the latest pending updated
    1583              :         // version in that case.
    1584              :         //
    1585              :         // Note: we don't check pending_deletions. It is an error to request a
    1586              :         // value that has been removed, deletion only avoids leaking storage.
    1587       286562 :         if let Some(values) = self.pending_updates.get(&key) {
    1588        15928 :             if let Some((_, value)) = values.last() {
    1589        15928 :                 return if let Value::Image(img) = value {
    1590        15928 :                     Ok(img.clone())
    1591              :                 } else {
    1592              :                     // Currently, we never need to read back a WAL record that we
    1593              :                     // inserted in the same "transaction". All the metadata updates
    1594              :                     // work directly with Images, and we never need to read actual
    1595              :                     // data pages. We could handle this if we had to, by calling
    1596              :                     // the walredo manager, but let's keep it simple for now.
    1597            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1598            0 :                         "unexpected pending WAL record"
    1599            0 :                     )))
    1600              :                 };
    1601            0 :             }
    1602       270634 :         }
    1603       270634 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1604       270634 :         self.tline.get(key, lsn, ctx).await
    1605       286562 :     }
    1606              : 
    1607       711856 :     fn put(&mut self, key: Key, val: Value) {
    1608       711856 :         let values = self.pending_updates.entry(key).or_default();
    1609              :         // Replace the previous value if it exists at the same lsn
    1610       711856 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1611        12164 :             if *last_lsn == self.lsn {
    1612        12160 :                 *last_value = val;
    1613        12160 :                 return;
    1614            4 :             }
    1615       699692 :         }
    1616       699696 :         values.push((self.lsn, val));
    1617       711856 :     }
    1618              : 
    1619            2 :     fn delete(&mut self, key_range: Range<Key>) {
    1620            2 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1621            2 :         self.pending_deletions.push((key_range, self.lsn));
    1622            2 :     }
    1623              : }
    1624              : 
    1625              : /// This struct facilitates accessing either a committed key from the timeline at a
    1626              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1627              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1628              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1629              : /// need to look up the keys in the modification first before looking them up in the
    1630              : /// timeline to not miss the latest updates.
    1631            0 : #[derive(Clone, Copy)]
    1632              : pub enum Version<'a> {
    1633              :     Lsn(Lsn),
    1634              :     Modified(&'a DatadirModification<'a>),
    1635              : }
    1636              : 
    1637              : impl<'a> Version<'a> {
    1638        23542 :     async fn get(
    1639        23542 :         &self,
    1640        23542 :         timeline: &Timeline,
    1641        23542 :         key: Key,
    1642        23542 :         ctx: &RequestContext,
    1643        23542 :     ) -> Result<Bytes, PageReconstructError> {
    1644        23542 :         match self {
    1645        23532 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1646           10 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1647              :         }
    1648        23542 :     }
    1649              : 
    1650        30484 :     fn get_lsn(&self) -> Lsn {
    1651        30484 :         match self {
    1652        24438 :             Version::Lsn(lsn) => *lsn,
    1653         6046 :             Version::Modified(modification) => modification.lsn,
    1654              :         }
    1655        30484 :     }
    1656              : }
    1657              : 
    1658              : //--- Metadata structs stored in key-value pairs in the repository.
    1659              : 
    1660         2110 : #[derive(Debug, Serialize, Deserialize)]
    1661              : struct DbDirectory {
    1662              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1663              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1664              : }
    1665              : 
    1666          174 : #[derive(Debug, Serialize, Deserialize)]
    1667              : struct TwoPhaseDirectory {
    1668              :     xids: HashSet<TransactionId>,
    1669              : }
    1670              : 
    1671         3860 : #[derive(Debug, Serialize, Deserialize, Default)]
    1672              : struct RelDirectory {
    1673              :     // Set of relations that exist. (relfilenode, forknum)
    1674              :     //
    1675              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1676              :     // key-value pairs, if you have a lot of relations
    1677              :     rels: HashSet<(Oid, u8)>,
    1678              : }
    1679              : 
    1680          208 : #[derive(Debug, Serialize, Deserialize, Default)]
    1681              : pub(crate) struct AuxFilesDirectory {
    1682              :     pub(crate) files: HashMap<String, Bytes>,
    1683              : }
    1684              : 
    1685              : impl AuxFilesDirectory {
    1686           24 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    1687           24 :         if let Some(value) = value {
    1688           18 :             self.files.insert(key, value);
    1689           18 :         } else {
    1690            6 :             self.files.remove(&key);
    1691            6 :         }
    1692           24 :     }
    1693              : }
    1694              : 
    1695            0 : #[derive(Debug, Serialize, Deserialize)]
    1696              : struct RelSizeEntry {
    1697              :     nblocks: u32,
    1698              : }
    1699              : 
    1700          528 : #[derive(Debug, Serialize, Deserialize, Default)]
    1701              : struct SlruSegmentDirectory {
    1702              :     // Set of SLRU segments that exist.
    1703              :     segments: HashSet<u32>,
    1704              : }
    1705              : 
    1706         4848 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    1707              : #[repr(u8)]
    1708              : pub(crate) enum DirectoryKind {
    1709              :     Db,
    1710              :     TwoPhase,
    1711              :     Rel,
    1712              :     AuxFiles,
    1713              :     SlruSegment(SlruKind),
    1714              : }
    1715              : 
    1716              : impl DirectoryKind {
    1717              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    1718         4848 :     pub(crate) fn offset(&self) -> usize {
    1719         4848 :         self.into_usize()
    1720         4848 :     }
    1721              : }
    1722              : 
    1723              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1724              : 
    1725              : #[allow(clippy::bool_assert_comparison)]
    1726              : #[cfg(test)]
    1727              : mod tests {
    1728              :     use hex_literal::hex;
    1729              :     use utils::id::TimelineId;
    1730              : 
    1731              :     use super::*;
    1732              : 
    1733              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    1734              : 
    1735              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    1736            2 :     #[tokio::test]
    1737            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    1738            2 :         let name = "aux_files_round_trip";
    1739            2 :         let harness = TenantHarness::create(name)?;
    1740            2 : 
    1741            2 :         pub const TIMELINE_ID: TimelineId =
    1742            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    1743            2 : 
    1744            2 :         let (tenant, ctx) = harness.load().await;
    1745            2 :         let tline = tenant
    1746            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    1747            2 :             .await?;
    1748            2 :         let tline = tline.raw_timeline().unwrap();
    1749            2 : 
    1750            2 :         // First modification: insert two keys
    1751            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    1752            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    1753            2 :         modification.set_lsn(Lsn(0x1008))?;
    1754            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    1755            2 :         modification.commit(&ctx).await?;
    1756            2 :         let expect_1008 = HashMap::from([
    1757            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    1758            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    1759            2 :         ]);
    1760            2 : 
    1761            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1762            2 :         assert_eq!(readback, expect_1008);
    1763            2 : 
    1764            2 :         // Second modification: update one key, remove the other
    1765            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    1766            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    1767            2 :         modification.set_lsn(Lsn(0x2008))?;
    1768            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    1769            2 :         modification.commit(&ctx).await?;
    1770            2 :         let expect_2008 =
    1771            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    1772            2 : 
    1773            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    1774            2 :         assert_eq!(readback, expect_2008);
    1775            2 : 
    1776            2 :         // Reading back in time works
    1777            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1778            2 :         assert_eq!(readback, expect_1008);
    1779            2 : 
    1780            2 :         Ok(())
    1781            2 :     }
    1782              : 
    1783              :     /*
    1784              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1785              :             let incremental = timeline.get_current_logical_size();
    1786              :             let non_incremental = timeline
    1787              :                 .get_current_logical_size_non_incremental(lsn)
    1788              :                 .unwrap();
    1789              :             assert_eq!(incremental, non_incremental);
    1790              :         }
    1791              :     */
    1792              : 
    1793              :     /*
    1794              :     ///
    1795              :     /// Test list_rels() function, with branches and dropped relations
    1796              :     ///
    1797              :     #[test]
    1798              :     fn test_list_rels_drop() -> Result<()> {
    1799              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1800              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1801              :         const TESTDB: u32 = 111;
    1802              : 
    1803              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1804              :         // after branching fails below
    1805              :         let mut writer = tline.begin_record(Lsn(0x10));
    1806              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1807              :         writer.finish()?;
    1808              : 
    1809              :         // Create a relation on the timeline
    1810              :         let mut writer = tline.begin_record(Lsn(0x20));
    1811              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1812              :         writer.finish()?;
    1813              : 
    1814              :         let writer = tline.begin_record(Lsn(0x00));
    1815              :         writer.finish()?;
    1816              : 
    1817              :         // Check that list_rels() lists it after LSN 2, but no before it
    1818              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1819              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1820              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1821              : 
    1822              :         // Create a branch, check that the relation is visible there
    1823              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1824              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1825              :             Some(timeline) => timeline,
    1826              :             None => panic!("Should have a local timeline"),
    1827              :         };
    1828              :         let newtline = DatadirTimelineImpl::new(newtline);
    1829              :         assert!(newtline
    1830              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1831              :             .contains(&TESTREL_A));
    1832              : 
    1833              :         // Drop it on the branch
    1834              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1835              :         new_writer.drop_relation(TESTREL_A)?;
    1836              :         new_writer.finish()?;
    1837              : 
    1838              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1839              :         assert!(newtline
    1840              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1841              :             .contains(&TESTREL_A));
    1842              :         assert!(!newtline
    1843              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1844              :             .contains(&TESTREL_A));
    1845              : 
    1846              :         // Run checkpoint and garbage collection and check that it's still not visible
    1847              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1848              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1849              : 
    1850              :         assert!(!newtline
    1851              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1852              :             .contains(&TESTREL_A));
    1853              : 
    1854              :         Ok(())
    1855              :     }
    1856              :      */
    1857              : 
    1858              :     /*
    1859              :     #[test]
    1860              :     fn test_read_beyond_eof() -> Result<()> {
    1861              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1862              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1863              : 
    1864              :         make_some_layers(&tline, Lsn(0x20))?;
    1865              :         let mut writer = tline.begin_record(Lsn(0x60));
    1866              :         walingest.put_rel_page_image(
    1867              :             &mut writer,
    1868              :             TESTREL_A,
    1869              :             0,
    1870              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1871              :         )?;
    1872              :         writer.finish()?;
    1873              : 
    1874              :         // Test read before rel creation. Should error out.
    1875              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1876              : 
    1877              :         // Read block beyond end of relation at different points in time.
    1878              :         // These reads should fall into different delta, image, and in-memory layers.
    1879              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1880              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1881              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1882              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1883              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1884              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1885              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1886              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1887              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1888              : 
    1889              :         // Test on an in-memory layer with no preceding layer
    1890              :         let mut writer = tline.begin_record(Lsn(0x70));
    1891              :         walingest.put_rel_page_image(
    1892              :             &mut writer,
    1893              :             TESTREL_B,
    1894              :             0,
    1895              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1896              :         )?;
    1897              :         writer.finish()?;
    1898              : 
    1899              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1900              : 
    1901              :         Ok(())
    1902              :     }
    1903              :      */
    1904              : }
        

Generated by: LCOV version 2.1-beta