LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 54.2 % 1167 633
Test Date: 2024-02-23 13:21:27 Functions: 38.2 % 220 84

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::repository::*;
      13              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use itertools::Itertools;
      19              : use pageserver_api::key::{
      20              :     dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
      21              :     rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      22              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      23              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      24              : };
      25              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      26              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      27              : use postgres_ffi::BLCKSZ;
      28              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      29              : use serde::{Deserialize, Serialize};
      30              : use std::collections::{hash_map, HashMap, HashSet};
      31              : use std::ops::ControlFlow;
      32              : use std::ops::Range;
      33              : use strum::IntoEnumIterator;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::{debug, trace, warn};
      36              : use utils::bin_ser::DeserializeError;
      37              : use utils::{bin_ser::BeSer, lsn::Lsn};
      38              : 
      39            0 : #[derive(Debug)]
      40              : pub enum LsnForTimestamp {
      41              :     /// Found commits both before and after the given timestamp
      42              :     Present(Lsn),
      43              : 
      44              :     /// Found no commits after the given timestamp, this means
      45              :     /// that the newest data in the branch is older than the given
      46              :     /// timestamp.
      47              :     ///
      48              :     /// All commits <= LSN happened before the given timestamp
      49              :     Future(Lsn),
      50              : 
      51              :     /// The queried timestamp is past our horizon we look back at (PITR)
      52              :     ///
      53              :     /// All commits > LSN happened after the given timestamp,
      54              :     /// but any commits < LSN might have happened before or after
      55              :     /// the given timestamp. We don't know because no data before
      56              :     /// the given lsn is available.
      57              :     Past(Lsn),
      58              : 
      59              :     /// We have found no commit with a timestamp,
      60              :     /// so we can't return anything meaningful.
      61              :     ///
      62              :     /// The associated LSN is the lower bound value we can safely
      63              :     /// create branches on, but no statement is made if it is
      64              :     /// older or newer than the timestamp.
      65              :     ///
      66              :     /// This variant can e.g. be returned right after a
      67              :     /// cluster import.
      68              :     NoData(Lsn),
      69              : }
      70              : 
      71            0 : #[derive(Debug, thiserror::Error)]
      72              : pub enum CalculateLogicalSizeError {
      73              :     #[error("cancelled")]
      74              :     Cancelled,
      75              :     #[error(transparent)]
      76              :     Other(#[from] anyhow::Error),
      77              : }
      78              : 
      79            0 : #[derive(Debug, thiserror::Error)]
      80              : pub(crate) enum CollectKeySpaceError {
      81              :     #[error(transparent)]
      82              :     Decode(#[from] DeserializeError),
      83              :     #[error(transparent)]
      84              :     PageRead(PageReconstructError),
      85              :     #[error("cancelled")]
      86              :     Cancelled,
      87              : }
      88              : 
      89              : impl From<PageReconstructError> for CollectKeySpaceError {
      90            0 :     fn from(err: PageReconstructError) -> Self {
      91            0 :         match err {
      92            0 :             PageReconstructError::Cancelled => Self::Cancelled,
      93            0 :             err => Self::PageRead(err),
      94              :         }
      95            0 :     }
      96              : }
      97              : 
      98              : impl From<PageReconstructError> for CalculateLogicalSizeError {
      99            0 :     fn from(pre: PageReconstructError) -> Self {
     100            0 :         match pre {
     101              :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
     102            0 :                 Self::Cancelled
     103              :             }
     104            0 :             _ => Self::Other(pre.into()),
     105              :         }
     106            0 :     }
     107              : }
     108              : 
     109            0 : #[derive(Debug, thiserror::Error)]
     110              : pub enum RelationError {
     111              :     #[error("Relation Already Exists")]
     112              :     AlreadyExists,
     113              :     #[error("invalid relnode")]
     114              :     InvalidRelnode,
     115              :     #[error(transparent)]
     116              :     Other(#[from] anyhow::Error),
     117              : }
     118              : 
     119              : ///
     120              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     121              : /// and other special kinds of files, in a versioned key-value store. The
     122              : /// Timeline struct provides the key-value store.
     123              : ///
     124              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     125              : /// implementation, and might be moved into a separate struct later.
     126              : impl Timeline {
     127              :     /// Start ingesting a WAL record, or other atomic modification of
     128              :     /// the timeline.
     129              :     ///
     130              :     /// This provides a transaction-like interface to perform a bunch
     131              :     /// of modifications atomically.
     132              :     ///
     133              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     134              :     /// DatadirModification object. Use the functions in the object to
     135              :     /// modify the repository state, updating all the pages and metadata
     136              :     /// that the WAL record affects. When you're done, call commit() to
     137              :     /// commit the changes.
     138              :     ///
     139              :     /// Lsn stored in modification is advanced by `ingest_record` and
     140              :     /// is used by `commit()` to update `last_record_lsn`.
     141              :     ///
     142              :     /// Calling commit() will flush all the changes and reset the state,
     143              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     144              :     ///
     145              :     /// Note that any pending modifications you make through the
     146              :     /// modification object won't be visible to calls to the 'get' and list
     147              :     /// functions of the timeline until you finish! And if you update the
     148              :     /// same page twice, the last update wins.
     149              :     ///
     150       268268 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     151       268268 :     where
     152       268268 :         Self: Sized,
     153       268268 :     {
     154       268268 :         DatadirModification {
     155       268268 :             tline: self,
     156       268268 :             pending_lsns: Vec::new(),
     157       268268 :             pending_updates: HashMap::new(),
     158       268268 :             pending_deletions: Vec::new(),
     159       268268 :             pending_nblocks: 0,
     160       268268 :             pending_aux_files: None,
     161       268268 :             pending_directory_entries: Vec::new(),
     162       268268 :             lsn,
     163       268268 :         }
     164       268268 :     }
     165              : 
     166              :     //------------------------------------------------------------------------------
     167              :     // Public GET functions
     168              :     //------------------------------------------------------------------------------
     169              : 
     170              :     /// Look up given page version.
     171        18384 :     pub(crate) async fn get_rel_page_at_lsn(
     172        18384 :         &self,
     173        18384 :         tag: RelTag,
     174        18384 :         blknum: BlockNumber,
     175        18384 :         version: Version<'_>,
     176        18384 :         latest: bool,
     177        18384 :         ctx: &RequestContext,
     178        18384 :     ) -> Result<Bytes, PageReconstructError> {
     179        18384 :         if tag.relnode == 0 {
     180            0 :             return Err(PageReconstructError::Other(
     181            0 :                 RelationError::InvalidRelnode.into(),
     182            0 :             ));
     183        18384 :         }
     184              : 
     185        18384 :         let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
     186        18384 :         if blknum >= nblocks {
     187            0 :             debug!(
     188            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     189            0 :                 tag,
     190            0 :                 blknum,
     191            0 :                 version.get_lsn(),
     192            0 :                 nblocks
     193            0 :             );
     194            0 :             return Ok(ZERO_PAGE.clone());
     195        18384 :         }
     196        18384 : 
     197        18384 :         let key = rel_block_to_key(tag, blknum);
     198        18384 :         version.get(self, key, ctx).await
     199        18384 :     }
     200              : 
     201              :     // Get size of a database in blocks
     202            0 :     pub(crate) async fn get_db_size(
     203            0 :         &self,
     204            0 :         spcnode: Oid,
     205            0 :         dbnode: Oid,
     206            0 :         version: Version<'_>,
     207            0 :         latest: bool,
     208            0 :         ctx: &RequestContext,
     209            0 :     ) -> Result<usize, PageReconstructError> {
     210            0 :         let mut total_blocks = 0;
     211              : 
     212            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     213              : 
     214            0 :         for rel in rels {
     215            0 :             let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
     216            0 :             total_blocks += n_blocks as usize;
     217              :         }
     218            0 :         Ok(total_blocks)
     219            0 :     }
     220              : 
     221              :     /// Get size of a relation file
     222        24434 :     pub(crate) async fn get_rel_size(
     223        24434 :         &self,
     224        24434 :         tag: RelTag,
     225        24434 :         version: Version<'_>,
     226        24434 :         latest: bool,
     227        24434 :         ctx: &RequestContext,
     228        24434 :     ) -> Result<BlockNumber, PageReconstructError> {
     229        24434 :         if tag.relnode == 0 {
     230            0 :             return Err(PageReconstructError::Other(
     231            0 :                 RelationError::InvalidRelnode.into(),
     232            0 :             ));
     233        24434 :         }
     234              : 
     235        24434 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     236        19294 :             return Ok(nblocks);
     237         5140 :         }
     238         5140 : 
     239         5140 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     240            0 :             && !self.get_rel_exists(tag, version, latest, ctx).await?
     241              :         {
     242              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     243              :             // FSM, and smgrnblocks() on it immediately afterwards,
     244              :             // without extending it.  Tolerate that by claiming that
     245              :             // any non-existent FSM fork has size 0.
     246            0 :             return Ok(0);
     247         5140 :         }
     248         5140 : 
     249         5140 :         let key = rel_size_to_key(tag);
     250         5140 :         let mut buf = version.get(self, key, ctx).await?;
     251         5136 :         let nblocks = buf.get_u32_le();
     252         5136 : 
     253         5136 :         if latest {
     254            0 :             // Update relation size cache only if "latest" flag is set.
     255            0 :             // This flag is set by compute when it is working with most recent version of relation.
     256            0 :             // Typically master compute node always set latest=true.
     257            0 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     258            0 :             // latest=true, then it can not cause cache corruption, because with latest=true
     259            0 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     260            0 :             // associated with most recent value of LSN.
     261            0 :             self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     262         5136 :         }
     263         5136 :         Ok(nblocks)
     264        24434 :     }
     265              : 
     266              :     /// Does relation exist?
     267         6050 :     pub(crate) async fn get_rel_exists(
     268         6050 :         &self,
     269         6050 :         tag: RelTag,
     270         6050 :         version: Version<'_>,
     271         6050 :         _latest: bool,
     272         6050 :         ctx: &RequestContext,
     273         6050 :     ) -> Result<bool, PageReconstructError> {
     274         6050 :         if tag.relnode == 0 {
     275            0 :             return Err(PageReconstructError::Other(
     276            0 :                 RelationError::InvalidRelnode.into(),
     277            0 :             ));
     278         6050 :         }
     279              : 
     280              :         // first try to lookup relation in cache
     281         6050 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     282         6032 :             return Ok(true);
     283           18 :         }
     284           18 :         // fetch directory listing
     285           18 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     286           18 :         let buf = version.get(self, key, ctx).await?;
     287              : 
     288           18 :         match RelDirectory::des(&buf).context("deserialization failure") {
     289           18 :             Ok(dir) => {
     290           18 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     291           18 :                 Ok(exists)
     292              :             }
     293            0 :             Err(e) => Err(PageReconstructError::from(e)),
     294              :         }
     295         6050 :     }
     296              : 
     297              :     /// Get a list of all existing relations in given tablespace and database.
     298              :     ///
     299              :     /// # Cancel-Safety
     300              :     ///
     301              :     /// This method is cancellation-safe.
     302            0 :     pub(crate) async fn list_rels(
     303            0 :         &self,
     304            0 :         spcnode: Oid,
     305            0 :         dbnode: Oid,
     306            0 :         version: Version<'_>,
     307            0 :         ctx: &RequestContext,
     308            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     309            0 :         // fetch directory listing
     310            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     311            0 :         let buf = version.get(self, key, ctx).await?;
     312              : 
     313            0 :         match RelDirectory::des(&buf).context("deserialization failure") {
     314            0 :             Ok(dir) => {
     315            0 :                 let rels: HashSet<RelTag> =
     316            0 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     317            0 :                         spcnode,
     318            0 :                         dbnode,
     319            0 :                         relnode: *relnode,
     320            0 :                         forknum: *forknum,
     321            0 :                     }));
     322            0 : 
     323            0 :                 Ok(rels)
     324              :             }
     325            0 :             Err(e) => Err(PageReconstructError::from(e)),
     326              :         }
     327            0 :     }
     328              : 
     329              :     /// Get the whole SLRU segment
     330            0 :     pub(crate) async fn get_slru_segment(
     331            0 :         &self,
     332            0 :         kind: SlruKind,
     333            0 :         segno: u32,
     334            0 :         lsn: Lsn,
     335            0 :         ctx: &RequestContext,
     336            0 :     ) -> Result<Bytes, PageReconstructError> {
     337            0 :         let n_blocks = self
     338            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     339            0 :             .await?;
     340            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     341            0 :         for blkno in 0..n_blocks {
     342            0 :             let block = self
     343            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     344            0 :                 .await?;
     345            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     346              :         }
     347            0 :         Ok(segment.freeze())
     348            0 :     }
     349              : 
     350              :     /// Look up given SLRU page version.
     351            0 :     pub(crate) async fn get_slru_page_at_lsn(
     352            0 :         &self,
     353            0 :         kind: SlruKind,
     354            0 :         segno: u32,
     355            0 :         blknum: BlockNumber,
     356            0 :         lsn: Lsn,
     357            0 :         ctx: &RequestContext,
     358            0 :     ) -> Result<Bytes, PageReconstructError> {
     359            0 :         let key = slru_block_to_key(kind, segno, blknum);
     360            0 :         self.get(key, lsn, ctx).await
     361            0 :     }
     362              : 
     363              :     /// Get size of an SLRU segment
     364            0 :     pub(crate) async fn get_slru_segment_size(
     365            0 :         &self,
     366            0 :         kind: SlruKind,
     367            0 :         segno: u32,
     368            0 :         version: Version<'_>,
     369            0 :         ctx: &RequestContext,
     370            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     371            0 :         let key = slru_segment_size_to_key(kind, segno);
     372            0 :         let mut buf = version.get(self, key, ctx).await?;
     373            0 :         Ok(buf.get_u32_le())
     374            0 :     }
     375              : 
     376              :     /// Get size of an SLRU segment
     377            0 :     pub(crate) async fn get_slru_segment_exists(
     378            0 :         &self,
     379            0 :         kind: SlruKind,
     380            0 :         segno: u32,
     381            0 :         version: Version<'_>,
     382            0 :         ctx: &RequestContext,
     383            0 :     ) -> Result<bool, PageReconstructError> {
     384            0 :         // fetch directory listing
     385            0 :         let key = slru_dir_to_key(kind);
     386            0 :         let buf = version.get(self, key, ctx).await?;
     387              : 
     388            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     389            0 :             Ok(dir) => {
     390            0 :                 let exists = dir.segments.get(&segno).is_some();
     391            0 :                 Ok(exists)
     392              :             }
     393            0 :             Err(e) => Err(PageReconstructError::from(e)),
     394              :         }
     395            0 :     }
     396              : 
     397              :     /// Locate LSN, such that all transactions that committed before
     398              :     /// 'search_timestamp' are visible, but nothing newer is.
     399              :     ///
     400              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     401              :     /// so it's not well defined which LSN you get if there were multiple commits
     402              :     /// "in flight" at that point in time.
     403              :     ///
     404            0 :     pub(crate) async fn find_lsn_for_timestamp(
     405            0 :         &self,
     406            0 :         search_timestamp: TimestampTz,
     407            0 :         cancel: &CancellationToken,
     408            0 :         ctx: &RequestContext,
     409            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     410            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     411            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     412            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     413            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     414            0 :         // on the safe side.
     415            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     416            0 :         let max_lsn = self.get_last_record_lsn();
     417            0 : 
     418            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     419            0 :         // LSN divided by 8.
     420            0 :         let mut low = min_lsn.0 / 8;
     421            0 :         let mut high = max_lsn.0 / 8 + 1;
     422            0 : 
     423            0 :         let mut found_smaller = false;
     424            0 :         let mut found_larger = false;
     425            0 :         while low < high {
     426            0 :             if cancel.is_cancelled() {
     427            0 :                 return Err(PageReconstructError::Cancelled);
     428            0 :             }
     429            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     430            0 :             let mid = (high + low) / 2;
     431              : 
     432            0 :             let cmp = self
     433            0 :                 .is_latest_commit_timestamp_ge_than(
     434            0 :                     search_timestamp,
     435            0 :                     Lsn(mid * 8),
     436            0 :                     &mut found_smaller,
     437            0 :                     &mut found_larger,
     438            0 :                     ctx,
     439            0 :                 )
     440            0 :                 .await?;
     441              : 
     442            0 :             if cmp {
     443            0 :                 high = mid;
     444            0 :             } else {
     445            0 :                 low = mid + 1;
     446            0 :             }
     447              :         }
     448              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     449              :         // so the LSN of the last commit record before or at `search_timestamp`.
     450              :         // Remove one from `low` to get `t`.
     451              :         //
     452              :         // FIXME: it would be better to get the LSN of the previous commit.
     453              :         // Otherwise, if you restore to the returned LSN, the database will
     454              :         // include physical changes from later commits that will be marked
     455              :         // as aborted, and will need to be vacuumed away.
     456            0 :         let commit_lsn = Lsn((low - 1) * 8);
     457            0 :         match (found_smaller, found_larger) {
     458              :             (false, false) => {
     459              :                 // This can happen if no commit records have been processed yet, e.g.
     460              :                 // just after importing a cluster.
     461            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     462              :             }
     463              :             (false, true) => {
     464              :                 // Didn't find any commit timestamps smaller than the request
     465            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     466              :             }
     467              :             (true, false) => {
     468              :                 // Only found commits with timestamps smaller than the request.
     469              :                 // It's still a valid case for branch creation, return it.
     470              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     471              :                 // case, anyway.
     472            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     473              :             }
     474            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     475              :         }
     476            0 :     }
     477              : 
     478              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     479              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     480              :     ///
     481              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     482              :     /// with a smaller/larger timestamp.
     483              :     ///
     484            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     485            0 :         &self,
     486            0 :         search_timestamp: TimestampTz,
     487            0 :         probe_lsn: Lsn,
     488            0 :         found_smaller: &mut bool,
     489            0 :         found_larger: &mut bool,
     490            0 :         ctx: &RequestContext,
     491            0 :     ) -> Result<bool, PageReconstructError> {
     492            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     493            0 :             if timestamp >= search_timestamp {
     494            0 :                 *found_larger = true;
     495            0 :                 return ControlFlow::Break(true);
     496            0 :             } else {
     497            0 :                 *found_smaller = true;
     498            0 :             }
     499            0 :             ControlFlow::Continue(())
     500            0 :         })
     501            0 :         .await
     502            0 :     }
     503              : 
     504              :     /// Obtain the possible timestamp range for the given lsn.
     505              :     ///
     506              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     507            0 :     pub(crate) async fn get_timestamp_for_lsn(
     508            0 :         &self,
     509            0 :         probe_lsn: Lsn,
     510            0 :         ctx: &RequestContext,
     511            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     512            0 :         let mut max: Option<TimestampTz> = None;
     513            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     514            0 :             if let Some(max_prev) = max {
     515            0 :                 max = Some(max_prev.max(timestamp));
     516            0 :             } else {
     517            0 :                 max = Some(timestamp);
     518            0 :             }
     519            0 :             ControlFlow::Continue(())
     520            0 :         })
     521            0 :         .await?;
     522              : 
     523            0 :         Ok(max)
     524            0 :     }
     525              : 
     526              :     /// Runs the given function on all the timestamps for a given lsn
     527              :     ///
     528              :     /// The return value is either given by the closure, or set to the `Default`
     529              :     /// impl's output.
     530            0 :     async fn map_all_timestamps<T: Default>(
     531            0 :         &self,
     532            0 :         probe_lsn: Lsn,
     533            0 :         ctx: &RequestContext,
     534            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     535            0 :     ) -> Result<T, PageReconstructError> {
     536            0 :         for segno in self
     537            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     538            0 :             .await?
     539              :         {
     540            0 :             let nblocks = self
     541            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     542            0 :                 .await?;
     543            0 :             for blknum in (0..nblocks).rev() {
     544            0 :                 let clog_page = self
     545            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     546            0 :                     .await?;
     547              : 
     548            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     549            0 :                     let mut timestamp_bytes = [0u8; 8];
     550            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     551            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     552            0 : 
     553            0 :                     match f(timestamp) {
     554            0 :                         ControlFlow::Break(b) => return Ok(b),
     555            0 :                         ControlFlow::Continue(()) => (),
     556              :                     }
     557            0 :                 }
     558              :             }
     559              :         }
     560            0 :         Ok(Default::default())
     561            0 :     }
     562              : 
     563            0 :     pub(crate) async fn get_slru_keyspace(
     564            0 :         &self,
     565            0 :         version: Version<'_>,
     566            0 :         ctx: &RequestContext,
     567            0 :     ) -> Result<KeySpace, PageReconstructError> {
     568            0 :         let mut accum = KeySpaceAccum::new();
     569              : 
     570            0 :         for kind in SlruKind::iter() {
     571            0 :             let mut segments: Vec<u32> = self
     572            0 :                 .list_slru_segments(kind, version, ctx)
     573            0 :                 .await?
     574            0 :                 .into_iter()
     575            0 :                 .collect();
     576            0 :             segments.sort_unstable();
     577              : 
     578            0 :             for seg in segments {
     579            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     580              : 
     581            0 :                 accum.add_range(
     582            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     583            0 :                 );
     584              :             }
     585              :         }
     586              : 
     587            0 :         Ok(accum.to_keyspace())
     588            0 :     }
     589              : 
     590              :     /// Get a list of SLRU segments
     591            0 :     pub(crate) async fn list_slru_segments(
     592            0 :         &self,
     593            0 :         kind: SlruKind,
     594            0 :         version: Version<'_>,
     595            0 :         ctx: &RequestContext,
     596            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     597            0 :         // fetch directory entry
     598            0 :         let key = slru_dir_to_key(kind);
     599              : 
     600            0 :         let buf = version.get(self, key, ctx).await?;
     601            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     602            0 :             Ok(dir) => Ok(dir.segments),
     603            0 :             Err(e) => Err(PageReconstructError::from(e)),
     604              :         }
     605            0 :     }
     606              : 
     607            0 :     pub(crate) async fn get_relmap_file(
     608            0 :         &self,
     609            0 :         spcnode: Oid,
     610            0 :         dbnode: Oid,
     611            0 :         version: Version<'_>,
     612            0 :         ctx: &RequestContext,
     613            0 :     ) -> Result<Bytes, PageReconstructError> {
     614            0 :         let key = relmap_file_key(spcnode, dbnode);
     615              : 
     616            0 :         let buf = version.get(self, key, ctx).await?;
     617            0 :         Ok(buf)
     618            0 :     }
     619              : 
     620            0 :     pub(crate) async fn list_dbdirs(
     621            0 :         &self,
     622            0 :         lsn: Lsn,
     623            0 :         ctx: &RequestContext,
     624            0 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     625              :         // fetch directory entry
     626            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     627              : 
     628            0 :         match DbDirectory::des(&buf).context("deserialization failure") {
     629            0 :             Ok(dir) => Ok(dir.dbdirs),
     630            0 :             Err(e) => Err(PageReconstructError::from(e)),
     631              :         }
     632            0 :     }
     633              : 
     634            0 :     pub(crate) async fn get_twophase_file(
     635            0 :         &self,
     636            0 :         xid: TransactionId,
     637            0 :         lsn: Lsn,
     638            0 :         ctx: &RequestContext,
     639            0 :     ) -> Result<Bytes, PageReconstructError> {
     640            0 :         let key = twophase_file_key(xid);
     641            0 :         let buf = self.get(key, lsn, ctx).await?;
     642            0 :         Ok(buf)
     643            0 :     }
     644              : 
     645            0 :     pub(crate) async fn list_twophase_files(
     646            0 :         &self,
     647            0 :         lsn: Lsn,
     648            0 :         ctx: &RequestContext,
     649            0 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     650              :         // fetch directory entry
     651            0 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     652              : 
     653            0 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     654            0 :             Ok(dir) => Ok(dir.xids),
     655            0 :             Err(e) => Err(PageReconstructError::from(e)),
     656              :         }
     657            0 :     }
     658              : 
     659            0 :     pub(crate) async fn get_control_file(
     660            0 :         &self,
     661            0 :         lsn: Lsn,
     662            0 :         ctx: &RequestContext,
     663            0 :     ) -> Result<Bytes, PageReconstructError> {
     664            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     665            0 :     }
     666              : 
     667           12 :     pub(crate) async fn get_checkpoint(
     668           12 :         &self,
     669           12 :         lsn: Lsn,
     670           12 :         ctx: &RequestContext,
     671           12 :     ) -> Result<Bytes, PageReconstructError> {
     672           12 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     673           12 :     }
     674              : 
     675            6 :     pub(crate) async fn list_aux_files(
     676            6 :         &self,
     677            6 :         lsn: Lsn,
     678            6 :         ctx: &RequestContext,
     679            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     680            6 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     681            6 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     682            6 :                 Ok(dir) => Ok(dir.files),
     683            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     684              :             },
     685            0 :             Err(e) => {
     686              :                 // This is expected: historical databases do not have the key.
     687            0 :                 debug!("Failed to get info about AUX files: {}", e);
     688            0 :                 Ok(HashMap::new())
     689              :             }
     690              :         }
     691            6 :     }
     692              : 
     693              :     /// Does the same as get_current_logical_size but counted on demand.
     694              :     /// Used to initialize the logical size tracking on startup.
     695              :     ///
     696              :     /// Only relation blocks are counted currently. That excludes metadata,
     697              :     /// SLRUs, twophase files etc.
     698              :     ///
     699              :     /// # Cancel-Safety
     700              :     ///
     701              :     /// This method is cancellation-safe.
     702            0 :     pub async fn get_current_logical_size_non_incremental(
     703            0 :         &self,
     704            0 :         lsn: Lsn,
     705            0 :         ctx: &RequestContext,
     706            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     707            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     708              : 
     709              :         // Fetch list of database dirs and iterate them
     710            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     711            0 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     712              : 
     713            0 :         let mut total_size: u64 = 0;
     714            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     715            0 :             for rel in self
     716            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     717            0 :                 .await?
     718              :             {
     719            0 :                 if self.cancel.is_cancelled() {
     720            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     721            0 :                 }
     722            0 :                 let relsize_key = rel_size_to_key(rel);
     723            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     724            0 :                 let relsize = buf.get_u32_le();
     725            0 : 
     726            0 :                 total_size += relsize as u64;
     727              :             }
     728              :         }
     729            0 :         Ok(total_size * BLCKSZ as u64)
     730            0 :     }
     731              : 
     732              :     ///
     733              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     734              :     /// Anything that's not listed maybe removed from the underlying storage (from
     735              :     /// that LSN forwards).
     736          170 :     pub(crate) async fn collect_keyspace(
     737          170 :         &self,
     738          170 :         lsn: Lsn,
     739          170 :         ctx: &RequestContext,
     740          170 :     ) -> Result<KeySpace, CollectKeySpaceError> {
     741          170 :         // Iterate through key ranges, greedily packing them into partitions
     742          170 :         let mut result = KeySpaceAccum::new();
     743          170 : 
     744          170 :         // The dbdir metadata always exists
     745          170 :         result.add_key(DBDIR_KEY);
     746              : 
     747              :         // Fetch list of database dirs and iterate them
     748         2160 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     749          170 :         let dbdir = DbDirectory::des(&buf)?;
     750              : 
     751          170 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     752          170 :         dbs.sort_unstable();
     753          170 :         for (spcnode, dbnode) in dbs {
     754            0 :             result.add_key(relmap_file_key(spcnode, dbnode));
     755            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     756              : 
     757            0 :             let mut rels: Vec<RelTag> = self
     758            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     759            0 :                 .await?
     760            0 :                 .into_iter()
     761            0 :                 .collect();
     762            0 :             rels.sort_unstable();
     763            0 :             for rel in rels {
     764            0 :                 let relsize_key = rel_size_to_key(rel);
     765            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     766            0 :                 let relsize = buf.get_u32_le();
     767            0 : 
     768            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     769            0 :                 result.add_key(relsize_key);
     770              :             }
     771              :         }
     772              : 
     773              :         // Iterate SLRUs next
     774          510 :         for kind in [
     775          170 :             SlruKind::Clog,
     776          170 :             SlruKind::MultiXactMembers,
     777          170 :             SlruKind::MultiXactOffsets,
     778              :         ] {
     779          510 :             let slrudir_key = slru_dir_to_key(kind);
     780          510 :             result.add_key(slrudir_key);
     781         7958 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     782          510 :             let dir = SlruSegmentDirectory::des(&buf)?;
     783          510 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     784          510 :             segments.sort_unstable();
     785          510 :             for segno in segments {
     786            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     787            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     788            0 :                 let segsize = buf.get_u32_le();
     789            0 : 
     790            0 :                 result.add_range(
     791            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     792            0 :                 );
     793            0 :                 result.add_key(segsize_key);
     794              :             }
     795              :         }
     796              : 
     797              :         // Then pg_twophase
     798          170 :         result.add_key(TWOPHASEDIR_KEY);
     799         3054 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     800          170 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     801          170 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     802          170 :         xids.sort_unstable();
     803          170 :         for xid in xids {
     804            0 :             result.add_key(twophase_file_key(xid));
     805            0 :         }
     806              : 
     807          170 :         result.add_key(CONTROLFILE_KEY);
     808          170 :         result.add_key(CHECKPOINT_KEY);
     809          170 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     810           70 :             result.add_key(AUX_FILES_KEY);
     811          100 :         }
     812          170 :         Ok(result.to_keyspace())
     813          170 :     }
     814              : 
     815              :     /// Get cached size of relation if it not updated after specified LSN
     816       448540 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     817       448540 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     818       448540 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     819       448518 :             if lsn >= *cached_lsn {
     820       443372 :                 return Some(*nblocks);
     821         5146 :             }
     822           22 :         }
     823         5168 :         None
     824       448540 :     }
     825              : 
     826              :     /// Update cached relation size if there is no more recent update
     827            0 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     828            0 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     829            0 :         match rel_size_cache.entry(tag) {
     830            0 :             hash_map::Entry::Occupied(mut entry) => {
     831            0 :                 let cached_lsn = entry.get_mut();
     832            0 :                 if lsn >= cached_lsn.0 {
     833            0 :                     *cached_lsn = (lsn, nblocks);
     834            0 :                 }
     835              :             }
     836            0 :             hash_map::Entry::Vacant(entry) => {
     837            0 :                 entry.insert((lsn, nblocks));
     838            0 :             }
     839              :         }
     840            0 :     }
     841              : 
     842              :     /// Store cached relation size
     843       288732 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     844       288732 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     845       288732 :         rel_size_cache.insert(tag, (lsn, nblocks));
     846       288732 :     }
     847              : 
     848              :     /// Remove cached relation size
     849            2 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     850            2 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     851            2 :         rel_size_cache.remove(tag);
     852            2 :     }
     853              : }
     854              : 
     855              : /// DatadirModification represents an operation to ingest an atomic set of
     856              : /// updates to the repository. It is created by the 'begin_record'
     857              : /// function. It is called for each WAL record, so that all the modifications
     858              : /// by a one WAL record appear atomic.
     859              : pub struct DatadirModification<'a> {
     860              :     /// The timeline this modification applies to. You can access this to
     861              :     /// read the state, but note that any pending updates are *not* reflected
     862              :     /// in the state in 'tline' yet.
     863              :     pub tline: &'a Timeline,
     864              : 
     865              :     /// Current LSN of the modification
     866              :     lsn: Lsn,
     867              : 
     868              :     // The modifications are not applied directly to the underlying key-value store.
     869              :     // The put-functions add the modifications here, and they are flushed to the
     870              :     // underlying key-value store by the 'finish' function.
     871              :     pending_lsns: Vec<Lsn>,
     872              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     873              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     874              :     pending_nblocks: i64,
     875              : 
     876              :     // If we already wrote any aux file changes in this modification, stash the latest dir.  If set,
     877              :     // [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
     878              :     // if AUX_FILES_KEY is already set.
     879              :     pending_aux_files: Option<AuxFilesDirectory>,
     880              : 
     881              :     /// For special "directory" keys that store key-value maps, track the size of the map
     882              :     /// if it was updated in this modification.
     883              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
     884              : }
     885              : 
     886              : impl<'a> DatadirModification<'a> {
     887              :     /// Get the current lsn
     888       418056 :     pub(crate) fn get_lsn(&self) -> Lsn {
     889       418056 :         self.lsn
     890       418056 :     }
     891              : 
     892              :     /// Set the current lsn
     893       145856 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     894       145856 :         ensure!(
     895       145856 :             lsn >= self.lsn,
     896            0 :             "setting an older lsn {} than {} is not allowed",
     897              :             lsn,
     898              :             self.lsn
     899              :         );
     900       145856 :         if lsn > self.lsn {
     901       145856 :             self.pending_lsns.push(self.lsn);
     902       145856 :             self.lsn = lsn;
     903       145856 :         }
     904       145856 :         Ok(())
     905       145856 :     }
     906              : 
     907              :     /// Initialize a completely new repository.
     908              :     ///
     909              :     /// This inserts the directory metadata entries that are assumed to
     910              :     /// always exist.
     911           72 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     912           72 :         let buf = DbDirectory::ser(&DbDirectory {
     913           72 :             dbdirs: HashMap::new(),
     914           72 :         })?;
     915           72 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
     916           72 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     917           72 : 
     918           72 :         // Create AuxFilesDirectory
     919           72 :         self.init_aux_dir()?;
     920              : 
     921           72 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     922           72 :             xids: HashSet::new(),
     923           72 :         })?;
     924           72 :         self.pending_directory_entries
     925           72 :             .push((DirectoryKind::TwoPhase, 0));
     926           72 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     927              : 
     928           72 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     929           72 :         let empty_dir = Value::Image(buf);
     930           72 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     931           72 :         self.pending_directory_entries
     932           72 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     933           72 :         self.put(
     934           72 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     935           72 :             empty_dir.clone(),
     936           72 :         );
     937           72 :         self.pending_directory_entries
     938           72 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     939           72 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     940           72 :         self.pending_directory_entries
     941           72 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
     942           72 : 
     943           72 :         Ok(())
     944           72 :     }
     945              : 
     946              :     #[cfg(test)]
     947           70 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     948           70 :         self.init_empty()?;
     949           70 :         self.put_control_file(bytes::Bytes::from_static(
     950           70 :             b"control_file contents do not matter",
     951           70 :         ))
     952           70 :         .context("put_control_file")?;
     953           70 :         self.put_checkpoint(bytes::Bytes::from_static(
     954           70 :             b"checkpoint_file contents do not matter",
     955           70 :         ))
     956           70 :         .context("put_checkpoint_file")?;
     957           70 :         Ok(())
     958           70 :     }
     959              : 
     960              :     /// Put a new page version that can be constructed from a WAL record
     961              :     ///
     962              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     963              :     /// current end-of-file. It's up to the caller to check that the relation size
     964              :     /// matches the blocks inserted!
     965       145630 :     pub fn put_rel_wal_record(
     966       145630 :         &mut self,
     967       145630 :         rel: RelTag,
     968       145630 :         blknum: BlockNumber,
     969       145630 :         rec: NeonWalRecord,
     970       145630 :     ) -> anyhow::Result<()> {
     971       145630 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     972       145630 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     973       145630 :         Ok(())
     974       145630 :     }
     975              : 
     976              :     // Same, but for an SLRU.
     977            8 :     pub fn put_slru_wal_record(
     978            8 :         &mut self,
     979            8 :         kind: SlruKind,
     980            8 :         segno: u32,
     981            8 :         blknum: BlockNumber,
     982            8 :         rec: NeonWalRecord,
     983            8 :     ) -> anyhow::Result<()> {
     984            8 :         self.put(
     985            8 :             slru_block_to_key(kind, segno, blknum),
     986            8 :             Value::WalRecord(rec),
     987            8 :         );
     988            8 :         Ok(())
     989            8 :     }
     990              : 
     991              :     /// Like put_wal_record, but with ready-made image of the page.
     992       280864 :     pub fn put_rel_page_image(
     993       280864 :         &mut self,
     994       280864 :         rel: RelTag,
     995       280864 :         blknum: BlockNumber,
     996       280864 :         img: Bytes,
     997       280864 :     ) -> anyhow::Result<()> {
     998       280864 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     999       280864 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1000       280864 :         Ok(())
    1001       280864 :     }
    1002              : 
    1003            6 :     pub fn put_slru_page_image(
    1004            6 :         &mut self,
    1005            6 :         kind: SlruKind,
    1006            6 :         segno: u32,
    1007            6 :         blknum: BlockNumber,
    1008            6 :         img: Bytes,
    1009            6 :     ) -> anyhow::Result<()> {
    1010            6 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
    1011            6 :         Ok(())
    1012            6 :     }
    1013              : 
    1014              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1015           16 :     pub async fn put_relmap_file(
    1016           16 :         &mut self,
    1017           16 :         spcnode: Oid,
    1018           16 :         dbnode: Oid,
    1019           16 :         img: Bytes,
    1020           16 :         ctx: &RequestContext,
    1021           16 :     ) -> anyhow::Result<()> {
    1022              :         // Add it to the directory (if it doesn't exist already)
    1023           16 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1024           16 :         let mut dbdir = DbDirectory::des(&buf)?;
    1025              : 
    1026           16 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1027           16 :         if r.is_none() || r == Some(false) {
    1028              :             // The dbdir entry didn't exist, or it contained a
    1029              :             // 'false'. The 'insert' call already updated it with
    1030              :             // 'true', now write the updated 'dbdirs' map back.
    1031           16 :             let buf = DbDirectory::ser(&dbdir)?;
    1032           16 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1033           16 : 
    1034           16 :             // Create AuxFilesDirectory as well
    1035           16 :             self.init_aux_dir()?;
    1036            0 :         }
    1037           16 :         if r.is_none() {
    1038            8 :             // Create RelDirectory
    1039            8 :             let buf = RelDirectory::ser(&RelDirectory {
    1040            8 :                 rels: HashSet::new(),
    1041            8 :             })?;
    1042            8 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1043            8 :             self.put(
    1044            8 :                 rel_dir_to_key(spcnode, dbnode),
    1045            8 :                 Value::Image(Bytes::from(buf)),
    1046            8 :             );
    1047            8 :         }
    1048              : 
    1049           16 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1050           16 :         Ok(())
    1051           16 :     }
    1052              : 
    1053            0 :     pub async fn put_twophase_file(
    1054            0 :         &mut self,
    1055            0 :         xid: TransactionId,
    1056            0 :         img: Bytes,
    1057            0 :         ctx: &RequestContext,
    1058            0 :     ) -> anyhow::Result<()> {
    1059              :         // Add it to the directory entry
    1060            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1061            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1062            0 :         if !dir.xids.insert(xid) {
    1063            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1064            0 :         }
    1065            0 :         self.pending_directory_entries
    1066            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1067            0 :         self.put(
    1068            0 :             TWOPHASEDIR_KEY,
    1069            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1070              :         );
    1071              : 
    1072            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1073            0 :         Ok(())
    1074            0 :     }
    1075              : 
    1076           72 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1077           72 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1078           72 :         Ok(())
    1079           72 :     }
    1080              : 
    1081           86 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1082           86 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1083           86 :         Ok(())
    1084           86 :     }
    1085              : 
    1086            0 :     pub async fn drop_dbdir(
    1087            0 :         &mut self,
    1088            0 :         spcnode: Oid,
    1089            0 :         dbnode: Oid,
    1090            0 :         ctx: &RequestContext,
    1091            0 :     ) -> anyhow::Result<()> {
    1092            0 :         let total_blocks = self
    1093            0 :             .tline
    1094            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
    1095            0 :             .await?;
    1096              : 
    1097              :         // Remove entry from dbdir
    1098            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1099            0 :         let mut dir = DbDirectory::des(&buf)?;
    1100            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1101            0 :             let buf = DbDirectory::ser(&dir)?;
    1102            0 :             self.pending_directory_entries
    1103            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1104            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1105              :         } else {
    1106            0 :             warn!(
    1107            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1108            0 :                 spcnode, dbnode
    1109            0 :             );
    1110              :         }
    1111              : 
    1112              :         // Update logical database size.
    1113            0 :         self.pending_nblocks -= total_blocks as i64;
    1114            0 : 
    1115            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1116            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1117            0 :         Ok(())
    1118            0 :     }
    1119              : 
    1120              :     /// Create a relation fork.
    1121              :     ///
    1122              :     /// 'nblocks' is the initial size.
    1123         1920 :     pub async fn put_rel_creation(
    1124         1920 :         &mut self,
    1125         1920 :         rel: RelTag,
    1126         1920 :         nblocks: BlockNumber,
    1127         1920 :         ctx: &RequestContext,
    1128         1920 :     ) -> Result<(), RelationError> {
    1129         1920 :         if rel.relnode == 0 {
    1130            0 :             return Err(RelationError::InvalidRelnode);
    1131         1920 :         }
    1132              :         // It's possible that this is the first rel for this db in this
    1133              :         // tablespace.  Create the reldir entry for it if so.
    1134         1920 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1135         1920 :             .context("deserialize db")?;
    1136         1920 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1137         1920 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
    1138              :             // Didn't exist. Update dbdir
    1139            8 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
    1140            8 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1141            8 :             self.pending_directory_entries
    1142            8 :                 .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1143            8 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1144            8 : 
    1145            8 :             // and create the RelDirectory
    1146            8 :             RelDirectory::default()
    1147              :         } else {
    1148              :             // reldir already exists, fetch it
    1149         1912 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1150         1912 :                 .context("deserialize db")?
    1151              :         };
    1152              : 
    1153              :         // Add the new relation to the rel directory entry, and write it back
    1154         1920 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1155            0 :             return Err(RelationError::AlreadyExists);
    1156         1920 :         }
    1157         1920 : 
    1158         1920 :         self.pending_directory_entries
    1159         1920 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1160         1920 : 
    1161         1920 :         self.put(
    1162         1920 :             rel_dir_key,
    1163         1920 :             Value::Image(Bytes::from(
    1164         1920 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1165              :             )),
    1166              :         );
    1167              : 
    1168              :         // Put size
    1169         1920 :         let size_key = rel_size_to_key(rel);
    1170         1920 :         let buf = nblocks.to_le_bytes();
    1171         1920 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1172         1920 : 
    1173         1920 :         self.pending_nblocks += nblocks as i64;
    1174         1920 : 
    1175         1920 :         // Update relation size cache
    1176         1920 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1177         1920 : 
    1178         1920 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1179         1920 :         // caller.
    1180         1920 :         Ok(())
    1181         1920 :     }
    1182              : 
    1183              :     /// Truncate relation
    1184         6012 :     pub async fn put_rel_truncation(
    1185         6012 :         &mut self,
    1186         6012 :         rel: RelTag,
    1187         6012 :         nblocks: BlockNumber,
    1188         6012 :         ctx: &RequestContext,
    1189         6012 :     ) -> anyhow::Result<()> {
    1190         6012 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1191         6012 :         if self
    1192         6012 :             .tline
    1193         6012 :             .get_rel_exists(rel, Version::Modified(self), true, ctx)
    1194            0 :             .await?
    1195              :         {
    1196         6012 :             let size_key = rel_size_to_key(rel);
    1197              :             // Fetch the old size first
    1198         6012 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1199         6012 : 
    1200         6012 :             // Update the entry with the new size.
    1201         6012 :             let buf = nblocks.to_le_bytes();
    1202         6012 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1203         6012 : 
    1204         6012 :             // Update relation size cache
    1205         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1206         6012 : 
    1207         6012 :             // Update relation size cache
    1208         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1209         6012 : 
    1210         6012 :             // Update logical database size.
    1211         6012 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1212            0 :         }
    1213         6012 :         Ok(())
    1214         6012 :     }
    1215              : 
    1216              :     /// Extend relation
    1217              :     /// If new size is smaller, do nothing.
    1218       276680 :     pub async fn put_rel_extend(
    1219       276680 :         &mut self,
    1220       276680 :         rel: RelTag,
    1221       276680 :         nblocks: BlockNumber,
    1222       276680 :         ctx: &RequestContext,
    1223       276680 :     ) -> anyhow::Result<()> {
    1224       276680 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1225              : 
    1226              :         // Put size
    1227       276680 :         let size_key = rel_size_to_key(rel);
    1228       276680 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1229       276680 : 
    1230       276680 :         // only extend relation here. never decrease the size
    1231       276680 :         if nblocks > old_size {
    1232       274788 :             let buf = nblocks.to_le_bytes();
    1233       274788 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1234       274788 : 
    1235       274788 :             // Update relation size cache
    1236       274788 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1237       274788 : 
    1238       274788 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1239       274788 :         }
    1240       276680 :         Ok(())
    1241       276680 :     }
    1242              : 
    1243              :     /// Drop a relation.
    1244            2 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1245            2 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1246              : 
    1247              :         // Remove it from the directory entry
    1248            2 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1249            2 :         let buf = self.get(dir_key, ctx).await?;
    1250            2 :         let mut dir = RelDirectory::des(&buf)?;
    1251              : 
    1252            2 :         self.pending_directory_entries
    1253            2 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1254            2 : 
    1255            2 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1256            2 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1257              :         } else {
    1258            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1259              :         }
    1260              : 
    1261              :         // update logical size
    1262            2 :         let size_key = rel_size_to_key(rel);
    1263            2 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1264            2 :         self.pending_nblocks -= old_size as i64;
    1265            2 : 
    1266            2 :         // Remove enty from relation size cache
    1267            2 :         self.tline.remove_cached_rel_size(&rel);
    1268            2 : 
    1269            2 :         // Delete size entry, as well as all blocks
    1270            2 :         self.delete(rel_key_range(rel));
    1271            2 : 
    1272            2 :         Ok(())
    1273            2 :     }
    1274              : 
    1275            6 :     pub async fn put_slru_segment_creation(
    1276            6 :         &mut self,
    1277            6 :         kind: SlruKind,
    1278            6 :         segno: u32,
    1279            6 :         nblocks: BlockNumber,
    1280            6 :         ctx: &RequestContext,
    1281            6 :     ) -> anyhow::Result<()> {
    1282            6 :         // Add it to the directory entry
    1283            6 :         let dir_key = slru_dir_to_key(kind);
    1284            6 :         let buf = self.get(dir_key, ctx).await?;
    1285            6 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1286              : 
    1287            6 :         if !dir.segments.insert(segno) {
    1288            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1289            6 :         }
    1290            6 :         self.pending_directory_entries
    1291            6 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1292            6 :         self.put(
    1293            6 :             dir_key,
    1294            6 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1295              :         );
    1296              : 
    1297              :         // Put size
    1298            6 :         let size_key = slru_segment_size_to_key(kind, segno);
    1299            6 :         let buf = nblocks.to_le_bytes();
    1300            6 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1301            6 : 
    1302            6 :         // even if nblocks > 0, we don't insert any actual blocks here
    1303            6 : 
    1304            6 :         Ok(())
    1305            6 :     }
    1306              : 
    1307              :     /// Extend SLRU segment
    1308            0 :     pub fn put_slru_extend(
    1309            0 :         &mut self,
    1310            0 :         kind: SlruKind,
    1311            0 :         segno: u32,
    1312            0 :         nblocks: BlockNumber,
    1313            0 :     ) -> anyhow::Result<()> {
    1314            0 :         // Put size
    1315            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1316            0 :         let buf = nblocks.to_le_bytes();
    1317            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1318            0 :         Ok(())
    1319            0 :     }
    1320              : 
    1321              :     /// This method is used for marking truncated SLRU files
    1322            0 :     pub async fn drop_slru_segment(
    1323            0 :         &mut self,
    1324            0 :         kind: SlruKind,
    1325            0 :         segno: u32,
    1326            0 :         ctx: &RequestContext,
    1327            0 :     ) -> anyhow::Result<()> {
    1328            0 :         // Remove it from the directory entry
    1329            0 :         let dir_key = slru_dir_to_key(kind);
    1330            0 :         let buf = self.get(dir_key, ctx).await?;
    1331            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1332              : 
    1333            0 :         if !dir.segments.remove(&segno) {
    1334            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1335            0 :         }
    1336            0 :         self.pending_directory_entries
    1337            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1338            0 :         self.put(
    1339            0 :             dir_key,
    1340            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1341              :         );
    1342              : 
    1343              :         // Delete size entry, as well as all blocks
    1344            0 :         self.delete(slru_segment_key_range(kind, segno));
    1345            0 : 
    1346            0 :         Ok(())
    1347            0 :     }
    1348              : 
    1349              :     /// Drop a relmapper file (pg_filenode.map)
    1350            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1351            0 :         // TODO
    1352            0 :         Ok(())
    1353            0 :     }
    1354              : 
    1355              :     /// This method is used for marking truncated SLRU files
    1356            0 :     pub async fn drop_twophase_file(
    1357            0 :         &mut self,
    1358            0 :         xid: TransactionId,
    1359            0 :         ctx: &RequestContext,
    1360            0 :     ) -> anyhow::Result<()> {
    1361              :         // Remove it from the directory entry
    1362            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1363            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1364              : 
    1365            0 :         if !dir.xids.remove(&xid) {
    1366            0 :             warn!("twophase file for xid {} does not exist", xid);
    1367            0 :         }
    1368            0 :         self.pending_directory_entries
    1369            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1370            0 :         self.put(
    1371            0 :             TWOPHASEDIR_KEY,
    1372            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1373              :         );
    1374              : 
    1375              :         // Delete it
    1376            0 :         self.delete(twophase_key_range(xid));
    1377            0 : 
    1378            0 :         Ok(())
    1379            0 :     }
    1380              : 
    1381           88 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1382           88 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1383           88 :             files: HashMap::new(),
    1384           88 :         })?;
    1385           88 :         self.pending_directory_entries
    1386           88 :             .push((DirectoryKind::AuxFiles, 0));
    1387           88 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1388           88 :         Ok(())
    1389           88 :     }
    1390              : 
    1391            8 :     pub async fn put_file(
    1392            8 :         &mut self,
    1393            8 :         path: &str,
    1394            8 :         content: &[u8],
    1395            8 :         ctx: &RequestContext,
    1396            8 :     ) -> anyhow::Result<()> {
    1397            8 :         let file_path = path.to_string();
    1398            8 :         let content = if content.is_empty() {
    1399            2 :             None
    1400              :         } else {
    1401            6 :             Some(Bytes::copy_from_slice(content))
    1402              :         };
    1403              : 
    1404            8 :         let dir = if let Some(mut dir) = self.pending_aux_files.take() {
    1405              :             // We already updated aux files in `self`: emit a delta and update our latest value
    1406              : 
    1407            4 :             self.put(
    1408            4 :                 AUX_FILES_KEY,
    1409            4 :                 Value::WalRecord(NeonWalRecord::AuxFile {
    1410            4 :                     file_path: file_path.clone(),
    1411            4 :                     content: content.clone(),
    1412            4 :                 }),
    1413            4 :             );
    1414            4 : 
    1415            4 :             dir.upsert(file_path, content);
    1416            4 :             dir
    1417              :         } else {
    1418              :             // Check if the AUX_FILES_KEY is initialized
    1419            4 :             match self.get(AUX_FILES_KEY, ctx).await {
    1420            2 :                 Ok(dir_bytes) => {
    1421            2 :                     let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1422              :                     // Key is already set, we may append a delta
    1423            2 :                     self.put(
    1424            2 :                         AUX_FILES_KEY,
    1425            2 :                         Value::WalRecord(NeonWalRecord::AuxFile {
    1426            2 :                             file_path: file_path.clone(),
    1427            2 :                             content: content.clone(),
    1428            2 :                         }),
    1429            2 :                     );
    1430            2 :                     dir.upsert(file_path, content);
    1431            2 :                     dir
    1432              :                 }
    1433              :                 Err(
    1434            0 :                     e @ (PageReconstructError::AncestorStopping(_)
    1435              :                     | PageReconstructError::Cancelled
    1436              :                     | PageReconstructError::AncestorLsnTimeout(_)),
    1437              :                 ) => {
    1438              :                     // Important that we do not interpret a shutdown error as "not found" and thereby
    1439              :                     // reset the map.
    1440            0 :                     return Err(e.into());
    1441              :                 }
    1442              :                 // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
    1443              :                 // we are assuming that all _other_ possible errors represents a missing key.  If some
    1444              :                 // other error occurs, we may incorrectly reset the map of aux files.
    1445              :                 Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
    1446              :                     // Key is missing, we must insert an image as the basis for subsequent deltas.
    1447              : 
    1448            2 :                     let mut dir = AuxFilesDirectory {
    1449            2 :                         files: HashMap::new(),
    1450            2 :                     };
    1451            2 :                     dir.upsert(file_path, content);
    1452            2 :                     self.put(
    1453            2 :                         AUX_FILES_KEY,
    1454            2 :                         Value::Image(Bytes::from(
    1455            2 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1456              :                         )),
    1457              :                     );
    1458            2 :                     dir
    1459              :                 }
    1460              :             }
    1461              :         };
    1462              : 
    1463            8 :         self.pending_directory_entries
    1464            8 :             .push((DirectoryKind::AuxFiles, dir.files.len()));
    1465            8 :         self.pending_aux_files = Some(dir);
    1466            8 : 
    1467            8 :         Ok(())
    1468            8 :     }
    1469              : 
    1470              :     ///
    1471              :     /// Flush changes accumulated so far to the underlying repository.
    1472              :     ///
    1473              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1474              :     /// you to flush them to the underlying repository before the final `commit`.
    1475              :     /// That allows to free up the memory used to hold the pending changes.
    1476              :     ///
    1477              :     /// Currently only used during bulk import of a data directory. In that
    1478              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1479              :     /// whole import fails and the timeline will be deleted anyway.
    1480              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1481              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1482              :     ///
    1483              :     /// Note: A consequence of flushing the pending operations is that they
    1484              :     /// won't be visible to subsequent operations until `commit`. The function
    1485              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1486              :     /// for bulk import, where you are just loading data pages and won't try to
    1487              :     /// modify the same pages twice.
    1488         1930 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1489         1930 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1490         1930 :         // to scan through the pending_updates list.
    1491         1930 :         let pending_nblocks = self.pending_nblocks;
    1492         1930 :         if pending_nblocks < 10000 {
    1493         1930 :             return Ok(());
    1494            0 :         }
    1495              : 
    1496            0 :         let mut writer = self.tline.writer().await;
    1497              : 
    1498              :         // Flush relation and  SLRU data blocks, keep metadata.
    1499            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1500            0 :         for (key, values) in self.pending_updates.drain() {
    1501            0 :             for (lsn, value) in values {
    1502            0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1503              :                     // This bails out on first error without modifying pending_updates.
    1504              :                     // That's Ok, cf this function's doc comment.
    1505            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1506            0 :                 } else {
    1507            0 :                     retained_pending_updates
    1508            0 :                         .entry(key)
    1509            0 :                         .or_default()
    1510            0 :                         .push((lsn, value));
    1511            0 :                 }
    1512              :             }
    1513              :         }
    1514              : 
    1515            0 :         self.pending_updates = retained_pending_updates;
    1516            0 : 
    1517            0 :         if pending_nblocks != 0 {
    1518            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1519            0 :             self.pending_nblocks = 0;
    1520            0 :         }
    1521              : 
    1522            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1523            0 :             writer.update_directory_entries_count(kind, count as u64);
    1524            0 :         }
    1525              : 
    1526            0 :         Ok(())
    1527         1930 :     }
    1528              : 
    1529              :     ///
    1530              :     /// Finish this atomic update, writing all the updated keys to the
    1531              :     /// underlying timeline.
    1532              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1533              :     ///
    1534       742952 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1535       742952 :         let mut writer = self.tline.writer().await;
    1536              : 
    1537       742952 :         let pending_nblocks = self.pending_nblocks;
    1538       742952 :         self.pending_nblocks = 0;
    1539       742952 : 
    1540       742952 :         if !self.pending_updates.is_empty() {
    1541       413936 :             let prev_pending_updates = std::mem::take(&mut self.pending_updates);
    1542       413936 : 
    1543       413936 :             // The put_batch call below expects expects the inputs to be sorted by Lsn,
    1544       413936 :             // so we do that first.
    1545       413936 :             let lsn_ordered_batch: Vec<(Key, Lsn, Value)> = prev_pending_updates
    1546       413936 :                 .into_iter()
    1547       699664 :                 .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (key, lsn, val)))
    1548       413936 :                 .kmerge_by(|lhs, rhs| lhs.1 .0 < rhs.1 .0)
    1549       413936 :                 .collect();
    1550       413936 : 
    1551       413936 :             writer.put_batch(lsn_ordered_batch, ctx).await?;
    1552       413936 :             self.pending_updates.clear();
    1553       329016 :         }
    1554              : 
    1555       742952 :         if !self.pending_deletions.is_empty() {
    1556            2 :             writer.delete_batch(&self.pending_deletions).await?;
    1557            2 :             self.pending_deletions.clear();
    1558       742950 :         }
    1559              : 
    1560       742952 :         self.pending_lsns.push(self.lsn);
    1561       888808 :         for pending_lsn in self.pending_lsns.drain(..) {
    1562       888808 :             // Ideally, we should be able to call writer.finish_write() only once
    1563       888808 :             // with the highest LSN. However, the last_record_lsn variable in the
    1564       888808 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1565       888808 :             // so we need to record every LSN to not leave a gap between them.
    1566       888808 :             writer.finish_write(pending_lsn);
    1567       888808 :         }
    1568              : 
    1569       742952 :         if pending_nblocks != 0 {
    1570       270570 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1571       472382 :         }
    1572              : 
    1573       742952 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1574         2400 :             writer.update_directory_entries_count(kind, count as u64);
    1575         2400 :         }
    1576              : 
    1577       742952 :         Ok(())
    1578       742952 :     }
    1579              : 
    1580       291704 :     pub(crate) fn len(&self) -> usize {
    1581       291704 :         self.pending_updates.len() + self.pending_deletions.len()
    1582       291704 :     }
    1583              : 
    1584              :     // Internal helper functions to batch the modifications
    1585              : 
    1586       286564 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1587              :         // Have we already updated the same key? Read the latest pending updated
    1588              :         // version in that case.
    1589              :         //
    1590              :         // Note: we don't check pending_deletions. It is an error to request a
    1591              :         // value that has been removed, deletion only avoids leaking storage.
    1592       286564 :         if let Some(values) = self.pending_updates.get(&key) {
    1593        15928 :             if let Some((_, value)) = values.last() {
    1594        15928 :                 return if let Value::Image(img) = value {
    1595        15928 :                     Ok(img.clone())
    1596              :                 } else {
    1597              :                     // Currently, we never need to read back a WAL record that we
    1598              :                     // inserted in the same "transaction". All the metadata updates
    1599              :                     // work directly with Images, and we never need to read actual
    1600              :                     // data pages. We could handle this if we had to, by calling
    1601              :                     // the walredo manager, but let's keep it simple for now.
    1602            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1603            0 :                         "unexpected pending WAL record"
    1604            0 :                     )))
    1605              :                 };
    1606            0 :             }
    1607       270636 :         }
    1608       270636 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1609       270636 :         self.tline.get(key, lsn, ctx).await
    1610       286564 :     }
    1611              : 
    1612       711824 :     fn put(&mut self, key: Key, val: Value) {
    1613       711824 :         let values = self.pending_updates.entry(key).or_default();
    1614              :         // Replace the previous value if it exists at the same lsn
    1615       711824 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1616        12164 :             if *last_lsn == self.lsn {
    1617        12160 :                 *last_value = val;
    1618        12160 :                 return;
    1619            4 :             }
    1620       699660 :         }
    1621       699664 :         values.push((self.lsn, val));
    1622       711824 :     }
    1623              : 
    1624            2 :     fn delete(&mut self, key_range: Range<Key>) {
    1625            2 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1626            2 :         self.pending_deletions.push((key_range, self.lsn));
    1627            2 :     }
    1628              : }
    1629              : 
    1630              : /// This struct facilitates accessing either a committed key from the timeline at a
    1631              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1632              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1633              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1634              : /// need to look up the keys in the modification first before looking them up in the
    1635              : /// timeline to not miss the latest updates.
    1636            0 : #[derive(Clone, Copy)]
    1637              : pub enum Version<'a> {
    1638              :     Lsn(Lsn),
    1639              :     Modified(&'a DatadirModification<'a>),
    1640              : }
    1641              : 
    1642              : impl<'a> Version<'a> {
    1643        23542 :     async fn get(
    1644        23542 :         &self,
    1645        23542 :         timeline: &Timeline,
    1646        23542 :         key: Key,
    1647        23542 :         ctx: &RequestContext,
    1648        23542 :     ) -> Result<Bytes, PageReconstructError> {
    1649        23542 :         match self {
    1650        23532 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1651           10 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1652              :         }
    1653        23542 :     }
    1654              : 
    1655        30484 :     fn get_lsn(&self) -> Lsn {
    1656        30484 :         match self {
    1657        24438 :             Version::Lsn(lsn) => *lsn,
    1658         6046 :             Version::Modified(modification) => modification.lsn,
    1659              :         }
    1660        30484 :     }
    1661              : }
    1662              : 
    1663              : //--- Metadata structs stored in key-value pairs in the repository.
    1664              : 
    1665         2106 : #[derive(Debug, Serialize, Deserialize)]
    1666              : struct DbDirectory {
    1667              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1668              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1669              : }
    1670              : 
    1671          170 : #[derive(Debug, Serialize, Deserialize)]
    1672              : struct TwoPhaseDirectory {
    1673              :     xids: HashSet<TransactionId>,
    1674              : }
    1675              : 
    1676         3860 : #[derive(Debug, Serialize, Deserialize, Default)]
    1677              : struct RelDirectory {
    1678              :     // Set of relations that exist. (relfilenode, forknum)
    1679              :     //
    1680              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1681              :     // key-value pairs, if you have a lot of relations
    1682              :     rels: HashSet<(Oid, u8)>,
    1683              : }
    1684              : 
    1685          202 : #[derive(Debug, Serialize, Deserialize, Default)]
    1686              : pub(crate) struct AuxFilesDirectory {
    1687              :     pub(crate) files: HashMap<String, Bytes>,
    1688              : }
    1689              : 
    1690              : impl AuxFilesDirectory {
    1691           26 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    1692           26 :         if let Some(value) = value {
    1693           20 :             self.files.insert(key, value);
    1694           20 :         } else {
    1695            6 :             self.files.remove(&key);
    1696            6 :         }
    1697           26 :     }
    1698              : }
    1699              : 
    1700            0 : #[derive(Debug, Serialize, Deserialize)]
    1701              : struct RelSizeEntry {
    1702              :     nblocks: u32,
    1703              : }
    1704              : 
    1705          516 : #[derive(Debug, Serialize, Deserialize, Default)]
    1706              : struct SlruSegmentDirectory {
    1707              :     // Set of SLRU segments that exist.
    1708              :     segments: HashSet<u32>,
    1709              : }
    1710              : 
    1711         4800 : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    1712              : #[repr(u8)]
    1713              : pub(crate) enum DirectoryKind {
    1714              :     Db,
    1715              :     TwoPhase,
    1716              :     Rel,
    1717              :     AuxFiles,
    1718              :     SlruSegment(SlruKind),
    1719              : }
    1720              : 
    1721              : impl DirectoryKind {
    1722              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    1723         4800 :     pub(crate) fn offset(&self) -> usize {
    1724         4800 :         self.into_usize()
    1725         4800 :     }
    1726              : }
    1727              : 
    1728              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1729              : 
    1730              : #[allow(clippy::bool_assert_comparison)]
    1731              : #[cfg(test)]
    1732              : mod tests {
    1733              :     use hex_literal::hex;
    1734              :     use utils::id::TimelineId;
    1735              : 
    1736              :     use super::*;
    1737              : 
    1738              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    1739              : 
    1740              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    1741            2 :     #[tokio::test]
    1742            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    1743            2 :         let name = "aux_files_round_trip";
    1744            2 :         let harness = TenantHarness::create(name)?;
    1745            2 : 
    1746            2 :         pub const TIMELINE_ID: TimelineId =
    1747            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    1748            2 : 
    1749            2 :         let (tenant, ctx) = harness.load().await;
    1750            2 :         let tline = tenant
    1751            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    1752            3 :             .await?;
    1753            2 :         let tline = tline.raw_timeline().unwrap();
    1754            2 : 
    1755            2 :         // First modification: insert two keys
    1756            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    1757            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    1758            2 :         modification.set_lsn(Lsn(0x1008))?;
    1759            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    1760            2 :         modification.commit(&ctx).await?;
    1761            2 :         let expect_1008 = HashMap::from([
    1762            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    1763            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    1764            2 :         ]);
    1765            2 : 
    1766            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1767            2 :         assert_eq!(readback, expect_1008);
    1768            2 : 
    1769            2 :         // Second modification: update one key, remove the other
    1770            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    1771            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    1772            2 :         modification.set_lsn(Lsn(0x2008))?;
    1773            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    1774            2 :         modification.commit(&ctx).await?;
    1775            2 :         let expect_2008 =
    1776            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    1777            2 : 
    1778            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    1779            2 :         assert_eq!(readback, expect_2008);
    1780            2 : 
    1781            2 :         // Reading back in time works
    1782            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1783            2 :         assert_eq!(readback, expect_1008);
    1784            2 : 
    1785            2 :         Ok(())
    1786            2 :     }
    1787              : 
    1788              :     /*
    1789              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1790              :             let incremental = timeline.get_current_logical_size();
    1791              :             let non_incremental = timeline
    1792              :                 .get_current_logical_size_non_incremental(lsn)
    1793              :                 .unwrap();
    1794              :             assert_eq!(incremental, non_incremental);
    1795              :         }
    1796              :     */
    1797              : 
    1798              :     /*
    1799              :     ///
    1800              :     /// Test list_rels() function, with branches and dropped relations
    1801              :     ///
    1802              :     #[test]
    1803              :     fn test_list_rels_drop() -> Result<()> {
    1804              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1805              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1806              :         const TESTDB: u32 = 111;
    1807              : 
    1808              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1809              :         // after branching fails below
    1810              :         let mut writer = tline.begin_record(Lsn(0x10));
    1811              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1812              :         writer.finish()?;
    1813              : 
    1814              :         // Create a relation on the timeline
    1815              :         let mut writer = tline.begin_record(Lsn(0x20));
    1816              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1817              :         writer.finish()?;
    1818              : 
    1819              :         let writer = tline.begin_record(Lsn(0x00));
    1820              :         writer.finish()?;
    1821              : 
    1822              :         // Check that list_rels() lists it after LSN 2, but no before it
    1823              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1824              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1825              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1826              : 
    1827              :         // Create a branch, check that the relation is visible there
    1828              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1829              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1830              :             Some(timeline) => timeline,
    1831              :             None => panic!("Should have a local timeline"),
    1832              :         };
    1833              :         let newtline = DatadirTimelineImpl::new(newtline);
    1834              :         assert!(newtline
    1835              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1836              :             .contains(&TESTREL_A));
    1837              : 
    1838              :         // Drop it on the branch
    1839              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1840              :         new_writer.drop_relation(TESTREL_A)?;
    1841              :         new_writer.finish()?;
    1842              : 
    1843              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1844              :         assert!(newtline
    1845              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1846              :             .contains(&TESTREL_A));
    1847              :         assert!(!newtline
    1848              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1849              :             .contains(&TESTREL_A));
    1850              : 
    1851              :         // Run checkpoint and garbage collection and check that it's still not visible
    1852              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1853              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1854              : 
    1855              :         assert!(!newtline
    1856              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1857              :             .contains(&TESTREL_A));
    1858              : 
    1859              :         Ok(())
    1860              :     }
    1861              :      */
    1862              : 
    1863              :     /*
    1864              :     #[test]
    1865              :     fn test_read_beyond_eof() -> Result<()> {
    1866              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1867              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1868              : 
    1869              :         make_some_layers(&tline, Lsn(0x20))?;
    1870              :         let mut writer = tline.begin_record(Lsn(0x60));
    1871              :         walingest.put_rel_page_image(
    1872              :             &mut writer,
    1873              :             TESTREL_A,
    1874              :             0,
    1875              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1876              :         )?;
    1877              :         writer.finish()?;
    1878              : 
    1879              :         // Test read before rel creation. Should error out.
    1880              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1881              : 
    1882              :         // Read block beyond end of relation at different points in time.
    1883              :         // These reads should fall into different delta, image, and in-memory layers.
    1884              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1885              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1886              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1887              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1888              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1889              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1890              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1891              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1892              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1893              : 
    1894              :         // Test on an in-memory layer with no preceding layer
    1895              :         let mut writer = tline.begin_record(Lsn(0x70));
    1896              :         walingest.put_rel_page_image(
    1897              :             &mut writer,
    1898              :             TESTREL_B,
    1899              :             0,
    1900              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1901              :         )?;
    1902              :         writer.finish()?;
    1903              : 
    1904              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1905              : 
    1906              :         Ok(())
    1907              :     }
    1908              :      */
    1909              : }
        

Generated by: LCOV version 2.1-beta