LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 54.7 % 1888 1032
Test Date: 2025-02-20 13:11:02 Functions: 42.8 % 194 83

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::aux_file;
      11              : use crate::context::RequestContext;
      12              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      13              : use crate::metrics::{
      14              :     RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
      15              : };
      16              : use crate::span::{
      17              :     debug_assert_current_span_has_tenant_and_timeline_id,
      18              :     debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
      19              : };
      20              : use crate::tenant::storage_layer::IoConcurrency;
      21              : use crate::tenant::timeline::GetVectoredError;
      22              : use anyhow::{ensure, Context};
      23              : use bytes::{Buf, Bytes, BytesMut};
      24              : use enum_map::Enum;
      25              : use itertools::Itertools;
      26              : use pageserver_api::key::{
      27              :     dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
      28              :     rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range,
      29              :     slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key,
      30              :     twophase_file_key, twophase_key_range, CompactKey, RelDirExists, AUX_FILES_KEY, CHECKPOINT_KEY,
      31              :     CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      32              : };
      33              : use pageserver_api::key::{rel_tag_sparse_key, Key};
      34              : use pageserver_api::keyspace::SparseKeySpace;
      35              : use pageserver_api::record::NeonWalRecord;
      36              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      37              : use pageserver_api::shard::ShardIdentity;
      38              : use pageserver_api::value::Value;
      39              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      40              : use postgres_ffi::BLCKSZ;
      41              : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
      42              : use serde::{Deserialize, Serialize};
      43              : use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
      44              : use std::ops::ControlFlow;
      45              : use std::ops::Range;
      46              : use strum::IntoEnumIterator;
      47              : use tokio_util::sync::CancellationToken;
      48              : use tracing::{debug, info, trace, warn};
      49              : use utils::bin_ser::DeserializeError;
      50              : use utils::pausable_failpoint;
      51              : use utils::{bin_ser::BeSer, lsn::Lsn};
      52              : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
      53              : 
      54              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      55              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      56              : 
      57              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      58              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
      59              : 
      60              : #[derive(Debug)]
      61              : pub enum LsnForTimestamp {
      62              :     /// Found commits both before and after the given timestamp
      63              :     Present(Lsn),
      64              : 
      65              :     /// Found no commits after the given timestamp, this means
      66              :     /// that the newest data in the branch is older than the given
      67              :     /// timestamp.
      68              :     ///
      69              :     /// All commits <= LSN happened before the given timestamp
      70              :     Future(Lsn),
      71              : 
      72              :     /// The queried timestamp is past our horizon we look back at (PITR)
      73              :     ///
      74              :     /// All commits > LSN happened after the given timestamp,
      75              :     /// but any commits < LSN might have happened before or after
      76              :     /// the given timestamp. We don't know because no data before
      77              :     /// the given lsn is available.
      78              :     Past(Lsn),
      79              : 
      80              :     /// We have found no commit with a timestamp,
      81              :     /// so we can't return anything meaningful.
      82              :     ///
      83              :     /// The associated LSN is the lower bound value we can safely
      84              :     /// create branches on, but no statement is made if it is
      85              :     /// older or newer than the timestamp.
      86              :     ///
      87              :     /// This variant can e.g. be returned right after a
      88              :     /// cluster import.
      89              :     NoData(Lsn),
      90              : }
      91              : 
      92              : #[derive(Debug, thiserror::Error)]
      93              : pub(crate) enum CalculateLogicalSizeError {
      94              :     #[error("cancelled")]
      95              :     Cancelled,
      96              : 
      97              :     /// Something went wrong while reading the metadata we use to calculate logical size
      98              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
      99              :     /// in the `From` implementation for this variant.
     100              :     #[error(transparent)]
     101              :     PageRead(PageReconstructError),
     102              : 
     103              :     /// Something went wrong deserializing metadata that we read to calculate logical size
     104              :     #[error("decode error: {0}")]
     105              :     Decode(#[from] DeserializeError),
     106              : }
     107              : 
     108              : #[derive(Debug, thiserror::Error)]
     109              : pub(crate) enum CollectKeySpaceError {
     110              :     #[error(transparent)]
     111              :     Decode(#[from] DeserializeError),
     112              :     #[error(transparent)]
     113              :     PageRead(PageReconstructError),
     114              :     #[error("cancelled")]
     115              :     Cancelled,
     116              : }
     117              : 
     118              : impl From<PageReconstructError> for CollectKeySpaceError {
     119            0 :     fn from(err: PageReconstructError) -> Self {
     120            0 :         match err {
     121            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     122            0 :             err => Self::PageRead(err),
     123              :         }
     124            0 :     }
     125              : }
     126              : 
     127              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     128            0 :     fn from(pre: PageReconstructError) -> Self {
     129            0 :         match pre {
     130            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     131            0 :             _ => Self::PageRead(pre),
     132              :         }
     133            0 :     }
     134              : }
     135              : 
     136              : #[derive(Debug, thiserror::Error)]
     137              : pub enum RelationError {
     138              :     #[error("Relation Already Exists")]
     139              :     AlreadyExists,
     140              :     #[error("invalid relnode")]
     141              :     InvalidRelnode,
     142              :     #[error(transparent)]
     143              :     Other(#[from] anyhow::Error),
     144              : }
     145              : 
     146              : ///
     147              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     148              : /// and other special kinds of files, in a versioned key-value store. The
     149              : /// Timeline struct provides the key-value store.
     150              : ///
     151              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     152              : /// implementation, and might be moved into a separate struct later.
     153              : impl Timeline {
     154              :     /// Start ingesting a WAL record, or other atomic modification of
     155              :     /// the timeline.
     156              :     ///
     157              :     /// This provides a transaction-like interface to perform a bunch
     158              :     /// of modifications atomically.
     159              :     ///
     160              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     161              :     /// DatadirModification object. Use the functions in the object to
     162              :     /// modify the repository state, updating all the pages and metadata
     163              :     /// that the WAL record affects. When you're done, call commit() to
     164              :     /// commit the changes.
     165              :     ///
     166              :     /// Lsn stored in modification is advanced by `ingest_record` and
     167              :     /// is used by `commit()` to update `last_record_lsn`.
     168              :     ///
     169              :     /// Calling commit() will flush all the changes and reset the state,
     170              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     171              :     ///
     172              :     /// Note that any pending modifications you make through the
     173              :     /// modification object won't be visible to calls to the 'get' and list
     174              :     /// functions of the timeline until you finish! And if you update the
     175              :     /// same page twice, the last update wins.
     176              :     ///
     177       536820 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     178       536820 :     where
     179       536820 :         Self: Sized,
     180       536820 :     {
     181       536820 :         DatadirModification {
     182       536820 :             tline: self,
     183       536820 :             pending_lsns: Vec::new(),
     184       536820 :             pending_metadata_pages: HashMap::new(),
     185       536820 :             pending_data_batch: None,
     186       536820 :             pending_deletions: Vec::new(),
     187       536820 :             pending_nblocks: 0,
     188       536820 :             pending_directory_entries: Vec::new(),
     189       536820 :             pending_metadata_bytes: 0,
     190       536820 :             lsn,
     191       536820 :         }
     192       536820 :     }
     193              : 
     194              :     //------------------------------------------------------------------------------
     195              :     // Public GET functions
     196              :     //------------------------------------------------------------------------------
     197              : 
     198              :     /// Look up given page version.
     199        36768 :     pub(crate) async fn get_rel_page_at_lsn(
     200        36768 :         &self,
     201        36768 :         tag: RelTag,
     202        36768 :         blknum: BlockNumber,
     203        36768 :         version: Version<'_>,
     204        36768 :         ctx: &RequestContext,
     205        36768 :         io_concurrency: IoConcurrency,
     206        36768 :     ) -> Result<Bytes, PageReconstructError> {
     207        36768 :         match version {
     208        36768 :             Version::Lsn(effective_lsn) => {
     209        36768 :                 let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
     210        36768 :                 let res = self
     211        36768 :                     .get_rel_page_at_lsn_batched(
     212        36768 :                         pages.iter().map(|(tag, blknum)| (tag, blknum)),
     213        36768 :                         effective_lsn,
     214        36768 :                         io_concurrency.clone(),
     215        36768 :                         ctx,
     216        36768 :                     )
     217        36768 :                     .await;
     218        36768 :                 assert_eq!(res.len(), 1);
     219        36768 :                 res.into_iter().next().unwrap()
     220              :             }
     221            0 :             Version::Modified(modification) => {
     222            0 :                 if tag.relnode == 0 {
     223            0 :                     return Err(PageReconstructError::Other(
     224            0 :                         RelationError::InvalidRelnode.into(),
     225            0 :                     ));
     226            0 :                 }
     227              : 
     228            0 :                 let nblocks = self.get_rel_size(tag, version, ctx).await?;
     229            0 :                 if blknum >= nblocks {
     230            0 :                     debug!(
     231            0 :                         "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     232            0 :                         tag,
     233            0 :                         blknum,
     234            0 :                         version.get_lsn(),
     235              :                         nblocks
     236              :                     );
     237            0 :                     return Ok(ZERO_PAGE.clone());
     238            0 :                 }
     239            0 : 
     240            0 :                 let key = rel_block_to_key(tag, blknum);
     241            0 :                 modification.get(key, ctx).await
     242              :             }
     243              :         }
     244        36768 :     }
     245              : 
     246              :     /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
     247              :     ///
     248              :     /// The ordering of the returned vec corresponds to the ordering of `pages`.
     249        36768 :     pub(crate) async fn get_rel_page_at_lsn_batched(
     250        36768 :         &self,
     251        36768 :         pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
     252        36768 :         effective_lsn: Lsn,
     253        36768 :         io_concurrency: IoConcurrency,
     254        36768 :         ctx: &RequestContext,
     255        36768 :     ) -> Vec<Result<Bytes, PageReconstructError>> {
     256        36768 :         debug_assert_current_span_has_tenant_and_timeline_id();
     257        36768 : 
     258        36768 :         let mut slots_filled = 0;
     259        36768 :         let page_count = pages.len();
     260        36768 : 
     261        36768 :         // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
     262        36768 :         let mut result = Vec::with_capacity(pages.len());
     263        36768 :         let result_slots = result.spare_capacity_mut();
     264        36768 : 
     265        36768 :         let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
     266        36768 :         for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
     267        36768 :             if tag.relnode == 0 {
     268            0 :                 result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
     269            0 :                     RelationError::InvalidRelnode.into(),
     270            0 :                 )));
     271            0 : 
     272            0 :                 slots_filled += 1;
     273            0 :                 continue;
     274        36768 :             }
     275              : 
     276        36768 :             let nblocks = match self
     277        36768 :                 .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
     278        36768 :                 .await
     279              :             {
     280        36768 :                 Ok(nblocks) => nblocks,
     281            0 :                 Err(err) => {
     282            0 :                     result_slots[response_slot_idx].write(Err(err));
     283            0 :                     slots_filled += 1;
     284            0 :                     continue;
     285              :                 }
     286              :             };
     287              : 
     288        36768 :             if *blknum >= nblocks {
     289            0 :                 debug!(
     290            0 :                     "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     291              :                     tag, blknum, effective_lsn, nblocks
     292              :                 );
     293            0 :                 result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
     294            0 :                 slots_filled += 1;
     295            0 :                 continue;
     296        36768 :             }
     297        36768 : 
     298        36768 :             let key = rel_block_to_key(*tag, *blknum);
     299        36768 : 
     300        36768 :             let key_slots = keys_slots.entry(key).or_default();
     301        36768 :             key_slots.push(response_slot_idx);
     302              :         }
     303              : 
     304        36768 :         let keyspace = {
     305              :             // add_key requires monotonicity
     306        36768 :             let mut acc = KeySpaceAccum::new();
     307        36768 :             for key in keys_slots
     308        36768 :                 .keys()
     309        36768 :                 // in fact it requires strong monotonicity
     310        36768 :                 .dedup()
     311        36768 :             {
     312        36768 :                 acc.add_key(*key);
     313        36768 :             }
     314        36768 :             acc.to_keyspace()
     315        36768 :         };
     316        36768 : 
     317        36768 :         match self
     318        36768 :             .get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
     319        36768 :             .await
     320              :         {
     321        36768 :             Ok(results) => {
     322        73536 :                 for (key, res) in results {
     323        36768 :                     let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
     324        36768 :                     let first_slot = key_slots.next().unwrap();
     325              : 
     326        36768 :                     for slot in key_slots {
     327            0 :                         let clone = match &res {
     328            0 :                             Ok(buf) => Ok(buf.clone()),
     329            0 :                             Err(err) => Err(match err {
     330              :                                 PageReconstructError::Cancelled => {
     331            0 :                                     PageReconstructError::Cancelled
     332              :                                 }
     333              : 
     334            0 :                                 x @ PageReconstructError::Other(_) |
     335            0 :                                 x @ PageReconstructError::AncestorLsnTimeout(_) |
     336            0 :                                 x @ PageReconstructError::WalRedo(_) |
     337            0 :                                 x @ PageReconstructError::MissingKey(_) => {
     338            0 :                                     PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
     339              :                                 },
     340              :                             }),
     341              :                         };
     342              : 
     343            0 :                         result_slots[slot].write(clone);
     344            0 :                         slots_filled += 1;
     345              :                     }
     346              : 
     347        36768 :                     result_slots[first_slot].write(res);
     348        36768 :                     slots_filled += 1;
     349              :                 }
     350              :             }
     351            0 :             Err(err) => {
     352              :                 // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
     353              :                 // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
     354            0 :                 for slot in keys_slots.values().flatten() {
     355              :                     // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
     356              :                     // but without taking ownership of the GetVectoredError
     357            0 :                     let err = match &err {
     358              :                         GetVectoredError::Cancelled => {
     359            0 :                             Err(PageReconstructError::Cancelled)
     360              :                         }
     361              :                         // TODO: restructure get_vectored API to make this error per-key
     362            0 :                         GetVectoredError::MissingKey(err) => {
     363            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}")))
     364              :                         }
     365              :                         // TODO: restructure get_vectored API to make this error per-key
     366            0 :                         GetVectoredError::GetReadyAncestorError(err) => {
     367            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}")))
     368              :                         }
     369              :                         // TODO: restructure get_vectored API to make this error per-key
     370            0 :                         GetVectoredError::Other(err) => {
     371            0 :                             Err(PageReconstructError::Other(
     372            0 :                                 anyhow::anyhow!("whole vectored get request failed: {err:?}"),
     373            0 :                             ))
     374              :                         }
     375              :                         // TODO: we can prevent this error class by moving this check into the type system
     376            0 :                         GetVectoredError::InvalidLsn(e) => {
     377            0 :                             Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
     378              :                         }
     379              :                         // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
     380              :                         // TODO: we can prevent this error class by moving this check into the type system
     381            0 :                         GetVectoredError::Oversized(err) => {
     382            0 :                             Err(anyhow::anyhow!(
     383            0 :                                 "batching oversized: {err:?}"
     384            0 :                             )
     385            0 :                             .into())
     386              :                         }
     387              :                     };
     388              : 
     389            0 :                     result_slots[*slot].write(err);
     390              :                 }
     391              : 
     392            0 :                 slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
     393            0 :             }
     394              :         };
     395              : 
     396        36768 :         assert_eq!(slots_filled, page_count);
     397              :         // SAFETY:
     398              :         // 1. `result` and any of its uninint members are not read from until this point
     399              :         // 2. The length below is tracked at run-time and matches the number of requested pages.
     400        36768 :         unsafe {
     401        36768 :             result.set_len(page_count);
     402        36768 :         }
     403        36768 : 
     404        36768 :         result
     405        36768 :     }
     406              : 
     407              :     /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
     408              :     /// other shards, by only accounting for relations the shard has pages for, and only accounting
     409              :     /// for pages up to the highest page number it has stored.
     410            0 :     pub(crate) async fn get_db_size(
     411            0 :         &self,
     412            0 :         spcnode: Oid,
     413            0 :         dbnode: Oid,
     414            0 :         version: Version<'_>,
     415            0 :         ctx: &RequestContext,
     416            0 :     ) -> Result<usize, PageReconstructError> {
     417            0 :         let mut total_blocks = 0;
     418              : 
     419            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     420              : 
     421            0 :         for rel in rels {
     422            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     423            0 :             total_blocks += n_blocks as usize;
     424              :         }
     425            0 :         Ok(total_blocks)
     426            0 :     }
     427              : 
     428              :     /// Get size of a relation file. The relation must exist, otherwise an error is returned.
     429              :     ///
     430              :     /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
     431              :     /// page number stored in the shard.
     432        48868 :     pub(crate) async fn get_rel_size(
     433        48868 :         &self,
     434        48868 :         tag: RelTag,
     435        48868 :         version: Version<'_>,
     436        48868 :         ctx: &RequestContext,
     437        48868 :     ) -> Result<BlockNumber, PageReconstructError> {
     438        48868 :         if tag.relnode == 0 {
     439            0 :             return Err(PageReconstructError::Other(
     440            0 :                 RelationError::InvalidRelnode.into(),
     441            0 :             ));
     442        48868 :         }
     443              : 
     444        48868 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     445        38588 :             return Ok(nblocks);
     446        10280 :         }
     447        10280 : 
     448        10280 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     449            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     450              :         {
     451              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     452              :             // FSM, and smgrnblocks() on it immediately afterwards,
     453              :             // without extending it.  Tolerate that by claiming that
     454              :             // any non-existent FSM fork has size 0.
     455            0 :             return Ok(0);
     456        10280 :         }
     457        10280 : 
     458        10280 :         let key = rel_size_to_key(tag);
     459        10280 :         let mut buf = version.get(self, key, ctx).await?;
     460        10272 :         let nblocks = buf.get_u32_le();
     461        10272 : 
     462        10272 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     463        10272 : 
     464        10272 :         Ok(nblocks)
     465        48868 :     }
     466              : 
     467              :     /// Does the relation exist?
     468              :     ///
     469              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     470              :     /// the shard stores pages for.
     471        12100 :     pub(crate) async fn get_rel_exists(
     472        12100 :         &self,
     473        12100 :         tag: RelTag,
     474        12100 :         version: Version<'_>,
     475        12100 :         ctx: &RequestContext,
     476        12100 :     ) -> Result<bool, PageReconstructError> {
     477        12100 :         if tag.relnode == 0 {
     478            0 :             return Err(PageReconstructError::Other(
     479            0 :                 RelationError::InvalidRelnode.into(),
     480            0 :             ));
     481        12100 :         }
     482              : 
     483              :         // first try to lookup relation in cache
     484        12100 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     485        12064 :             return Ok(true);
     486           36 :         }
     487              :         // then check if the database was already initialized.
     488              :         // get_rel_exists can be called before dbdir is created.
     489           36 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     490           36 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     491           36 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     492            0 :             return Ok(false);
     493           36 :         }
     494           36 : 
     495           36 :         // Read path: first read the new reldir keyspace. Early return if the relation exists.
     496           36 :         // Otherwise, read the old reldir keyspace.
     497           36 :         // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
     498           36 : 
     499           36 :         if self.get_rel_size_v2_enabled() {
     500              :             // fetch directory listing (new)
     501            0 :             let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
     502            0 :             let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
     503            0 :                 .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
     504            0 :             let exists_v2 = buf == RelDirExists::Exists;
     505            0 :             // Fast path: if the relation exists in the new format, return true.
     506            0 :             // TODO: we should have a verification mode that checks both keyspaces
     507            0 :             // to ensure the relation only exists in one of them.
     508            0 :             if exists_v2 {
     509            0 :                 return Ok(true);
     510            0 :             }
     511           36 :         }
     512              : 
     513              :         // fetch directory listing (old)
     514              : 
     515           36 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     516           36 :         let buf = version.get(self, key, ctx).await?;
     517              : 
     518           36 :         let dir = RelDirectory::des(&buf)?;
     519           36 :         let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
     520           36 :         Ok(exists_v1)
     521        12100 :     }
     522              : 
     523              :     /// Get a list of all existing relations in given tablespace and database.
     524              :     ///
     525              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     526              :     /// the shard stores pages for.
     527              :     ///
     528              :     /// # Cancel-Safety
     529              :     ///
     530              :     /// This method is cancellation-safe.
     531            0 :     pub(crate) async fn list_rels(
     532            0 :         &self,
     533            0 :         spcnode: Oid,
     534            0 :         dbnode: Oid,
     535            0 :         version: Version<'_>,
     536            0 :         ctx: &RequestContext,
     537            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     538            0 :         // fetch directory listing (old)
     539            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     540            0 :         let buf = version.get(self, key, ctx).await?;
     541              : 
     542            0 :         let dir = RelDirectory::des(&buf)?;
     543            0 :         let rels_v1: HashSet<RelTag> =
     544            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     545            0 :                 spcnode,
     546            0 :                 dbnode,
     547            0 :                 relnode: *relnode,
     548            0 :                 forknum: *forknum,
     549            0 :             }));
     550            0 : 
     551            0 :         if !self.get_rel_size_v2_enabled() {
     552            0 :             return Ok(rels_v1);
     553            0 :         }
     554            0 : 
     555            0 :         // scan directory listing (new), merge with the old results
     556            0 :         let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
     557            0 :         let io_concurrency = IoConcurrency::spawn_from_conf(
     558            0 :             self.conf,
     559            0 :             self.gate
     560            0 :                 .enter()
     561            0 :                 .map_err(|_| PageReconstructError::Cancelled)?,
     562              :         );
     563            0 :         let results = self
     564            0 :             .scan(
     565            0 :                 KeySpace::single(key_range),
     566            0 :                 version.get_lsn(),
     567            0 :                 ctx,
     568            0 :                 io_concurrency,
     569            0 :             )
     570            0 :             .await?;
     571            0 :         let mut rels = rels_v1;
     572            0 :         for (key, val) in results {
     573            0 :             let val = RelDirExists::decode(&val?)
     574            0 :                 .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
     575            0 :             assert_eq!(key.field6, 1);
     576            0 :             assert_eq!(key.field2, spcnode);
     577            0 :             assert_eq!(key.field3, dbnode);
     578            0 :             let tag = RelTag {
     579            0 :                 spcnode,
     580            0 :                 dbnode,
     581            0 :                 relnode: key.field4,
     582            0 :                 forknum: key.field5,
     583            0 :             };
     584            0 :             if val == RelDirExists::Removed {
     585            0 :                 debug_assert!(!rels.contains(&tag), "removed reltag in v2");
     586            0 :                 continue;
     587            0 :             }
     588            0 :             let did_not_contain = rels.insert(tag);
     589            0 :             debug_assert!(did_not_contain, "duplicate reltag in v2");
     590              :         }
     591            0 :         Ok(rels)
     592            0 :     }
     593              : 
     594              :     /// Get the whole SLRU segment
     595            0 :     pub(crate) async fn get_slru_segment(
     596            0 :         &self,
     597            0 :         kind: SlruKind,
     598            0 :         segno: u32,
     599            0 :         lsn: Lsn,
     600            0 :         ctx: &RequestContext,
     601            0 :     ) -> Result<Bytes, PageReconstructError> {
     602            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     603            0 :         let n_blocks = self
     604            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     605            0 :             .await?;
     606            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     607            0 :         for blkno in 0..n_blocks {
     608            0 :             let block = self
     609            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     610            0 :                 .await?;
     611            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     612              :         }
     613            0 :         Ok(segment.freeze())
     614            0 :     }
     615              : 
     616              :     /// Look up given SLRU page version.
     617            0 :     pub(crate) async fn get_slru_page_at_lsn(
     618            0 :         &self,
     619            0 :         kind: SlruKind,
     620            0 :         segno: u32,
     621            0 :         blknum: BlockNumber,
     622            0 :         lsn: Lsn,
     623            0 :         ctx: &RequestContext,
     624            0 :     ) -> Result<Bytes, PageReconstructError> {
     625            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     626            0 :         let key = slru_block_to_key(kind, segno, blknum);
     627            0 :         self.get(key, lsn, ctx).await
     628            0 :     }
     629              : 
     630              :     /// Get size of an SLRU segment
     631            0 :     pub(crate) async fn get_slru_segment_size(
     632            0 :         &self,
     633            0 :         kind: SlruKind,
     634            0 :         segno: u32,
     635            0 :         version: Version<'_>,
     636            0 :         ctx: &RequestContext,
     637            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     638            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     639            0 :         let key = slru_segment_size_to_key(kind, segno);
     640            0 :         let mut buf = version.get(self, key, ctx).await?;
     641            0 :         Ok(buf.get_u32_le())
     642            0 :     }
     643              : 
     644              :     /// Get size of an SLRU segment
     645            0 :     pub(crate) async fn get_slru_segment_exists(
     646            0 :         &self,
     647            0 :         kind: SlruKind,
     648            0 :         segno: u32,
     649            0 :         version: Version<'_>,
     650            0 :         ctx: &RequestContext,
     651            0 :     ) -> Result<bool, PageReconstructError> {
     652            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     653              :         // fetch directory listing
     654            0 :         let key = slru_dir_to_key(kind);
     655            0 :         let buf = version.get(self, key, ctx).await?;
     656              : 
     657            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     658            0 :         Ok(dir.segments.contains(&segno))
     659            0 :     }
     660              : 
     661              :     /// Locate LSN, such that all transactions that committed before
     662              :     /// 'search_timestamp' are visible, but nothing newer is.
     663              :     ///
     664              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     665              :     /// so it's not well defined which LSN you get if there were multiple commits
     666              :     /// "in flight" at that point in time.
     667              :     ///
     668            0 :     pub(crate) async fn find_lsn_for_timestamp(
     669            0 :         &self,
     670            0 :         search_timestamp: TimestampTz,
     671            0 :         cancel: &CancellationToken,
     672            0 :         ctx: &RequestContext,
     673            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     674            0 :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     675              : 
     676            0 :         let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
     677            0 :         let gc_cutoff_planned = {
     678            0 :             let gc_info = self.gc_info.read().unwrap();
     679            0 :             gc_info.min_cutoff()
     680            0 :         };
     681            0 :         // Usually the planned cutoff is newer than the cutoff of the last gc run,
     682            0 :         // but let's be defensive.
     683            0 :         let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
     684            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     685            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     686            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     687            0 :         // on the safe side.
     688            0 :         let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
     689            0 :         let max_lsn = self.get_last_record_lsn();
     690            0 : 
     691            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     692            0 :         // LSN divided by 8.
     693            0 :         let mut low = min_lsn.0 / 8;
     694            0 :         let mut high = max_lsn.0 / 8 + 1;
     695            0 : 
     696            0 :         let mut found_smaller = false;
     697            0 :         let mut found_larger = false;
     698              : 
     699            0 :         while low < high {
     700            0 :             if cancel.is_cancelled() {
     701            0 :                 return Err(PageReconstructError::Cancelled);
     702            0 :             }
     703            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     704            0 :             let mid = (high + low) / 2;
     705              : 
     706            0 :             let cmp = match self
     707            0 :                 .is_latest_commit_timestamp_ge_than(
     708            0 :                     search_timestamp,
     709            0 :                     Lsn(mid * 8),
     710            0 :                     &mut found_smaller,
     711            0 :                     &mut found_larger,
     712            0 :                     ctx,
     713            0 :                 )
     714            0 :                 .await
     715              :             {
     716            0 :                 Ok(res) => res,
     717            0 :                 Err(PageReconstructError::MissingKey(e)) => {
     718            0 :                     warn!("Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}", e);
     719              :                     // Return that we didn't find any requests smaller than the LSN, and logging the error.
     720            0 :                     return Ok(LsnForTimestamp::Past(min_lsn));
     721              :                 }
     722            0 :                 Err(e) => return Err(e),
     723              :             };
     724              : 
     725            0 :             if cmp {
     726            0 :                 high = mid;
     727            0 :             } else {
     728            0 :                 low = mid + 1;
     729            0 :             }
     730              :         }
     731              : 
     732              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     733              :         // so the LSN of the last commit record before or at `search_timestamp`.
     734              :         // Remove one from `low` to get `t`.
     735              :         //
     736              :         // FIXME: it would be better to get the LSN of the previous commit.
     737              :         // Otherwise, if you restore to the returned LSN, the database will
     738              :         // include physical changes from later commits that will be marked
     739              :         // as aborted, and will need to be vacuumed away.
     740            0 :         let commit_lsn = Lsn((low - 1) * 8);
     741            0 :         match (found_smaller, found_larger) {
     742              :             (false, false) => {
     743              :                 // This can happen if no commit records have been processed yet, e.g.
     744              :                 // just after importing a cluster.
     745            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     746              :             }
     747              :             (false, true) => {
     748              :                 // Didn't find any commit timestamps smaller than the request
     749            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     750              :             }
     751            0 :             (true, _) if commit_lsn < min_lsn => {
     752            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     753            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     754            0 :                 // below the min_lsn. We should never do that.
     755            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     756              :             }
     757              :             (true, false) => {
     758              :                 // Only found commits with timestamps smaller than the request.
     759              :                 // It's still a valid case for branch creation, return it.
     760              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     761              :                 // case, anyway.
     762            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     763              :             }
     764            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     765              :         }
     766            0 :     }
     767              : 
     768              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     769              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     770              :     ///
     771              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     772              :     /// with a smaller/larger timestamp.
     773              :     ///
     774            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     775            0 :         &self,
     776            0 :         search_timestamp: TimestampTz,
     777            0 :         probe_lsn: Lsn,
     778            0 :         found_smaller: &mut bool,
     779            0 :         found_larger: &mut bool,
     780            0 :         ctx: &RequestContext,
     781            0 :     ) -> Result<bool, PageReconstructError> {
     782            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     783            0 :             if timestamp >= search_timestamp {
     784            0 :                 *found_larger = true;
     785            0 :                 return ControlFlow::Break(true);
     786            0 :             } else {
     787            0 :                 *found_smaller = true;
     788            0 :             }
     789            0 :             ControlFlow::Continue(())
     790            0 :         })
     791            0 :         .await
     792            0 :     }
     793              : 
     794              :     /// Obtain the possible timestamp range for the given lsn.
     795              :     ///
     796              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     797            0 :     pub(crate) async fn get_timestamp_for_lsn(
     798            0 :         &self,
     799            0 :         probe_lsn: Lsn,
     800            0 :         ctx: &RequestContext,
     801            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     802            0 :         let mut max: Option<TimestampTz> = None;
     803            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     804            0 :             if let Some(max_prev) = max {
     805            0 :                 max = Some(max_prev.max(timestamp));
     806            0 :             } else {
     807            0 :                 max = Some(timestamp);
     808            0 :             }
     809            0 :             ControlFlow::Continue(())
     810            0 :         })
     811            0 :         .await?;
     812              : 
     813            0 :         Ok(max)
     814            0 :     }
     815              : 
     816              :     /// Runs the given function on all the timestamps for a given lsn
     817              :     ///
     818              :     /// The return value is either given by the closure, or set to the `Default`
     819              :     /// impl's output.
     820            0 :     async fn map_all_timestamps<T: Default>(
     821            0 :         &self,
     822            0 :         probe_lsn: Lsn,
     823            0 :         ctx: &RequestContext,
     824            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     825            0 :     ) -> Result<T, PageReconstructError> {
     826            0 :         for segno in self
     827            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     828            0 :             .await?
     829              :         {
     830            0 :             let nblocks = self
     831            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     832            0 :                 .await?;
     833            0 :             for blknum in (0..nblocks).rev() {
     834            0 :                 let clog_page = self
     835            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     836            0 :                     .await?;
     837              : 
     838            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     839            0 :                     let mut timestamp_bytes = [0u8; 8];
     840            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     841            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     842            0 : 
     843            0 :                     match f(timestamp) {
     844            0 :                         ControlFlow::Break(b) => return Ok(b),
     845            0 :                         ControlFlow::Continue(()) => (),
     846              :                     }
     847            0 :                 }
     848              :             }
     849              :         }
     850            0 :         Ok(Default::default())
     851            0 :     }
     852              : 
     853            0 :     pub(crate) async fn get_slru_keyspace(
     854            0 :         &self,
     855            0 :         version: Version<'_>,
     856            0 :         ctx: &RequestContext,
     857            0 :     ) -> Result<KeySpace, PageReconstructError> {
     858            0 :         let mut accum = KeySpaceAccum::new();
     859              : 
     860            0 :         for kind in SlruKind::iter() {
     861            0 :             let mut segments: Vec<u32> = self
     862            0 :                 .list_slru_segments(kind, version, ctx)
     863            0 :                 .await?
     864            0 :                 .into_iter()
     865            0 :                 .collect();
     866            0 :             segments.sort_unstable();
     867              : 
     868            0 :             for seg in segments {
     869            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     870              : 
     871            0 :                 accum.add_range(
     872            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     873            0 :                 );
     874              :             }
     875              :         }
     876              : 
     877            0 :         Ok(accum.to_keyspace())
     878            0 :     }
     879              : 
     880              :     /// Get a list of SLRU segments
     881            0 :     pub(crate) async fn list_slru_segments(
     882            0 :         &self,
     883            0 :         kind: SlruKind,
     884            0 :         version: Version<'_>,
     885            0 :         ctx: &RequestContext,
     886            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     887            0 :         // fetch directory entry
     888            0 :         let key = slru_dir_to_key(kind);
     889              : 
     890            0 :         let buf = version.get(self, key, ctx).await?;
     891            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
     892            0 :     }
     893              : 
     894            0 :     pub(crate) async fn get_relmap_file(
     895            0 :         &self,
     896            0 :         spcnode: Oid,
     897            0 :         dbnode: Oid,
     898            0 :         version: Version<'_>,
     899            0 :         ctx: &RequestContext,
     900            0 :     ) -> Result<Bytes, PageReconstructError> {
     901            0 :         let key = relmap_file_key(spcnode, dbnode);
     902              : 
     903            0 :         let buf = version.get(self, key, ctx).await?;
     904            0 :         Ok(buf)
     905            0 :     }
     906              : 
     907          644 :     pub(crate) async fn list_dbdirs(
     908          644 :         &self,
     909          644 :         lsn: Lsn,
     910          644 :         ctx: &RequestContext,
     911          644 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     912              :         // fetch directory entry
     913          644 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     914              : 
     915          644 :         Ok(DbDirectory::des(&buf)?.dbdirs)
     916          644 :     }
     917              : 
     918            0 :     pub(crate) async fn get_twophase_file(
     919            0 :         &self,
     920            0 :         xid: u64,
     921            0 :         lsn: Lsn,
     922            0 :         ctx: &RequestContext,
     923            0 :     ) -> Result<Bytes, PageReconstructError> {
     924            0 :         let key = twophase_file_key(xid);
     925            0 :         let buf = self.get(key, lsn, ctx).await?;
     926            0 :         Ok(buf)
     927            0 :     }
     928              : 
     929          648 :     pub(crate) async fn list_twophase_files(
     930          648 :         &self,
     931          648 :         lsn: Lsn,
     932          648 :         ctx: &RequestContext,
     933          648 :     ) -> Result<HashSet<u64>, PageReconstructError> {
     934              :         // fetch directory entry
     935          648 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     936              : 
     937          648 :         if self.pg_version >= 17 {
     938            0 :             Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
     939              :         } else {
     940          648 :             Ok(TwoPhaseDirectory::des(&buf)?
     941              :                 .xids
     942          648 :                 .iter()
     943          648 :                 .map(|x| u64::from(*x))
     944          648 :                 .collect())
     945              :         }
     946          648 :     }
     947              : 
     948            0 :     pub(crate) async fn get_control_file(
     949            0 :         &self,
     950            0 :         lsn: Lsn,
     951            0 :         ctx: &RequestContext,
     952            0 :     ) -> Result<Bytes, PageReconstructError> {
     953            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     954            0 :     }
     955              : 
     956           24 :     pub(crate) async fn get_checkpoint(
     957           24 :         &self,
     958           24 :         lsn: Lsn,
     959           24 :         ctx: &RequestContext,
     960           24 :     ) -> Result<Bytes, PageReconstructError> {
     961           24 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     962           24 :     }
     963              : 
     964           24 :     async fn list_aux_files_v2(
     965           24 :         &self,
     966           24 :         lsn: Lsn,
     967           24 :         ctx: &RequestContext,
     968           24 :         io_concurrency: IoConcurrency,
     969           24 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     970           24 :         let kv = self
     971           24 :             .scan(
     972           24 :                 KeySpace::single(Key::metadata_aux_key_range()),
     973           24 :                 lsn,
     974           24 :                 ctx,
     975           24 :                 io_concurrency,
     976           24 :             )
     977           24 :             .await?;
     978           24 :         let mut result = HashMap::new();
     979           24 :         let mut sz = 0;
     980           60 :         for (_, v) in kv {
     981           36 :             let v = v?;
     982           36 :             let v = aux_file::decode_file_value_bytes(&v)
     983           36 :                 .context("value decode")
     984           36 :                 .map_err(PageReconstructError::Other)?;
     985           68 :             for (fname, content) in v {
     986           32 :                 sz += fname.len();
     987           32 :                 sz += content.len();
     988           32 :                 result.insert(fname, content);
     989           32 :             }
     990              :         }
     991           24 :         self.aux_file_size_estimator.on_initial(sz);
     992           24 :         Ok(result)
     993           24 :     }
     994              : 
     995            0 :     pub(crate) async fn trigger_aux_file_size_computation(
     996            0 :         &self,
     997            0 :         lsn: Lsn,
     998            0 :         ctx: &RequestContext,
     999            0 :         io_concurrency: IoConcurrency,
    1000            0 :     ) -> Result<(), PageReconstructError> {
    1001            0 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
    1002            0 :         Ok(())
    1003            0 :     }
    1004              : 
    1005           24 :     pub(crate) async fn list_aux_files(
    1006           24 :         &self,
    1007           24 :         lsn: Lsn,
    1008           24 :         ctx: &RequestContext,
    1009           24 :         io_concurrency: IoConcurrency,
    1010           24 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
    1011           24 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await
    1012           24 :     }
    1013              : 
    1014            0 :     pub(crate) async fn get_replorigins(
    1015            0 :         &self,
    1016            0 :         lsn: Lsn,
    1017            0 :         ctx: &RequestContext,
    1018            0 :         io_concurrency: IoConcurrency,
    1019            0 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
    1020            0 :         let kv = self
    1021            0 :             .scan(
    1022            0 :                 KeySpace::single(repl_origin_key_range()),
    1023            0 :                 lsn,
    1024            0 :                 ctx,
    1025            0 :                 io_concurrency,
    1026            0 :             )
    1027            0 :             .await?;
    1028            0 :         let mut result = HashMap::new();
    1029            0 :         for (k, v) in kv {
    1030            0 :             let v = v?;
    1031            0 :             let origin_id = k.field6 as RepOriginId;
    1032            0 :             let origin_lsn = Lsn::des(&v).unwrap();
    1033            0 :             if origin_lsn != Lsn::INVALID {
    1034            0 :                 result.insert(origin_id, origin_lsn);
    1035            0 :             }
    1036              :         }
    1037            0 :         Ok(result)
    1038            0 :     }
    1039              : 
    1040              :     /// Does the same as get_current_logical_size but counted on demand.
    1041              :     /// Used to initialize the logical size tracking on startup.
    1042              :     ///
    1043              :     /// Only relation blocks are counted currently. That excludes metadata,
    1044              :     /// SLRUs, twophase files etc.
    1045              :     ///
    1046              :     /// # Cancel-Safety
    1047              :     ///
    1048              :     /// This method is cancellation-safe.
    1049            0 :     pub(crate) async fn get_current_logical_size_non_incremental(
    1050            0 :         &self,
    1051            0 :         lsn: Lsn,
    1052            0 :         ctx: &RequestContext,
    1053            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
    1054            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
    1055              : 
    1056              :         // Fetch list of database dirs and iterate them
    1057            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
    1058            0 :         let dbdir = DbDirectory::des(&buf)?;
    1059              : 
    1060            0 :         let mut total_size: u64 = 0;
    1061            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
    1062            0 :             for rel in self
    1063            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
    1064            0 :                 .await?
    1065              :             {
    1066            0 :                 if self.cancel.is_cancelled() {
    1067            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
    1068            0 :                 }
    1069            0 :                 let relsize_key = rel_size_to_key(rel);
    1070            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1071            0 :                 let relsize = buf.get_u32_le();
    1072            0 : 
    1073            0 :                 total_size += relsize as u64;
    1074              :             }
    1075              :         }
    1076            0 :         Ok(total_size * BLCKSZ as u64)
    1077            0 :     }
    1078              : 
    1079              :     /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
    1080              :     /// for gc-compaction.
    1081              :     ///
    1082              :     /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
    1083              :     /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
    1084              :     /// be kept only for a specific range of LSN.
    1085              :     ///
    1086              :     /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
    1087              :     /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
    1088              :     /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
    1089              :     /// determine which keys to retain/drop for gc-compaction.
    1090              :     ///
    1091              :     /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
    1092              :     /// to be retained for each of the branch LSN.
    1093              :     ///
    1094              :     /// The return value is (dense keyspace, sparse keyspace).
    1095          104 :     pub(crate) async fn collect_gc_compaction_keyspace(
    1096          104 :         &self,
    1097          104 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1098          104 :         let metadata_key_begin = Key::metadata_key_range().start;
    1099          104 :         let aux_v1_key = AUX_FILES_KEY;
    1100          104 :         let dense_keyspace = KeySpace {
    1101          104 :             ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
    1102          104 :         };
    1103          104 :         Ok((
    1104          104 :             dense_keyspace,
    1105          104 :             SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
    1106          104 :         ))
    1107          104 :     }
    1108              : 
    1109              :     ///
    1110              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
    1111              :     /// Anything that's not listed maybe removed from the underlying storage (from
    1112              :     /// that LSN forwards).
    1113              :     ///
    1114              :     /// The return value is (dense keyspace, sparse keyspace).
    1115          644 :     pub(crate) async fn collect_keyspace(
    1116          644 :         &self,
    1117          644 :         lsn: Lsn,
    1118          644 :         ctx: &RequestContext,
    1119          644 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1120          644 :         // Iterate through key ranges, greedily packing them into partitions
    1121          644 :         let mut result = KeySpaceAccum::new();
    1122          644 : 
    1123          644 :         // The dbdir metadata always exists
    1124          644 :         result.add_key(DBDIR_KEY);
    1125              : 
    1126              :         // Fetch list of database dirs and iterate them
    1127          644 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
    1128          644 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
    1129          644 : 
    1130          644 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
    1131          644 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
    1132            0 :             if has_relmap_file {
    1133            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
    1134            0 :             }
    1135            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
    1136              : 
    1137            0 :             let mut rels: Vec<RelTag> = self
    1138            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
    1139            0 :                 .await?
    1140            0 :                 .into_iter()
    1141            0 :                 .collect();
    1142            0 :             rels.sort_unstable();
    1143            0 :             for rel in rels {
    1144            0 :                 let relsize_key = rel_size_to_key(rel);
    1145            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1146            0 :                 let relsize = buf.get_u32_le();
    1147            0 : 
    1148            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
    1149            0 :                 result.add_key(relsize_key);
    1150              :             }
    1151              :         }
    1152              : 
    1153              :         // Iterate SLRUs next
    1154          644 :         if self.tenant_shard_id.is_shard_zero() {
    1155         1896 :             for kind in [
    1156          632 :                 SlruKind::Clog,
    1157          632 :                 SlruKind::MultiXactMembers,
    1158          632 :                 SlruKind::MultiXactOffsets,
    1159              :             ] {
    1160         1896 :                 let slrudir_key = slru_dir_to_key(kind);
    1161         1896 :                 result.add_key(slrudir_key);
    1162         1896 :                 let buf = self.get(slrudir_key, lsn, ctx).await?;
    1163         1896 :                 let dir = SlruSegmentDirectory::des(&buf)?;
    1164         1896 :                 let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
    1165         1896 :                 segments.sort_unstable();
    1166         1896 :                 for segno in segments {
    1167            0 :                     let segsize_key = slru_segment_size_to_key(kind, segno);
    1168            0 :                     let mut buf = self.get(segsize_key, lsn, ctx).await?;
    1169            0 :                     let segsize = buf.get_u32_le();
    1170            0 : 
    1171            0 :                     result.add_range(
    1172            0 :                         slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
    1173            0 :                     );
    1174            0 :                     result.add_key(segsize_key);
    1175              :                 }
    1176              :             }
    1177           12 :         }
    1178              : 
    1179              :         // Then pg_twophase
    1180          644 :         result.add_key(TWOPHASEDIR_KEY);
    1181              : 
    1182          644 :         let mut xids: Vec<u64> = self
    1183          644 :             .list_twophase_files(lsn, ctx)
    1184          644 :             .await?
    1185          644 :             .iter()
    1186          644 :             .cloned()
    1187          644 :             .collect();
    1188          644 :         xids.sort_unstable();
    1189          644 :         for xid in xids {
    1190            0 :             result.add_key(twophase_file_key(xid));
    1191            0 :         }
    1192              : 
    1193          644 :         result.add_key(CONTROLFILE_KEY);
    1194          644 :         result.add_key(CHECKPOINT_KEY);
    1195          644 : 
    1196          644 :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
    1197          644 :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
    1198          644 :         // and the keys will not be garbage-colllected.
    1199          644 :         #[cfg(test)]
    1200          644 :         {
    1201          644 :             let guard = self.extra_test_dense_keyspace.load();
    1202          644 :             for kr in &guard.ranges {
    1203            0 :                 result.add_range(kr.clone());
    1204            0 :             }
    1205            0 :         }
    1206            0 : 
    1207          644 :         let dense_keyspace = result.to_keyspace();
    1208          644 :         let sparse_keyspace = SparseKeySpace(KeySpace {
    1209          644 :             ranges: vec![
    1210          644 :                 Key::metadata_aux_key_range(),
    1211          644 :                 repl_origin_key_range(),
    1212          644 :                 Key::rel_dir_sparse_key_range(),
    1213          644 :             ],
    1214          644 :         });
    1215          644 : 
    1216          644 :         if cfg!(debug_assertions) {
    1217              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
    1218              : 
    1219              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
    1220              :             // category of sparse keys are split into their own image/delta files. If there
    1221              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
    1222              :             // and we want the developer to keep the keyspaces separated.
    1223              : 
    1224          644 :             let ranges = &sparse_keyspace.0.ranges;
    1225              : 
    1226              :             // TODO: use a single overlaps_with across the codebase
    1227         1932 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
    1228         1932 :                 !(a.end <= b.start || b.end <= a.start)
    1229         1932 :             }
    1230         1932 :             for i in 0..ranges.len() {
    1231         1932 :                 for j in 0..i {
    1232         1932 :                     if overlaps_with(&ranges[i], &ranges[j]) {
    1233            0 :                         panic!(
    1234            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
    1235            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
    1236            0 :                         );
    1237         1932 :                     }
    1238              :                 }
    1239              :             }
    1240         1288 :             for i in 1..ranges.len() {
    1241         1288 :                 assert!(
    1242         1288 :                     ranges[i - 1].end <= ranges[i].start,
    1243            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
    1244            0 :                     ranges[i - 1].start,
    1245            0 :                     ranges[i - 1].end,
    1246            0 :                     ranges[i].start,
    1247            0 :                     ranges[i].end
    1248              :                 );
    1249              :             }
    1250            0 :         }
    1251              : 
    1252          644 :         Ok((dense_keyspace, sparse_keyspace))
    1253          644 :     }
    1254              : 
    1255              :     /// Get cached size of relation if it not updated after specified LSN
    1256       897080 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
    1257       897080 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
    1258       897080 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
    1259       897036 :             if lsn >= *cached_lsn {
    1260       886744 :                 RELSIZE_CACHE_HITS.inc();
    1261       886744 :                 return Some(*nblocks);
    1262        10292 :             }
    1263        10292 :             RELSIZE_CACHE_MISSES_OLD.inc();
    1264           44 :         }
    1265        10336 :         RELSIZE_CACHE_MISSES.inc();
    1266        10336 :         None
    1267       897080 :     }
    1268              : 
    1269              :     /// Update cached relation size if there is no more recent update
    1270        10272 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1271        10272 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1272        10272 : 
    1273        10272 :         if lsn < rel_size_cache.complete_as_of {
    1274              :             // Do not cache old values. It's safe to cache the size on read, as long as
    1275              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
    1276              :             // never evict values from the cache, so if the relation size changed after
    1277              :             // 'lsn', the new value is already in the cache.
    1278            0 :             return;
    1279        10272 :         }
    1280        10272 : 
    1281        10272 :         match rel_size_cache.map.entry(tag) {
    1282        10272 :             hash_map::Entry::Occupied(mut entry) => {
    1283        10272 :                 let cached_lsn = entry.get_mut();
    1284        10272 :                 if lsn >= cached_lsn.0 {
    1285            0 :                     *cached_lsn = (lsn, nblocks);
    1286        10272 :                 }
    1287              :             }
    1288            0 :             hash_map::Entry::Vacant(entry) => {
    1289            0 :                 entry.insert((lsn, nblocks));
    1290            0 :                 RELSIZE_CACHE_ENTRIES.inc();
    1291            0 :             }
    1292              :         }
    1293        10272 :     }
    1294              : 
    1295              :     /// Store cached relation size
    1296       565440 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1297       565440 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1298       565440 :         if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
    1299         3840 :             RELSIZE_CACHE_ENTRIES.inc();
    1300       561600 :         }
    1301       565440 :     }
    1302              : 
    1303              :     /// Remove cached relation size
    1304            4 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1305            4 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1306            4 :         if rel_size_cache.map.remove(tag).is_some() {
    1307            4 :             RELSIZE_CACHE_ENTRIES.dec();
    1308            4 :         }
    1309            4 :     }
    1310              : }
    1311              : 
    1312              : /// DatadirModification represents an operation to ingest an atomic set of
    1313              : /// updates to the repository.
    1314              : ///
    1315              : /// It is created by the 'begin_record' function. It is called for each WAL
    1316              : /// record, so that all the modifications by a one WAL record appear atomic.
    1317              : pub struct DatadirModification<'a> {
    1318              :     /// The timeline this modification applies to. You can access this to
    1319              :     /// read the state, but note that any pending updates are *not* reflected
    1320              :     /// in the state in 'tline' yet.
    1321              :     pub tline: &'a Timeline,
    1322              : 
    1323              :     /// Current LSN of the modification
    1324              :     lsn: Lsn,
    1325              : 
    1326              :     // The modifications are not applied directly to the underlying key-value store.
    1327              :     // The put-functions add the modifications here, and they are flushed to the
    1328              :     // underlying key-value store by the 'finish' function.
    1329              :     pending_lsns: Vec<Lsn>,
    1330              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1331              :     pending_nblocks: i64,
    1332              : 
    1333              :     /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
    1334              :     /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
    1335              :     pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
    1336              : 
    1337              :     /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
    1338              :     /// which keys are stored here.
    1339              :     pending_data_batch: Option<SerializedValueBatch>,
    1340              : 
    1341              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1342              :     /// if it was updated in this modification.
    1343              :     pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
    1344              : 
    1345              :     /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
    1346              :     pending_metadata_bytes: usize,
    1347              : }
    1348              : 
    1349              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    1350              : pub enum MetricsUpdate {
    1351              :     /// Set the metrics to this value
    1352              :     Set(u64),
    1353              :     /// Increment the metrics by this value
    1354              :     Add(u64),
    1355              :     /// Decrement the metrics by this value
    1356              :     Sub(u64),
    1357              : }
    1358              : 
    1359              : impl DatadirModification<'_> {
    1360              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1361              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1362              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1363              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1364              : 
    1365              :     /// Get the current lsn
    1366       836116 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1367       836116 :         self.lsn
    1368       836116 :     }
    1369              : 
    1370            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1371            0 :         self.pending_data_batch
    1372            0 :             .as_ref()
    1373            0 :             .map_or(0, |b| b.buffer_size())
    1374            0 :             + self.pending_metadata_bytes
    1375            0 :     }
    1376              : 
    1377            0 :     pub(crate) fn has_dirty_data(&self) -> bool {
    1378            0 :         self.pending_data_batch
    1379            0 :             .as_ref()
    1380            0 :             .is_some_and(|b| b.has_data())
    1381            0 :     }
    1382              : 
    1383              :     /// Returns statistics about the currently pending modifications.
    1384            0 :     pub(crate) fn stats(&self) -> DatadirModificationStats {
    1385            0 :         let mut stats = DatadirModificationStats::default();
    1386            0 :         for (_, _, value) in self.pending_metadata_pages.values().flatten() {
    1387            0 :             match value {
    1388            0 :                 Value::Image(_) => stats.metadata_images += 1,
    1389            0 :                 Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
    1390            0 :                 Value::WalRecord(_) => stats.metadata_deltas += 1,
    1391              :             }
    1392              :         }
    1393            0 :         for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
    1394            0 :             match valuemeta {
    1395            0 :                 ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
    1396            0 :                 ValueMeta::Serialized(_) => stats.data_deltas += 1,
    1397            0 :                 ValueMeta::Observed(_) => {}
    1398              :             }
    1399              :         }
    1400            0 :         stats
    1401            0 :     }
    1402              : 
    1403              :     /// Set the current lsn
    1404       291716 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
    1405       291716 :         ensure!(
    1406       291716 :             lsn >= self.lsn,
    1407            0 :             "setting an older lsn {} than {} is not allowed",
    1408              :             lsn,
    1409              :             self.lsn
    1410              :         );
    1411              : 
    1412       291716 :         if lsn > self.lsn {
    1413       291716 :             self.pending_lsns.push(self.lsn);
    1414       291716 :             self.lsn = lsn;
    1415       291716 :         }
    1416       291716 :         Ok(())
    1417       291716 :     }
    1418              : 
    1419              :     /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
    1420              :     /// keys that represent literal blocks that postgres can read.  So data includes relation blocks and
    1421              :     /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
    1422              :     ///
    1423              :     /// The distinction is important because data keys are handled on a fast path where dirty writes are
    1424              :     /// not readable until this modification is committed, whereas metadata keys are visible for read
    1425              :     /// via [`Self::get`] as soon as their record has been ingested.
    1426      1701248 :     fn is_data_key(key: &Key) -> bool {
    1427      1701248 :         key.is_rel_block_key() || key.is_slru_block_key()
    1428      1701248 :     }
    1429              : 
    1430              :     /// Initialize a completely new repository.
    1431              :     ///
    1432              :     /// This inserts the directory metadata entries that are assumed to
    1433              :     /// always exist.
    1434          416 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1435          416 :         let buf = DbDirectory::ser(&DbDirectory {
    1436          416 :             dbdirs: HashMap::new(),
    1437          416 :         })?;
    1438          416 :         self.pending_directory_entries
    1439          416 :             .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
    1440          416 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1441              : 
    1442          416 :         let buf = if self.tline.pg_version >= 17 {
    1443            0 :             TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
    1444            0 :                 xids: HashSet::new(),
    1445            0 :             })
    1446              :         } else {
    1447          416 :             TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1448          416 :                 xids: HashSet::new(),
    1449          416 :             })
    1450            0 :         }?;
    1451          416 :         self.pending_directory_entries
    1452          416 :             .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
    1453          416 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1454              : 
    1455          416 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1456          416 :         let empty_dir = Value::Image(buf);
    1457          416 : 
    1458          416 :         // Initialize SLRUs on shard 0 only: creating these on other shards would be
    1459          416 :         // harmless but they'd just be dropped on later compaction.
    1460          416 :         if self.tline.tenant_shard_id.is_shard_zero() {
    1461          404 :             self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1462          404 :             self.pending_directory_entries.push((
    1463          404 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1464          404 :                 MetricsUpdate::Set(0),
    1465          404 :             ));
    1466          404 :             self.put(
    1467          404 :                 slru_dir_to_key(SlruKind::MultiXactMembers),
    1468          404 :                 empty_dir.clone(),
    1469          404 :             );
    1470          404 :             self.pending_directory_entries.push((
    1471          404 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1472          404 :                 MetricsUpdate::Set(0),
    1473          404 :             ));
    1474          404 :             self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1475          404 :             self.pending_directory_entries.push((
    1476          404 :                 DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
    1477          404 :                 MetricsUpdate::Set(0),
    1478          404 :             ));
    1479          404 :         }
    1480              : 
    1481          416 :         Ok(())
    1482          416 :     }
    1483              : 
    1484              :     #[cfg(test)]
    1485          412 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1486          412 :         self.init_empty()?;
    1487          412 :         self.put_control_file(bytes::Bytes::from_static(
    1488          412 :             b"control_file contents do not matter",
    1489          412 :         ))
    1490          412 :         .context("put_control_file")?;
    1491          412 :         self.put_checkpoint(bytes::Bytes::from_static(
    1492          412 :             b"checkpoint_file contents do not matter",
    1493          412 :         ))
    1494          412 :         .context("put_checkpoint_file")?;
    1495          412 :         Ok(())
    1496          412 :     }
    1497              : 
    1498              :     /// Creates a relation if it is not already present.
    1499              :     /// Returns the current size of the relation
    1500       836112 :     pub(crate) async fn create_relation_if_required(
    1501       836112 :         &mut self,
    1502       836112 :         rel: RelTag,
    1503       836112 :         ctx: &RequestContext,
    1504       836112 :     ) -> Result<u32, PageReconstructError> {
    1505              :         // Get current size and put rel creation if rel doesn't exist
    1506              :         //
    1507              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1508              :         //       check the cache too. This is because eagerly checking the cache results in
    1509              :         //       less work overall and 10% better performance. It's more work on cache miss
    1510              :         //       but cache miss is rare.
    1511       836112 :         if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
    1512       836092 :             Ok(nblocks)
    1513           20 :         } else if !self
    1514           20 :             .tline
    1515           20 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1516           20 :             .await?
    1517              :         {
    1518              :             // create it with 0 size initially, the logic below will extend it
    1519           20 :             self.put_rel_creation(rel, 0, ctx)
    1520           20 :                 .await
    1521           20 :                 .context("Relation Error")?;
    1522           20 :             Ok(0)
    1523              :         } else {
    1524            0 :             self.tline
    1525            0 :                 .get_rel_size(rel, Version::Modified(self), ctx)
    1526            0 :                 .await
    1527              :         }
    1528       836112 :     }
    1529              : 
    1530              :     /// Given a block number for a relation (which represents a newly written block),
    1531              :     /// the previous block count of the relation, and the shard info, find the gaps
    1532              :     /// that were created by the newly written block if any.
    1533       291340 :     fn find_gaps(
    1534       291340 :         rel: RelTag,
    1535       291340 :         blkno: u32,
    1536       291340 :         previous_nblocks: u32,
    1537       291340 :         shard: &ShardIdentity,
    1538       291340 :     ) -> Option<KeySpace> {
    1539       291340 :         let mut key = rel_block_to_key(rel, blkno);
    1540       291340 :         let mut gap_accum = None;
    1541              : 
    1542       291340 :         for gap_blkno in previous_nblocks..blkno {
    1543           64 :             key.field6 = gap_blkno;
    1544           64 : 
    1545           64 :             if shard.get_shard_number(&key) != shard.number {
    1546           16 :                 continue;
    1547           48 :             }
    1548           48 : 
    1549           48 :             gap_accum
    1550           48 :                 .get_or_insert_with(KeySpaceAccum::new)
    1551           48 :                 .add_key(key);
    1552              :         }
    1553              : 
    1554       291340 :         gap_accum.map(|accum| accum.to_keyspace())
    1555       291340 :     }
    1556              : 
    1557       291704 :     pub async fn ingest_batch(
    1558       291704 :         &mut self,
    1559       291704 :         mut batch: SerializedValueBatch,
    1560       291704 :         // TODO(vlad): remove this argument and replace the shard check with is_key_local
    1561       291704 :         shard: &ShardIdentity,
    1562       291704 :         ctx: &RequestContext,
    1563       291704 :     ) -> anyhow::Result<()> {
    1564       291704 :         let mut gaps_at_lsns = Vec::default();
    1565              : 
    1566       291704 :         for meta in batch.metadata.iter() {
    1567       291284 :             let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
    1568       291284 :             let new_nblocks = blkno + 1;
    1569              : 
    1570       291284 :             let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
    1571       291284 :             if new_nblocks > old_nblocks {
    1572         4780 :                 self.put_rel_extend(rel, new_nblocks, ctx).await?;
    1573       286504 :             }
    1574              : 
    1575       291284 :             if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
    1576            0 :                 gaps_at_lsns.push((gaps, meta.lsn()));
    1577       291284 :             }
    1578              :         }
    1579              : 
    1580       291704 :         if !gaps_at_lsns.is_empty() {
    1581            0 :             batch.zero_gaps(gaps_at_lsns);
    1582       291704 :         }
    1583              : 
    1584       291704 :         match self.pending_data_batch.as_mut() {
    1585           40 :             Some(pending_batch) => {
    1586           40 :                 pending_batch.extend(batch);
    1587           40 :             }
    1588       291664 :             None if batch.has_data() => {
    1589       291260 :                 self.pending_data_batch = Some(batch);
    1590       291260 :             }
    1591          404 :             None => {
    1592          404 :                 // Nothing to initialize the batch with
    1593          404 :             }
    1594              :         }
    1595              : 
    1596       291704 :         Ok(())
    1597       291704 :     }
    1598              : 
    1599              :     /// Put a new page version that can be constructed from a WAL record
    1600              :     ///
    1601              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1602              :     /// current end-of-file. It's up to the caller to check that the relation size
    1603              :     /// matches the blocks inserted!
    1604           24 :     pub fn put_rel_wal_record(
    1605           24 :         &mut self,
    1606           24 :         rel: RelTag,
    1607           24 :         blknum: BlockNumber,
    1608           24 :         rec: NeonWalRecord,
    1609           24 :     ) -> anyhow::Result<()> {
    1610           24 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1611           24 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1612           24 :         Ok(())
    1613           24 :     }
    1614              : 
    1615              :     // Same, but for an SLRU.
    1616           16 :     pub fn put_slru_wal_record(
    1617           16 :         &mut self,
    1618           16 :         kind: SlruKind,
    1619           16 :         segno: u32,
    1620           16 :         blknum: BlockNumber,
    1621           16 :         rec: NeonWalRecord,
    1622           16 :     ) -> anyhow::Result<()> {
    1623           16 :         if !self.tline.tenant_shard_id.is_shard_zero() {
    1624            0 :             return Ok(());
    1625           16 :         }
    1626           16 : 
    1627           16 :         self.put(
    1628           16 :             slru_block_to_key(kind, segno, blknum),
    1629           16 :             Value::WalRecord(rec),
    1630           16 :         );
    1631           16 :         Ok(())
    1632           16 :     }
    1633              : 
    1634              :     /// Like put_wal_record, but with ready-made image of the page.
    1635       555684 :     pub fn put_rel_page_image(
    1636       555684 :         &mut self,
    1637       555684 :         rel: RelTag,
    1638       555684 :         blknum: BlockNumber,
    1639       555684 :         img: Bytes,
    1640       555684 :     ) -> anyhow::Result<()> {
    1641       555684 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1642       555684 :         let key = rel_block_to_key(rel, blknum);
    1643       555684 :         if !key.is_valid_key_on_write_path() {
    1644            0 :             anyhow::bail!(
    1645            0 :                 "the request contains data not supported by pageserver at {}",
    1646            0 :                 key
    1647            0 :             );
    1648       555684 :         }
    1649       555684 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1650       555684 :         Ok(())
    1651       555684 :     }
    1652              : 
    1653           12 :     pub fn put_slru_page_image(
    1654           12 :         &mut self,
    1655           12 :         kind: SlruKind,
    1656           12 :         segno: u32,
    1657           12 :         blknum: BlockNumber,
    1658           12 :         img: Bytes,
    1659           12 :     ) -> anyhow::Result<()> {
    1660           12 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1661              : 
    1662           12 :         let key = slru_block_to_key(kind, segno, blknum);
    1663           12 :         if !key.is_valid_key_on_write_path() {
    1664            0 :             anyhow::bail!(
    1665            0 :                 "the request contains data not supported by pageserver at {}",
    1666            0 :                 key
    1667            0 :             );
    1668           12 :         }
    1669           12 :         self.put(key, Value::Image(img));
    1670           12 :         Ok(())
    1671           12 :     }
    1672              : 
    1673         5996 :     pub(crate) fn put_rel_page_image_zero(
    1674         5996 :         &mut self,
    1675         5996 :         rel: RelTag,
    1676         5996 :         blknum: BlockNumber,
    1677         5996 :     ) -> anyhow::Result<()> {
    1678         5996 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1679         5996 :         let key = rel_block_to_key(rel, blknum);
    1680         5996 :         if !key.is_valid_key_on_write_path() {
    1681            0 :             anyhow::bail!(
    1682            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1683            0 :                 key,
    1684            0 :                 self.lsn
    1685            0 :             );
    1686         5996 :         }
    1687         5996 : 
    1688         5996 :         let batch = self
    1689         5996 :             .pending_data_batch
    1690         5996 :             .get_or_insert_with(SerializedValueBatch::default);
    1691         5996 : 
    1692         5996 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1693         5996 : 
    1694         5996 :         Ok(())
    1695         5996 :     }
    1696              : 
    1697            0 :     pub(crate) fn put_slru_page_image_zero(
    1698            0 :         &mut self,
    1699            0 :         kind: SlruKind,
    1700            0 :         segno: u32,
    1701            0 :         blknum: BlockNumber,
    1702            0 :     ) -> anyhow::Result<()> {
    1703            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1704            0 :         let key = slru_block_to_key(kind, segno, blknum);
    1705            0 :         if !key.is_valid_key_on_write_path() {
    1706            0 :             anyhow::bail!(
    1707            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1708            0 :                 key,
    1709            0 :                 self.lsn
    1710            0 :             );
    1711            0 :         }
    1712            0 : 
    1713            0 :         let batch = self
    1714            0 :             .pending_data_batch
    1715            0 :             .get_or_insert_with(SerializedValueBatch::default);
    1716            0 : 
    1717            0 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1718            0 : 
    1719            0 :         Ok(())
    1720            0 :     }
    1721              : 
    1722              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1723           32 :     pub async fn put_relmap_file(
    1724           32 :         &mut self,
    1725           32 :         spcnode: Oid,
    1726           32 :         dbnode: Oid,
    1727           32 :         img: Bytes,
    1728           32 :         ctx: &RequestContext,
    1729           32 :     ) -> anyhow::Result<()> {
    1730              :         // Add it to the directory (if it doesn't exist already)
    1731           32 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1732           32 :         let mut dbdir = DbDirectory::des(&buf)?;
    1733              : 
    1734           32 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1735           32 :         if r.is_none() || r == Some(false) {
    1736              :             // The dbdir entry didn't exist, or it contained a
    1737              :             // 'false'. The 'insert' call already updated it with
    1738              :             // 'true', now write the updated 'dbdirs' map back.
    1739           32 :             let buf = DbDirectory::ser(&dbdir)?;
    1740           32 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1741            0 :         }
    1742           32 :         if r.is_none() {
    1743              :             // Create RelDirectory
    1744              :             // TODO: if we have fully migrated to v2, no need to create this directory
    1745           16 :             let buf = RelDirectory::ser(&RelDirectory {
    1746           16 :                 rels: HashSet::new(),
    1747           16 :             })?;
    1748           16 :             self.pending_directory_entries
    1749           16 :                 .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
    1750           16 :             if self.tline.get_rel_size_v2_enabled() {
    1751            0 :                 self.pending_directory_entries
    1752            0 :                     .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    1753           16 :             }
    1754           16 :             self.put(
    1755           16 :                 rel_dir_to_key(spcnode, dbnode),
    1756           16 :                 Value::Image(Bytes::from(buf)),
    1757           16 :             );
    1758           16 :         }
    1759              : 
    1760           32 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1761           32 :         Ok(())
    1762           32 :     }
    1763              : 
    1764            0 :     pub async fn put_twophase_file(
    1765            0 :         &mut self,
    1766            0 :         xid: u64,
    1767            0 :         img: Bytes,
    1768            0 :         ctx: &RequestContext,
    1769            0 :     ) -> anyhow::Result<()> {
    1770              :         // Add it to the directory entry
    1771            0 :         let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1772            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1773            0 :             let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
    1774            0 :             if !dir.xids.insert(xid) {
    1775            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1776            0 :             }
    1777            0 :             self.pending_directory_entries.push((
    1778            0 :                 DirectoryKind::TwoPhase,
    1779            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    1780            0 :             ));
    1781            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1782              :         } else {
    1783            0 :             let xid = xid as u32;
    1784            0 :             let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
    1785            0 :             if !dir.xids.insert(xid) {
    1786            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1787            0 :             }
    1788            0 :             self.pending_directory_entries.push((
    1789            0 :                 DirectoryKind::TwoPhase,
    1790            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    1791            0 :             ));
    1792            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1793              :         };
    1794            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1795            0 : 
    1796            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1797            0 :         Ok(())
    1798            0 :     }
    1799              : 
    1800            0 :     pub async fn set_replorigin(
    1801            0 :         &mut self,
    1802            0 :         origin_id: RepOriginId,
    1803            0 :         origin_lsn: Lsn,
    1804            0 :     ) -> anyhow::Result<()> {
    1805            0 :         let key = repl_origin_key(origin_id);
    1806            0 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    1807            0 :         Ok(())
    1808            0 :     }
    1809              : 
    1810            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
    1811            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    1812            0 :     }
    1813              : 
    1814          416 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1815          416 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1816          416 :         Ok(())
    1817          416 :     }
    1818              : 
    1819          444 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1820          444 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1821          444 :         Ok(())
    1822          444 :     }
    1823              : 
    1824            0 :     pub async fn drop_dbdir(
    1825            0 :         &mut self,
    1826            0 :         spcnode: Oid,
    1827            0 :         dbnode: Oid,
    1828            0 :         ctx: &RequestContext,
    1829            0 :     ) -> anyhow::Result<()> {
    1830            0 :         let total_blocks = self
    1831            0 :             .tline
    1832            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1833            0 :             .await?;
    1834              : 
    1835              :         // Remove entry from dbdir
    1836            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1837            0 :         let mut dir = DbDirectory::des(&buf)?;
    1838            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1839            0 :             let buf = DbDirectory::ser(&dir)?;
    1840            0 :             self.pending_directory_entries.push((
    1841            0 :                 DirectoryKind::Db,
    1842            0 :                 MetricsUpdate::Set(dir.dbdirs.len() as u64),
    1843            0 :             ));
    1844            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1845              :         } else {
    1846            0 :             warn!(
    1847            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1848              :                 spcnode, dbnode
    1849              :             );
    1850              :         }
    1851              : 
    1852              :         // Update logical database size.
    1853            0 :         self.pending_nblocks -= total_blocks as i64;
    1854            0 : 
    1855            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1856            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1857            0 :         Ok(())
    1858            0 :     }
    1859              : 
    1860              :     /// Create a relation fork.
    1861              :     ///
    1862              :     /// 'nblocks' is the initial size.
    1863         3840 :     pub async fn put_rel_creation(
    1864         3840 :         &mut self,
    1865         3840 :         rel: RelTag,
    1866         3840 :         nblocks: BlockNumber,
    1867         3840 :         ctx: &RequestContext,
    1868         3840 :     ) -> Result<(), RelationError> {
    1869         3840 :         if rel.relnode == 0 {
    1870            0 :             return Err(RelationError::InvalidRelnode);
    1871         3840 :         }
    1872              :         // It's possible that this is the first rel for this db in this
    1873              :         // tablespace.  Create the reldir entry for it if so.
    1874         3840 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1875         3840 :             .context("deserialize db")?;
    1876              : 
    1877         3840 :         let dbdir_exists =
    1878         3840 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1879              :                 // Didn't exist. Update dbdir
    1880           16 :                 e.insert(false);
    1881           16 :                 let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1882           16 :                 self.pending_directory_entries.push((
    1883           16 :                     DirectoryKind::Db,
    1884           16 :                     MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
    1885           16 :                 ));
    1886           16 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1887           16 :                 false
    1888              :             } else {
    1889         3824 :                 true
    1890              :             };
    1891              : 
    1892         3840 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1893         3840 :         let mut rel_dir = if !dbdir_exists {
    1894              :             // Create the RelDirectory
    1895           16 :             RelDirectory::default()
    1896              :         } else {
    1897              :             // reldir already exists, fetch it
    1898         3824 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1899         3824 :                 .context("deserialize db")?
    1900              :         };
    1901              : 
    1902              :         // Add the new relation to the rel directory entry, and write it back
    1903         3840 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1904            0 :             return Err(RelationError::AlreadyExists);
    1905         3840 :         }
    1906         3840 : 
    1907         3840 :         if self.tline.get_rel_size_v2_enabled() {
    1908            0 :             let sparse_rel_dir_key =
    1909            0 :                 rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
    1910              :             // check if the rel_dir_key exists in v2
    1911            0 :             let val = self
    1912            0 :                 .sparse_get(sparse_rel_dir_key, ctx)
    1913            0 :                 .await
    1914            0 :                 .map_err(|e| RelationError::Other(e.into()))?;
    1915            0 :             let val = RelDirExists::decode_option(val)
    1916            0 :                 .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
    1917            0 :             if val == RelDirExists::Exists {
    1918            0 :                 return Err(RelationError::AlreadyExists);
    1919            0 :             }
    1920            0 :             self.put(
    1921            0 :                 sparse_rel_dir_key,
    1922            0 :                 Value::Image(RelDirExists::Exists.encode()),
    1923            0 :             );
    1924            0 :             if !dbdir_exists {
    1925            0 :                 self.pending_directory_entries
    1926            0 :                     .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
    1927            0 :                 self.pending_directory_entries
    1928            0 :                     .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    1929            0 :                 // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
    1930            0 :                 // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
    1931            0 :                 // will be key not found errors if we don't create an empty one for rel_size_v2.
    1932            0 :                 self.put(
    1933            0 :                     rel_dir_key,
    1934            0 :                     Value::Image(Bytes::from(
    1935            0 :                         RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
    1936              :                     )),
    1937              :                 );
    1938            0 :             }
    1939            0 :             self.pending_directory_entries
    1940            0 :                 .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
    1941              :         } else {
    1942         3840 :             if !dbdir_exists {
    1943           16 :                 self.pending_directory_entries
    1944           16 :                     .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
    1945         3824 :             }
    1946         3840 :             self.pending_directory_entries
    1947         3840 :                 .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
    1948         3840 :             self.put(
    1949         3840 :                 rel_dir_key,
    1950         3840 :                 Value::Image(Bytes::from(
    1951         3840 :                     RelDirectory::ser(&rel_dir).context("serialize")?,
    1952              :                 )),
    1953              :             );
    1954              :         }
    1955              :         // Put size
    1956         3840 :         let size_key = rel_size_to_key(rel);
    1957         3840 :         let buf = nblocks.to_le_bytes();
    1958         3840 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1959         3840 : 
    1960         3840 :         self.pending_nblocks += nblocks as i64;
    1961         3840 : 
    1962         3840 :         // Update relation size cache
    1963         3840 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1964         3840 : 
    1965         3840 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1966         3840 :         // caller.
    1967         3840 :         Ok(())
    1968         3840 :     }
    1969              : 
    1970              :     /// Truncate relation
    1971        12024 :     pub async fn put_rel_truncation(
    1972        12024 :         &mut self,
    1973        12024 :         rel: RelTag,
    1974        12024 :         nblocks: BlockNumber,
    1975        12024 :         ctx: &RequestContext,
    1976        12024 :     ) -> anyhow::Result<()> {
    1977        12024 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1978        12024 :         if self
    1979        12024 :             .tline
    1980        12024 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1981        12024 :             .await?
    1982              :         {
    1983        12024 :             let size_key = rel_size_to_key(rel);
    1984              :             // Fetch the old size first
    1985        12024 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1986        12024 : 
    1987        12024 :             // Update the entry with the new size.
    1988        12024 :             let buf = nblocks.to_le_bytes();
    1989        12024 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1990        12024 : 
    1991        12024 :             // Update relation size cache
    1992        12024 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1993        12024 : 
    1994        12024 :             // Update logical database size.
    1995        12024 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1996            0 :         }
    1997        12024 :         Ok(())
    1998        12024 :     }
    1999              : 
    2000              :     /// Extend relation
    2001              :     /// If new size is smaller, do nothing.
    2002       553360 :     pub async fn put_rel_extend(
    2003       553360 :         &mut self,
    2004       553360 :         rel: RelTag,
    2005       553360 :         nblocks: BlockNumber,
    2006       553360 :         ctx: &RequestContext,
    2007       553360 :     ) -> anyhow::Result<()> {
    2008       553360 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    2009              : 
    2010              :         // Put size
    2011       553360 :         let size_key = rel_size_to_key(rel);
    2012       553360 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2013       553360 : 
    2014       553360 :         // only extend relation here. never decrease the size
    2015       553360 :         if nblocks > old_size {
    2016       549576 :             let buf = nblocks.to_le_bytes();
    2017       549576 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2018       549576 : 
    2019       549576 :             // Update relation size cache
    2020       549576 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2021       549576 : 
    2022       549576 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    2023       549576 :         }
    2024       553360 :         Ok(())
    2025       553360 :     }
    2026              : 
    2027              :     /// Drop some relations
    2028           20 :     pub(crate) async fn put_rel_drops(
    2029           20 :         &mut self,
    2030           20 :         drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
    2031           20 :         ctx: &RequestContext,
    2032           20 :     ) -> anyhow::Result<()> {
    2033           24 :         for ((spc_node, db_node), rel_tags) in drop_relations {
    2034            4 :             let dir_key = rel_dir_to_key(spc_node, db_node);
    2035            4 :             let buf = self.get(dir_key, ctx).await?;
    2036            4 :             let mut dir = RelDirectory::des(&buf)?;
    2037              : 
    2038            4 :             let mut dirty = false;
    2039            8 :             for rel_tag in rel_tags {
    2040            4 :                 let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
    2041            4 :                     self.pending_directory_entries
    2042            4 :                         .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
    2043            4 :                     dirty = true;
    2044            4 :                     true
    2045            0 :                 } else if self.tline.get_rel_size_v2_enabled() {
    2046              :                     // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
    2047              :                     // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
    2048              :                     // logic).
    2049            0 :                     let key =
    2050            0 :                         rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
    2051            0 :                     let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
    2052            0 :                         .map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
    2053            0 :                     if val == RelDirExists::Exists {
    2054            0 :                         self.pending_directory_entries
    2055            0 :                             .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
    2056            0 :                         // put tombstone
    2057            0 :                         self.put(key, Value::Image(RelDirExists::Removed.encode()));
    2058            0 :                         // no need to set dirty to true
    2059            0 :                         true
    2060              :                     } else {
    2061            0 :                         false
    2062              :                     }
    2063              :                 } else {
    2064            0 :                     false
    2065              :                 };
    2066              : 
    2067            4 :                 if found {
    2068              :                     // update logical size
    2069            4 :                     let size_key = rel_size_to_key(rel_tag);
    2070            4 :                     let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2071            4 :                     self.pending_nblocks -= old_size as i64;
    2072            4 : 
    2073            4 :                     // Remove entry from relation size cache
    2074            4 :                     self.tline.remove_cached_rel_size(&rel_tag);
    2075            4 : 
    2076            4 :                     // Delete size entry, as well as all blocks
    2077            4 :                     self.delete(rel_key_range(rel_tag));
    2078            0 :                 }
    2079              :             }
    2080              : 
    2081            4 :             if dirty {
    2082            4 :                 self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    2083            0 :             }
    2084              :         }
    2085              : 
    2086           20 :         Ok(())
    2087           20 :     }
    2088              : 
    2089           12 :     pub async fn put_slru_segment_creation(
    2090           12 :         &mut self,
    2091           12 :         kind: SlruKind,
    2092           12 :         segno: u32,
    2093           12 :         nblocks: BlockNumber,
    2094           12 :         ctx: &RequestContext,
    2095           12 :     ) -> anyhow::Result<()> {
    2096           12 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2097              : 
    2098              :         // Add it to the directory entry
    2099           12 :         let dir_key = slru_dir_to_key(kind);
    2100           12 :         let buf = self.get(dir_key, ctx).await?;
    2101           12 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2102              : 
    2103           12 :         if !dir.segments.insert(segno) {
    2104            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    2105           12 :         }
    2106           12 :         self.pending_directory_entries.push((
    2107           12 :             DirectoryKind::SlruSegment(kind),
    2108           12 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2109           12 :         ));
    2110           12 :         self.put(
    2111           12 :             dir_key,
    2112           12 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2113              :         );
    2114              : 
    2115              :         // Put size
    2116           12 :         let size_key = slru_segment_size_to_key(kind, segno);
    2117           12 :         let buf = nblocks.to_le_bytes();
    2118           12 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2119           12 : 
    2120           12 :         // even if nblocks > 0, we don't insert any actual blocks here
    2121           12 : 
    2122           12 :         Ok(())
    2123           12 :     }
    2124              : 
    2125              :     /// Extend SLRU segment
    2126            0 :     pub fn put_slru_extend(
    2127            0 :         &mut self,
    2128            0 :         kind: SlruKind,
    2129            0 :         segno: u32,
    2130            0 :         nblocks: BlockNumber,
    2131            0 :     ) -> anyhow::Result<()> {
    2132            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2133              : 
    2134              :         // Put size
    2135            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    2136            0 :         let buf = nblocks.to_le_bytes();
    2137            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2138            0 :         Ok(())
    2139            0 :     }
    2140              : 
    2141              :     /// This method is used for marking truncated SLRU files
    2142            0 :     pub async fn drop_slru_segment(
    2143            0 :         &mut self,
    2144            0 :         kind: SlruKind,
    2145            0 :         segno: u32,
    2146            0 :         ctx: &RequestContext,
    2147            0 :     ) -> anyhow::Result<()> {
    2148            0 :         // Remove it from the directory entry
    2149            0 :         let dir_key = slru_dir_to_key(kind);
    2150            0 :         let buf = self.get(dir_key, ctx).await?;
    2151            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2152              : 
    2153            0 :         if !dir.segments.remove(&segno) {
    2154            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    2155            0 :         }
    2156            0 :         self.pending_directory_entries.push((
    2157            0 :             DirectoryKind::SlruSegment(kind),
    2158            0 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2159            0 :         ));
    2160            0 :         self.put(
    2161            0 :             dir_key,
    2162            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2163              :         );
    2164              : 
    2165              :         // Delete size entry, as well as all blocks
    2166            0 :         self.delete(slru_segment_key_range(kind, segno));
    2167            0 : 
    2168            0 :         Ok(())
    2169            0 :     }
    2170              : 
    2171              :     /// Drop a relmapper file (pg_filenode.map)
    2172            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    2173            0 :         // TODO
    2174            0 :         Ok(())
    2175            0 :     }
    2176              : 
    2177              :     /// This method is used for marking truncated SLRU files
    2178            0 :     pub async fn drop_twophase_file(
    2179            0 :         &mut self,
    2180            0 :         xid: u64,
    2181            0 :         ctx: &RequestContext,
    2182            0 :     ) -> anyhow::Result<()> {
    2183              :         // Remove it from the directory entry
    2184            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    2185            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    2186            0 :             let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
    2187              : 
    2188            0 :             if !dir.xids.remove(&xid) {
    2189            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2190            0 :             }
    2191            0 :             self.pending_directory_entries.push((
    2192            0 :                 DirectoryKind::TwoPhase,
    2193            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2194            0 :             ));
    2195            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    2196              :         } else {
    2197            0 :             let xid: u32 = u32::try_from(xid)?;
    2198            0 :             let mut dir = TwoPhaseDirectory::des(&buf)?;
    2199              : 
    2200            0 :             if !dir.xids.remove(&xid) {
    2201            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2202            0 :             }
    2203            0 :             self.pending_directory_entries.push((
    2204            0 :                 DirectoryKind::TwoPhase,
    2205            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2206            0 :             ));
    2207            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    2208              :         };
    2209            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    2210            0 : 
    2211            0 :         // Delete it
    2212            0 :         self.delete(twophase_key_range(xid));
    2213            0 : 
    2214            0 :         Ok(())
    2215            0 :     }
    2216              : 
    2217           32 :     pub async fn put_file(
    2218           32 :         &mut self,
    2219           32 :         path: &str,
    2220           32 :         content: &[u8],
    2221           32 :         ctx: &RequestContext,
    2222           32 :     ) -> anyhow::Result<()> {
    2223           32 :         let key = aux_file::encode_aux_file_key(path);
    2224              :         // retrieve the key from the engine
    2225           32 :         let old_val = match self.get(key, ctx).await {
    2226            8 :             Ok(val) => Some(val),
    2227           24 :             Err(PageReconstructError::MissingKey(_)) => None,
    2228            0 :             Err(e) => return Err(e.into()),
    2229              :         };
    2230           32 :         let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    2231            8 :             aux_file::decode_file_value(old_val)?
    2232              :         } else {
    2233           24 :             Vec::new()
    2234              :         };
    2235           32 :         let mut other_files = Vec::with_capacity(files.len());
    2236           32 :         let mut modifying_file = None;
    2237           40 :         for file @ (p, content) in files {
    2238            8 :             if path == p {
    2239            8 :                 assert!(
    2240            8 :                     modifying_file.is_none(),
    2241            0 :                     "duplicated entries found for {}",
    2242              :                     path
    2243              :                 );
    2244            8 :                 modifying_file = Some(content);
    2245            0 :             } else {
    2246            0 :                 other_files.push(file);
    2247            0 :             }
    2248              :         }
    2249           32 :         let mut new_files = other_files;
    2250           32 :         match (modifying_file, content.is_empty()) {
    2251            4 :             (Some(old_content), false) => {
    2252            4 :                 self.tline
    2253            4 :                     .aux_file_size_estimator
    2254            4 :                     .on_update(old_content.len(), content.len());
    2255            4 :                 new_files.push((path, content));
    2256            4 :             }
    2257            4 :             (Some(old_content), true) => {
    2258            4 :                 self.tline
    2259            4 :                     .aux_file_size_estimator
    2260            4 :                     .on_remove(old_content.len());
    2261            4 :                 // not adding the file key to the final `new_files` vec.
    2262            4 :             }
    2263           24 :             (None, false) => {
    2264           24 :                 self.tline.aux_file_size_estimator.on_add(content.len());
    2265           24 :                 new_files.push((path, content));
    2266           24 :             }
    2267              :             // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
    2268              :             // Compute doesn't know if previous version of this file exists or not, so
    2269              :             // attempt to delete non-existing file can cause this message.
    2270              :             // To avoid false alarms, log it as info rather than warning.
    2271            0 :             (None, true) if path.starts_with("pg_stat/") => {
    2272            0 :                 info!("removing non-existing pg_stat file: {}", path)
    2273              :             }
    2274            0 :             (None, true) => warn!("removing non-existing aux file: {}", path),
    2275              :         }
    2276           32 :         let new_val = aux_file::encode_file_value(&new_files)?;
    2277           32 :         self.put(key, Value::Image(new_val.into()));
    2278           32 : 
    2279           32 :         Ok(())
    2280           32 :     }
    2281              : 
    2282              :     ///
    2283              :     /// Flush changes accumulated so far to the underlying repository.
    2284              :     ///
    2285              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    2286              :     /// you to flush them to the underlying repository before the final `commit`.
    2287              :     /// That allows to free up the memory used to hold the pending changes.
    2288              :     ///
    2289              :     /// Currently only used during bulk import of a data directory. In that
    2290              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    2291              :     /// whole import fails and the timeline will be deleted anyway.
    2292              :     /// (Or to be precise, it will be left behind for debugging purposes and
    2293              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    2294              :     ///
    2295              :     /// Note: A consequence of flushing the pending operations is that they
    2296              :     /// won't be visible to subsequent operations until `commit`. The function
    2297              :     /// retains all the metadata, but data pages are flushed. That's again OK
    2298              :     /// for bulk import, where you are just loading data pages and won't try to
    2299              :     /// modify the same pages twice.
    2300         3860 :     pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2301         3860 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    2302         3860 :         // to scan through the pending_updates list.
    2303         3860 :         let pending_nblocks = self.pending_nblocks;
    2304         3860 :         if pending_nblocks < 10000 {
    2305         3860 :             return Ok(());
    2306            0 :         }
    2307              : 
    2308            0 :         let mut writer = self.tline.writer().await;
    2309              : 
    2310              :         // Flush relation and  SLRU data blocks, keep metadata.
    2311            0 :         if let Some(batch) = self.pending_data_batch.take() {
    2312            0 :             tracing::debug!(
    2313            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2314            0 :                 batch.max_lsn,
    2315            0 :                 self.tline.get_last_record_lsn()
    2316              :             );
    2317              : 
    2318              :             // This bails out on first error without modifying pending_updates.
    2319              :             // That's Ok, cf this function's doc comment.
    2320            0 :             writer.put_batch(batch, ctx).await?;
    2321            0 :         }
    2322              : 
    2323            0 :         if pending_nblocks != 0 {
    2324            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2325            0 :             self.pending_nblocks = 0;
    2326            0 :         }
    2327              : 
    2328            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2329            0 :             writer.update_directory_entries_count(kind, count);
    2330            0 :         }
    2331              : 
    2332            0 :         Ok(())
    2333         3860 :     }
    2334              : 
    2335              :     ///
    2336              :     /// Finish this atomic update, writing all the updated keys to the
    2337              :     /// underlying timeline.
    2338              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    2339              :     ///
    2340      1486188 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2341      1486188 :         let mut writer = self.tline.writer().await;
    2342              : 
    2343      1486188 :         let pending_nblocks = self.pending_nblocks;
    2344      1486188 :         self.pending_nblocks = 0;
    2345              : 
    2346              :         // Ordering: the items in this batch do not need to be in any global order, but values for
    2347              :         // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    2348              :         // this to do efficient updates to its index.  See [`wal_decoder::serialized_batch`] for
    2349              :         // more details.
    2350              : 
    2351      1486188 :         let metadata_batch = {
    2352      1486188 :             let pending_meta = self
    2353      1486188 :                 .pending_metadata_pages
    2354      1486188 :                 .drain()
    2355      1486188 :                 .flat_map(|(key, values)| {
    2356       548036 :                     values
    2357       548036 :                         .into_iter()
    2358       548036 :                         .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
    2359      1486188 :                 })
    2360      1486188 :                 .collect::<Vec<_>>();
    2361      1486188 : 
    2362      1486188 :             if pending_meta.is_empty() {
    2363       944556 :                 None
    2364              :             } else {
    2365       541632 :                 Some(SerializedValueBatch::from_values(pending_meta))
    2366              :             }
    2367              :         };
    2368              : 
    2369      1486188 :         let data_batch = self.pending_data_batch.take();
    2370              : 
    2371      1486188 :         let maybe_batch = match (data_batch, metadata_batch) {
    2372       529112 :             (Some(mut data), Some(metadata)) => {
    2373       529112 :                 data.extend(metadata);
    2374       529112 :                 Some(data)
    2375              :             }
    2376       286524 :             (Some(data), None) => Some(data),
    2377        12520 :             (None, Some(metadata)) => Some(metadata),
    2378       658032 :             (None, None) => None,
    2379              :         };
    2380              : 
    2381      1486188 :         if let Some(batch) = maybe_batch {
    2382       828156 :             tracing::debug!(
    2383            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2384            0 :                 batch.max_lsn,
    2385            0 :                 self.tline.get_last_record_lsn()
    2386              :             );
    2387              : 
    2388              :             // This bails out on first error without modifying pending_updates.
    2389              :             // That's Ok, cf this function's doc comment.
    2390       828156 :             writer.put_batch(batch, ctx).await?;
    2391       658032 :         }
    2392              : 
    2393      1486188 :         if !self.pending_deletions.is_empty() {
    2394            4 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    2395            4 :             self.pending_deletions.clear();
    2396      1486184 :         }
    2397              : 
    2398      1486188 :         self.pending_lsns.push(self.lsn);
    2399      1777904 :         for pending_lsn in self.pending_lsns.drain(..) {
    2400      1777904 :             // TODO(vlad): pretty sure the comment below is not valid anymore
    2401      1777904 :             // and we can call finish write with the latest LSN
    2402      1777904 :             //
    2403      1777904 :             // Ideally, we should be able to call writer.finish_write() only once
    2404      1777904 :             // with the highest LSN. However, the last_record_lsn variable in the
    2405      1777904 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    2406      1777904 :             // so we need to record every LSN to not leave a gap between them.
    2407      1777904 :             writer.finish_write(pending_lsn);
    2408      1777904 :         }
    2409              : 
    2410      1486188 :         if pending_nblocks != 0 {
    2411       541140 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2412       945048 :         }
    2413              : 
    2414      1486188 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2415         5948 :             writer.update_directory_entries_count(kind, count);
    2416         5948 :         }
    2417              : 
    2418      1486188 :         self.pending_metadata_bytes = 0;
    2419      1486188 : 
    2420      1486188 :         Ok(())
    2421      1486188 :     }
    2422              : 
    2423       583408 :     pub(crate) fn len(&self) -> usize {
    2424       583408 :         self.pending_metadata_pages.len()
    2425       583408 :             + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
    2426       583408 :             + self.pending_deletions.len()
    2427       583408 :     }
    2428              : 
    2429              :     /// Read a page from the Timeline we are writing to.  For metadata pages, this passes through
    2430              :     /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
    2431              :     /// in the modification.
    2432              :     ///
    2433              :     /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
    2434              :     /// page must ensure that the pages they read are already committed in Timeline, for example
    2435              :     /// DB create operations are always preceded by a call to commit().  This is special cased because
    2436              :     /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
    2437              :     /// and not data pages.
    2438       573172 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    2439       573172 :         if !Self::is_data_key(&key) {
    2440              :             // Have we already updated the same key? Read the latest pending updated
    2441              :             // version in that case.
    2442              :             //
    2443              :             // Note: we don't check pending_deletions. It is an error to request a
    2444              :             // value that has been removed, deletion only avoids leaking storage.
    2445       573172 :             if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
    2446        31856 :                 if let Some((_, _, value)) = values.last() {
    2447        31856 :                     return if let Value::Image(img) = value {
    2448        31856 :                         Ok(img.clone())
    2449              :                     } else {
    2450              :                         // Currently, we never need to read back a WAL record that we
    2451              :                         // inserted in the same "transaction". All the metadata updates
    2452              :                         // work directly with Images, and we never need to read actual
    2453              :                         // data pages. We could handle this if we had to, by calling
    2454              :                         // the walredo manager, but let's keep it simple for now.
    2455            0 :                         Err(PageReconstructError::Other(anyhow::anyhow!(
    2456            0 :                             "unexpected pending WAL record"
    2457            0 :                         )))
    2458              :                     };
    2459            0 :                 }
    2460       541316 :             }
    2461              :         } else {
    2462              :             // This is an expensive check, so we only do it in debug mode. If reading a data key,
    2463              :             // this key should never be present in pending_data_pages. We ensure this by committing
    2464              :             // modifications before ingesting DB create operations, which are the only kind that reads
    2465              :             // data pages during ingest.
    2466            0 :             if cfg!(debug_assertions) {
    2467            0 :                 assert!(!self
    2468            0 :                     .pending_data_batch
    2469            0 :                     .as_ref()
    2470            0 :                     .is_some_and(|b| b.updates_key(&key)));
    2471            0 :             }
    2472              :         }
    2473              : 
    2474              :         // Metadata page cache miss, or we're reading a data page.
    2475       541316 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    2476       541316 :         self.tline.get(key, lsn, ctx).await
    2477       573172 :     }
    2478              : 
    2479              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    2480              :     /// and the empty value into None.
    2481            0 :     async fn sparse_get(
    2482            0 :         &self,
    2483            0 :         key: Key,
    2484            0 :         ctx: &RequestContext,
    2485            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    2486            0 :         let val = self.get(key, ctx).await;
    2487            0 :         match val {
    2488            0 :             Ok(val) if val.is_empty() => Ok(None),
    2489            0 :             Ok(val) => Ok(Some(val)),
    2490            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    2491            0 :             Err(e) => Err(e),
    2492              :         }
    2493            0 :     }
    2494              : 
    2495      1128076 :     fn put(&mut self, key: Key, val: Value) {
    2496      1128076 :         if Self::is_data_key(&key) {
    2497       555736 :             self.put_data(key.to_compact(), val)
    2498              :         } else {
    2499       572340 :             self.put_metadata(key.to_compact(), val)
    2500              :         }
    2501      1128076 :     }
    2502              : 
    2503       555736 :     fn put_data(&mut self, key: CompactKey, val: Value) {
    2504       555736 :         let batch = self
    2505       555736 :             .pending_data_batch
    2506       555736 :             .get_or_insert_with(SerializedValueBatch::default);
    2507       555736 :         batch.put(key, val, self.lsn);
    2508       555736 :     }
    2509              : 
    2510       572340 :     fn put_metadata(&mut self, key: CompactKey, val: Value) {
    2511       572340 :         let values = self.pending_metadata_pages.entry(key).or_default();
    2512              :         // Replace the previous value if it exists at the same lsn
    2513       572340 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    2514        24304 :             if *last_lsn == self.lsn {
    2515              :                 // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
    2516        24304 :                 self.pending_metadata_bytes -= *last_value_ser_size;
    2517        24304 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    2518        24304 :                 self.pending_metadata_bytes += *last_value_ser_size;
    2519        24304 : 
    2520        24304 :                 // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
    2521        24304 :                 // have been generated by synthesized zero page writes prior to the first real write to a page.
    2522        24304 :                 *last_value = val;
    2523        24304 :                 return;
    2524            0 :             }
    2525       548036 :         }
    2526              : 
    2527       548036 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    2528       548036 :         self.pending_metadata_bytes += val_serialized_size;
    2529       548036 :         values.push((self.lsn, val_serialized_size, val));
    2530       548036 : 
    2531       548036 :         if key == CHECKPOINT_KEY.to_compact() {
    2532          444 :             tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
    2533       547592 :         }
    2534       572340 :     }
    2535              : 
    2536            4 :     fn delete(&mut self, key_range: Range<Key>) {
    2537            4 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    2538            4 :         self.pending_deletions.push((key_range, self.lsn));
    2539            4 :     }
    2540              : }
    2541              : 
    2542              : /// Statistics for a DatadirModification.
    2543              : #[derive(Default)]
    2544              : pub struct DatadirModificationStats {
    2545              :     pub metadata_images: u64,
    2546              :     pub metadata_deltas: u64,
    2547              :     pub data_images: u64,
    2548              :     pub data_deltas: u64,
    2549              : }
    2550              : 
    2551              : /// This struct facilitates accessing either a committed key from the timeline at a
    2552              : /// specific LSN, or the latest uncommitted key from a pending modification.
    2553              : ///
    2554              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    2555              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    2556              : /// need to look up the keys in the modification first before looking them up in the
    2557              : /// timeline to not miss the latest updates.
    2558              : #[derive(Clone, Copy)]
    2559              : pub enum Version<'a> {
    2560              :     Lsn(Lsn),
    2561              :     Modified(&'a DatadirModification<'a>),
    2562              : }
    2563              : 
    2564              : impl Version<'_> {
    2565        10352 :     async fn get(
    2566        10352 :         &self,
    2567        10352 :         timeline: &Timeline,
    2568        10352 :         key: Key,
    2569        10352 :         ctx: &RequestContext,
    2570        10352 :     ) -> Result<Bytes, PageReconstructError> {
    2571        10352 :         match self {
    2572        10312 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    2573           40 :             Version::Modified(modification) => modification.get(key, ctx).await,
    2574              :         }
    2575        10352 :     }
    2576              : 
    2577              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    2578              :     /// and the empty value into None.
    2579            0 :     async fn sparse_get(
    2580            0 :         &self,
    2581            0 :         timeline: &Timeline,
    2582            0 :         key: Key,
    2583            0 :         ctx: &RequestContext,
    2584            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    2585            0 :         let val = self.get(timeline, key, ctx).await;
    2586            0 :         match val {
    2587            0 :             Ok(val) if val.is_empty() => Ok(None),
    2588            0 :             Ok(val) => Ok(Some(val)),
    2589            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    2590            0 :             Err(e) => Err(e),
    2591              :         }
    2592            0 :     }
    2593              : 
    2594        71240 :     fn get_lsn(&self) -> Lsn {
    2595        71240 :         match self {
    2596        59148 :             Version::Lsn(lsn) => *lsn,
    2597        12092 :             Version::Modified(modification) => modification.lsn,
    2598              :         }
    2599        71240 :     }
    2600              : }
    2601              : 
    2602              : //--- Metadata structs stored in key-value pairs in the repository.
    2603              : 
    2604            0 : #[derive(Debug, Serialize, Deserialize)]
    2605              : pub(crate) struct DbDirectory {
    2606              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    2607              :     pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
    2608              : }
    2609              : 
    2610              : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
    2611              : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs.  Previously, the files
    2612              : // were named like "pg_twophase/000002E5", now they're like
    2613              : // "pg_twophsae/0000000A000002E4".
    2614              : 
    2615            0 : #[derive(Debug, Serialize, Deserialize)]
    2616              : pub(crate) struct TwoPhaseDirectory {
    2617              :     pub(crate) xids: HashSet<TransactionId>,
    2618              : }
    2619              : 
    2620            0 : #[derive(Debug, Serialize, Deserialize)]
    2621              : struct TwoPhaseDirectoryV17 {
    2622              :     xids: HashSet<u64>,
    2623              : }
    2624              : 
    2625            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    2626              : pub(crate) struct RelDirectory {
    2627              :     // Set of relations that exist. (relfilenode, forknum)
    2628              :     //
    2629              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    2630              :     // key-value pairs, if you have a lot of relations
    2631              :     pub(crate) rels: HashSet<(Oid, u8)>,
    2632              : }
    2633              : 
    2634            0 : #[derive(Debug, Serialize, Deserialize)]
    2635              : struct RelSizeEntry {
    2636              :     nblocks: u32,
    2637              : }
    2638              : 
    2639            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    2640              : pub(crate) struct SlruSegmentDirectory {
    2641              :     // Set of SLRU segments that exist.
    2642              :     pub(crate) segments: HashSet<u32>,
    2643              : }
    2644              : 
    2645              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2646              : #[repr(u8)]
    2647              : pub(crate) enum DirectoryKind {
    2648              :     Db,
    2649              :     TwoPhase,
    2650              :     Rel,
    2651              :     AuxFiles,
    2652              :     SlruSegment(SlruKind),
    2653              :     RelV2,
    2654              : }
    2655              : 
    2656              : impl DirectoryKind {
    2657              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2658        17848 :     pub(crate) fn offset(&self) -> usize {
    2659        17848 :         self.into_usize()
    2660        17848 :     }
    2661              : }
    2662              : 
    2663              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2664              : 
    2665              : #[allow(clippy::bool_assert_comparison)]
    2666              : #[cfg(test)]
    2667              : mod tests {
    2668              :     use hex_literal::hex;
    2669              :     use pageserver_api::{models::ShardParameters, shard::ShardStripeSize};
    2670              :     use utils::{
    2671              :         id::TimelineId,
    2672              :         shard::{ShardCount, ShardNumber},
    2673              :     };
    2674              : 
    2675              :     use super::*;
    2676              : 
    2677              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    2678              : 
    2679              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2680              :     #[tokio::test]
    2681            4 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2682            4 :         let name = "aux_files_round_trip";
    2683            4 :         let harness = TenantHarness::create(name).await?;
    2684            4 : 
    2685            4 :         pub const TIMELINE_ID: TimelineId =
    2686            4 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2687            4 : 
    2688            4 :         let (tenant, ctx) = harness.load().await;
    2689            4 :         let tline = tenant
    2690            4 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2691            4 :             .await?;
    2692            4 :         let tline = tline.raw_timeline().unwrap();
    2693            4 : 
    2694            4 :         // First modification: insert two keys
    2695            4 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2696            4 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2697            4 :         modification.set_lsn(Lsn(0x1008))?;
    2698            4 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2699            4 :         modification.commit(&ctx).await?;
    2700            4 :         let expect_1008 = HashMap::from([
    2701            4 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2702            4 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2703            4 :         ]);
    2704            4 : 
    2705            4 :         let io_concurrency = IoConcurrency::spawn_for_test();
    2706            4 : 
    2707            4 :         let readback = tline
    2708            4 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    2709            4 :             .await?;
    2710            4 :         assert_eq!(readback, expect_1008);
    2711            4 : 
    2712            4 :         // Second modification: update one key, remove the other
    2713            4 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2714            4 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2715            4 :         modification.set_lsn(Lsn(0x2008))?;
    2716            4 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2717            4 :         modification.commit(&ctx).await?;
    2718            4 :         let expect_2008 =
    2719            4 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2720            4 : 
    2721            4 :         let readback = tline
    2722            4 :             .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
    2723            4 :             .await?;
    2724            4 :         assert_eq!(readback, expect_2008);
    2725            4 : 
    2726            4 :         // Reading back in time works
    2727            4 :         let readback = tline
    2728            4 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    2729            4 :             .await?;
    2730            4 :         assert_eq!(readback, expect_1008);
    2731            4 : 
    2732            4 :         Ok(())
    2733            4 :     }
    2734              : 
    2735              :     #[test]
    2736            4 :     fn gap_finding() {
    2737            4 :         let rel = RelTag {
    2738            4 :             spcnode: 1663,
    2739            4 :             dbnode: 208101,
    2740            4 :             relnode: 2620,
    2741            4 :             forknum: 0,
    2742            4 :         };
    2743            4 :         let base_blkno = 1;
    2744            4 : 
    2745            4 :         let base_key = rel_block_to_key(rel, base_blkno);
    2746            4 :         let before_base_key = rel_block_to_key(rel, base_blkno - 1);
    2747            4 : 
    2748            4 :         let shard = ShardIdentity::unsharded();
    2749            4 : 
    2750            4 :         let mut previous_nblocks = 0;
    2751           44 :         for i in 0..10 {
    2752           40 :             let crnt_blkno = base_blkno + i;
    2753           40 :             let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
    2754           40 : 
    2755           40 :             previous_nblocks = crnt_blkno + 1;
    2756           40 : 
    2757           40 :             if i == 0 {
    2758              :                 // The first block we write is 1, so we should find the gap.
    2759            4 :                 assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
    2760              :             } else {
    2761           36 :                 assert!(gaps.is_none());
    2762              :             }
    2763              :         }
    2764              : 
    2765              :         // This is an update to an already existing block. No gaps here.
    2766            4 :         let update_blkno = 5;
    2767            4 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    2768            4 :         assert!(gaps.is_none());
    2769              : 
    2770              :         // This is an update past the current end block.
    2771            4 :         let after_gap_blkno = 20;
    2772            4 :         let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
    2773            4 : 
    2774            4 :         let gap_start_key = rel_block_to_key(rel, previous_nblocks);
    2775            4 :         let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
    2776            4 :         assert_eq!(
    2777            4 :             gaps.unwrap(),
    2778            4 :             KeySpace::single(gap_start_key..after_gap_key)
    2779            4 :         );
    2780            4 :     }
    2781              : 
    2782              :     #[test]
    2783            4 :     fn sharded_gap_finding() {
    2784            4 :         let rel = RelTag {
    2785            4 :             spcnode: 1663,
    2786            4 :             dbnode: 208101,
    2787            4 :             relnode: 2620,
    2788            4 :             forknum: 0,
    2789            4 :         };
    2790            4 : 
    2791            4 :         let first_blkno = 6;
    2792            4 : 
    2793            4 :         // This shard will get the even blocks
    2794            4 :         let shard = ShardIdentity::from_params(
    2795            4 :             ShardNumber(0),
    2796            4 :             &ShardParameters {
    2797            4 :                 count: ShardCount(2),
    2798            4 :                 stripe_size: ShardStripeSize(1),
    2799            4 :             },
    2800            4 :         );
    2801            4 : 
    2802            4 :         // Only keys belonging to this shard are considered as gaps.
    2803            4 :         let mut previous_nblocks = 0;
    2804            4 :         let gaps =
    2805            4 :             DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
    2806            4 :         assert!(!gaps.ranges.is_empty());
    2807           12 :         for gap_range in gaps.ranges {
    2808            8 :             let mut k = gap_range.start;
    2809           16 :             while k != gap_range.end {
    2810            8 :                 assert_eq!(shard.get_shard_number(&k), shard.number);
    2811            8 :                 k = k.next();
    2812              :             }
    2813              :         }
    2814              : 
    2815            4 :         previous_nblocks = first_blkno;
    2816            4 : 
    2817            4 :         let update_blkno = 2;
    2818            4 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    2819            4 :         assert!(gaps.is_none());
    2820            4 :     }
    2821              : 
    2822              :     /*
    2823              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    2824              :             let incremental = timeline.get_current_logical_size();
    2825              :             let non_incremental = timeline
    2826              :                 .get_current_logical_size_non_incremental(lsn)
    2827              :                 .unwrap();
    2828              :             assert_eq!(incremental, non_incremental);
    2829              :         }
    2830              :     */
    2831              : 
    2832              :     /*
    2833              :     ///
    2834              :     /// Test list_rels() function, with branches and dropped relations
    2835              :     ///
    2836              :     #[test]
    2837              :     fn test_list_rels_drop() -> Result<()> {
    2838              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    2839              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    2840              :         const TESTDB: u32 = 111;
    2841              : 
    2842              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    2843              :         // after branching fails below
    2844              :         let mut writer = tline.begin_record(Lsn(0x10));
    2845              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    2846              :         writer.finish()?;
    2847              : 
    2848              :         // Create a relation on the timeline
    2849              :         let mut writer = tline.begin_record(Lsn(0x20));
    2850              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    2851              :         writer.finish()?;
    2852              : 
    2853              :         let writer = tline.begin_record(Lsn(0x00));
    2854              :         writer.finish()?;
    2855              : 
    2856              :         // Check that list_rels() lists it after LSN 2, but no before it
    2857              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    2858              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    2859              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    2860              : 
    2861              :         // Create a branch, check that the relation is visible there
    2862              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    2863              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    2864              :             Some(timeline) => timeline,
    2865              :             None => panic!("Should have a local timeline"),
    2866              :         };
    2867              :         let newtline = DatadirTimelineImpl::new(newtline);
    2868              :         assert!(newtline
    2869              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2870              :             .contains(&TESTREL_A));
    2871              : 
    2872              :         // Drop it on the branch
    2873              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    2874              :         new_writer.drop_relation(TESTREL_A)?;
    2875              :         new_writer.finish()?;
    2876              : 
    2877              :         // Check that it's no longer listed on the branch after the point where it was dropped
    2878              :         assert!(newtline
    2879              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2880              :             .contains(&TESTREL_A));
    2881              :         assert!(!newtline
    2882              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2883              :             .contains(&TESTREL_A));
    2884              : 
    2885              :         // Run checkpoint and garbage collection and check that it's still not visible
    2886              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    2887              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    2888              : 
    2889              :         assert!(!newtline
    2890              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2891              :             .contains(&TESTREL_A));
    2892              : 
    2893              :         Ok(())
    2894              :     }
    2895              :      */
    2896              : 
    2897              :     /*
    2898              :     #[test]
    2899              :     fn test_read_beyond_eof() -> Result<()> {
    2900              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    2901              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    2902              : 
    2903              :         make_some_layers(&tline, Lsn(0x20))?;
    2904              :         let mut writer = tline.begin_record(Lsn(0x60));
    2905              :         walingest.put_rel_page_image(
    2906              :             &mut writer,
    2907              :             TESTREL_A,
    2908              :             0,
    2909              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    2910              :         )?;
    2911              :         writer.finish()?;
    2912              : 
    2913              :         // Test read before rel creation. Should error out.
    2914              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2915              : 
    2916              :         // Read block beyond end of relation at different points in time.
    2917              :         // These reads should fall into different delta, image, and in-memory layers.
    2918              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2919              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2920              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2921              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2922              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2923              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2924              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2925              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2926              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2927              : 
    2928              :         // Test on an in-memory layer with no preceding layer
    2929              :         let mut writer = tline.begin_record(Lsn(0x70));
    2930              :         walingest.put_rel_page_image(
    2931              :             &mut writer,
    2932              :             TESTREL_B,
    2933              :             0,
    2934              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2935              :         )?;
    2936              :         writer.finish()?;
    2937              : 
    2938              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2939              : 
    2940              :         Ok(())
    2941              :     }
    2942              :      */
    2943              : }
        

Generated by: LCOV version 2.1-beta