LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 52.9 % 1168 618
Test Date: 2024-04-18 15:32:49 Functions: 36.5 % 178 65

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::repository::*;
      13              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      14              : use crate::walrecord::NeonWalRecord;
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use itertools::Itertools;
      19              : use pageserver_api::key::{
      20              :     dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
      21              :     rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      22              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      23              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      24              : };
      25              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      26              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      27              : use postgres_ffi::BLCKSZ;
      28              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      29              : use serde::{Deserialize, Serialize};
      30              : use std::collections::{hash_map, HashMap, HashSet};
      31              : use std::ops::ControlFlow;
      32              : use std::ops::Range;
      33              : use strum::IntoEnumIterator;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::{debug, trace, warn};
      36              : use utils::bin_ser::DeserializeError;
      37              : use utils::vec_map::{VecMap, VecMapOrdering};
      38              : use utils::{bin_ser::BeSer, lsn::Lsn};
      39              : 
      40              : const MAX_AUX_FILE_DELTAS: usize = 1024;
      41              : 
      42              : #[derive(Debug)]
      43              : pub enum LsnForTimestamp {
      44              :     /// Found commits both before and after the given timestamp
      45              :     Present(Lsn),
      46              : 
      47              :     /// Found no commits after the given timestamp, this means
      48              :     /// that the newest data in the branch is older than the given
      49              :     /// timestamp.
      50              :     ///
      51              :     /// All commits <= LSN happened before the given timestamp
      52              :     Future(Lsn),
      53              : 
      54              :     /// The queried timestamp is past our horizon we look back at (PITR)
      55              :     ///
      56              :     /// All commits > LSN happened after the given timestamp,
      57              :     /// but any commits < LSN might have happened before or after
      58              :     /// the given timestamp. We don't know because no data before
      59              :     /// the given lsn is available.
      60              :     Past(Lsn),
      61              : 
      62              :     /// We have found no commit with a timestamp,
      63              :     /// so we can't return anything meaningful.
      64              :     ///
      65              :     /// The associated LSN is the lower bound value we can safely
      66              :     /// create branches on, but no statement is made if it is
      67              :     /// older or newer than the timestamp.
      68              :     ///
      69              :     /// This variant can e.g. be returned right after a
      70              :     /// cluster import.
      71              :     NoData(Lsn),
      72              : }
      73              : 
      74            0 : #[derive(Debug, thiserror::Error)]
      75              : pub enum CalculateLogicalSizeError {
      76              :     #[error("cancelled")]
      77              :     Cancelled,
      78              :     #[error(transparent)]
      79              :     Other(#[from] anyhow::Error),
      80              : }
      81              : 
      82            0 : #[derive(Debug, thiserror::Error)]
      83              : pub(crate) enum CollectKeySpaceError {
      84              :     #[error(transparent)]
      85              :     Decode(#[from] DeserializeError),
      86              :     #[error(transparent)]
      87              :     PageRead(PageReconstructError),
      88              :     #[error("cancelled")]
      89              :     Cancelled,
      90              : }
      91              : 
      92              : impl From<PageReconstructError> for CollectKeySpaceError {
      93            0 :     fn from(err: PageReconstructError) -> Self {
      94            0 :         match err {
      95            0 :             PageReconstructError::Cancelled => Self::Cancelled,
      96            0 :             err => Self::PageRead(err),
      97              :         }
      98            0 :     }
      99              : }
     100              : 
     101              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     102            0 :     fn from(pre: PageReconstructError) -> Self {
     103            0 :         match pre {
     104              :             PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
     105            0 :                 Self::Cancelled
     106              :             }
     107            0 :             _ => Self::Other(pre.into()),
     108              :         }
     109            0 :     }
     110              : }
     111              : 
     112            0 : #[derive(Debug, thiserror::Error)]
     113              : pub enum RelationError {
     114              :     #[error("Relation Already Exists")]
     115              :     AlreadyExists,
     116              :     #[error("invalid relnode")]
     117              :     InvalidRelnode,
     118              :     #[error(transparent)]
     119              :     Other(#[from] anyhow::Error),
     120              : }
     121              : 
     122              : ///
     123              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     124              : /// and other special kinds of files, in a versioned key-value store. The
     125              : /// Timeline struct provides the key-value store.
     126              : ///
     127              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     128              : /// implementation, and might be moved into a separate struct later.
     129              : impl Timeline {
     130              :     /// Start ingesting a WAL record, or other atomic modification of
     131              :     /// the timeline.
     132              :     ///
     133              :     /// This provides a transaction-like interface to perform a bunch
     134              :     /// of modifications atomically.
     135              :     ///
     136              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     137              :     /// DatadirModification object. Use the functions in the object to
     138              :     /// modify the repository state, updating all the pages and metadata
     139              :     /// that the WAL record affects. When you're done, call commit() to
     140              :     /// commit the changes.
     141              :     ///
     142              :     /// Lsn stored in modification is advanced by `ingest_record` and
     143              :     /// is used by `commit()` to update `last_record_lsn`.
     144              :     ///
     145              :     /// Calling commit() will flush all the changes and reset the state,
     146              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     147              :     ///
     148              :     /// Note that any pending modifications you make through the
     149              :     /// modification object won't be visible to calls to the 'get' and list
     150              :     /// functions of the timeline until you finish! And if you update the
     151              :     /// same page twice, the last update wins.
     152              :     ///
     153       268292 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     154       268292 :     where
     155       268292 :         Self: Sized,
     156       268292 :     {
     157       268292 :         DatadirModification {
     158       268292 :             tline: self,
     159       268292 :             pending_lsns: Vec::new(),
     160       268292 :             pending_updates: HashMap::new(),
     161       268292 :             pending_deletions: Vec::new(),
     162       268292 :             pending_nblocks: 0,
     163       268292 :             pending_directory_entries: Vec::new(),
     164       268292 :             lsn,
     165       268292 :         }
     166       268292 :     }
     167              : 
     168              :     //------------------------------------------------------------------------------
     169              :     // Public GET functions
     170              :     //------------------------------------------------------------------------------
     171              : 
     172              :     /// Look up given page version.
     173        18384 :     pub(crate) async fn get_rel_page_at_lsn(
     174        18384 :         &self,
     175        18384 :         tag: RelTag,
     176        18384 :         blknum: BlockNumber,
     177        18384 :         version: Version<'_>,
     178        18384 :         latest: bool,
     179        18384 :         ctx: &RequestContext,
     180        18384 :     ) -> Result<Bytes, PageReconstructError> {
     181        18384 :         if tag.relnode == 0 {
     182            0 :             return Err(PageReconstructError::Other(
     183            0 :                 RelationError::InvalidRelnode.into(),
     184            0 :             ));
     185        18384 :         }
     186              : 
     187        18384 :         let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
     188        18384 :         if blknum >= nblocks {
     189            0 :             debug!(
     190            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     191            0 :                 tag,
     192            0 :                 blknum,
     193            0 :                 version.get_lsn(),
     194            0 :                 nblocks
     195            0 :             );
     196            0 :             return Ok(ZERO_PAGE.clone());
     197        18384 :         }
     198        18384 : 
     199        18384 :         let key = rel_block_to_key(tag, blknum);
     200        18384 :         version.get(self, key, ctx).await
     201        18384 :     }
     202              : 
     203              :     // Get size of a database in blocks
     204            0 :     pub(crate) async fn get_db_size(
     205            0 :         &self,
     206            0 :         spcnode: Oid,
     207            0 :         dbnode: Oid,
     208            0 :         version: Version<'_>,
     209            0 :         latest: bool,
     210            0 :         ctx: &RequestContext,
     211            0 :     ) -> Result<usize, PageReconstructError> {
     212            0 :         let mut total_blocks = 0;
     213              : 
     214            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     215              : 
     216            0 :         for rel in rels {
     217            0 :             let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
     218            0 :             total_blocks += n_blocks as usize;
     219              :         }
     220            0 :         Ok(total_blocks)
     221            0 :     }
     222              : 
     223              :     /// Get size of a relation file
     224        24434 :     pub(crate) async fn get_rel_size(
     225        24434 :         &self,
     226        24434 :         tag: RelTag,
     227        24434 :         version: Version<'_>,
     228        24434 :         latest: bool,
     229        24434 :         ctx: &RequestContext,
     230        24434 :     ) -> Result<BlockNumber, PageReconstructError> {
     231        24434 :         if tag.relnode == 0 {
     232            0 :             return Err(PageReconstructError::Other(
     233            0 :                 RelationError::InvalidRelnode.into(),
     234            0 :             ));
     235        24434 :         }
     236              : 
     237        24434 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     238        19294 :             return Ok(nblocks);
     239         5140 :         }
     240         5140 : 
     241         5140 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     242            0 :             && !self.get_rel_exists(tag, version, latest, ctx).await?
     243              :         {
     244              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     245              :             // FSM, and smgrnblocks() on it immediately afterwards,
     246              :             // without extending it.  Tolerate that by claiming that
     247              :             // any non-existent FSM fork has size 0.
     248            0 :             return Ok(0);
     249         5140 :         }
     250         5140 : 
     251         5140 :         let key = rel_size_to_key(tag);
     252         5140 :         let mut buf = version.get(self, key, ctx).await?;
     253         5136 :         let nblocks = buf.get_u32_le();
     254         5136 : 
     255         5136 :         if latest {
     256            0 :             // Update relation size cache only if "latest" flag is set.
     257            0 :             // This flag is set by compute when it is working with most recent version of relation.
     258            0 :             // Typically master compute node always set latest=true.
     259            0 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     260            0 :             // latest=true, then it can not cause cache corruption, because with latest=true
     261            0 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     262            0 :             // associated with most recent value of LSN.
     263            0 :             self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     264         5136 :         }
     265         5136 :         Ok(nblocks)
     266        24434 :     }
     267              : 
     268              :     /// Does relation exist?
     269         6050 :     pub(crate) async fn get_rel_exists(
     270         6050 :         &self,
     271         6050 :         tag: RelTag,
     272         6050 :         version: Version<'_>,
     273         6050 :         _latest: bool,
     274         6050 :         ctx: &RequestContext,
     275         6050 :     ) -> Result<bool, PageReconstructError> {
     276         6050 :         if tag.relnode == 0 {
     277            0 :             return Err(PageReconstructError::Other(
     278            0 :                 RelationError::InvalidRelnode.into(),
     279            0 :             ));
     280         6050 :         }
     281              : 
     282              :         // first try to lookup relation in cache
     283         6050 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     284         6032 :             return Ok(true);
     285           18 :         }
     286           18 :         // fetch directory listing
     287           18 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     288           18 :         let buf = version.get(self, key, ctx).await?;
     289              : 
     290           18 :         match RelDirectory::des(&buf).context("deserialization failure") {
     291           18 :             Ok(dir) => {
     292           18 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     293           18 :                 Ok(exists)
     294              :             }
     295            0 :             Err(e) => Err(PageReconstructError::from(e)),
     296              :         }
     297         6050 :     }
     298              : 
     299              :     /// Get a list of all existing relations in given tablespace and database.
     300              :     ///
     301              :     /// # Cancel-Safety
     302              :     ///
     303              :     /// This method is cancellation-safe.
     304            0 :     pub(crate) async fn list_rels(
     305            0 :         &self,
     306            0 :         spcnode: Oid,
     307            0 :         dbnode: Oid,
     308            0 :         version: Version<'_>,
     309            0 :         ctx: &RequestContext,
     310            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     311            0 :         // fetch directory listing
     312            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     313            0 :         let buf = version.get(self, key, ctx).await?;
     314              : 
     315            0 :         match RelDirectory::des(&buf).context("deserialization failure") {
     316            0 :             Ok(dir) => {
     317            0 :                 let rels: HashSet<RelTag> =
     318            0 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     319            0 :                         spcnode,
     320            0 :                         dbnode,
     321            0 :                         relnode: *relnode,
     322            0 :                         forknum: *forknum,
     323            0 :                     }));
     324            0 : 
     325            0 :                 Ok(rels)
     326              :             }
     327            0 :             Err(e) => Err(PageReconstructError::from(e)),
     328              :         }
     329            0 :     }
     330              : 
     331              :     /// Get the whole SLRU segment
     332            0 :     pub(crate) async fn get_slru_segment(
     333            0 :         &self,
     334            0 :         kind: SlruKind,
     335            0 :         segno: u32,
     336            0 :         lsn: Lsn,
     337            0 :         ctx: &RequestContext,
     338            0 :     ) -> Result<Bytes, PageReconstructError> {
     339            0 :         let n_blocks = self
     340            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     341            0 :             .await?;
     342            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     343            0 :         for blkno in 0..n_blocks {
     344            0 :             let block = self
     345            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     346            0 :                 .await?;
     347            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     348              :         }
     349            0 :         Ok(segment.freeze())
     350            0 :     }
     351              : 
     352              :     /// Look up given SLRU page version.
     353            0 :     pub(crate) async fn get_slru_page_at_lsn(
     354            0 :         &self,
     355            0 :         kind: SlruKind,
     356            0 :         segno: u32,
     357            0 :         blknum: BlockNumber,
     358            0 :         lsn: Lsn,
     359            0 :         ctx: &RequestContext,
     360            0 :     ) -> Result<Bytes, PageReconstructError> {
     361            0 :         let key = slru_block_to_key(kind, segno, blknum);
     362            0 :         self.get(key, lsn, ctx).await
     363            0 :     }
     364              : 
     365              :     /// Get size of an SLRU segment
     366            0 :     pub(crate) async fn get_slru_segment_size(
     367            0 :         &self,
     368            0 :         kind: SlruKind,
     369            0 :         segno: u32,
     370            0 :         version: Version<'_>,
     371            0 :         ctx: &RequestContext,
     372            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     373            0 :         let key = slru_segment_size_to_key(kind, segno);
     374            0 :         let mut buf = version.get(self, key, ctx).await?;
     375            0 :         Ok(buf.get_u32_le())
     376            0 :     }
     377              : 
     378              :     /// Get size of an SLRU segment
     379            0 :     pub(crate) async fn get_slru_segment_exists(
     380            0 :         &self,
     381            0 :         kind: SlruKind,
     382            0 :         segno: u32,
     383            0 :         version: Version<'_>,
     384            0 :         ctx: &RequestContext,
     385            0 :     ) -> Result<bool, PageReconstructError> {
     386            0 :         // fetch directory listing
     387            0 :         let key = slru_dir_to_key(kind);
     388            0 :         let buf = version.get(self, key, ctx).await?;
     389              : 
     390            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     391            0 :             Ok(dir) => {
     392            0 :                 let exists = dir.segments.get(&segno).is_some();
     393            0 :                 Ok(exists)
     394              :             }
     395            0 :             Err(e) => Err(PageReconstructError::from(e)),
     396              :         }
     397            0 :     }
     398              : 
     399              :     /// Locate LSN, such that all transactions that committed before
     400              :     /// 'search_timestamp' are visible, but nothing newer is.
     401              :     ///
     402              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     403              :     /// so it's not well defined which LSN you get if there were multiple commits
     404              :     /// "in flight" at that point in time.
     405              :     ///
     406            0 :     pub(crate) async fn find_lsn_for_timestamp(
     407            0 :         &self,
     408            0 :         search_timestamp: TimestampTz,
     409            0 :         cancel: &CancellationToken,
     410            0 :         ctx: &RequestContext,
     411            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     412            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     413            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     414            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     415            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     416            0 :         // on the safe side.
     417            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     418            0 :         let max_lsn = self.get_last_record_lsn();
     419            0 : 
     420            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     421            0 :         // LSN divided by 8.
     422            0 :         let mut low = min_lsn.0 / 8;
     423            0 :         let mut high = max_lsn.0 / 8 + 1;
     424            0 : 
     425            0 :         let mut found_smaller = false;
     426            0 :         let mut found_larger = false;
     427            0 :         while low < high {
     428            0 :             if cancel.is_cancelled() {
     429            0 :                 return Err(PageReconstructError::Cancelled);
     430            0 :             }
     431            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     432            0 :             let mid = (high + low) / 2;
     433              : 
     434            0 :             let cmp = self
     435            0 :                 .is_latest_commit_timestamp_ge_than(
     436            0 :                     search_timestamp,
     437            0 :                     Lsn(mid * 8),
     438            0 :                     &mut found_smaller,
     439            0 :                     &mut found_larger,
     440            0 :                     ctx,
     441            0 :                 )
     442            0 :                 .await?;
     443              : 
     444            0 :             if cmp {
     445            0 :                 high = mid;
     446            0 :             } else {
     447            0 :                 low = mid + 1;
     448            0 :             }
     449              :         }
     450              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     451              :         // so the LSN of the last commit record before or at `search_timestamp`.
     452              :         // Remove one from `low` to get `t`.
     453              :         //
     454              :         // FIXME: it would be better to get the LSN of the previous commit.
     455              :         // Otherwise, if you restore to the returned LSN, the database will
     456              :         // include physical changes from later commits that will be marked
     457              :         // as aborted, and will need to be vacuumed away.
     458            0 :         let commit_lsn = Lsn((low - 1) * 8);
     459            0 :         match (found_smaller, found_larger) {
     460              :             (false, false) => {
     461              :                 // This can happen if no commit records have been processed yet, e.g.
     462              :                 // just after importing a cluster.
     463            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     464              :             }
     465              :             (false, true) => {
     466              :                 // Didn't find any commit timestamps smaller than the request
     467            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     468              :             }
     469              :             (true, false) => {
     470              :                 // Only found commits with timestamps smaller than the request.
     471              :                 // It's still a valid case for branch creation, return it.
     472              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     473              :                 // case, anyway.
     474            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     475              :             }
     476            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     477              :         }
     478            0 :     }
     479              : 
     480              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     481              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     482              :     ///
     483              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     484              :     /// with a smaller/larger timestamp.
     485              :     ///
     486            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     487            0 :         &self,
     488            0 :         search_timestamp: TimestampTz,
     489            0 :         probe_lsn: Lsn,
     490            0 :         found_smaller: &mut bool,
     491            0 :         found_larger: &mut bool,
     492            0 :         ctx: &RequestContext,
     493            0 :     ) -> Result<bool, PageReconstructError> {
     494            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     495            0 :             if timestamp >= search_timestamp {
     496            0 :                 *found_larger = true;
     497            0 :                 return ControlFlow::Break(true);
     498            0 :             } else {
     499            0 :                 *found_smaller = true;
     500            0 :             }
     501            0 :             ControlFlow::Continue(())
     502            0 :         })
     503            0 :         .await
     504            0 :     }
     505              : 
     506              :     /// Obtain the possible timestamp range for the given lsn.
     507              :     ///
     508              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     509            0 :     pub(crate) async fn get_timestamp_for_lsn(
     510            0 :         &self,
     511            0 :         probe_lsn: Lsn,
     512            0 :         ctx: &RequestContext,
     513            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     514            0 :         let mut max: Option<TimestampTz> = None;
     515            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     516            0 :             if let Some(max_prev) = max {
     517            0 :                 max = Some(max_prev.max(timestamp));
     518            0 :             } else {
     519            0 :                 max = Some(timestamp);
     520            0 :             }
     521            0 :             ControlFlow::Continue(())
     522            0 :         })
     523            0 :         .await?;
     524              : 
     525            0 :         Ok(max)
     526            0 :     }
     527              : 
     528              :     /// Runs the given function on all the timestamps for a given lsn
     529              :     ///
     530              :     /// The return value is either given by the closure, or set to the `Default`
     531              :     /// impl's output.
     532            0 :     async fn map_all_timestamps<T: Default>(
     533            0 :         &self,
     534            0 :         probe_lsn: Lsn,
     535            0 :         ctx: &RequestContext,
     536            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     537            0 :     ) -> Result<T, PageReconstructError> {
     538            0 :         for segno in self
     539            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     540            0 :             .await?
     541              :         {
     542            0 :             let nblocks = self
     543            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     544            0 :                 .await?;
     545            0 :             for blknum in (0..nblocks).rev() {
     546            0 :                 let clog_page = self
     547            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     548            0 :                     .await?;
     549              : 
     550            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     551            0 :                     let mut timestamp_bytes = [0u8; 8];
     552            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     553            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     554            0 : 
     555            0 :                     match f(timestamp) {
     556            0 :                         ControlFlow::Break(b) => return Ok(b),
     557            0 :                         ControlFlow::Continue(()) => (),
     558              :                     }
     559            0 :                 }
     560              :             }
     561              :         }
     562            0 :         Ok(Default::default())
     563            0 :     }
     564              : 
     565            0 :     pub(crate) async fn get_slru_keyspace(
     566            0 :         &self,
     567            0 :         version: Version<'_>,
     568            0 :         ctx: &RequestContext,
     569            0 :     ) -> Result<KeySpace, PageReconstructError> {
     570            0 :         let mut accum = KeySpaceAccum::new();
     571              : 
     572            0 :         for kind in SlruKind::iter() {
     573            0 :             let mut segments: Vec<u32> = self
     574            0 :                 .list_slru_segments(kind, version, ctx)
     575            0 :                 .await?
     576            0 :                 .into_iter()
     577            0 :                 .collect();
     578            0 :             segments.sort_unstable();
     579              : 
     580            0 :             for seg in segments {
     581            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     582              : 
     583            0 :                 accum.add_range(
     584            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     585            0 :                 );
     586              :             }
     587              :         }
     588              : 
     589            0 :         Ok(accum.to_keyspace())
     590            0 :     }
     591              : 
     592              :     /// Get a list of SLRU segments
     593            0 :     pub(crate) async fn list_slru_segments(
     594            0 :         &self,
     595            0 :         kind: SlruKind,
     596            0 :         version: Version<'_>,
     597            0 :         ctx: &RequestContext,
     598            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     599            0 :         // fetch directory entry
     600            0 :         let key = slru_dir_to_key(kind);
     601              : 
     602            0 :         let buf = version.get(self, key, ctx).await?;
     603            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     604            0 :             Ok(dir) => Ok(dir.segments),
     605            0 :             Err(e) => Err(PageReconstructError::from(e)),
     606              :         }
     607            0 :     }
     608              : 
     609            0 :     pub(crate) async fn get_relmap_file(
     610            0 :         &self,
     611            0 :         spcnode: Oid,
     612            0 :         dbnode: Oid,
     613            0 :         version: Version<'_>,
     614            0 :         ctx: &RequestContext,
     615            0 :     ) -> Result<Bytes, PageReconstructError> {
     616            0 :         let key = relmap_file_key(spcnode, dbnode);
     617              : 
     618            0 :         let buf = version.get(self, key, ctx).await?;
     619            0 :         Ok(buf)
     620            0 :     }
     621              : 
     622            0 :     pub(crate) async fn list_dbdirs(
     623            0 :         &self,
     624            0 :         lsn: Lsn,
     625            0 :         ctx: &RequestContext,
     626            0 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     627              :         // fetch directory entry
     628            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     629              : 
     630            0 :         match DbDirectory::des(&buf).context("deserialization failure") {
     631            0 :             Ok(dir) => Ok(dir.dbdirs),
     632            0 :             Err(e) => Err(PageReconstructError::from(e)),
     633              :         }
     634            0 :     }
     635              : 
     636            0 :     pub(crate) async fn get_twophase_file(
     637            0 :         &self,
     638            0 :         xid: TransactionId,
     639            0 :         lsn: Lsn,
     640            0 :         ctx: &RequestContext,
     641            0 :     ) -> Result<Bytes, PageReconstructError> {
     642            0 :         let key = twophase_file_key(xid);
     643            0 :         let buf = self.get(key, lsn, ctx).await?;
     644            0 :         Ok(buf)
     645            0 :     }
     646              : 
     647            0 :     pub(crate) async fn list_twophase_files(
     648            0 :         &self,
     649            0 :         lsn: Lsn,
     650            0 :         ctx: &RequestContext,
     651            0 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     652              :         // fetch directory entry
     653            0 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     654              : 
     655            0 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     656            0 :             Ok(dir) => Ok(dir.xids),
     657            0 :             Err(e) => Err(PageReconstructError::from(e)),
     658              :         }
     659            0 :     }
     660              : 
     661            0 :     pub(crate) async fn get_control_file(
     662            0 :         &self,
     663            0 :         lsn: Lsn,
     664            0 :         ctx: &RequestContext,
     665            0 :     ) -> Result<Bytes, PageReconstructError> {
     666            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     667            0 :     }
     668              : 
     669           12 :     pub(crate) async fn get_checkpoint(
     670           12 :         &self,
     671           12 :         lsn: Lsn,
     672           12 :         ctx: &RequestContext,
     673           12 :     ) -> Result<Bytes, PageReconstructError> {
     674           12 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     675           12 :     }
     676              : 
     677            6 :     pub(crate) async fn list_aux_files(
     678            6 :         &self,
     679            6 :         lsn: Lsn,
     680            6 :         ctx: &RequestContext,
     681            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     682            6 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     683            6 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     684            6 :                 Ok(dir) => Ok(dir.files),
     685            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     686              :             },
     687            0 :             Err(e) => {
     688            0 :                 // This is expected: historical databases do not have the key.
     689            0 :                 debug!("Failed to get info about AUX files: {}", e);
     690            0 :                 Ok(HashMap::new())
     691              :             }
     692              :         }
     693            6 :     }
     694              : 
     695              :     /// Does the same as get_current_logical_size but counted on demand.
     696              :     /// Used to initialize the logical size tracking on startup.
     697              :     ///
     698              :     /// Only relation blocks are counted currently. That excludes metadata,
     699              :     /// SLRUs, twophase files etc.
     700              :     ///
     701              :     /// # Cancel-Safety
     702              :     ///
     703              :     /// This method is cancellation-safe.
     704            0 :     pub async fn get_current_logical_size_non_incremental(
     705            0 :         &self,
     706            0 :         lsn: Lsn,
     707            0 :         ctx: &RequestContext,
     708            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     709            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     710              : 
     711              :         // Fetch list of database dirs and iterate them
     712            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     713            0 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     714              : 
     715            0 :         let mut total_size: u64 = 0;
     716            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     717            0 :             for rel in self
     718            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     719            0 :                 .await?
     720              :             {
     721            0 :                 if self.cancel.is_cancelled() {
     722            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     723            0 :                 }
     724            0 :                 let relsize_key = rel_size_to_key(rel);
     725            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     726            0 :                 let relsize = buf.get_u32_le();
     727            0 : 
     728            0 :                 total_size += relsize as u64;
     729              :             }
     730              :         }
     731            0 :         Ok(total_size * BLCKSZ as u64)
     732            0 :     }
     733              : 
     734              :     ///
     735              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     736              :     /// Anything that's not listed maybe removed from the underlying storage (from
     737              :     /// that LSN forwards).
     738          196 :     pub(crate) async fn collect_keyspace(
     739          196 :         &self,
     740          196 :         lsn: Lsn,
     741          196 :         ctx: &RequestContext,
     742          196 :     ) -> Result<KeySpace, CollectKeySpaceError> {
     743          196 :         // Iterate through key ranges, greedily packing them into partitions
     744          196 :         let mut result = KeySpaceAccum::new();
     745          196 : 
     746          196 :         // The dbdir metadata always exists
     747          196 :         result.add_key(DBDIR_KEY);
     748              : 
     749              :         // Fetch list of database dirs and iterate them
     750         2253 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     751          196 :         let dbdir = DbDirectory::des(&buf)?;
     752              : 
     753          196 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     754          196 :         dbs.sort_unstable();
     755          196 :         for (spcnode, dbnode) in dbs {
     756            0 :             result.add_key(relmap_file_key(spcnode, dbnode));
     757            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     758              : 
     759            0 :             let mut rels: Vec<RelTag> = self
     760            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     761            0 :                 .await?
     762            0 :                 .into_iter()
     763            0 :                 .collect();
     764            0 :             rels.sort_unstable();
     765            0 :             for rel in rels {
     766            0 :                 let relsize_key = rel_size_to_key(rel);
     767            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     768            0 :                 let relsize = buf.get_u32_le();
     769            0 : 
     770            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     771            0 :                 result.add_key(relsize_key);
     772              :             }
     773              :         }
     774              : 
     775              :         // Iterate SLRUs next
     776          588 :         for kind in [
     777          196 :             SlruKind::Clog,
     778          196 :             SlruKind::MultiXactMembers,
     779          196 :             SlruKind::MultiXactOffsets,
     780              :         ] {
     781          588 :             let slrudir_key = slru_dir_to_key(kind);
     782          588 :             result.add_key(slrudir_key);
     783         7928 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     784          588 :             let dir = SlruSegmentDirectory::des(&buf)?;
     785          588 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     786          588 :             segments.sort_unstable();
     787          588 :             for segno in segments {
     788            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     789            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     790            0 :                 let segsize = buf.get_u32_le();
     791            0 : 
     792            0 :                 result.add_range(
     793            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     794            0 :                 );
     795            0 :                 result.add_key(segsize_key);
     796              :             }
     797              :         }
     798              : 
     799              :         // Then pg_twophase
     800          196 :         result.add_key(TWOPHASEDIR_KEY);
     801         3039 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     802          196 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     803          196 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     804          196 :         xids.sort_unstable();
     805          196 :         for xid in xids {
     806            0 :             result.add_key(twophase_file_key(xid));
     807            0 :         }
     808              : 
     809          196 :         result.add_key(CONTROLFILE_KEY);
     810          196 :         result.add_key(CHECKPOINT_KEY);
     811          196 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     812           94 :             result.add_key(AUX_FILES_KEY);
     813          102 :         }
     814          196 :         Ok(result.to_keyspace())
     815          196 :     }
     816              : 
     817              :     /// Get cached size of relation if it not updated after specified LSN
     818       448540 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     819       448540 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     820       448540 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     821       448518 :             if lsn >= *cached_lsn {
     822       443372 :                 return Some(*nblocks);
     823         5146 :             }
     824           22 :         }
     825         5168 :         None
     826       448540 :     }
     827              : 
     828              :     /// Update cached relation size if there is no more recent update
     829            0 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     830            0 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     831            0 :         match rel_size_cache.entry(tag) {
     832            0 :             hash_map::Entry::Occupied(mut entry) => {
     833            0 :                 let cached_lsn = entry.get_mut();
     834            0 :                 if lsn >= cached_lsn.0 {
     835            0 :                     *cached_lsn = (lsn, nblocks);
     836            0 :                 }
     837              :             }
     838            0 :             hash_map::Entry::Vacant(entry) => {
     839            0 :                 entry.insert((lsn, nblocks));
     840            0 :             }
     841              :         }
     842            0 :     }
     843              : 
     844              :     /// Store cached relation size
     845       288732 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     846       288732 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     847       288732 :         rel_size_cache.insert(tag, (lsn, nblocks));
     848       288732 :     }
     849              : 
     850              :     /// Remove cached relation size
     851            2 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     852            2 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     853            2 :         rel_size_cache.remove(tag);
     854            2 :     }
     855              : }
     856              : 
     857              : /// DatadirModification represents an operation to ingest an atomic set of
     858              : /// updates to the repository. It is created by the 'begin_record'
     859              : /// function. It is called for each WAL record, so that all the modifications
     860              : /// by a one WAL record appear atomic.
     861              : pub struct DatadirModification<'a> {
     862              :     /// The timeline this modification applies to. You can access this to
     863              :     /// read the state, but note that any pending updates are *not* reflected
     864              :     /// in the state in 'tline' yet.
     865              :     pub tline: &'a Timeline,
     866              : 
     867              :     /// Current LSN of the modification
     868              :     lsn: Lsn,
     869              : 
     870              :     // The modifications are not applied directly to the underlying key-value store.
     871              :     // The put-functions add the modifications here, and they are flushed to the
     872              :     // underlying key-value store by the 'finish' function.
     873              :     pending_lsns: Vec<Lsn>,
     874              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
     875              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
     876              :     pending_nblocks: i64,
     877              : 
     878              :     /// For special "directory" keys that store key-value maps, track the size of the map
     879              :     /// if it was updated in this modification.
     880              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
     881              : }
     882              : 
     883              : impl<'a> DatadirModification<'a> {
     884              :     /// Get the current lsn
     885       418056 :     pub(crate) fn get_lsn(&self) -> Lsn {
     886       418056 :         self.lsn
     887       418056 :     }
     888              : 
     889              :     /// Set the current lsn
     890       145856 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
     891       145856 :         ensure!(
     892       145856 :             lsn >= self.lsn,
     893            0 :             "setting an older lsn {} than {} is not allowed",
     894              :             lsn,
     895              :             self.lsn
     896              :         );
     897       145856 :         if lsn > self.lsn {
     898       145856 :             self.pending_lsns.push(self.lsn);
     899       145856 :             self.lsn = lsn;
     900       145856 :         }
     901       145856 :         Ok(())
     902       145856 :     }
     903              : 
     904              :     /// Initialize a completely new repository.
     905              :     ///
     906              :     /// This inserts the directory metadata entries that are assumed to
     907              :     /// always exist.
     908           96 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     909           96 :         let buf = DbDirectory::ser(&DbDirectory {
     910           96 :             dbdirs: HashMap::new(),
     911           96 :         })?;
     912           96 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
     913           96 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     914           96 : 
     915           96 :         // Create AuxFilesDirectory
     916           96 :         self.init_aux_dir()?;
     917              : 
     918           96 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     919           96 :             xids: HashSet::new(),
     920           96 :         })?;
     921           96 :         self.pending_directory_entries
     922           96 :             .push((DirectoryKind::TwoPhase, 0));
     923           96 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     924              : 
     925           96 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     926           96 :         let empty_dir = Value::Image(buf);
     927           96 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     928           96 :         self.pending_directory_entries
     929           96 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     930           96 :         self.put(
     931           96 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     932           96 :             empty_dir.clone(),
     933           96 :         );
     934           96 :         self.pending_directory_entries
     935           96 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
     936           96 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     937           96 :         self.pending_directory_entries
     938           96 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
     939           96 : 
     940           96 :         Ok(())
     941           96 :     }
     942              : 
     943              :     #[cfg(test)]
     944           94 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     945           94 :         self.init_empty()?;
     946           94 :         self.put_control_file(bytes::Bytes::from_static(
     947           94 :             b"control_file contents do not matter",
     948           94 :         ))
     949           94 :         .context("put_control_file")?;
     950           94 :         self.put_checkpoint(bytes::Bytes::from_static(
     951           94 :             b"checkpoint_file contents do not matter",
     952           94 :         ))
     953           94 :         .context("put_checkpoint_file")?;
     954           94 :         Ok(())
     955           94 :     }
     956              : 
     957              :     /// Put a new page version that can be constructed from a WAL record
     958              :     ///
     959              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     960              :     /// current end-of-file. It's up to the caller to check that the relation size
     961              :     /// matches the blocks inserted!
     962       145630 :     pub fn put_rel_wal_record(
     963       145630 :         &mut self,
     964       145630 :         rel: RelTag,
     965       145630 :         blknum: BlockNumber,
     966       145630 :         rec: NeonWalRecord,
     967       145630 :     ) -> anyhow::Result<()> {
     968       145630 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     969       145630 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     970       145630 :         Ok(())
     971       145630 :     }
     972              : 
     973              :     // Same, but for an SLRU.
     974            8 :     pub fn put_slru_wal_record(
     975            8 :         &mut self,
     976            8 :         kind: SlruKind,
     977            8 :         segno: u32,
     978            8 :         blknum: BlockNumber,
     979            8 :         rec: NeonWalRecord,
     980            8 :     ) -> anyhow::Result<()> {
     981            8 :         self.put(
     982            8 :             slru_block_to_key(kind, segno, blknum),
     983            8 :             Value::WalRecord(rec),
     984            8 :         );
     985            8 :         Ok(())
     986            8 :     }
     987              : 
     988              :     /// Like put_wal_record, but with ready-made image of the page.
     989       280864 :     pub fn put_rel_page_image(
     990       280864 :         &mut self,
     991       280864 :         rel: RelTag,
     992       280864 :         blknum: BlockNumber,
     993       280864 :         img: Bytes,
     994       280864 :     ) -> anyhow::Result<()> {
     995       280864 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     996       280864 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     997       280864 :         Ok(())
     998       280864 :     }
     999              : 
    1000            6 :     pub fn put_slru_page_image(
    1001            6 :         &mut self,
    1002            6 :         kind: SlruKind,
    1003            6 :         segno: u32,
    1004            6 :         blknum: BlockNumber,
    1005            6 :         img: Bytes,
    1006            6 :     ) -> anyhow::Result<()> {
    1007            6 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
    1008            6 :         Ok(())
    1009            6 :     }
    1010              : 
    1011              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1012           16 :     pub async fn put_relmap_file(
    1013           16 :         &mut self,
    1014           16 :         spcnode: Oid,
    1015           16 :         dbnode: Oid,
    1016           16 :         img: Bytes,
    1017           16 :         ctx: &RequestContext,
    1018           16 :     ) -> anyhow::Result<()> {
    1019              :         // Add it to the directory (if it doesn't exist already)
    1020           16 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1021           16 :         let mut dbdir = DbDirectory::des(&buf)?;
    1022              : 
    1023           16 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1024           16 :         if r.is_none() || r == Some(false) {
    1025              :             // The dbdir entry didn't exist, or it contained a
    1026              :             // 'false'. The 'insert' call already updated it with
    1027              :             // 'true', now write the updated 'dbdirs' map back.
    1028           16 :             let buf = DbDirectory::ser(&dbdir)?;
    1029           16 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1030           16 : 
    1031           16 :             // Create AuxFilesDirectory as well
    1032           16 :             self.init_aux_dir()?;
    1033            0 :         }
    1034           16 :         if r.is_none() {
    1035            8 :             // Create RelDirectory
    1036            8 :             let buf = RelDirectory::ser(&RelDirectory {
    1037            8 :                 rels: HashSet::new(),
    1038            8 :             })?;
    1039            8 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1040            8 :             self.put(
    1041            8 :                 rel_dir_to_key(spcnode, dbnode),
    1042            8 :                 Value::Image(Bytes::from(buf)),
    1043            8 :             );
    1044            8 :         }
    1045              : 
    1046           16 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1047           16 :         Ok(())
    1048           16 :     }
    1049              : 
    1050            0 :     pub async fn put_twophase_file(
    1051            0 :         &mut self,
    1052            0 :         xid: TransactionId,
    1053            0 :         img: Bytes,
    1054            0 :         ctx: &RequestContext,
    1055            0 :     ) -> anyhow::Result<()> {
    1056              :         // Add it to the directory entry
    1057            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1058            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1059            0 :         if !dir.xids.insert(xid) {
    1060            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1061            0 :         }
    1062            0 :         self.pending_directory_entries
    1063            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1064            0 :         self.put(
    1065            0 :             TWOPHASEDIR_KEY,
    1066            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1067              :         );
    1068              : 
    1069            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1070            0 :         Ok(())
    1071            0 :     }
    1072              : 
    1073           96 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1074           96 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1075           96 :         Ok(())
    1076           96 :     }
    1077              : 
    1078          110 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1079          110 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1080          110 :         Ok(())
    1081          110 :     }
    1082              : 
    1083            0 :     pub async fn drop_dbdir(
    1084            0 :         &mut self,
    1085            0 :         spcnode: Oid,
    1086            0 :         dbnode: Oid,
    1087            0 :         ctx: &RequestContext,
    1088            0 :     ) -> anyhow::Result<()> {
    1089            0 :         let total_blocks = self
    1090            0 :             .tline
    1091            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
    1092            0 :             .await?;
    1093              : 
    1094              :         // Remove entry from dbdir
    1095            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1096            0 :         let mut dir = DbDirectory::des(&buf)?;
    1097            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1098            0 :             let buf = DbDirectory::ser(&dir)?;
    1099            0 :             self.pending_directory_entries
    1100            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1101            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1102              :         } else {
    1103            0 :             warn!(
    1104            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1105            0 :                 spcnode, dbnode
    1106            0 :             );
    1107              :         }
    1108              : 
    1109              :         // Update logical database size.
    1110            0 :         self.pending_nblocks -= total_blocks as i64;
    1111            0 : 
    1112            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1113            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1114            0 :         Ok(())
    1115            0 :     }
    1116              : 
    1117              :     /// Create a relation fork.
    1118              :     ///
    1119              :     /// 'nblocks' is the initial size.
    1120         1920 :     pub async fn put_rel_creation(
    1121         1920 :         &mut self,
    1122         1920 :         rel: RelTag,
    1123         1920 :         nblocks: BlockNumber,
    1124         1920 :         ctx: &RequestContext,
    1125         1920 :     ) -> Result<(), RelationError> {
    1126         1920 :         if rel.relnode == 0 {
    1127            0 :             return Err(RelationError::InvalidRelnode);
    1128         1920 :         }
    1129              :         // It's possible that this is the first rel for this db in this
    1130              :         // tablespace.  Create the reldir entry for it if so.
    1131         1920 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1132         1920 :             .context("deserialize db")?;
    1133         1920 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1134         1920 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
    1135              :             // Didn't exist. Update dbdir
    1136            8 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
    1137            8 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1138            8 :             self.pending_directory_entries
    1139            8 :                 .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1140            8 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1141            8 : 
    1142            8 :             // and create the RelDirectory
    1143            8 :             RelDirectory::default()
    1144              :         } else {
    1145              :             // reldir already exists, fetch it
    1146         1912 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1147         1912 :                 .context("deserialize db")?
    1148              :         };
    1149              : 
    1150              :         // Add the new relation to the rel directory entry, and write it back
    1151         1920 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1152            0 :             return Err(RelationError::AlreadyExists);
    1153         1920 :         }
    1154         1920 : 
    1155         1920 :         self.pending_directory_entries
    1156         1920 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1157         1920 : 
    1158         1920 :         self.put(
    1159         1920 :             rel_dir_key,
    1160         1920 :             Value::Image(Bytes::from(
    1161         1920 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1162              :             )),
    1163              :         );
    1164              : 
    1165              :         // Put size
    1166         1920 :         let size_key = rel_size_to_key(rel);
    1167         1920 :         let buf = nblocks.to_le_bytes();
    1168         1920 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1169         1920 : 
    1170         1920 :         self.pending_nblocks += nblocks as i64;
    1171         1920 : 
    1172         1920 :         // Update relation size cache
    1173         1920 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1174         1920 : 
    1175         1920 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1176         1920 :         // caller.
    1177         1920 :         Ok(())
    1178         1920 :     }
    1179              : 
    1180              :     /// Truncate relation
    1181         6012 :     pub async fn put_rel_truncation(
    1182         6012 :         &mut self,
    1183         6012 :         rel: RelTag,
    1184         6012 :         nblocks: BlockNumber,
    1185         6012 :         ctx: &RequestContext,
    1186         6012 :     ) -> anyhow::Result<()> {
    1187         6012 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1188         6012 :         if self
    1189         6012 :             .tline
    1190         6012 :             .get_rel_exists(rel, Version::Modified(self), true, ctx)
    1191            0 :             .await?
    1192              :         {
    1193         6012 :             let size_key = rel_size_to_key(rel);
    1194              :             // Fetch the old size first
    1195         6012 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1196         6012 : 
    1197         6012 :             // Update the entry with the new size.
    1198         6012 :             let buf = nblocks.to_le_bytes();
    1199         6012 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1200         6012 : 
    1201         6012 :             // Update relation size cache
    1202         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1203         6012 : 
    1204         6012 :             // Update relation size cache
    1205         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1206         6012 : 
    1207         6012 :             // Update logical database size.
    1208         6012 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1209            0 :         }
    1210         6012 :         Ok(())
    1211         6012 :     }
    1212              : 
    1213              :     /// Extend relation
    1214              :     /// If new size is smaller, do nothing.
    1215       276680 :     pub async fn put_rel_extend(
    1216       276680 :         &mut self,
    1217       276680 :         rel: RelTag,
    1218       276680 :         nblocks: BlockNumber,
    1219       276680 :         ctx: &RequestContext,
    1220       276680 :     ) -> anyhow::Result<()> {
    1221       276680 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1222              : 
    1223              :         // Put size
    1224       276680 :         let size_key = rel_size_to_key(rel);
    1225       276680 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1226       276680 : 
    1227       276680 :         // only extend relation here. never decrease the size
    1228       276680 :         if nblocks > old_size {
    1229       274788 :             let buf = nblocks.to_le_bytes();
    1230       274788 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1231       274788 : 
    1232       274788 :             // Update relation size cache
    1233       274788 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1234       274788 : 
    1235       274788 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1236       274788 :         }
    1237       276680 :         Ok(())
    1238       276680 :     }
    1239              : 
    1240              :     /// Drop a relation.
    1241            2 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1242            2 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1243              : 
    1244              :         // Remove it from the directory entry
    1245            2 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1246            2 :         let buf = self.get(dir_key, ctx).await?;
    1247            2 :         let mut dir = RelDirectory::des(&buf)?;
    1248              : 
    1249            2 :         self.pending_directory_entries
    1250            2 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1251            2 : 
    1252            2 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1253            2 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1254              :         } else {
    1255            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1256              :         }
    1257              : 
    1258              :         // update logical size
    1259            2 :         let size_key = rel_size_to_key(rel);
    1260            2 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1261            2 :         self.pending_nblocks -= old_size as i64;
    1262            2 : 
    1263            2 :         // Remove enty from relation size cache
    1264            2 :         self.tline.remove_cached_rel_size(&rel);
    1265            2 : 
    1266            2 :         // Delete size entry, as well as all blocks
    1267            2 :         self.delete(rel_key_range(rel));
    1268            2 : 
    1269            2 :         Ok(())
    1270            2 :     }
    1271              : 
    1272            6 :     pub async fn put_slru_segment_creation(
    1273            6 :         &mut self,
    1274            6 :         kind: SlruKind,
    1275            6 :         segno: u32,
    1276            6 :         nblocks: BlockNumber,
    1277            6 :         ctx: &RequestContext,
    1278            6 :     ) -> anyhow::Result<()> {
    1279            6 :         // Add it to the directory entry
    1280            6 :         let dir_key = slru_dir_to_key(kind);
    1281            6 :         let buf = self.get(dir_key, ctx).await?;
    1282            6 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1283              : 
    1284            6 :         if !dir.segments.insert(segno) {
    1285            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1286            6 :         }
    1287            6 :         self.pending_directory_entries
    1288            6 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1289            6 :         self.put(
    1290            6 :             dir_key,
    1291            6 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1292              :         );
    1293              : 
    1294              :         // Put size
    1295            6 :         let size_key = slru_segment_size_to_key(kind, segno);
    1296            6 :         let buf = nblocks.to_le_bytes();
    1297            6 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1298            6 : 
    1299            6 :         // even if nblocks > 0, we don't insert any actual blocks here
    1300            6 : 
    1301            6 :         Ok(())
    1302            6 :     }
    1303              : 
    1304              :     /// Extend SLRU segment
    1305            0 :     pub fn put_slru_extend(
    1306            0 :         &mut self,
    1307            0 :         kind: SlruKind,
    1308            0 :         segno: u32,
    1309            0 :         nblocks: BlockNumber,
    1310            0 :     ) -> anyhow::Result<()> {
    1311            0 :         // Put size
    1312            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1313            0 :         let buf = nblocks.to_le_bytes();
    1314            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1315            0 :         Ok(())
    1316            0 :     }
    1317              : 
    1318              :     /// This method is used for marking truncated SLRU files
    1319            0 :     pub async fn drop_slru_segment(
    1320            0 :         &mut self,
    1321            0 :         kind: SlruKind,
    1322            0 :         segno: u32,
    1323            0 :         ctx: &RequestContext,
    1324            0 :     ) -> anyhow::Result<()> {
    1325            0 :         // Remove it from the directory entry
    1326            0 :         let dir_key = slru_dir_to_key(kind);
    1327            0 :         let buf = self.get(dir_key, ctx).await?;
    1328            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1329              : 
    1330            0 :         if !dir.segments.remove(&segno) {
    1331            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1332            0 :         }
    1333            0 :         self.pending_directory_entries
    1334            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1335            0 :         self.put(
    1336            0 :             dir_key,
    1337            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1338              :         );
    1339              : 
    1340              :         // Delete size entry, as well as all blocks
    1341            0 :         self.delete(slru_segment_key_range(kind, segno));
    1342            0 : 
    1343            0 :         Ok(())
    1344            0 :     }
    1345              : 
    1346              :     /// Drop a relmapper file (pg_filenode.map)
    1347            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1348            0 :         // TODO
    1349            0 :         Ok(())
    1350            0 :     }
    1351              : 
    1352              :     /// This method is used for marking truncated SLRU files
    1353            0 :     pub async fn drop_twophase_file(
    1354            0 :         &mut self,
    1355            0 :         xid: TransactionId,
    1356            0 :         ctx: &RequestContext,
    1357            0 :     ) -> anyhow::Result<()> {
    1358              :         // Remove it from the directory entry
    1359            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1360            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1361              : 
    1362            0 :         if !dir.xids.remove(&xid) {
    1363            0 :             warn!("twophase file for xid {} does not exist", xid);
    1364            0 :         }
    1365            0 :         self.pending_directory_entries
    1366            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1367            0 :         self.put(
    1368            0 :             TWOPHASEDIR_KEY,
    1369            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1370              :         );
    1371              : 
    1372              :         // Delete it
    1373            0 :         self.delete(twophase_key_range(xid));
    1374            0 : 
    1375            0 :         Ok(())
    1376            0 :     }
    1377              : 
    1378          112 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1379          112 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1380          112 :             files: HashMap::new(),
    1381          112 :         })?;
    1382          112 :         self.pending_directory_entries
    1383          112 :             .push((DirectoryKind::AuxFiles, 0));
    1384          112 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1385          112 :         Ok(())
    1386          112 :     }
    1387              : 
    1388            8 :     pub async fn put_file(
    1389            8 :         &mut self,
    1390            8 :         path: &str,
    1391            8 :         content: &[u8],
    1392            8 :         ctx: &RequestContext,
    1393            8 :     ) -> anyhow::Result<()> {
    1394            8 :         let file_path = path.to_string();
    1395            8 :         let content = if content.is_empty() {
    1396            2 :             None
    1397              :         } else {
    1398            6 :             Some(Bytes::copy_from_slice(content))
    1399              :         };
    1400              : 
    1401              :         let n_files;
    1402            8 :         let mut aux_files = self.tline.aux_files.lock().await;
    1403            8 :         if let Some(mut dir) = aux_files.dir.take() {
    1404              :             // We already updated aux files in `self`: emit a delta and update our latest value
    1405            6 :             dir.upsert(file_path.clone(), content.clone());
    1406            6 :             n_files = dir.files.len();
    1407            6 :             if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
    1408            0 :                 self.put(
    1409            0 :                     AUX_FILES_KEY,
    1410            0 :                     Value::Image(Bytes::from(
    1411            0 :                         AuxFilesDirectory::ser(&dir).context("serialize")?,
    1412              :                     )),
    1413              :                 );
    1414            0 :                 aux_files.n_deltas = 0;
    1415            6 :             } else {
    1416            6 :                 self.put(
    1417            6 :                     AUX_FILES_KEY,
    1418            6 :                     Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
    1419            6 :                 );
    1420            6 :                 aux_files.n_deltas += 1;
    1421            6 :             }
    1422            6 :             aux_files.dir = Some(dir);
    1423              :         } else {
    1424              :             // Check if the AUX_FILES_KEY is initialized
    1425            2 :             match self.get(AUX_FILES_KEY, ctx).await {
    1426            0 :                 Ok(dir_bytes) => {
    1427            0 :                     let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1428              :                     // Key is already set, we may append a delta
    1429            0 :                     self.put(
    1430            0 :                         AUX_FILES_KEY,
    1431            0 :                         Value::WalRecord(NeonWalRecord::AuxFile {
    1432            0 :                             file_path: file_path.clone(),
    1433            0 :                             content: content.clone(),
    1434            0 :                         }),
    1435            0 :                     );
    1436            0 :                     dir.upsert(file_path, content);
    1437            0 :                     n_files = dir.files.len();
    1438            0 :                     aux_files.dir = Some(dir);
    1439              :                 }
    1440              :                 Err(
    1441            0 :                     e @ (PageReconstructError::AncestorStopping(_)
    1442              :                     | PageReconstructError::Cancelled
    1443              :                     | PageReconstructError::AncestorLsnTimeout(_)),
    1444              :                 ) => {
    1445              :                     // Important that we do not interpret a shutdown error as "not found" and thereby
    1446              :                     // reset the map.
    1447            0 :                     return Err(e.into());
    1448              :                 }
    1449              :                 // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
    1450              :                 // we are assuming that all _other_ possible errors represents a missing key.  If some
    1451              :                 // other error occurs, we may incorrectly reset the map of aux files.
    1452              :                 Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
    1453              :                     // Key is missing, we must insert an image as the basis for subsequent deltas.
    1454              : 
    1455            2 :                     let mut dir = AuxFilesDirectory {
    1456            2 :                         files: HashMap::new(),
    1457            2 :                     };
    1458            2 :                     dir.upsert(file_path, content);
    1459            2 :                     self.put(
    1460            2 :                         AUX_FILES_KEY,
    1461            2 :                         Value::Image(Bytes::from(
    1462            2 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1463              :                         )),
    1464              :                     );
    1465            2 :                     n_files = 1;
    1466            2 :                     aux_files.dir = Some(dir);
    1467              :                 }
    1468              :             }
    1469              :         }
    1470              : 
    1471            8 :         self.pending_directory_entries
    1472            8 :             .push((DirectoryKind::AuxFiles, n_files));
    1473            8 : 
    1474            8 :         Ok(())
    1475            8 :     }
    1476              : 
    1477              :     ///
    1478              :     /// Flush changes accumulated so far to the underlying repository.
    1479              :     ///
    1480              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1481              :     /// you to flush them to the underlying repository before the final `commit`.
    1482              :     /// That allows to free up the memory used to hold the pending changes.
    1483              :     ///
    1484              :     /// Currently only used during bulk import of a data directory. In that
    1485              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1486              :     /// whole import fails and the timeline will be deleted anyway.
    1487              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1488              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1489              :     ///
    1490              :     /// Note: A consequence of flushing the pending operations is that they
    1491              :     /// won't be visible to subsequent operations until `commit`. The function
    1492              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1493              :     /// for bulk import, where you are just loading data pages and won't try to
    1494              :     /// modify the same pages twice.
    1495         1930 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1496         1930 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1497         1930 :         // to scan through the pending_updates list.
    1498         1930 :         let pending_nblocks = self.pending_nblocks;
    1499         1930 :         if pending_nblocks < 10000 {
    1500         1930 :             return Ok(());
    1501            0 :         }
    1502              : 
    1503            0 :         let mut writer = self.tline.writer().await;
    1504              : 
    1505              :         // Flush relation and  SLRU data blocks, keep metadata.
    1506            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1507            0 :         for (key, values) in self.pending_updates.drain() {
    1508            0 :             for (lsn, value) in values {
    1509            0 :                 if is_rel_block_key(&key) || is_slru_block_key(key) {
    1510              :                     // This bails out on first error without modifying pending_updates.
    1511              :                     // That's Ok, cf this function's doc comment.
    1512            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1513            0 :                 } else {
    1514            0 :                     retained_pending_updates
    1515            0 :                         .entry(key)
    1516            0 :                         .or_default()
    1517            0 :                         .push((lsn, value));
    1518            0 :                 }
    1519              :             }
    1520              :         }
    1521              : 
    1522            0 :         self.pending_updates = retained_pending_updates;
    1523            0 : 
    1524            0 :         if pending_nblocks != 0 {
    1525            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1526            0 :             self.pending_nblocks = 0;
    1527            0 :         }
    1528              : 
    1529            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1530            0 :             writer.update_directory_entries_count(kind, count as u64);
    1531            0 :         }
    1532              : 
    1533            0 :         Ok(())
    1534         1930 :     }
    1535              : 
    1536              :     ///
    1537              :     /// Finish this atomic update, writing all the updated keys to the
    1538              :     /// underlying timeline.
    1539              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1540              :     ///
    1541       742976 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1542       742976 :         let mut writer = self.tline.writer().await;
    1543              : 
    1544       742976 :         let pending_nblocks = self.pending_nblocks;
    1545       742976 :         self.pending_nblocks = 0;
    1546       742976 : 
    1547       742976 :         if !self.pending_updates.is_empty() {
    1548              :             // The put_batch call below expects expects the inputs to be sorted by Lsn,
    1549              :             // so we do that first.
    1550       413960 :             let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
    1551       413960 :                 self.pending_updates
    1552       413960 :                     .drain()
    1553       699856 :                     .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
    1554       413960 :                     .kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
    1555       413960 :                 VecMapOrdering::GreaterOrEqual,
    1556       413960 :             );
    1557       413960 : 
    1558       413960 :             writer.put_batch(lsn_ordered_batch, ctx).await?;
    1559       329016 :         }
    1560              : 
    1561       742976 :         if !self.pending_deletions.is_empty() {
    1562            2 :             writer.delete_batch(&self.pending_deletions).await?;
    1563            2 :             self.pending_deletions.clear();
    1564       742974 :         }
    1565              : 
    1566       742976 :         self.pending_lsns.push(self.lsn);
    1567       888832 :         for pending_lsn in self.pending_lsns.drain(..) {
    1568       888832 :             // Ideally, we should be able to call writer.finish_write() only once
    1569       888832 :             // with the highest LSN. However, the last_record_lsn variable in the
    1570       888832 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1571       888832 :             // so we need to record every LSN to not leave a gap between them.
    1572       888832 :             writer.finish_write(pending_lsn);
    1573       888832 :         }
    1574              : 
    1575       742976 :         if pending_nblocks != 0 {
    1576       270570 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1577       472406 :         }
    1578              : 
    1579       742976 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1580         2544 :             writer.update_directory_entries_count(kind, count as u64);
    1581         2544 :         }
    1582              : 
    1583       742976 :         Ok(())
    1584       742976 :     }
    1585              : 
    1586       291704 :     pub(crate) fn len(&self) -> usize {
    1587       291704 :         self.pending_updates.len() + self.pending_deletions.len()
    1588       291704 :     }
    1589              : 
    1590              :     // Internal helper functions to batch the modifications
    1591              : 
    1592       286562 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1593              :         // Have we already updated the same key? Read the latest pending updated
    1594              :         // version in that case.
    1595              :         //
    1596              :         // Note: we don't check pending_deletions. It is an error to request a
    1597              :         // value that has been removed, deletion only avoids leaking storage.
    1598       286562 :         if let Some(values) = self.pending_updates.get(&key) {
    1599        15928 :             if let Some((_, value)) = values.last() {
    1600        15928 :                 return if let Value::Image(img) = value {
    1601        15928 :                     Ok(img.clone())
    1602              :                 } else {
    1603              :                     // Currently, we never need to read back a WAL record that we
    1604              :                     // inserted in the same "transaction". All the metadata updates
    1605              :                     // work directly with Images, and we never need to read actual
    1606              :                     // data pages. We could handle this if we had to, by calling
    1607              :                     // the walredo manager, but let's keep it simple for now.
    1608            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1609            0 :                         "unexpected pending WAL record"
    1610            0 :                     )))
    1611              :                 };
    1612            0 :             }
    1613       270634 :         }
    1614       270634 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1615       270634 :         self.tline.get(key, lsn, ctx).await
    1616       286562 :     }
    1617              : 
    1618       712016 :     fn put(&mut self, key: Key, val: Value) {
    1619       712016 :         let values = self.pending_updates.entry(key).or_default();
    1620              :         // Replace the previous value if it exists at the same lsn
    1621       712016 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1622        12164 :             if *last_lsn == self.lsn {
    1623        12160 :                 *last_value = val;
    1624        12160 :                 return;
    1625            4 :             }
    1626       699852 :         }
    1627       699856 :         values.push((self.lsn, val));
    1628       712016 :     }
    1629              : 
    1630            2 :     fn delete(&mut self, key_range: Range<Key>) {
    1631            2 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1632            2 :         self.pending_deletions.push((key_range, self.lsn));
    1633            2 :     }
    1634              : }
    1635              : 
    1636              : /// This struct facilitates accessing either a committed key from the timeline at a
    1637              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1638              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1639              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1640              : /// need to look up the keys in the modification first before looking them up in the
    1641              : /// timeline to not miss the latest updates.
    1642              : #[derive(Clone, Copy)]
    1643              : pub enum Version<'a> {
    1644              :     Lsn(Lsn),
    1645              :     Modified(&'a DatadirModification<'a>),
    1646              : }
    1647              : 
    1648              : impl<'a> Version<'a> {
    1649        23542 :     async fn get(
    1650        23542 :         &self,
    1651        23542 :         timeline: &Timeline,
    1652        23542 :         key: Key,
    1653        23542 :         ctx: &RequestContext,
    1654        23542 :     ) -> Result<Bytes, PageReconstructError> {
    1655        23542 :         match self {
    1656        23532 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1657           10 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1658              :         }
    1659        23542 :     }
    1660              : 
    1661        30484 :     fn get_lsn(&self) -> Lsn {
    1662        30484 :         match self {
    1663        24438 :             Version::Lsn(lsn) => *lsn,
    1664         6046 :             Version::Modified(modification) => modification.lsn,
    1665              :         }
    1666        30484 :     }
    1667              : }
    1668              : 
    1669              : //--- Metadata structs stored in key-value pairs in the repository.
    1670              : 
    1671         2132 : #[derive(Debug, Serialize, Deserialize)]
    1672              : struct DbDirectory {
    1673              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1674              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1675              : }
    1676              : 
    1677          196 : #[derive(Debug, Serialize, Deserialize)]
    1678              : struct TwoPhaseDirectory {
    1679              :     xids: HashSet<TransactionId>,
    1680              : }
    1681              : 
    1682         1932 : #[derive(Debug, Serialize, Deserialize, Default)]
    1683              : struct RelDirectory {
    1684              :     // Set of relations that exist. (relfilenode, forknum)
    1685              :     //
    1686              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1687              :     // key-value pairs, if you have a lot of relations
    1688              :     rels: HashSet<(Oid, u8)>,
    1689              : }
    1690              : 
    1691           24 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
    1692              : pub(crate) struct AuxFilesDirectory {
    1693              :     pub(crate) files: HashMap<String, Bytes>,
    1694              : }
    1695              : 
    1696              : impl AuxFilesDirectory {
    1697           24 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    1698           24 :         if let Some(value) = value {
    1699           18 :             self.files.insert(key, value);
    1700           18 :         } else {
    1701            6 :             self.files.remove(&key);
    1702            6 :         }
    1703           24 :     }
    1704              : }
    1705              : 
    1706            0 : #[derive(Debug, Serialize, Deserialize)]
    1707              : struct RelSizeEntry {
    1708              :     nblocks: u32,
    1709              : }
    1710              : 
    1711          594 : #[derive(Debug, Serialize, Deserialize, Default)]
    1712              : struct SlruSegmentDirectory {
    1713              :     // Set of SLRU segments that exist.
    1714              :     segments: HashSet<u32>,
    1715              : }
    1716              : 
    1717              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    1718              : #[repr(u8)]
    1719              : pub(crate) enum DirectoryKind {
    1720              :     Db,
    1721              :     TwoPhase,
    1722              :     Rel,
    1723              :     AuxFiles,
    1724              :     SlruSegment(SlruKind),
    1725              : }
    1726              : 
    1727              : impl DirectoryKind {
    1728              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    1729         5088 :     pub(crate) fn offset(&self) -> usize {
    1730         5088 :         self.into_usize()
    1731         5088 :     }
    1732              : }
    1733              : 
    1734              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1735              : 
    1736              : #[allow(clippy::bool_assert_comparison)]
    1737              : #[cfg(test)]
    1738              : mod tests {
    1739              :     use hex_literal::hex;
    1740              :     use utils::id::TimelineId;
    1741              : 
    1742              :     use super::*;
    1743              : 
    1744              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    1745              : 
    1746              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    1747              :     #[tokio::test]
    1748            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    1749            2 :         let name = "aux_files_round_trip";
    1750            2 :         let harness = TenantHarness::create(name)?;
    1751            2 : 
    1752            2 :         pub const TIMELINE_ID: TimelineId =
    1753            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    1754            2 : 
    1755            2 :         let (tenant, ctx) = harness.load().await;
    1756            2 :         let tline = tenant
    1757            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    1758            2 :             .await?;
    1759            2 :         let tline = tline.raw_timeline().unwrap();
    1760            2 : 
    1761            2 :         // First modification: insert two keys
    1762            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    1763            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    1764            2 :         modification.set_lsn(Lsn(0x1008))?;
    1765            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    1766            2 :         modification.commit(&ctx).await?;
    1767            2 :         let expect_1008 = HashMap::from([
    1768            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    1769            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    1770            2 :         ]);
    1771            2 : 
    1772            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1773            2 :         assert_eq!(readback, expect_1008);
    1774            2 : 
    1775            2 :         // Second modification: update one key, remove the other
    1776            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    1777            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    1778            2 :         modification.set_lsn(Lsn(0x2008))?;
    1779            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    1780            2 :         modification.commit(&ctx).await?;
    1781            2 :         let expect_2008 =
    1782            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    1783            2 : 
    1784            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    1785            2 :         assert_eq!(readback, expect_2008);
    1786            2 : 
    1787            2 :         // Reading back in time works
    1788            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    1789            2 :         assert_eq!(readback, expect_1008);
    1790            2 : 
    1791            2 :         Ok(())
    1792            2 :     }
    1793              : 
    1794              :     /*
    1795              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1796              :             let incremental = timeline.get_current_logical_size();
    1797              :             let non_incremental = timeline
    1798              :                 .get_current_logical_size_non_incremental(lsn)
    1799              :                 .unwrap();
    1800              :             assert_eq!(incremental, non_incremental);
    1801              :         }
    1802              :     */
    1803              : 
    1804              :     /*
    1805              :     ///
    1806              :     /// Test list_rels() function, with branches and dropped relations
    1807              :     ///
    1808              :     #[test]
    1809              :     fn test_list_rels_drop() -> Result<()> {
    1810              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1811              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1812              :         const TESTDB: u32 = 111;
    1813              : 
    1814              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1815              :         // after branching fails below
    1816              :         let mut writer = tline.begin_record(Lsn(0x10));
    1817              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1818              :         writer.finish()?;
    1819              : 
    1820              :         // Create a relation on the timeline
    1821              :         let mut writer = tline.begin_record(Lsn(0x20));
    1822              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1823              :         writer.finish()?;
    1824              : 
    1825              :         let writer = tline.begin_record(Lsn(0x00));
    1826              :         writer.finish()?;
    1827              : 
    1828              :         // Check that list_rels() lists it after LSN 2, but no before it
    1829              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1830              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1831              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1832              : 
    1833              :         // Create a branch, check that the relation is visible there
    1834              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1835              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1836              :             Some(timeline) => timeline,
    1837              :             None => panic!("Should have a local timeline"),
    1838              :         };
    1839              :         let newtline = DatadirTimelineImpl::new(newtline);
    1840              :         assert!(newtline
    1841              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1842              :             .contains(&TESTREL_A));
    1843              : 
    1844              :         // Drop it on the branch
    1845              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1846              :         new_writer.drop_relation(TESTREL_A)?;
    1847              :         new_writer.finish()?;
    1848              : 
    1849              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1850              :         assert!(newtline
    1851              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1852              :             .contains(&TESTREL_A));
    1853              :         assert!(!newtline
    1854              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1855              :             .contains(&TESTREL_A));
    1856              : 
    1857              :         // Run checkpoint and garbage collection and check that it's still not visible
    1858              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1859              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1860              : 
    1861              :         assert!(!newtline
    1862              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1863              :             .contains(&TESTREL_A));
    1864              : 
    1865              :         Ok(())
    1866              :     }
    1867              :      */
    1868              : 
    1869              :     /*
    1870              :     #[test]
    1871              :     fn test_read_beyond_eof() -> Result<()> {
    1872              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1873              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1874              : 
    1875              :         make_some_layers(&tline, Lsn(0x20))?;
    1876              :         let mut writer = tline.begin_record(Lsn(0x60));
    1877              :         walingest.put_rel_page_image(
    1878              :             &mut writer,
    1879              :             TESTREL_A,
    1880              :             0,
    1881              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1882              :         )?;
    1883              :         writer.finish()?;
    1884              : 
    1885              :         // Test read before rel creation. Should error out.
    1886              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1887              : 
    1888              :         // Read block beyond end of relation at different points in time.
    1889              :         // These reads should fall into different delta, image, and in-memory layers.
    1890              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1891              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1892              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1893              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1894              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1895              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1896              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1897              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1898              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1899              : 
    1900              :         // Test on an in-memory layer with no preceding layer
    1901              :         let mut writer = tline.begin_record(Lsn(0x70));
    1902              :         walingest.put_rel_page_image(
    1903              :             &mut writer,
    1904              :             TESTREL_B,
    1905              :             0,
    1906              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1907              :         )?;
    1908              :         writer.finish()?;
    1909              : 
    1910              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1911              : 
    1912              :         Ok(())
    1913              :     }
    1914              :      */
    1915              : }
        

Generated by: LCOV version 2.1-beta