LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 56.9 % 1366 777
Test Date: 2024-08-02 21:34:27 Functions: 40.9 % 186 76

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      13              : use crate::walrecord::NeonWalRecord;
      14              : use crate::{aux_file, repository::*};
      15              : use anyhow::{ensure, Context};
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use itertools::Itertools;
      19              : use pageserver_api::key::{
      20              :     dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
      21              :     relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      22              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      23              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      24              : };
      25              : use pageserver_api::keyspace::SparseKeySpace;
      26              : use pageserver_api::models::AuxFilePolicy;
      27              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      28              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      29              : use postgres_ffi::BLCKSZ;
      30              : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
      31              : use serde::{Deserialize, Serialize};
      32              : use std::collections::{hash_map, HashMap, HashSet};
      33              : use std::ops::ControlFlow;
      34              : use std::ops::Range;
      35              : use strum::IntoEnumIterator;
      36              : use tokio_util::sync::CancellationToken;
      37              : use tracing::{debug, info, trace, warn};
      38              : use utils::bin_ser::DeserializeError;
      39              : use utils::pausable_failpoint;
      40              : use utils::vec_map::{VecMap, VecMapOrdering};
      41              : use utils::{bin_ser::BeSer, lsn::Lsn};
      42              : 
      43              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      44              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      45              : 
      46              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      47              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
      48              : 
      49              : #[derive(Debug)]
      50              : pub enum LsnForTimestamp {
      51              :     /// Found commits both before and after the given timestamp
      52              :     Present(Lsn),
      53              : 
      54              :     /// Found no commits after the given timestamp, this means
      55              :     /// that the newest data in the branch is older than the given
      56              :     /// timestamp.
      57              :     ///
      58              :     /// All commits <= LSN happened before the given timestamp
      59              :     Future(Lsn),
      60              : 
      61              :     /// The queried timestamp is past our horizon we look back at (PITR)
      62              :     ///
      63              :     /// All commits > LSN happened after the given timestamp,
      64              :     /// but any commits < LSN might have happened before or after
      65              :     /// the given timestamp. We don't know because no data before
      66              :     /// the given lsn is available.
      67              :     Past(Lsn),
      68              : 
      69              :     /// We have found no commit with a timestamp,
      70              :     /// so we can't return anything meaningful.
      71              :     ///
      72              :     /// The associated LSN is the lower bound value we can safely
      73              :     /// create branches on, but no statement is made if it is
      74              :     /// older or newer than the timestamp.
      75              :     ///
      76              :     /// This variant can e.g. be returned right after a
      77              :     /// cluster import.
      78              :     NoData(Lsn),
      79              : }
      80              : 
      81            0 : #[derive(Debug, thiserror::Error)]
      82              : pub(crate) enum CalculateLogicalSizeError {
      83              :     #[error("cancelled")]
      84              :     Cancelled,
      85              : 
      86              :     /// Something went wrong while reading the metadata we use to calculate logical size
      87              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
      88              :     /// in the `From` implementation for this variant.
      89              :     #[error(transparent)]
      90              :     PageRead(PageReconstructError),
      91              : 
      92              :     /// Something went wrong deserializing metadata that we read to calculate logical size
      93              :     #[error("decode error: {0}")]
      94              :     Decode(#[from] DeserializeError),
      95              : }
      96              : 
      97            0 : #[derive(Debug, thiserror::Error)]
      98              : pub(crate) enum CollectKeySpaceError {
      99              :     #[error(transparent)]
     100              :     Decode(#[from] DeserializeError),
     101              :     #[error(transparent)]
     102              :     PageRead(PageReconstructError),
     103              :     #[error("cancelled")]
     104              :     Cancelled,
     105              : }
     106              : 
     107              : impl From<PageReconstructError> for CollectKeySpaceError {
     108            0 :     fn from(err: PageReconstructError) -> Self {
     109            0 :         match err {
     110            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     111            0 :             err => Self::PageRead(err),
     112              :         }
     113            0 :     }
     114              : }
     115              : 
     116              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     117            0 :     fn from(pre: PageReconstructError) -> Self {
     118            0 :         match pre {
     119            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     120            0 :             _ => Self::PageRead(pre),
     121              :         }
     122            0 :     }
     123              : }
     124              : 
     125            0 : #[derive(Debug, thiserror::Error)]
     126              : pub enum RelationError {
     127              :     #[error("Relation Already Exists")]
     128              :     AlreadyExists,
     129              :     #[error("invalid relnode")]
     130              :     InvalidRelnode,
     131              :     #[error(transparent)]
     132              :     Other(#[from] anyhow::Error),
     133              : }
     134              : 
     135              : ///
     136              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     137              : /// and other special kinds of files, in a versioned key-value store. The
     138              : /// Timeline struct provides the key-value store.
     139              : ///
     140              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     141              : /// implementation, and might be moved into a separate struct later.
     142              : impl Timeline {
     143              :     /// Start ingesting a WAL record, or other atomic modification of
     144              :     /// the timeline.
     145              :     ///
     146              :     /// This provides a transaction-like interface to perform a bunch
     147              :     /// of modifications atomically.
     148              :     ///
     149              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     150              :     /// DatadirModification object. Use the functions in the object to
     151              :     /// modify the repository state, updating all the pages and metadata
     152              :     /// that the WAL record affects. When you're done, call commit() to
     153              :     /// commit the changes.
     154              :     ///
     155              :     /// Lsn stored in modification is advanced by `ingest_record` and
     156              :     /// is used by `commit()` to update `last_record_lsn`.
     157              :     ///
     158              :     /// Calling commit() will flush all the changes and reset the state,
     159              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     160              :     ///
     161              :     /// Note that any pending modifications you make through the
     162              :     /// modification object won't be visible to calls to the 'get' and list
     163              :     /// functions of the timeline until you finish! And if you update the
     164              :     /// same page twice, the last update wins.
     165              :     ///
     166       268378 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     167       268378 :     where
     168       268378 :         Self: Sized,
     169       268378 :     {
     170       268378 :         DatadirModification {
     171       268378 :             tline: self,
     172       268378 :             pending_lsns: Vec::new(),
     173       268378 :             pending_updates: HashMap::new(),
     174       268378 :             pending_deletions: Vec::new(),
     175       268378 :             pending_nblocks: 0,
     176       268378 :             pending_directory_entries: Vec::new(),
     177       268378 :             lsn,
     178       268378 :         }
     179       268378 :     }
     180              : 
     181              :     //------------------------------------------------------------------------------
     182              :     // Public GET functions
     183              :     //------------------------------------------------------------------------------
     184              : 
     185              :     /// Look up given page version.
     186        18384 :     pub(crate) async fn get_rel_page_at_lsn(
     187        18384 :         &self,
     188        18384 :         tag: RelTag,
     189        18384 :         blknum: BlockNumber,
     190        18384 :         version: Version<'_>,
     191        18384 :         ctx: &RequestContext,
     192        18384 :     ) -> Result<Bytes, PageReconstructError> {
     193        18384 :         if tag.relnode == 0 {
     194            0 :             return Err(PageReconstructError::Other(
     195            0 :                 RelationError::InvalidRelnode.into(),
     196            0 :             ));
     197        18384 :         }
     198              : 
     199        18384 :         let nblocks = self.get_rel_size(tag, version, ctx).await?;
     200        18384 :         if blknum >= nblocks {
     201            0 :             debug!(
     202            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     203            0 :                 tag,
     204            0 :                 blknum,
     205            0 :                 version.get_lsn(),
     206              :                 nblocks
     207              :             );
     208            0 :             return Ok(ZERO_PAGE.clone());
     209        18384 :         }
     210        18384 : 
     211        18384 :         let key = rel_block_to_key(tag, blknum);
     212        18384 :         version.get(self, key, ctx).await
     213        18384 :     }
     214              : 
     215              :     // Get size of a database in blocks
     216            0 :     pub(crate) async fn get_db_size(
     217            0 :         &self,
     218            0 :         spcnode: Oid,
     219            0 :         dbnode: Oid,
     220            0 :         version: Version<'_>,
     221            0 :         ctx: &RequestContext,
     222            0 :     ) -> Result<usize, PageReconstructError> {
     223            0 :         let mut total_blocks = 0;
     224              : 
     225            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     226              : 
     227            0 :         for rel in rels {
     228            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     229            0 :             total_blocks += n_blocks as usize;
     230              :         }
     231            0 :         Ok(total_blocks)
     232            0 :     }
     233              : 
     234              :     /// Get size of a relation file
     235        24434 :     pub(crate) async fn get_rel_size(
     236        24434 :         &self,
     237        24434 :         tag: RelTag,
     238        24434 :         version: Version<'_>,
     239        24434 :         ctx: &RequestContext,
     240        24434 :     ) -> Result<BlockNumber, PageReconstructError> {
     241        24434 :         if tag.relnode == 0 {
     242            0 :             return Err(PageReconstructError::Other(
     243            0 :                 RelationError::InvalidRelnode.into(),
     244            0 :             ));
     245        24434 :         }
     246              : 
     247        24434 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     248        19294 :             return Ok(nblocks);
     249         5140 :         }
     250         5140 : 
     251         5140 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     252            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     253              :         {
     254              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     255              :             // FSM, and smgrnblocks() on it immediately afterwards,
     256              :             // without extending it.  Tolerate that by claiming that
     257              :             // any non-existent FSM fork has size 0.
     258            0 :             return Ok(0);
     259         5140 :         }
     260         5140 : 
     261         5140 :         let key = rel_size_to_key(tag);
     262         5140 :         let mut buf = version.get(self, key, ctx).await?;
     263         5136 :         let nblocks = buf.get_u32_le();
     264         5136 : 
     265         5136 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     266         5136 : 
     267         5136 :         Ok(nblocks)
     268        24434 :     }
     269              : 
     270              :     /// Does relation exist?
     271         6050 :     pub(crate) async fn get_rel_exists(
     272         6050 :         &self,
     273         6050 :         tag: RelTag,
     274         6050 :         version: Version<'_>,
     275         6050 :         ctx: &RequestContext,
     276         6050 :     ) -> Result<bool, PageReconstructError> {
     277         6050 :         if tag.relnode == 0 {
     278            0 :             return Err(PageReconstructError::Other(
     279            0 :                 RelationError::InvalidRelnode.into(),
     280            0 :             ));
     281         6050 :         }
     282              : 
     283              :         // first try to lookup relation in cache
     284         6050 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     285         6032 :             return Ok(true);
     286           18 :         }
     287              :         // then check if the database was already initialized.
     288              :         // get_rel_exists can be called before dbdir is created.
     289           18 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     290           18 :         let dbdirs = match DbDirectory::des(&buf).context("deserialization failure") {
     291           18 :             Ok(dir) => Ok(dir.dbdirs),
     292            0 :             Err(e) => Err(PageReconstructError::from(e)),
     293            0 :         }?;
     294           18 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     295            0 :             return Ok(false);
     296           18 :         }
     297           18 :         // fetch directory listing
     298           18 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     299           18 :         let buf = version.get(self, key, ctx).await?;
     300              : 
     301           18 :         match RelDirectory::des(&buf).context("deserialization failure") {
     302           18 :             Ok(dir) => {
     303           18 :                 let exists = dir.rels.contains(&(tag.relnode, tag.forknum));
     304           18 :                 Ok(exists)
     305              :             }
     306            0 :             Err(e) => Err(PageReconstructError::from(e)),
     307              :         }
     308         6050 :     }
     309              : 
     310              :     /// Get a list of all existing relations in given tablespace and database.
     311              :     ///
     312              :     /// # Cancel-Safety
     313              :     ///
     314              :     /// This method is cancellation-safe.
     315            0 :     pub(crate) async fn list_rels(
     316            0 :         &self,
     317            0 :         spcnode: Oid,
     318            0 :         dbnode: Oid,
     319            0 :         version: Version<'_>,
     320            0 :         ctx: &RequestContext,
     321            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     322            0 :         // fetch directory listing
     323            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     324            0 :         let buf = version.get(self, key, ctx).await?;
     325              : 
     326            0 :         match RelDirectory::des(&buf).context("deserialization failure") {
     327            0 :             Ok(dir) => {
     328            0 :                 let rels: HashSet<RelTag> =
     329            0 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     330            0 :                         spcnode,
     331            0 :                         dbnode,
     332            0 :                         relnode: *relnode,
     333            0 :                         forknum: *forknum,
     334            0 :                     }));
     335            0 : 
     336            0 :                 Ok(rels)
     337              :             }
     338            0 :             Err(e) => Err(PageReconstructError::from(e)),
     339              :         }
     340            0 :     }
     341              : 
     342              :     /// Get the whole SLRU segment
     343            0 :     pub(crate) async fn get_slru_segment(
     344            0 :         &self,
     345            0 :         kind: SlruKind,
     346            0 :         segno: u32,
     347            0 :         lsn: Lsn,
     348            0 :         ctx: &RequestContext,
     349            0 :     ) -> Result<Bytes, PageReconstructError> {
     350            0 :         let n_blocks = self
     351            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     352            0 :             .await?;
     353            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     354            0 :         for blkno in 0..n_blocks {
     355            0 :             let block = self
     356            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     357            0 :                 .await?;
     358            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     359              :         }
     360            0 :         Ok(segment.freeze())
     361            0 :     }
     362              : 
     363              :     /// Look up given SLRU page version.
     364            0 :     pub(crate) async fn get_slru_page_at_lsn(
     365            0 :         &self,
     366            0 :         kind: SlruKind,
     367            0 :         segno: u32,
     368            0 :         blknum: BlockNumber,
     369            0 :         lsn: Lsn,
     370            0 :         ctx: &RequestContext,
     371            0 :     ) -> Result<Bytes, PageReconstructError> {
     372            0 :         let key = slru_block_to_key(kind, segno, blknum);
     373            0 :         self.get(key, lsn, ctx).await
     374            0 :     }
     375              : 
     376              :     /// Get size of an SLRU segment
     377            0 :     pub(crate) async fn get_slru_segment_size(
     378            0 :         &self,
     379            0 :         kind: SlruKind,
     380            0 :         segno: u32,
     381            0 :         version: Version<'_>,
     382            0 :         ctx: &RequestContext,
     383            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     384            0 :         let key = slru_segment_size_to_key(kind, segno);
     385            0 :         let mut buf = version.get(self, key, ctx).await?;
     386            0 :         Ok(buf.get_u32_le())
     387            0 :     }
     388              : 
     389              :     /// Get size of an SLRU segment
     390            0 :     pub(crate) async fn get_slru_segment_exists(
     391            0 :         &self,
     392            0 :         kind: SlruKind,
     393            0 :         segno: u32,
     394            0 :         version: Version<'_>,
     395            0 :         ctx: &RequestContext,
     396            0 :     ) -> Result<bool, PageReconstructError> {
     397            0 :         // fetch directory listing
     398            0 :         let key = slru_dir_to_key(kind);
     399            0 :         let buf = version.get(self, key, ctx).await?;
     400              : 
     401            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     402            0 :             Ok(dir) => {
     403            0 :                 let exists = dir.segments.contains(&segno);
     404            0 :                 Ok(exists)
     405              :             }
     406            0 :             Err(e) => Err(PageReconstructError::from(e)),
     407              :         }
     408            0 :     }
     409              : 
     410              :     /// Locate LSN, such that all transactions that committed before
     411              :     /// 'search_timestamp' are visible, but nothing newer is.
     412              :     ///
     413              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     414              :     /// so it's not well defined which LSN you get if there were multiple commits
     415              :     /// "in flight" at that point in time.
     416              :     ///
     417            0 :     pub(crate) async fn find_lsn_for_timestamp(
     418            0 :         &self,
     419            0 :         search_timestamp: TimestampTz,
     420            0 :         cancel: &CancellationToken,
     421            0 :         ctx: &RequestContext,
     422            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     423              :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     424              : 
     425            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     426            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     427            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     428            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     429            0 :         // on the safe side.
     430            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     431            0 :         let max_lsn = self.get_last_record_lsn();
     432            0 : 
     433            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     434            0 :         // LSN divided by 8.
     435            0 :         let mut low = min_lsn.0 / 8;
     436            0 :         let mut high = max_lsn.0 / 8 + 1;
     437            0 : 
     438            0 :         let mut found_smaller = false;
     439            0 :         let mut found_larger = false;
     440              : 
     441            0 :         while low < high {
     442            0 :             if cancel.is_cancelled() {
     443            0 :                 return Err(PageReconstructError::Cancelled);
     444            0 :             }
     445            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     446            0 :             let mid = (high + low) / 2;
     447              : 
     448            0 :             let cmp = self
     449            0 :                 .is_latest_commit_timestamp_ge_than(
     450            0 :                     search_timestamp,
     451            0 :                     Lsn(mid * 8),
     452            0 :                     &mut found_smaller,
     453            0 :                     &mut found_larger,
     454            0 :                     ctx,
     455            0 :                 )
     456            0 :                 .await?;
     457              : 
     458            0 :             if cmp {
     459            0 :                 high = mid;
     460            0 :             } else {
     461            0 :                 low = mid + 1;
     462            0 :             }
     463              :         }
     464              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     465              :         // so the LSN of the last commit record before or at `search_timestamp`.
     466              :         // Remove one from `low` to get `t`.
     467              :         //
     468              :         // FIXME: it would be better to get the LSN of the previous commit.
     469              :         // Otherwise, if you restore to the returned LSN, the database will
     470              :         // include physical changes from later commits that will be marked
     471              :         // as aborted, and will need to be vacuumed away.
     472            0 :         let commit_lsn = Lsn((low - 1) * 8);
     473            0 :         match (found_smaller, found_larger) {
     474              :             (false, false) => {
     475              :                 // This can happen if no commit records have been processed yet, e.g.
     476              :                 // just after importing a cluster.
     477            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     478              :             }
     479              :             (false, true) => {
     480              :                 // Didn't find any commit timestamps smaller than the request
     481            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     482              :             }
     483            0 :             (true, _) if commit_lsn < min_lsn => {
     484            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     485            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     486            0 :                 // below the min_lsn. We should never do that.
     487            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     488              :             }
     489              :             (true, false) => {
     490              :                 // Only found commits with timestamps smaller than the request.
     491              :                 // It's still a valid case for branch creation, return it.
     492              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     493              :                 // case, anyway.
     494            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     495              :             }
     496            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     497              :         }
     498            0 :     }
     499              : 
     500              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     501              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     502              :     ///
     503              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     504              :     /// with a smaller/larger timestamp.
     505              :     ///
     506            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     507            0 :         &self,
     508            0 :         search_timestamp: TimestampTz,
     509            0 :         probe_lsn: Lsn,
     510            0 :         found_smaller: &mut bool,
     511            0 :         found_larger: &mut bool,
     512            0 :         ctx: &RequestContext,
     513            0 :     ) -> Result<bool, PageReconstructError> {
     514            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     515            0 :             if timestamp >= search_timestamp {
     516            0 :                 *found_larger = true;
     517            0 :                 return ControlFlow::Break(true);
     518            0 :             } else {
     519            0 :                 *found_smaller = true;
     520            0 :             }
     521            0 :             ControlFlow::Continue(())
     522            0 :         })
     523            0 :         .await
     524            0 :     }
     525              : 
     526              :     /// Obtain the possible timestamp range for the given lsn.
     527              :     ///
     528              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     529            0 :     pub(crate) async fn get_timestamp_for_lsn(
     530            0 :         &self,
     531            0 :         probe_lsn: Lsn,
     532            0 :         ctx: &RequestContext,
     533            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     534            0 :         let mut max: Option<TimestampTz> = None;
     535            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     536            0 :             if let Some(max_prev) = max {
     537            0 :                 max = Some(max_prev.max(timestamp));
     538            0 :             } else {
     539            0 :                 max = Some(timestamp);
     540            0 :             }
     541            0 :             ControlFlow::Continue(())
     542            0 :         })
     543            0 :         .await?;
     544              : 
     545            0 :         Ok(max)
     546            0 :     }
     547              : 
     548              :     /// Runs the given function on all the timestamps for a given lsn
     549              :     ///
     550              :     /// The return value is either given by the closure, or set to the `Default`
     551              :     /// impl's output.
     552            0 :     async fn map_all_timestamps<T: Default>(
     553            0 :         &self,
     554            0 :         probe_lsn: Lsn,
     555            0 :         ctx: &RequestContext,
     556            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     557            0 :     ) -> Result<T, PageReconstructError> {
     558            0 :         for segno in self
     559            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     560            0 :             .await?
     561              :         {
     562            0 :             let nblocks = self
     563            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     564            0 :                 .await?;
     565            0 :             for blknum in (0..nblocks).rev() {
     566            0 :                 let clog_page = self
     567            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     568            0 :                     .await?;
     569              : 
     570            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     571            0 :                     let mut timestamp_bytes = [0u8; 8];
     572            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     573            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     574            0 : 
     575            0 :                     match f(timestamp) {
     576            0 :                         ControlFlow::Break(b) => return Ok(b),
     577            0 :                         ControlFlow::Continue(()) => (),
     578              :                     }
     579            0 :                 }
     580              :             }
     581              :         }
     582            0 :         Ok(Default::default())
     583            0 :     }
     584              : 
     585            0 :     pub(crate) async fn get_slru_keyspace(
     586            0 :         &self,
     587            0 :         version: Version<'_>,
     588            0 :         ctx: &RequestContext,
     589            0 :     ) -> Result<KeySpace, PageReconstructError> {
     590            0 :         let mut accum = KeySpaceAccum::new();
     591              : 
     592            0 :         for kind in SlruKind::iter() {
     593            0 :             let mut segments: Vec<u32> = self
     594            0 :                 .list_slru_segments(kind, version, ctx)
     595            0 :                 .await?
     596            0 :                 .into_iter()
     597            0 :                 .collect();
     598            0 :             segments.sort_unstable();
     599              : 
     600            0 :             for seg in segments {
     601            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     602              : 
     603            0 :                 accum.add_range(
     604            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     605            0 :                 );
     606              :             }
     607              :         }
     608              : 
     609            0 :         Ok(accum.to_keyspace())
     610            0 :     }
     611              : 
     612              :     /// Get a list of SLRU segments
     613            0 :     pub(crate) async fn list_slru_segments(
     614            0 :         &self,
     615            0 :         kind: SlruKind,
     616            0 :         version: Version<'_>,
     617            0 :         ctx: &RequestContext,
     618            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     619            0 :         // fetch directory entry
     620            0 :         let key = slru_dir_to_key(kind);
     621              : 
     622            0 :         let buf = version.get(self, key, ctx).await?;
     623            0 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     624            0 :             Ok(dir) => Ok(dir.segments),
     625            0 :             Err(e) => Err(PageReconstructError::from(e)),
     626              :         }
     627            0 :     }
     628              : 
     629            0 :     pub(crate) async fn get_relmap_file(
     630            0 :         &self,
     631            0 :         spcnode: Oid,
     632            0 :         dbnode: Oid,
     633            0 :         version: Version<'_>,
     634            0 :         ctx: &RequestContext,
     635            0 :     ) -> Result<Bytes, PageReconstructError> {
     636            0 :         let key = relmap_file_key(spcnode, dbnode);
     637              : 
     638            0 :         let buf = version.get(self, key, ctx).await?;
     639            0 :         Ok(buf)
     640            0 :     }
     641              : 
     642          274 :     pub(crate) async fn list_dbdirs(
     643          274 :         &self,
     644          274 :         lsn: Lsn,
     645          274 :         ctx: &RequestContext,
     646          274 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     647              :         // fetch directory entry
     648         3280 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     649              : 
     650          274 :         match DbDirectory::des(&buf).context("deserialization failure") {
     651          274 :             Ok(dir) => Ok(dir.dbdirs),
     652            0 :             Err(e) => Err(PageReconstructError::from(e)),
     653              :         }
     654          274 :     }
     655              : 
     656            0 :     pub(crate) async fn get_twophase_file(
     657            0 :         &self,
     658            0 :         xid: TransactionId,
     659            0 :         lsn: Lsn,
     660            0 :         ctx: &RequestContext,
     661            0 :     ) -> Result<Bytes, PageReconstructError> {
     662            0 :         let key = twophase_file_key(xid);
     663            0 :         let buf = self.get(key, lsn, ctx).await?;
     664            0 :         Ok(buf)
     665            0 :     }
     666              : 
     667            2 :     pub(crate) async fn list_twophase_files(
     668            2 :         &self,
     669            2 :         lsn: Lsn,
     670            2 :         ctx: &RequestContext,
     671            2 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     672              :         // fetch directory entry
     673            2 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     674              : 
     675            2 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     676            2 :             Ok(dir) => Ok(dir.xids),
     677            0 :             Err(e) => Err(PageReconstructError::from(e)),
     678              :         }
     679            2 :     }
     680              : 
     681            0 :     pub(crate) async fn get_control_file(
     682            0 :         &self,
     683            0 :         lsn: Lsn,
     684            0 :         ctx: &RequestContext,
     685            0 :     ) -> Result<Bytes, PageReconstructError> {
     686            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     687            0 :     }
     688              : 
     689           12 :     pub(crate) async fn get_checkpoint(
     690           12 :         &self,
     691           12 :         lsn: Lsn,
     692           12 :         ctx: &RequestContext,
     693           12 :     ) -> Result<Bytes, PageReconstructError> {
     694           12 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     695           12 :     }
     696              : 
     697           28 :     async fn list_aux_files_v1(
     698           28 :         &self,
     699           28 :         lsn: Lsn,
     700           28 :         ctx: &RequestContext,
     701           28 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     702           28 :         match self.get(AUX_FILES_KEY, lsn, ctx).await {
     703           22 :             Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
     704           22 :                 Ok(dir) => Ok(dir.files),
     705            0 :                 Err(e) => Err(PageReconstructError::from(e)),
     706              :             },
     707            6 :             Err(e) => {
     708            6 :                 // This is expected: historical databases do not have the key.
     709            6 :                 debug!("Failed to get info about AUX files: {}", e);
     710            6 :                 Ok(HashMap::new())
     711              :             }
     712              :         }
     713           28 :     }
     714              : 
     715           12 :     async fn list_aux_files_v2(
     716           12 :         &self,
     717           12 :         lsn: Lsn,
     718           12 :         ctx: &RequestContext,
     719           12 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     720           12 :         let kv = self
     721           12 :             .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
     722            0 :             .await
     723           12 :             .context("scan")?;
     724           12 :         let mut result = HashMap::new();
     725           12 :         let mut sz = 0;
     726           30 :         for (_, v) in kv {
     727           18 :             let v = v.context("get value")?;
     728           18 :             let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
     729           36 :             for (fname, content) in v {
     730           18 :                 sz += fname.len();
     731           18 :                 sz += content.len();
     732           18 :                 result.insert(fname, content);
     733           18 :             }
     734              :         }
     735           12 :         self.aux_file_size_estimator.on_initial(sz);
     736           12 :         Ok(result)
     737           12 :     }
     738              : 
     739            0 :     pub(crate) async fn trigger_aux_file_size_computation(
     740            0 :         &self,
     741            0 :         lsn: Lsn,
     742            0 :         ctx: &RequestContext,
     743            0 :     ) -> Result<(), PageReconstructError> {
     744            0 :         let current_policy = self.last_aux_file_policy.load();
     745            0 :         if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
     746            0 :             self.list_aux_files_v2(lsn, ctx).await?;
     747            0 :         }
     748            0 :         Ok(())
     749            0 :     }
     750              : 
     751           26 :     pub(crate) async fn list_aux_files(
     752           26 :         &self,
     753           26 :         lsn: Lsn,
     754           26 :         ctx: &RequestContext,
     755           26 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     756           26 :         let current_policy = self.last_aux_file_policy.load();
     757           26 :         match current_policy {
     758           14 :             Some(AuxFilePolicy::V1) | None => self.list_aux_files_v1(lsn, ctx).await,
     759           10 :             Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
     760              :             Some(AuxFilePolicy::CrossValidation) => {
     761            2 :                 let v1_result = self.list_aux_files_v1(lsn, ctx).await;
     762            2 :                 let v2_result = self.list_aux_files_v2(lsn, ctx).await;
     763            2 :                 match (v1_result, v2_result) {
     764            2 :                     (Ok(v1), Ok(v2)) => {
     765            2 :                         if v1 != v2 {
     766            0 :                             tracing::error!(
     767            0 :                                 "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
     768              :                             );
     769            0 :                             return Err(PageReconstructError::Other(anyhow::anyhow!(
     770            0 :                                 "unmatched aux file v1 v2 result"
     771            0 :                             )));
     772            2 :                         }
     773            2 :                         Ok(v1)
     774              :                     }
     775            0 :                     (Ok(_), Err(v2)) => {
     776            0 :                         tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
     777            0 :                         Err(v2)
     778              :                     }
     779            0 :                     (Err(v1), Ok(_)) => {
     780            0 :                         tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
     781            0 :                         Err(v1)
     782              :                     }
     783            0 :                     (Err(_), Err(v2)) => Err(v2),
     784              :                 }
     785              :             }
     786              :         }
     787           26 :     }
     788              : 
     789            0 :     pub(crate) async fn get_replorigins(
     790            0 :         &self,
     791            0 :         lsn: Lsn,
     792            0 :         ctx: &RequestContext,
     793            0 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
     794            0 :         let kv = self
     795            0 :             .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
     796            0 :             .await
     797            0 :             .context("scan")?;
     798            0 :         let mut result = HashMap::new();
     799            0 :         for (k, v) in kv {
     800            0 :             let v = v.context("get value")?;
     801            0 :             let origin_id = k.field6 as RepOriginId;
     802            0 :             let origin_lsn = Lsn::des(&v).unwrap();
     803            0 :             if origin_lsn != Lsn::INVALID {
     804            0 :                 result.insert(origin_id, origin_lsn);
     805            0 :             }
     806              :         }
     807            0 :         Ok(result)
     808            0 :     }
     809              : 
     810              :     /// Does the same as get_current_logical_size but counted on demand.
     811              :     /// Used to initialize the logical size tracking on startup.
     812              :     ///
     813              :     /// Only relation blocks are counted currently. That excludes metadata,
     814              :     /// SLRUs, twophase files etc.
     815              :     ///
     816              :     /// # Cancel-Safety
     817              :     ///
     818              :     /// This method is cancellation-safe.
     819            0 :     pub(crate) async fn get_current_logical_size_non_incremental(
     820            0 :         &self,
     821            0 :         lsn: Lsn,
     822            0 :         ctx: &RequestContext,
     823            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     824            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     825              : 
     826              :         // Fetch list of database dirs and iterate them
     827            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     828            0 :         let dbdir = DbDirectory::des(&buf)?;
     829              : 
     830            0 :         let mut total_size: u64 = 0;
     831            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     832            0 :             for rel in self
     833            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     834            0 :                 .await?
     835              :             {
     836            0 :                 if self.cancel.is_cancelled() {
     837            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     838            0 :                 }
     839            0 :                 let relsize_key = rel_size_to_key(rel);
     840            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     841            0 :                 let relsize = buf.get_u32_le();
     842            0 : 
     843            0 :                 total_size += relsize as u64;
     844              :             }
     845              :         }
     846            0 :         Ok(total_size * BLCKSZ as u64)
     847            0 :     }
     848              : 
     849              :     ///
     850              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     851              :     /// Anything that's not listed maybe removed from the underlying storage (from
     852              :     /// that LSN forwards).
     853              :     ///
     854              :     /// The return value is (dense keyspace, sparse keyspace).
     855          274 :     pub(crate) async fn collect_keyspace(
     856          274 :         &self,
     857          274 :         lsn: Lsn,
     858          274 :         ctx: &RequestContext,
     859          274 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     860          274 :         // Iterate through key ranges, greedily packing them into partitions
     861          274 :         let mut result = KeySpaceAccum::new();
     862          274 : 
     863          274 :         // The dbdir metadata always exists
     864          274 :         result.add_key(DBDIR_KEY);
     865              : 
     866              :         // Fetch list of database dirs and iterate them
     867         3280 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
     868          274 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
     869          274 : 
     870          274 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
     871          274 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
     872            0 :             if has_relmap_file {
     873            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
     874            0 :             }
     875            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     876              : 
     877            0 :             let mut rels: Vec<RelTag> = self
     878            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
     879            0 :                 .await?
     880            0 :                 .into_iter()
     881            0 :                 .collect();
     882            0 :             rels.sort_unstable();
     883            0 :             for rel in rels {
     884            0 :                 let relsize_key = rel_size_to_key(rel);
     885            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     886            0 :                 let relsize = buf.get_u32_le();
     887            0 : 
     888            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     889            0 :                 result.add_key(relsize_key);
     890              :             }
     891              :         }
     892              : 
     893              :         // Iterate SLRUs next
     894          822 :         for kind in [
     895          274 :             SlruKind::Clog,
     896          274 :             SlruKind::MultiXactMembers,
     897          274 :             SlruKind::MultiXactOffsets,
     898              :         ] {
     899          822 :             let slrudir_key = slru_dir_to_key(kind);
     900          822 :             result.add_key(slrudir_key);
     901        10050 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     902          822 :             let dir = SlruSegmentDirectory::des(&buf)?;
     903          822 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     904          822 :             segments.sort_unstable();
     905          822 :             for segno in segments {
     906            0 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     907            0 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     908            0 :                 let segsize = buf.get_u32_le();
     909            0 : 
     910            0 :                 result.add_range(
     911            0 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     912            0 :                 );
     913            0 :                 result.add_key(segsize_key);
     914              :             }
     915              :         }
     916              : 
     917              :         // Then pg_twophase
     918          274 :         result.add_key(TWOPHASEDIR_KEY);
     919         3169 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     920          274 :         let twophase_dir = TwoPhaseDirectory::des(&buf)?;
     921          274 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     922          274 :         xids.sort_unstable();
     923          274 :         for xid in xids {
     924            0 :             result.add_key(twophase_file_key(xid));
     925            0 :         }
     926              : 
     927          274 :         result.add_key(CONTROLFILE_KEY);
     928          274 :         result.add_key(CHECKPOINT_KEY);
     929          274 :         if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
     930          170 :             result.add_key(AUX_FILES_KEY);
     931          170 :         }
     932              : 
     933              :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
     934              :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
     935              :         // and the keys will not be garbage-colllected.
     936              :         #[cfg(test)]
     937              :         {
     938          274 :             let guard = self.extra_test_dense_keyspace.load();
     939          274 :             for kr in &guard.ranges {
     940            0 :                 result.add_range(kr.clone());
     941            0 :             }
     942              :         }
     943              : 
     944          274 :         let dense_keyspace = result.to_keyspace();
     945          274 :         let sparse_keyspace = SparseKeySpace(KeySpace {
     946          274 :             ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
     947          274 :         });
     948          274 : 
     949          274 :         if cfg!(debug_assertions) {
     950              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
     951              : 
     952              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
     953              :             // category of sparse keys are split into their own image/delta files. If there
     954              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
     955              :             // and we want the developer to keep the keyspaces separated.
     956              : 
     957          274 :             let ranges = &sparse_keyspace.0.ranges;
     958              : 
     959              :             // TODO: use a single overlaps_with across the codebase
     960          274 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
     961          274 :                 !(a.end <= b.start || b.end <= a.start)
     962          274 :             }
     963          548 :             for i in 0..ranges.len() {
     964          548 :                 for j in 0..i {
     965          274 :                     if overlaps_with(&ranges[i], &ranges[j]) {
     966            0 :                         panic!(
     967            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
     968            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
     969            0 :                         );
     970          274 :                     }
     971              :                 }
     972              :             }
     973          274 :             for i in 1..ranges.len() {
     974          274 :                 assert!(
     975          274 :                     ranges[i - 1].end <= ranges[i].start,
     976            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
     977            0 :                     ranges[i - 1].start,
     978            0 :                     ranges[i - 1].end,
     979            0 :                     ranges[i].start,
     980            0 :                     ranges[i].end
     981              :                 );
     982              :             }
     983            0 :         }
     984              : 
     985          274 :         Ok((dense_keyspace, sparse_keyspace))
     986          274 :     }
     987              : 
     988              :     /// Get cached size of relation if it not updated after specified LSN
     989       448540 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     990       448540 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     991       448540 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
     992       448518 :             if lsn >= *cached_lsn {
     993       443372 :                 return Some(*nblocks);
     994         5146 :             }
     995           22 :         }
     996         5168 :         None
     997       448540 :     }
     998              : 
     999              :     /// Update cached relation size if there is no more recent update
    1000         5136 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1001         5136 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1002         5136 : 
    1003         5136 :         if lsn < rel_size_cache.complete_as_of {
    1004              :             // Do not cache old values. It's safe to cache the size on read, as long as
    1005              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
    1006              :             // never evict values from the cache, so if the relation size changed after
    1007              :             // 'lsn', the new value is already in the cache.
    1008            0 :             return;
    1009         5136 :         }
    1010         5136 : 
    1011         5136 :         match rel_size_cache.map.entry(tag) {
    1012         5136 :             hash_map::Entry::Occupied(mut entry) => {
    1013         5136 :                 let cached_lsn = entry.get_mut();
    1014         5136 :                 if lsn >= cached_lsn.0 {
    1015            0 :                     *cached_lsn = (lsn, nblocks);
    1016         5136 :                 }
    1017              :             }
    1018            0 :             hash_map::Entry::Vacant(entry) => {
    1019            0 :                 entry.insert((lsn, nblocks));
    1020            0 :             }
    1021              :         }
    1022         5136 :     }
    1023              : 
    1024              :     /// Store cached relation size
    1025       288732 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1026       288732 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1027       288732 :         rel_size_cache.map.insert(tag, (lsn, nblocks));
    1028       288732 :     }
    1029              : 
    1030              :     /// Remove cached relation size
    1031            2 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1032            2 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1033            2 :         rel_size_cache.map.remove(tag);
    1034            2 :     }
    1035              : }
    1036              : 
    1037              : /// DatadirModification represents an operation to ingest an atomic set of
    1038              : /// updates to the repository. It is created by the 'begin_record'
    1039              : /// function. It is called for each WAL record, so that all the modifications
    1040              : /// by a one WAL record appear atomic.
    1041              : pub struct DatadirModification<'a> {
    1042              :     /// The timeline this modification applies to. You can access this to
    1043              :     /// read the state, but note that any pending updates are *not* reflected
    1044              :     /// in the state in 'tline' yet.
    1045              :     pub tline: &'a Timeline,
    1046              : 
    1047              :     /// Current LSN of the modification
    1048              :     lsn: Lsn,
    1049              : 
    1050              :     // The modifications are not applied directly to the underlying key-value store.
    1051              :     // The put-functions add the modifications here, and they are flushed to the
    1052              :     // underlying key-value store by the 'finish' function.
    1053              :     pending_lsns: Vec<Lsn>,
    1054              :     pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
    1055              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1056              :     pending_nblocks: i64,
    1057              : 
    1058              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1059              :     /// if it was updated in this modification.
    1060              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
    1061              : }
    1062              : 
    1063              : impl<'a> DatadirModification<'a> {
    1064              :     /// Get the current lsn
    1065       418056 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1066       418056 :         self.lsn
    1067       418056 :     }
    1068              : 
    1069              :     /// Set the current lsn
    1070       145858 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
    1071       145858 :         ensure!(
    1072       145858 :             lsn >= self.lsn,
    1073            0 :             "setting an older lsn {} than {} is not allowed",
    1074              :             lsn,
    1075              :             self.lsn
    1076              :         );
    1077       145858 :         if lsn > self.lsn {
    1078       145858 :             self.pending_lsns.push(self.lsn);
    1079       145858 :             self.lsn = lsn;
    1080       145858 :         }
    1081       145858 :         Ok(())
    1082       145858 :     }
    1083              : 
    1084              :     /// Initialize a completely new repository.
    1085              :     ///
    1086              :     /// This inserts the directory metadata entries that are assumed to
    1087              :     /// always exist.
    1088          160 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1089          160 :         let buf = DbDirectory::ser(&DbDirectory {
    1090          160 :             dbdirs: HashMap::new(),
    1091          160 :         })?;
    1092          160 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
    1093          160 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1094          160 : 
    1095          160 :         // Create AuxFilesDirectory
    1096          160 :         self.init_aux_dir()?;
    1097              : 
    1098          160 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1099          160 :             xids: HashSet::new(),
    1100          160 :         })?;
    1101          160 :         self.pending_directory_entries
    1102          160 :             .push((DirectoryKind::TwoPhase, 0));
    1103          160 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1104              : 
    1105          160 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1106          160 :         let empty_dir = Value::Image(buf);
    1107          160 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1108          160 :         self.pending_directory_entries
    1109          160 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1110          160 :         self.put(
    1111          160 :             slru_dir_to_key(SlruKind::MultiXactMembers),
    1112          160 :             empty_dir.clone(),
    1113          160 :         );
    1114          160 :         self.pending_directory_entries
    1115          160 :             .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1116          160 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1117          160 :         self.pending_directory_entries
    1118          160 :             .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
    1119          160 : 
    1120          160 :         Ok(())
    1121          160 :     }
    1122              : 
    1123              :     #[cfg(test)]
    1124          158 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1125          158 :         self.init_empty()?;
    1126          158 :         self.put_control_file(bytes::Bytes::from_static(
    1127          158 :             b"control_file contents do not matter",
    1128          158 :         ))
    1129          158 :         .context("put_control_file")?;
    1130          158 :         self.put_checkpoint(bytes::Bytes::from_static(
    1131          158 :             b"checkpoint_file contents do not matter",
    1132          158 :         ))
    1133          158 :         .context("put_checkpoint_file")?;
    1134          158 :         Ok(())
    1135          158 :     }
    1136              : 
    1137              :     /// Put a new page version that can be constructed from a WAL record
    1138              :     ///
    1139              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1140              :     /// current end-of-file. It's up to the caller to check that the relation size
    1141              :     /// matches the blocks inserted!
    1142       145630 :     pub fn put_rel_wal_record(
    1143       145630 :         &mut self,
    1144       145630 :         rel: RelTag,
    1145       145630 :         blknum: BlockNumber,
    1146       145630 :         rec: NeonWalRecord,
    1147       145630 :     ) -> anyhow::Result<()> {
    1148       145630 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1149       145630 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1150       145630 :         Ok(())
    1151       145630 :     }
    1152              : 
    1153              :     // Same, but for an SLRU.
    1154            8 :     pub fn put_slru_wal_record(
    1155            8 :         &mut self,
    1156            8 :         kind: SlruKind,
    1157            8 :         segno: u32,
    1158            8 :         blknum: BlockNumber,
    1159            8 :         rec: NeonWalRecord,
    1160            8 :     ) -> anyhow::Result<()> {
    1161            8 :         self.put(
    1162            8 :             slru_block_to_key(kind, segno, blknum),
    1163            8 :             Value::WalRecord(rec),
    1164            8 :         );
    1165            8 :         Ok(())
    1166            8 :     }
    1167              : 
    1168              :     /// Like put_wal_record, but with ready-made image of the page.
    1169       280864 :     pub fn put_rel_page_image(
    1170       280864 :         &mut self,
    1171       280864 :         rel: RelTag,
    1172       280864 :         blknum: BlockNumber,
    1173       280864 :         img: Bytes,
    1174       280864 :     ) -> anyhow::Result<()> {
    1175       280864 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1176       280864 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1177       280864 :         Ok(())
    1178       280864 :     }
    1179              : 
    1180            6 :     pub fn put_slru_page_image(
    1181            6 :         &mut self,
    1182            6 :         kind: SlruKind,
    1183            6 :         segno: u32,
    1184            6 :         blknum: BlockNumber,
    1185            6 :         img: Bytes,
    1186            6 :     ) -> anyhow::Result<()> {
    1187            6 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
    1188            6 :         Ok(())
    1189            6 :     }
    1190              : 
    1191              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1192           16 :     pub async fn put_relmap_file(
    1193           16 :         &mut self,
    1194           16 :         spcnode: Oid,
    1195           16 :         dbnode: Oid,
    1196           16 :         img: Bytes,
    1197           16 :         ctx: &RequestContext,
    1198           16 :     ) -> anyhow::Result<()> {
    1199              :         // Add it to the directory (if it doesn't exist already)
    1200           16 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1201           16 :         let mut dbdir = DbDirectory::des(&buf)?;
    1202              : 
    1203           16 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1204           16 :         if r.is_none() || r == Some(false) {
    1205              :             // The dbdir entry didn't exist, or it contained a
    1206              :             // 'false'. The 'insert' call already updated it with
    1207              :             // 'true', now write the updated 'dbdirs' map back.
    1208           16 :             let buf = DbDirectory::ser(&dbdir)?;
    1209           16 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1210           16 : 
    1211           16 :             // Create AuxFilesDirectory as well
    1212           16 :             self.init_aux_dir()?;
    1213            0 :         }
    1214           16 :         if r.is_none() {
    1215            8 :             // Create RelDirectory
    1216            8 :             let buf = RelDirectory::ser(&RelDirectory {
    1217            8 :                 rels: HashSet::new(),
    1218            8 :             })?;
    1219            8 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1220            8 :             self.put(
    1221            8 :                 rel_dir_to_key(spcnode, dbnode),
    1222            8 :                 Value::Image(Bytes::from(buf)),
    1223            8 :             );
    1224            8 :         }
    1225              : 
    1226           16 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1227           16 :         Ok(())
    1228           16 :     }
    1229              : 
    1230            0 :     pub async fn put_twophase_file(
    1231            0 :         &mut self,
    1232            0 :         xid: TransactionId,
    1233            0 :         img: Bytes,
    1234            0 :         ctx: &RequestContext,
    1235            0 :     ) -> anyhow::Result<()> {
    1236              :         // Add it to the directory entry
    1237            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1238            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1239            0 :         if !dir.xids.insert(xid) {
    1240            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
    1241            0 :         }
    1242            0 :         self.pending_directory_entries
    1243            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1244            0 :         self.put(
    1245            0 :             TWOPHASEDIR_KEY,
    1246            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1247              :         );
    1248              : 
    1249            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1250            0 :         Ok(())
    1251            0 :     }
    1252              : 
    1253            0 :     pub async fn set_replorigin(
    1254            0 :         &mut self,
    1255            0 :         origin_id: RepOriginId,
    1256            0 :         origin_lsn: Lsn,
    1257            0 :     ) -> anyhow::Result<()> {
    1258            0 :         let key = repl_origin_key(origin_id);
    1259            0 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    1260            0 :         Ok(())
    1261            0 :     }
    1262              : 
    1263            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
    1264            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    1265            0 :     }
    1266              : 
    1267          160 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1268          160 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1269          160 :         Ok(())
    1270          160 :     }
    1271              : 
    1272          174 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1273          174 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1274          174 :         Ok(())
    1275          174 :     }
    1276              : 
    1277            0 :     pub async fn drop_dbdir(
    1278            0 :         &mut self,
    1279            0 :         spcnode: Oid,
    1280            0 :         dbnode: Oid,
    1281            0 :         ctx: &RequestContext,
    1282            0 :     ) -> anyhow::Result<()> {
    1283            0 :         let total_blocks = self
    1284            0 :             .tline
    1285            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1286            0 :             .await?;
    1287              : 
    1288              :         // Remove entry from dbdir
    1289            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1290            0 :         let mut dir = DbDirectory::des(&buf)?;
    1291            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1292            0 :             let buf = DbDirectory::ser(&dir)?;
    1293            0 :             self.pending_directory_entries
    1294            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1295            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1296              :         } else {
    1297            0 :             warn!(
    1298            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1299              :                 spcnode, dbnode
    1300              :             );
    1301              :         }
    1302              : 
    1303              :         // Update logical database size.
    1304            0 :         self.pending_nblocks -= total_blocks as i64;
    1305            0 : 
    1306            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1307            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1308            0 :         Ok(())
    1309            0 :     }
    1310              : 
    1311              :     /// Create a relation fork.
    1312              :     ///
    1313              :     /// 'nblocks' is the initial size.
    1314         1920 :     pub async fn put_rel_creation(
    1315         1920 :         &mut self,
    1316         1920 :         rel: RelTag,
    1317         1920 :         nblocks: BlockNumber,
    1318         1920 :         ctx: &RequestContext,
    1319         1920 :     ) -> Result<(), RelationError> {
    1320         1920 :         if rel.relnode == 0 {
    1321            0 :             return Err(RelationError::InvalidRelnode);
    1322         1920 :         }
    1323              :         // It's possible that this is the first rel for this db in this
    1324              :         // tablespace.  Create the reldir entry for it if so.
    1325         1920 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1326         1920 :             .context("deserialize db")?;
    1327         1920 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1328         1920 :         let mut rel_dir =
    1329         1920 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1330              :                 // Didn't exist. Update dbdir
    1331            8 :                 e.insert(false);
    1332            8 :                 let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1333            8 :                 self.pending_directory_entries
    1334            8 :                     .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1335            8 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1336            8 : 
    1337            8 :                 // and create the RelDirectory
    1338            8 :                 RelDirectory::default()
    1339              :             } else {
    1340              :                 // reldir already exists, fetch it
    1341         1912 :                 RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1342         1912 :                     .context("deserialize db")?
    1343              :             };
    1344              : 
    1345              :         // Add the new relation to the rel directory entry, and write it back
    1346         1920 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1347            0 :             return Err(RelationError::AlreadyExists);
    1348         1920 :         }
    1349         1920 : 
    1350         1920 :         self.pending_directory_entries
    1351         1920 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1352         1920 : 
    1353         1920 :         self.put(
    1354         1920 :             rel_dir_key,
    1355         1920 :             Value::Image(Bytes::from(
    1356         1920 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1357              :             )),
    1358              :         );
    1359              : 
    1360              :         // Put size
    1361         1920 :         let size_key = rel_size_to_key(rel);
    1362         1920 :         let buf = nblocks.to_le_bytes();
    1363         1920 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1364         1920 : 
    1365         1920 :         self.pending_nblocks += nblocks as i64;
    1366         1920 : 
    1367         1920 :         // Update relation size cache
    1368         1920 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1369         1920 : 
    1370         1920 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1371         1920 :         // caller.
    1372         1920 :         Ok(())
    1373         1920 :     }
    1374              : 
    1375              :     /// Truncate relation
    1376         6012 :     pub async fn put_rel_truncation(
    1377         6012 :         &mut self,
    1378         6012 :         rel: RelTag,
    1379         6012 :         nblocks: BlockNumber,
    1380         6012 :         ctx: &RequestContext,
    1381         6012 :     ) -> anyhow::Result<()> {
    1382         6012 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1383         6012 :         if self
    1384         6012 :             .tline
    1385         6012 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1386            0 :             .await?
    1387              :         {
    1388         6012 :             let size_key = rel_size_to_key(rel);
    1389              :             // Fetch the old size first
    1390         6012 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1391         6012 : 
    1392         6012 :             // Update the entry with the new size.
    1393         6012 :             let buf = nblocks.to_le_bytes();
    1394         6012 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1395         6012 : 
    1396         6012 :             // Update relation size cache
    1397         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1398         6012 : 
    1399         6012 :             // Update relation size cache
    1400         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1401         6012 : 
    1402         6012 :             // Update logical database size.
    1403         6012 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1404            0 :         }
    1405         6012 :         Ok(())
    1406         6012 :     }
    1407              : 
    1408              :     /// Extend relation
    1409              :     /// If new size is smaller, do nothing.
    1410       276680 :     pub async fn put_rel_extend(
    1411       276680 :         &mut self,
    1412       276680 :         rel: RelTag,
    1413       276680 :         nblocks: BlockNumber,
    1414       276680 :         ctx: &RequestContext,
    1415       276680 :     ) -> anyhow::Result<()> {
    1416       276680 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1417              : 
    1418              :         // Put size
    1419       276680 :         let size_key = rel_size_to_key(rel);
    1420       276680 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1421       276680 : 
    1422       276680 :         // only extend relation here. never decrease the size
    1423       276680 :         if nblocks > old_size {
    1424       274788 :             let buf = nblocks.to_le_bytes();
    1425       274788 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1426       274788 : 
    1427       274788 :             // Update relation size cache
    1428       274788 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1429       274788 : 
    1430       274788 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1431       274788 :         }
    1432       276680 :         Ok(())
    1433       276680 :     }
    1434              : 
    1435              :     /// Drop a relation.
    1436            2 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
    1437            2 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1438              : 
    1439              :         // Remove it from the directory entry
    1440            2 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1441            2 :         let buf = self.get(dir_key, ctx).await?;
    1442            2 :         let mut dir = RelDirectory::des(&buf)?;
    1443              : 
    1444            2 :         self.pending_directory_entries
    1445            2 :             .push((DirectoryKind::Rel, dir.rels.len()));
    1446            2 : 
    1447            2 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1448            2 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1449              :         } else {
    1450            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1451              :         }
    1452              : 
    1453              :         // update logical size
    1454            2 :         let size_key = rel_size_to_key(rel);
    1455            2 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1456            2 :         self.pending_nblocks -= old_size as i64;
    1457            2 : 
    1458            2 :         // Remove enty from relation size cache
    1459            2 :         self.tline.remove_cached_rel_size(&rel);
    1460            2 : 
    1461            2 :         // Delete size entry, as well as all blocks
    1462            2 :         self.delete(rel_key_range(rel));
    1463            2 : 
    1464            2 :         Ok(())
    1465            2 :     }
    1466              : 
    1467            6 :     pub async fn put_slru_segment_creation(
    1468            6 :         &mut self,
    1469            6 :         kind: SlruKind,
    1470            6 :         segno: u32,
    1471            6 :         nblocks: BlockNumber,
    1472            6 :         ctx: &RequestContext,
    1473            6 :     ) -> anyhow::Result<()> {
    1474            6 :         // Add it to the directory entry
    1475            6 :         let dir_key = slru_dir_to_key(kind);
    1476            6 :         let buf = self.get(dir_key, ctx).await?;
    1477            6 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1478              : 
    1479            6 :         if !dir.segments.insert(segno) {
    1480            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1481            6 :         }
    1482            6 :         self.pending_directory_entries
    1483            6 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1484            6 :         self.put(
    1485            6 :             dir_key,
    1486            6 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1487              :         );
    1488              : 
    1489              :         // Put size
    1490            6 :         let size_key = slru_segment_size_to_key(kind, segno);
    1491            6 :         let buf = nblocks.to_le_bytes();
    1492            6 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1493            6 : 
    1494            6 :         // even if nblocks > 0, we don't insert any actual blocks here
    1495            6 : 
    1496            6 :         Ok(())
    1497            6 :     }
    1498              : 
    1499              :     /// Extend SLRU segment
    1500            0 :     pub fn put_slru_extend(
    1501            0 :         &mut self,
    1502            0 :         kind: SlruKind,
    1503            0 :         segno: u32,
    1504            0 :         nblocks: BlockNumber,
    1505            0 :     ) -> anyhow::Result<()> {
    1506            0 :         // Put size
    1507            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1508            0 :         let buf = nblocks.to_le_bytes();
    1509            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1510            0 :         Ok(())
    1511            0 :     }
    1512              : 
    1513              :     /// This method is used for marking truncated SLRU files
    1514            0 :     pub async fn drop_slru_segment(
    1515            0 :         &mut self,
    1516            0 :         kind: SlruKind,
    1517            0 :         segno: u32,
    1518            0 :         ctx: &RequestContext,
    1519            0 :     ) -> anyhow::Result<()> {
    1520            0 :         // Remove it from the directory entry
    1521            0 :         let dir_key = slru_dir_to_key(kind);
    1522            0 :         let buf = self.get(dir_key, ctx).await?;
    1523            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1524              : 
    1525            0 :         if !dir.segments.remove(&segno) {
    1526            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1527            0 :         }
    1528            0 :         self.pending_directory_entries
    1529            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1530            0 :         self.put(
    1531            0 :             dir_key,
    1532            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1533              :         );
    1534              : 
    1535              :         // Delete size entry, as well as all blocks
    1536            0 :         self.delete(slru_segment_key_range(kind, segno));
    1537            0 : 
    1538            0 :         Ok(())
    1539            0 :     }
    1540              : 
    1541              :     /// Drop a relmapper file (pg_filenode.map)
    1542            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1543            0 :         // TODO
    1544            0 :         Ok(())
    1545            0 :     }
    1546              : 
    1547              :     /// This method is used for marking truncated SLRU files
    1548            0 :     pub async fn drop_twophase_file(
    1549            0 :         &mut self,
    1550            0 :         xid: TransactionId,
    1551            0 :         ctx: &RequestContext,
    1552            0 :     ) -> anyhow::Result<()> {
    1553              :         // Remove it from the directory entry
    1554            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1555            0 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1556              : 
    1557            0 :         if !dir.xids.remove(&xid) {
    1558            0 :             warn!("twophase file for xid {} does not exist", xid);
    1559            0 :         }
    1560            0 :         self.pending_directory_entries
    1561            0 :             .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1562            0 :         self.put(
    1563            0 :             TWOPHASEDIR_KEY,
    1564            0 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1565              :         );
    1566              : 
    1567              :         // Delete it
    1568            0 :         self.delete(twophase_key_range(xid));
    1569            0 : 
    1570            0 :         Ok(())
    1571            0 :     }
    1572              : 
    1573          176 :     pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
    1574          176 :         if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
    1575            2 :             return Ok(());
    1576          174 :         }
    1577          174 :         let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
    1578          174 :             files: HashMap::new(),
    1579          174 :         })?;
    1580          174 :         self.pending_directory_entries
    1581          174 :             .push((DirectoryKind::AuxFiles, 0));
    1582          174 :         self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
    1583          174 :         Ok(())
    1584          176 :     }
    1585              : 
    1586           30 :     pub async fn put_file(
    1587           30 :         &mut self,
    1588           30 :         path: &str,
    1589           30 :         content: &[u8],
    1590           30 :         ctx: &RequestContext,
    1591           30 :     ) -> anyhow::Result<()> {
    1592           30 :         let switch_policy = self.tline.get_switch_aux_file_policy();
    1593              : 
    1594           30 :         let policy = {
    1595           30 :             let current_policy = self.tline.last_aux_file_policy.load();
    1596              :             // Allowed switch path:
    1597              :             // * no aux files -> v1/v2/cross-validation
    1598              :             // * cross-validation->v2
    1599              : 
    1600           30 :             let current_policy = if current_policy.is_none() {
    1601              :                 // This path will only be hit once per tenant: we will decide the final policy in this code block.
    1602              :                 // The next call to `put_file` will always have `last_aux_file_policy != None`.
    1603           12 :                 let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1604           12 :                 let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
    1605           12 :                 if aux_files_key_v1.is_empty() {
    1606           10 :                     None
    1607              :                 } else {
    1608            2 :                     self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
    1609            2 :                     Some(AuxFilePolicy::V1)
    1610              :                 }
    1611              :             } else {
    1612           18 :                 current_policy
    1613              :             };
    1614              : 
    1615           30 :             if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
    1616           12 :                 self.tline.do_switch_aux_policy(switch_policy)?;
    1617           12 :                 info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
    1618           12 :                 switch_policy
    1619              :             } else {
    1620              :                 // This branch handles non-valid migration path, and the case that switch_policy == current_policy.
    1621              :                 // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
    1622           18 :                 current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
    1623              :             }
    1624              :         };
    1625              : 
    1626           30 :         if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
    1627           10 :             let key = aux_file::encode_aux_file_key(path);
    1628              :             // retrieve the key from the engine
    1629           10 :             let old_val = match self.get(key, ctx).await {
    1630            2 :                 Ok(val) => Some(val),
    1631            8 :                 Err(PageReconstructError::MissingKey(_)) => None,
    1632            0 :                 Err(e) => return Err(e.into()),
    1633              :             };
    1634           10 :             let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    1635            2 :                 aux_file::decode_file_value(old_val)?
    1636              :             } else {
    1637            8 :                 Vec::new()
    1638              :             };
    1639           10 :             let mut other_files = Vec::with_capacity(files.len());
    1640           10 :             let mut modifying_file = None;
    1641           12 :             for file @ (p, content) in files {
    1642            2 :                 if path == p {
    1643            2 :                     assert!(
    1644            2 :                         modifying_file.is_none(),
    1645            0 :                         "duplicated entries found for {}",
    1646              :                         path
    1647              :                     );
    1648            2 :                     modifying_file = Some(content);
    1649            0 :                 } else {
    1650            0 :                     other_files.push(file);
    1651            0 :                 }
    1652              :             }
    1653           10 :             let mut new_files = other_files;
    1654           10 :             match (modifying_file, content.is_empty()) {
    1655            2 :                 (Some(old_content), false) => {
    1656            2 :                     self.tline
    1657            2 :                         .aux_file_size_estimator
    1658            2 :                         .on_update(old_content.len(), content.len());
    1659            2 :                     new_files.push((path, content));
    1660            2 :                 }
    1661            0 :                 (Some(old_content), true) => {
    1662            0 :                     self.tline
    1663            0 :                         .aux_file_size_estimator
    1664            0 :                         .on_remove(old_content.len());
    1665            0 :                     // not adding the file key to the final `new_files` vec.
    1666            0 :                 }
    1667            8 :                 (None, false) => {
    1668            8 :                     self.tline.aux_file_size_estimator.on_add(content.len());
    1669            8 :                     new_files.push((path, content));
    1670            8 :                 }
    1671            0 :                 (None, true) => warn!("removing non-existing aux file: {}", path),
    1672              :             }
    1673           10 :             let new_val = aux_file::encode_file_value(&new_files)?;
    1674           10 :             self.put(key, Value::Image(new_val.into()));
    1675           20 :         }
    1676              : 
    1677           30 :         if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
    1678           22 :             let file_path = path.to_string();
    1679           22 :             let content = if content.is_empty() {
    1680            2 :                 None
    1681              :             } else {
    1682           20 :                 Some(Bytes::copy_from_slice(content))
    1683              :             };
    1684              : 
    1685              :             let n_files;
    1686           22 :             let mut aux_files = self.tline.aux_files.lock().await;
    1687           22 :             if let Some(mut dir) = aux_files.dir.take() {
    1688              :                 // We already updated aux files in `self`: emit a delta and update our latest value.
    1689           10 :                 dir.upsert(file_path.clone(), content.clone());
    1690           10 :                 n_files = dir.files.len();
    1691           10 :                 if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
    1692            0 :                     self.put(
    1693            0 :                         AUX_FILES_KEY,
    1694            0 :                         Value::Image(Bytes::from(
    1695            0 :                             AuxFilesDirectory::ser(&dir).context("serialize")?,
    1696              :                         )),
    1697              :                     );
    1698            0 :                     aux_files.n_deltas = 0;
    1699           10 :                 } else {
    1700           10 :                     self.put(
    1701           10 :                         AUX_FILES_KEY,
    1702           10 :                         Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
    1703           10 :                     );
    1704           10 :                     aux_files.n_deltas += 1;
    1705           10 :                 }
    1706           10 :                 aux_files.dir = Some(dir);
    1707              :             } else {
    1708              :                 // Check if the AUX_FILES_KEY is initialized
    1709           12 :                 match self.get(AUX_FILES_KEY, ctx).await {
    1710            8 :                     Ok(dir_bytes) => {
    1711            8 :                         let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
    1712              :                         // Key is already set, we may append a delta
    1713            8 :                         self.put(
    1714            8 :                             AUX_FILES_KEY,
    1715            8 :                             Value::WalRecord(NeonWalRecord::AuxFile {
    1716            8 :                                 file_path: file_path.clone(),
    1717            8 :                                 content: content.clone(),
    1718            8 :                             }),
    1719            8 :                         );
    1720            8 :                         dir.upsert(file_path, content);
    1721            8 :                         n_files = dir.files.len();
    1722            8 :                         aux_files.dir = Some(dir);
    1723              :                     }
    1724              :                     Err(
    1725            0 :                         e @ (PageReconstructError::Cancelled
    1726            0 :                         | PageReconstructError::AncestorLsnTimeout(_)),
    1727            0 :                     ) => {
    1728            0 :                         // Important that we do not interpret a shutdown error as "not found" and thereby
    1729            0 :                         // reset the map.
    1730            0 :                         return Err(e.into());
    1731              :                     }
    1732              :                     // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
    1733              :                     // the original code assumes all other errors are missing keys. Therefore, we keep the code path
    1734              :                     // the same for now, though in theory, we should only match the `MissingKey` variant.
    1735              :                     Err(
    1736              :                         PageReconstructError::Other(_)
    1737              :                         | PageReconstructError::WalRedo(_)
    1738              :                         | PageReconstructError::MissingKey { .. },
    1739              :                     ) => {
    1740              :                         // Key is missing, we must insert an image as the basis for subsequent deltas.
    1741              : 
    1742            4 :                         let mut dir = AuxFilesDirectory {
    1743            4 :                             files: HashMap::new(),
    1744            4 :                         };
    1745            4 :                         dir.upsert(file_path, content);
    1746            4 :                         self.put(
    1747            4 :                             AUX_FILES_KEY,
    1748            4 :                             Value::Image(Bytes::from(
    1749            4 :                                 AuxFilesDirectory::ser(&dir).context("serialize")?,
    1750              :                             )),
    1751              :                         );
    1752            4 :                         n_files = 1;
    1753            4 :                         aux_files.dir = Some(dir);
    1754              :                     }
    1755              :                 }
    1756              :             }
    1757              : 
    1758           22 :             self.pending_directory_entries
    1759           22 :                 .push((DirectoryKind::AuxFiles, n_files));
    1760            8 :         }
    1761              : 
    1762           30 :         Ok(())
    1763           30 :     }
    1764              : 
    1765              :     ///
    1766              :     /// Flush changes accumulated so far to the underlying repository.
    1767              :     ///
    1768              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1769              :     /// you to flush them to the underlying repository before the final `commit`.
    1770              :     /// That allows to free up the memory used to hold the pending changes.
    1771              :     ///
    1772              :     /// Currently only used during bulk import of a data directory. In that
    1773              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1774              :     /// whole import fails and the timeline will be deleted anyway.
    1775              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1776              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1777              :     ///
    1778              :     /// Note: A consequence of flushing the pending operations is that they
    1779              :     /// won't be visible to subsequent operations until `commit`. The function
    1780              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1781              :     /// for bulk import, where you are just loading data pages and won't try to
    1782              :     /// modify the same pages twice.
    1783         1930 :     pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1784         1930 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1785         1930 :         // to scan through the pending_updates list.
    1786         1930 :         let pending_nblocks = self.pending_nblocks;
    1787         1930 :         if pending_nblocks < 10000 {
    1788         1930 :             return Ok(());
    1789            0 :         }
    1790              : 
    1791            0 :         let mut writer = self.tline.writer().await;
    1792              : 
    1793              :         // Flush relation and  SLRU data blocks, keep metadata.
    1794            0 :         let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
    1795            0 :         for (key, values) in self.pending_updates.drain() {
    1796            0 :             for (lsn, value) in values {
    1797            0 :                 if key.is_rel_block_key() || key.is_slru_block_key() {
    1798              :                     // This bails out on first error without modifying pending_updates.
    1799              :                     // That's Ok, cf this function's doc comment.
    1800            0 :                     writer.put(key, lsn, &value, ctx).await?;
    1801            0 :                 } else {
    1802            0 :                     retained_pending_updates
    1803            0 :                         .entry(key)
    1804            0 :                         .or_default()
    1805            0 :                         .push((lsn, value));
    1806            0 :                 }
    1807              :             }
    1808              :         }
    1809              : 
    1810            0 :         self.pending_updates = retained_pending_updates;
    1811            0 : 
    1812            0 :         if pending_nblocks != 0 {
    1813            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1814            0 :             self.pending_nblocks = 0;
    1815            0 :         }
    1816              : 
    1817            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1818            0 :             writer.update_directory_entries_count(kind, count as u64);
    1819            0 :         }
    1820              : 
    1821            0 :         Ok(())
    1822         1930 :     }
    1823              : 
    1824              :     ///
    1825              :     /// Finish this atomic update, writing all the updated keys to the
    1826              :     /// underlying timeline.
    1827              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1828              :     ///
    1829       743062 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    1830       743062 :         let mut writer = self.tline.writer().await;
    1831              : 
    1832       743062 :         let pending_nblocks = self.pending_nblocks;
    1833       743062 :         self.pending_nblocks = 0;
    1834       743062 : 
    1835       743062 :         if !self.pending_updates.is_empty() {
    1836              :             // The put_batch call below expects expects the inputs to be sorted by Lsn,
    1837              :             // so we do that first.
    1838       414046 :             let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
    1839       414046 :                 self.pending_updates
    1840       414046 :                     .drain()
    1841       700392 :                     .map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
    1842       414046 :                     .kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
    1843       414046 :                 VecMapOrdering::GreaterOrEqual,
    1844       414046 :             );
    1845       414046 : 
    1846       414046 :             writer.put_batch(lsn_ordered_batch, ctx).await?;
    1847       329016 :         }
    1848              : 
    1849       743062 :         if !self.pending_deletions.is_empty() {
    1850            2 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    1851            2 :             self.pending_deletions.clear();
    1852       743060 :         }
    1853              : 
    1854       743062 :         self.pending_lsns.push(self.lsn);
    1855       888920 :         for pending_lsn in self.pending_lsns.drain(..) {
    1856       888920 :             // Ideally, we should be able to call writer.finish_write() only once
    1857       888920 :             // with the highest LSN. However, the last_record_lsn variable in the
    1858       888920 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    1859       888920 :             // so we need to record every LSN to not leave a gap between them.
    1860       888920 :             writer.finish_write(pending_lsn);
    1861       888920 :         }
    1862              : 
    1863       743062 :         if pending_nblocks != 0 {
    1864       270570 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1865       472492 :         }
    1866              : 
    1867       743062 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    1868         2940 :             writer.update_directory_entries_count(kind, count as u64);
    1869         2940 :         }
    1870              : 
    1871       743062 :         Ok(())
    1872       743062 :     }
    1873              : 
    1874       291704 :     pub(crate) fn len(&self) -> usize {
    1875       291704 :         self.pending_updates.len() + self.pending_deletions.len()
    1876       291704 :     }
    1877              : 
    1878              :     // Internal helper functions to batch the modifications
    1879              : 
    1880       286592 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1881              :         // Have we already updated the same key? Read the latest pending updated
    1882              :         // version in that case.
    1883              :         //
    1884              :         // Note: we don't check pending_deletions. It is an error to request a
    1885              :         // value that has been removed, deletion only avoids leaking storage.
    1886       286592 :         if let Some(values) = self.pending_updates.get(&key) {
    1887        15928 :             if let Some((_, value)) = values.last() {
    1888        15928 :                 return if let Value::Image(img) = value {
    1889        15928 :                     Ok(img.clone())
    1890              :                 } else {
    1891              :                     // Currently, we never need to read back a WAL record that we
    1892              :                     // inserted in the same "transaction". All the metadata updates
    1893              :                     // work directly with Images, and we never need to read actual
    1894              :                     // data pages. We could handle this if we had to, by calling
    1895              :                     // the walredo manager, but let's keep it simple for now.
    1896            0 :                     Err(PageReconstructError::from(anyhow::anyhow!(
    1897            0 :                         "unexpected pending WAL record"
    1898            0 :                     )))
    1899              :                 };
    1900            0 :             }
    1901       270664 :         }
    1902       270664 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1903       270664 :         self.tline.get(key, lsn, ctx).await
    1904       286592 :     }
    1905              : 
    1906              :     /// Only used during unit tests, force putting a key into the modification.
    1907              :     #[cfg(test)]
    1908            2 :     pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
    1909            2 :         self.put(key, val);
    1910            2 :     }
    1911              : 
    1912       712552 :     fn put(&mut self, key: Key, val: Value) {
    1913       712552 :         let values = self.pending_updates.entry(key).or_default();
    1914              :         // Replace the previous value if it exists at the same lsn
    1915       712552 :         if let Some((last_lsn, last_value)) = values.last_mut() {
    1916        12166 :             if *last_lsn == self.lsn {
    1917        12160 :                 *last_value = val;
    1918        12160 :                 return;
    1919            6 :             }
    1920       700386 :         }
    1921       700392 :         values.push((self.lsn, val));
    1922       712552 :     }
    1923              : 
    1924            2 :     fn delete(&mut self, key_range: Range<Key>) {
    1925            2 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1926            2 :         self.pending_deletions.push((key_range, self.lsn));
    1927            2 :     }
    1928              : }
    1929              : 
    1930              : /// This struct facilitates accessing either a committed key from the timeline at a
    1931              : /// specific LSN, or the latest uncommitted key from a pending modification.
    1932              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    1933              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    1934              : /// need to look up the keys in the modification first before looking them up in the
    1935              : /// timeline to not miss the latest updates.
    1936              : #[derive(Clone, Copy)]
    1937              : pub enum Version<'a> {
    1938              :     Lsn(Lsn),
    1939              :     Modified(&'a DatadirModification<'a>),
    1940              : }
    1941              : 
    1942              : impl<'a> Version<'a> {
    1943        23560 :     async fn get(
    1944        23560 :         &self,
    1945        23560 :         timeline: &Timeline,
    1946        23560 :         key: Key,
    1947        23560 :         ctx: &RequestContext,
    1948        23560 :     ) -> Result<Bytes, PageReconstructError> {
    1949        23560 :         match self {
    1950        23540 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    1951           20 :             Version::Modified(modification) => modification.get(key, ctx).await,
    1952              :         }
    1953        23560 :     }
    1954              : 
    1955        35620 :     fn get_lsn(&self) -> Lsn {
    1956        35620 :         match self {
    1957        29574 :             Version::Lsn(lsn) => *lsn,
    1958         6046 :             Version::Modified(modification) => modification.lsn,
    1959              :         }
    1960        35620 :     }
    1961              : }
    1962              : 
    1963              : //--- Metadata structs stored in key-value pairs in the repository.
    1964              : 
    1965         2228 : #[derive(Debug, Serialize, Deserialize)]
    1966              : struct DbDirectory {
    1967              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1968              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1969              : }
    1970              : 
    1971          276 : #[derive(Debug, Serialize, Deserialize)]
    1972              : struct TwoPhaseDirectory {
    1973              :     xids: HashSet<TransactionId>,
    1974              : }
    1975              : 
    1976         1932 : #[derive(Debug, Serialize, Deserialize, Default)]
    1977              : struct RelDirectory {
    1978              :     // Set of relations that exist. (relfilenode, forknum)
    1979              :     //
    1980              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1981              :     // key-value pairs, if you have a lot of relations
    1982              :     rels: HashSet<(Oid, u8)>,
    1983              : }
    1984              : 
    1985           58 : #[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
    1986              : pub(crate) struct AuxFilesDirectory {
    1987              :     pub(crate) files: HashMap<String, Bytes>,
    1988              : }
    1989              : 
    1990              : impl AuxFilesDirectory {
    1991           48 :     pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
    1992           48 :         if let Some(value) = value {
    1993           42 :             self.files.insert(key, value);
    1994           42 :         } else {
    1995            6 :             self.files.remove(&key);
    1996            6 :         }
    1997           48 :     }
    1998              : }
    1999              : 
    2000            0 : #[derive(Debug, Serialize, Deserialize)]
    2001              : struct RelSizeEntry {
    2002              :     nblocks: u32,
    2003              : }
    2004              : 
    2005          828 : #[derive(Debug, Serialize, Deserialize, Default)]
    2006              : struct SlruSegmentDirectory {
    2007              :     // Set of SLRU segments that exist.
    2008              :     segments: HashSet<u32>,
    2009              : }
    2010              : 
    2011              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2012              : #[repr(u8)]
    2013              : pub(crate) enum DirectoryKind {
    2014              :     Db,
    2015              :     TwoPhase,
    2016              :     Rel,
    2017              :     AuxFiles,
    2018              :     SlruSegment(SlruKind),
    2019              : }
    2020              : 
    2021              : impl DirectoryKind {
    2022              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2023         5880 :     pub(crate) fn offset(&self) -> usize {
    2024         5880 :         self.into_usize()
    2025         5880 :     }
    2026              : }
    2027              : 
    2028              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2029              : 
    2030              : #[allow(clippy::bool_assert_comparison)]
    2031              : #[cfg(test)]
    2032              : mod tests {
    2033              :     use hex_literal::hex;
    2034              :     use utils::id::TimelineId;
    2035              : 
    2036              :     use super::*;
    2037              : 
    2038              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    2039              : 
    2040              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2041              :     #[tokio::test]
    2042            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2043            2 :         let name = "aux_files_round_trip";
    2044            2 :         let harness = TenantHarness::create(name).await?;
    2045            2 : 
    2046            2 :         pub const TIMELINE_ID: TimelineId =
    2047            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2048            2 : 
    2049            8 :         let (tenant, ctx) = harness.load().await;
    2050            2 :         let tline = tenant
    2051            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    2052            2 :             .await?;
    2053            2 :         let tline = tline.raw_timeline().unwrap();
    2054            2 : 
    2055            2 :         // First modification: insert two keys
    2056            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2057            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2058            2 :         modification.set_lsn(Lsn(0x1008))?;
    2059            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2060            2 :         modification.commit(&ctx).await?;
    2061            2 :         let expect_1008 = HashMap::from([
    2062            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2063            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2064            2 :         ]);
    2065            2 : 
    2066            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2067            2 :         assert_eq!(readback, expect_1008);
    2068            2 : 
    2069            2 :         // Second modification: update one key, remove the other
    2070            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2071            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2072            2 :         modification.set_lsn(Lsn(0x2008))?;
    2073            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2074            2 :         modification.commit(&ctx).await?;
    2075            2 :         let expect_2008 =
    2076            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2077            2 : 
    2078            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    2079            2 :         assert_eq!(readback, expect_2008);
    2080            2 : 
    2081            2 :         // Reading back in time works
    2082            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2083            2 :         assert_eq!(readback, expect_1008);
    2084            2 : 
    2085            2 :         Ok(())
    2086            2 :     }
    2087              : 
    2088              :     /*
    2089              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    2090              :             let incremental = timeline.get_current_logical_size();
    2091              :             let non_incremental = timeline
    2092              :                 .get_current_logical_size_non_incremental(lsn)
    2093              :                 .unwrap();
    2094              :             assert_eq!(incremental, non_incremental);
    2095              :         }
    2096              :     */
    2097              : 
    2098              :     /*
    2099              :     ///
    2100              :     /// Test list_rels() function, with branches and dropped relations
    2101              :     ///
    2102              :     #[test]
    2103              :     fn test_list_rels_drop() -> Result<()> {
    2104              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    2105              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    2106              :         const TESTDB: u32 = 111;
    2107              : 
    2108              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    2109              :         // after branching fails below
    2110              :         let mut writer = tline.begin_record(Lsn(0x10));
    2111              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    2112              :         writer.finish()?;
    2113              : 
    2114              :         // Create a relation on the timeline
    2115              :         let mut writer = tline.begin_record(Lsn(0x20));
    2116              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    2117              :         writer.finish()?;
    2118              : 
    2119              :         let writer = tline.begin_record(Lsn(0x00));
    2120              :         writer.finish()?;
    2121              : 
    2122              :         // Check that list_rels() lists it after LSN 2, but no before it
    2123              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    2124              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    2125              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    2126              : 
    2127              :         // Create a branch, check that the relation is visible there
    2128              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    2129              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    2130              :             Some(timeline) => timeline,
    2131              :             None => panic!("Should have a local timeline"),
    2132              :         };
    2133              :         let newtline = DatadirTimelineImpl::new(newtline);
    2134              :         assert!(newtline
    2135              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2136              :             .contains(&TESTREL_A));
    2137              : 
    2138              :         // Drop it on the branch
    2139              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    2140              :         new_writer.drop_relation(TESTREL_A)?;
    2141              :         new_writer.finish()?;
    2142              : 
    2143              :         // Check that it's no longer listed on the branch after the point where it was dropped
    2144              :         assert!(newtline
    2145              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2146              :             .contains(&TESTREL_A));
    2147              :         assert!(!newtline
    2148              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2149              :             .contains(&TESTREL_A));
    2150              : 
    2151              :         // Run checkpoint and garbage collection and check that it's still not visible
    2152              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    2153              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    2154              : 
    2155              :         assert!(!newtline
    2156              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2157              :             .contains(&TESTREL_A));
    2158              : 
    2159              :         Ok(())
    2160              :     }
    2161              :      */
    2162              : 
    2163              :     /*
    2164              :     #[test]
    2165              :     fn test_read_beyond_eof() -> Result<()> {
    2166              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    2167              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    2168              : 
    2169              :         make_some_layers(&tline, Lsn(0x20))?;
    2170              :         let mut writer = tline.begin_record(Lsn(0x60));
    2171              :         walingest.put_rel_page_image(
    2172              :             &mut writer,
    2173              :             TESTREL_A,
    2174              :             0,
    2175              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    2176              :         )?;
    2177              :         writer.finish()?;
    2178              : 
    2179              :         // Test read before rel creation. Should error out.
    2180              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2181              : 
    2182              :         // Read block beyond end of relation at different points in time.
    2183              :         // These reads should fall into different delta, image, and in-memory layers.
    2184              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2185              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2186              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2187              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2188              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2189              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2190              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2191              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2192              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2193              : 
    2194              :         // Test on an in-memory layer with no preceding layer
    2195              :         let mut writer = tline.begin_record(Lsn(0x70));
    2196              :         walingest.put_rel_page_image(
    2197              :             &mut writer,
    2198              :             TESTREL_B,
    2199              :             0,
    2200              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2201              :         )?;
    2202              :         writer.finish()?;
    2203              : 
    2204              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2205              : 
    2206              :         Ok(())
    2207              :     }
    2208              :      */
    2209              : }
        

Generated by: LCOV version 2.1-beta