LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 59.0 % 1672 986
Test Date: 2025-01-07 20:58:07 Functions: 43.5 % 200 87

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::aux_file;
      11              : use crate::context::RequestContext;
      12              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      13              : use crate::metrics::{
      14              :     RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
      15              : };
      16              : use crate::span::{
      17              :     debug_assert_current_span_has_tenant_and_timeline_id,
      18              :     debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
      19              : };
      20              : use crate::tenant::timeline::GetVectoredError;
      21              : use anyhow::{ensure, Context};
      22              : use bytes::{Buf, Bytes, BytesMut};
      23              : use enum_map::Enum;
      24              : use itertools::Itertools;
      25              : use pageserver_api::key::Key;
      26              : use pageserver_api::key::{
      27              :     dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
      28              :     relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      29              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      30              :     CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
      31              : };
      32              : use pageserver_api::keyspace::SparseKeySpace;
      33              : use pageserver_api::record::NeonWalRecord;
      34              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      35              : use pageserver_api::shard::ShardIdentity;
      36              : use pageserver_api::value::Value;
      37              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      38              : use postgres_ffi::BLCKSZ;
      39              : use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
      40              : use serde::{Deserialize, Serialize};
      41              : use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
      42              : use std::ops::ControlFlow;
      43              : use std::ops::Range;
      44              : use strum::IntoEnumIterator;
      45              : use tokio_util::sync::CancellationToken;
      46              : use tracing::{debug, trace, warn};
      47              : use utils::bin_ser::DeserializeError;
      48              : use utils::pausable_failpoint;
      49              : use utils::{bin_ser::BeSer, lsn::Lsn};
      50              : use wal_decoder::serialized_batch::SerializedValueBatch;
      51              : 
      52              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      53              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      54              : 
      55              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      56              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
      57              : 
      58              : #[derive(Debug)]
      59              : pub enum LsnForTimestamp {
      60              :     /// Found commits both before and after the given timestamp
      61              :     Present(Lsn),
      62              : 
      63              :     /// Found no commits after the given timestamp, this means
      64              :     /// that the newest data in the branch is older than the given
      65              :     /// timestamp.
      66              :     ///
      67              :     /// All commits <= LSN happened before the given timestamp
      68              :     Future(Lsn),
      69              : 
      70              :     /// The queried timestamp is past our horizon we look back at (PITR)
      71              :     ///
      72              :     /// All commits > LSN happened after the given timestamp,
      73              :     /// but any commits < LSN might have happened before or after
      74              :     /// the given timestamp. We don't know because no data before
      75              :     /// the given lsn is available.
      76              :     Past(Lsn),
      77              : 
      78              :     /// We have found no commit with a timestamp,
      79              :     /// so we can't return anything meaningful.
      80              :     ///
      81              :     /// The associated LSN is the lower bound value we can safely
      82              :     /// create branches on, but no statement is made if it is
      83              :     /// older or newer than the timestamp.
      84              :     ///
      85              :     /// This variant can e.g. be returned right after a
      86              :     /// cluster import.
      87              :     NoData(Lsn),
      88              : }
      89              : 
      90              : #[derive(Debug, thiserror::Error)]
      91              : pub(crate) enum CalculateLogicalSizeError {
      92              :     #[error("cancelled")]
      93              :     Cancelled,
      94              : 
      95              :     /// Something went wrong while reading the metadata we use to calculate logical size
      96              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
      97              :     /// in the `From` implementation for this variant.
      98              :     #[error(transparent)]
      99              :     PageRead(PageReconstructError),
     100              : 
     101              :     /// Something went wrong deserializing metadata that we read to calculate logical size
     102              :     #[error("decode error: {0}")]
     103              :     Decode(#[from] DeserializeError),
     104              : }
     105              : 
     106              : #[derive(Debug, thiserror::Error)]
     107              : pub(crate) enum CollectKeySpaceError {
     108              :     #[error(transparent)]
     109              :     Decode(#[from] DeserializeError),
     110              :     #[error(transparent)]
     111              :     PageRead(PageReconstructError),
     112              :     #[error("cancelled")]
     113              :     Cancelled,
     114              : }
     115              : 
     116              : impl From<PageReconstructError> for CollectKeySpaceError {
     117            0 :     fn from(err: PageReconstructError) -> Self {
     118            0 :         match err {
     119            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     120            0 :             err => Self::PageRead(err),
     121              :         }
     122            0 :     }
     123              : }
     124              : 
     125              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     126            0 :     fn from(pre: PageReconstructError) -> Self {
     127            0 :         match pre {
     128            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     129            0 :             _ => Self::PageRead(pre),
     130              :         }
     131            0 :     }
     132              : }
     133              : 
     134              : #[derive(Debug, thiserror::Error)]
     135              : pub enum RelationError {
     136              :     #[error("Relation Already Exists")]
     137              :     AlreadyExists,
     138              :     #[error("invalid relnode")]
     139              :     InvalidRelnode,
     140              :     #[error(transparent)]
     141              :     Other(#[from] anyhow::Error),
     142              : }
     143              : 
     144              : ///
     145              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     146              : /// and other special kinds of files, in a versioned key-value store. The
     147              : /// Timeline struct provides the key-value store.
     148              : ///
     149              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     150              : /// implementation, and might be moved into a separate struct later.
     151              : impl Timeline {
     152              :     /// Start ingesting a WAL record, or other atomic modification of
     153              :     /// the timeline.
     154              :     ///
     155              :     /// This provides a transaction-like interface to perform a bunch
     156              :     /// of modifications atomically.
     157              :     ///
     158              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     159              :     /// DatadirModification object. Use the functions in the object to
     160              :     /// modify the repository state, updating all the pages and metadata
     161              :     /// that the WAL record affects. When you're done, call commit() to
     162              :     /// commit the changes.
     163              :     ///
     164              :     /// Lsn stored in modification is advanced by `ingest_record` and
     165              :     /// is used by `commit()` to update `last_record_lsn`.
     166              :     ///
     167              :     /// Calling commit() will flush all the changes and reset the state,
     168              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     169              :     ///
     170              :     /// Note that any pending modifications you make through the
     171              :     /// modification object won't be visible to calls to the 'get' and list
     172              :     /// functions of the timeline until you finish! And if you update the
     173              :     /// same page twice, the last update wins.
     174              :     ///
     175       268384 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     176       268384 :     where
     177       268384 :         Self: Sized,
     178       268384 :     {
     179       268384 :         DatadirModification {
     180       268384 :             tline: self,
     181       268384 :             pending_lsns: Vec::new(),
     182       268384 :             pending_metadata_pages: HashMap::new(),
     183       268384 :             pending_data_batch: None,
     184       268384 :             pending_deletions: Vec::new(),
     185       268384 :             pending_nblocks: 0,
     186       268384 :             pending_directory_entries: Vec::new(),
     187       268384 :             pending_metadata_bytes: 0,
     188       268384 :             lsn,
     189       268384 :         }
     190       268384 :     }
     191              : 
     192              :     //------------------------------------------------------------------------------
     193              :     // Public GET functions
     194              :     //------------------------------------------------------------------------------
     195              : 
     196              :     /// Look up given page version.
     197        18384 :     pub(crate) async fn get_rel_page_at_lsn(
     198        18384 :         &self,
     199        18384 :         tag: RelTag,
     200        18384 :         blknum: BlockNumber,
     201        18384 :         version: Version<'_>,
     202        18384 :         ctx: &RequestContext,
     203        18384 :     ) -> Result<Bytes, PageReconstructError> {
     204        18384 :         match version {
     205        18384 :             Version::Lsn(effective_lsn) => {
     206        18384 :                 let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
     207        18384 :                 let res = self
     208        18384 :                     .get_rel_page_at_lsn_batched(
     209        18384 :                         pages.iter().map(|(tag, blknum)| (tag, blknum)),
     210        18384 :                         effective_lsn,
     211        18384 :                         ctx,
     212        18384 :                     )
     213        18384 :                     .await;
     214        18384 :                 assert_eq!(res.len(), 1);
     215        18384 :                 res.into_iter().next().unwrap()
     216              :             }
     217            0 :             Version::Modified(modification) => {
     218            0 :                 if tag.relnode == 0 {
     219            0 :                     return Err(PageReconstructError::Other(
     220            0 :                         RelationError::InvalidRelnode.into(),
     221            0 :                     ));
     222            0 :                 }
     223              : 
     224            0 :                 let nblocks = self.get_rel_size(tag, version, ctx).await?;
     225            0 :                 if blknum >= nblocks {
     226            0 :                     debug!(
     227            0 :                         "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     228            0 :                         tag,
     229            0 :                         blknum,
     230            0 :                         version.get_lsn(),
     231              :                         nblocks
     232              :                     );
     233            0 :                     return Ok(ZERO_PAGE.clone());
     234            0 :                 }
     235            0 : 
     236            0 :                 let key = rel_block_to_key(tag, blknum);
     237            0 :                 modification.get(key, ctx).await
     238              :             }
     239              :         }
     240        18384 :     }
     241              : 
     242              :     /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
     243              :     ///
     244              :     /// The ordering of the returned vec corresponds to the ordering of `pages`.
     245        18384 :     pub(crate) async fn get_rel_page_at_lsn_batched(
     246        18384 :         &self,
     247        18384 :         pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
     248        18384 :         effective_lsn: Lsn,
     249        18384 :         ctx: &RequestContext,
     250        18384 :     ) -> Vec<Result<Bytes, PageReconstructError>> {
     251        18384 :         debug_assert_current_span_has_tenant_and_timeline_id();
     252        18384 : 
     253        18384 :         let mut slots_filled = 0;
     254        18384 :         let page_count = pages.len();
     255        18384 : 
     256        18384 :         // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
     257        18384 :         let mut result = Vec::with_capacity(pages.len());
     258        18384 :         let result_slots = result.spare_capacity_mut();
     259        18384 : 
     260        18384 :         let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
     261        18384 :         for (response_slot_idx, (tag, blknum)) in pages.enumerate() {
     262        18384 :             if tag.relnode == 0 {
     263            0 :                 result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
     264            0 :                     RelationError::InvalidRelnode.into(),
     265            0 :                 )));
     266            0 : 
     267            0 :                 slots_filled += 1;
     268            0 :                 continue;
     269        18384 :             }
     270              : 
     271        18384 :             let nblocks = match self
     272        18384 :                 .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
     273        18384 :                 .await
     274              :             {
     275        18384 :                 Ok(nblocks) => nblocks,
     276            0 :                 Err(err) => {
     277            0 :                     result_slots[response_slot_idx].write(Err(err));
     278            0 :                     slots_filled += 1;
     279            0 :                     continue;
     280              :                 }
     281              :             };
     282              : 
     283        18384 :             if *blknum >= nblocks {
     284            0 :                 debug!(
     285            0 :                     "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     286              :                     tag, blknum, effective_lsn, nblocks
     287              :                 );
     288            0 :                 result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
     289            0 :                 slots_filled += 1;
     290            0 :                 continue;
     291        18384 :             }
     292        18384 : 
     293        18384 :             let key = rel_block_to_key(*tag, *blknum);
     294        18384 : 
     295        18384 :             let key_slots = keys_slots.entry(key).or_default();
     296        18384 :             key_slots.push(response_slot_idx);
     297              :         }
     298              : 
     299        18384 :         let keyspace = {
     300              :             // add_key requires monotonicity
     301        18384 :             let mut acc = KeySpaceAccum::new();
     302        18384 :             for key in keys_slots
     303        18384 :                 .keys()
     304        18384 :                 // in fact it requires strong monotonicity
     305        18384 :                 .dedup()
     306        18384 :             {
     307        18384 :                 acc.add_key(*key);
     308        18384 :             }
     309        18384 :             acc.to_keyspace()
     310        18384 :         };
     311        18384 : 
     312        18384 :         match self.get_vectored(keyspace, effective_lsn, ctx).await {
     313        18384 :             Ok(results) => {
     314        36768 :                 for (key, res) in results {
     315        18384 :                     let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
     316        18384 :                     let first_slot = key_slots.next().unwrap();
     317              : 
     318        18384 :                     for slot in key_slots {
     319            0 :                         let clone = match &res {
     320            0 :                             Ok(buf) => Ok(buf.clone()),
     321            0 :                             Err(err) => Err(match err {
     322              :                                 PageReconstructError::Cancelled => {
     323            0 :                                     PageReconstructError::Cancelled
     324              :                                 }
     325              : 
     326            0 :                                 x @ PageReconstructError::Other(_) |
     327            0 :                                 x @ PageReconstructError::AncestorLsnTimeout(_) |
     328            0 :                                 x @ PageReconstructError::WalRedo(_) |
     329            0 :                                 x @ PageReconstructError::MissingKey(_) => {
     330            0 :                                     PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
     331              :                                 },
     332              :                             }),
     333              :                         };
     334              : 
     335            0 :                         result_slots[slot].write(clone);
     336            0 :                         slots_filled += 1;
     337              :                     }
     338              : 
     339        18384 :                     result_slots[first_slot].write(res);
     340        18384 :                     slots_filled += 1;
     341              :                 }
     342              :             }
     343            0 :             Err(err) => {
     344              :                 // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
     345              :                 // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
     346            0 :                 for slot in keys_slots.values().flatten() {
     347              :                     // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
     348              :                     // but without taking ownership of the GetVectoredError
     349            0 :                     let err = match &err {
     350              :                         GetVectoredError::Cancelled => {
     351            0 :                             Err(PageReconstructError::Cancelled)
     352              :                         }
     353              :                         // TODO: restructure get_vectored API to make this error per-key
     354            0 :                         GetVectoredError::MissingKey(err) => {
     355            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}")))
     356              :                         }
     357              :                         // TODO: restructure get_vectored API to make this error per-key
     358            0 :                         GetVectoredError::GetReadyAncestorError(err) => {
     359            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}")))
     360              :                         }
     361              :                         // TODO: restructure get_vectored API to make this error per-key
     362            0 :                         GetVectoredError::Other(err) => {
     363            0 :                             Err(PageReconstructError::Other(
     364            0 :                                 anyhow::anyhow!("whole vectored get request failed: {err:?}"),
     365            0 :                             ))
     366              :                         }
     367              :                         // TODO: we can prevent this error class by moving this check into the type system
     368            0 :                         GetVectoredError::InvalidLsn(e) => {
     369            0 :                             Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
     370              :                         }
     371              :                         // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
     372              :                         // TODO: we can prevent this error class by moving this check into the type system
     373            0 :                         GetVectoredError::Oversized(err) => {
     374            0 :                             Err(anyhow::anyhow!(
     375            0 :                                 "batching oversized: {err:?}"
     376            0 :                             )
     377            0 :                             .into())
     378              :                         }
     379              :                     };
     380              : 
     381            0 :                     result_slots[*slot].write(err);
     382              :                 }
     383              : 
     384            0 :                 slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
     385            0 :             }
     386              :         };
     387              : 
     388        18384 :         assert_eq!(slots_filled, page_count);
     389              :         // SAFETY:
     390              :         // 1. `result` and any of its uninint members are not read from until this point
     391              :         // 2. The length below is tracked at run-time and matches the number of requested pages.
     392        18384 :         unsafe {
     393        18384 :             result.set_len(page_count);
     394        18384 :         }
     395        18384 : 
     396        18384 :         result
     397        18384 :     }
     398              : 
     399              :     /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
     400              :     /// other shards, by only accounting for relations the shard has pages for, and only accounting
     401              :     /// for pages up to the highest page number it has stored.
     402            0 :     pub(crate) async fn get_db_size(
     403            0 :         &self,
     404            0 :         spcnode: Oid,
     405            0 :         dbnode: Oid,
     406            0 :         version: Version<'_>,
     407            0 :         ctx: &RequestContext,
     408            0 :     ) -> Result<usize, PageReconstructError> {
     409            0 :         let mut total_blocks = 0;
     410              : 
     411            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     412              : 
     413            0 :         for rel in rels {
     414            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     415            0 :             total_blocks += n_blocks as usize;
     416              :         }
     417            0 :         Ok(total_blocks)
     418            0 :     }
     419              : 
     420              :     /// Get size of a relation file. The relation must exist, otherwise an error is returned.
     421              :     ///
     422              :     /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
     423              :     /// page number stored in the shard.
     424        24434 :     pub(crate) async fn get_rel_size(
     425        24434 :         &self,
     426        24434 :         tag: RelTag,
     427        24434 :         version: Version<'_>,
     428        24434 :         ctx: &RequestContext,
     429        24434 :     ) -> Result<BlockNumber, PageReconstructError> {
     430        24434 :         if tag.relnode == 0 {
     431            0 :             return Err(PageReconstructError::Other(
     432            0 :                 RelationError::InvalidRelnode.into(),
     433            0 :             ));
     434        24434 :         }
     435              : 
     436        24434 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     437        19294 :             return Ok(nblocks);
     438         5140 :         }
     439         5140 : 
     440         5140 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     441            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     442              :         {
     443              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     444              :             // FSM, and smgrnblocks() on it immediately afterwards,
     445              :             // without extending it.  Tolerate that by claiming that
     446              :             // any non-existent FSM fork has size 0.
     447            0 :             return Ok(0);
     448         5140 :         }
     449         5140 : 
     450         5140 :         let key = rel_size_to_key(tag);
     451         5140 :         let mut buf = version.get(self, key, ctx).await?;
     452         5136 :         let nblocks = buf.get_u32_le();
     453         5136 : 
     454         5136 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     455         5136 : 
     456         5136 :         Ok(nblocks)
     457        24434 :     }
     458              : 
     459              :     /// Does the relation exist?
     460              :     ///
     461              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     462              :     /// the shard stores pages for.
     463         6050 :     pub(crate) async fn get_rel_exists(
     464         6050 :         &self,
     465         6050 :         tag: RelTag,
     466         6050 :         version: Version<'_>,
     467         6050 :         ctx: &RequestContext,
     468         6050 :     ) -> Result<bool, PageReconstructError> {
     469         6050 :         if tag.relnode == 0 {
     470            0 :             return Err(PageReconstructError::Other(
     471            0 :                 RelationError::InvalidRelnode.into(),
     472            0 :             ));
     473         6050 :         }
     474              : 
     475              :         // first try to lookup relation in cache
     476         6050 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     477         6032 :             return Ok(true);
     478           18 :         }
     479              :         // then check if the database was already initialized.
     480              :         // get_rel_exists can be called before dbdir is created.
     481           18 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     482           18 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     483           18 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     484            0 :             return Ok(false);
     485           18 :         }
     486           18 :         // fetch directory listing
     487           18 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     488           18 :         let buf = version.get(self, key, ctx).await?;
     489              : 
     490           18 :         let dir = RelDirectory::des(&buf)?;
     491           18 :         Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
     492         6050 :     }
     493              : 
     494              :     /// Get a list of all existing relations in given tablespace and database.
     495              :     ///
     496              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     497              :     /// the shard stores pages for.
     498              :     ///
     499              :     /// # Cancel-Safety
     500              :     ///
     501              :     /// This method is cancellation-safe.
     502            0 :     pub(crate) async fn list_rels(
     503            0 :         &self,
     504            0 :         spcnode: Oid,
     505            0 :         dbnode: Oid,
     506            0 :         version: Version<'_>,
     507            0 :         ctx: &RequestContext,
     508            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     509            0 :         // fetch directory listing
     510            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     511            0 :         let buf = version.get(self, key, ctx).await?;
     512              : 
     513            0 :         let dir = RelDirectory::des(&buf)?;
     514            0 :         let rels: HashSet<RelTag> =
     515            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     516            0 :                 spcnode,
     517            0 :                 dbnode,
     518            0 :                 relnode: *relnode,
     519            0 :                 forknum: *forknum,
     520            0 :             }));
     521            0 : 
     522            0 :         Ok(rels)
     523            0 :     }
     524              : 
     525              :     /// Get the whole SLRU segment
     526            0 :     pub(crate) async fn get_slru_segment(
     527            0 :         &self,
     528            0 :         kind: SlruKind,
     529            0 :         segno: u32,
     530            0 :         lsn: Lsn,
     531            0 :         ctx: &RequestContext,
     532            0 :     ) -> Result<Bytes, PageReconstructError> {
     533            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     534            0 :         let n_blocks = self
     535            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     536            0 :             .await?;
     537            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     538            0 :         for blkno in 0..n_blocks {
     539            0 :             let block = self
     540            0 :                 .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
     541            0 :                 .await?;
     542            0 :             segment.extend_from_slice(&block[..BLCKSZ as usize]);
     543              :         }
     544            0 :         Ok(segment.freeze())
     545            0 :     }
     546              : 
     547              :     /// Look up given SLRU page version.
     548            0 :     pub(crate) async fn get_slru_page_at_lsn(
     549            0 :         &self,
     550            0 :         kind: SlruKind,
     551            0 :         segno: u32,
     552            0 :         blknum: BlockNumber,
     553            0 :         lsn: Lsn,
     554            0 :         ctx: &RequestContext,
     555            0 :     ) -> Result<Bytes, PageReconstructError> {
     556            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     557            0 :         let key = slru_block_to_key(kind, segno, blknum);
     558            0 :         self.get(key, lsn, ctx).await
     559            0 :     }
     560              : 
     561              :     /// Get size of an SLRU segment
     562            0 :     pub(crate) async fn get_slru_segment_size(
     563            0 :         &self,
     564            0 :         kind: SlruKind,
     565            0 :         segno: u32,
     566            0 :         version: Version<'_>,
     567            0 :         ctx: &RequestContext,
     568            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     569            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     570            0 :         let key = slru_segment_size_to_key(kind, segno);
     571            0 :         let mut buf = version.get(self, key, ctx).await?;
     572            0 :         Ok(buf.get_u32_le())
     573            0 :     }
     574              : 
     575              :     /// Get size of an SLRU segment
     576            0 :     pub(crate) async fn get_slru_segment_exists(
     577            0 :         &self,
     578            0 :         kind: SlruKind,
     579            0 :         segno: u32,
     580            0 :         version: Version<'_>,
     581            0 :         ctx: &RequestContext,
     582            0 :     ) -> Result<bool, PageReconstructError> {
     583            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     584              :         // fetch directory listing
     585            0 :         let key = slru_dir_to_key(kind);
     586            0 :         let buf = version.get(self, key, ctx).await?;
     587              : 
     588            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     589            0 :         Ok(dir.segments.contains(&segno))
     590            0 :     }
     591              : 
     592              :     /// Locate LSN, such that all transactions that committed before
     593              :     /// 'search_timestamp' are visible, but nothing newer is.
     594              :     ///
     595              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     596              :     /// so it's not well defined which LSN you get if there were multiple commits
     597              :     /// "in flight" at that point in time.
     598              :     ///
     599            0 :     pub(crate) async fn find_lsn_for_timestamp(
     600            0 :         &self,
     601            0 :         search_timestamp: TimestampTz,
     602            0 :         cancel: &CancellationToken,
     603            0 :         ctx: &RequestContext,
     604            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     605            0 :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     606              : 
     607            0 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     608            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     609            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     610            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     611            0 :         // on the safe side.
     612            0 :         let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
     613            0 :         let max_lsn = self.get_last_record_lsn();
     614            0 : 
     615            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     616            0 :         // LSN divided by 8.
     617            0 :         let mut low = min_lsn.0 / 8;
     618            0 :         let mut high = max_lsn.0 / 8 + 1;
     619            0 : 
     620            0 :         let mut found_smaller = false;
     621            0 :         let mut found_larger = false;
     622              : 
     623            0 :         while low < high {
     624            0 :             if cancel.is_cancelled() {
     625            0 :                 return Err(PageReconstructError::Cancelled);
     626            0 :             }
     627            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     628            0 :             let mid = (high + low) / 2;
     629              : 
     630            0 :             let cmp = self
     631            0 :                 .is_latest_commit_timestamp_ge_than(
     632            0 :                     search_timestamp,
     633            0 :                     Lsn(mid * 8),
     634            0 :                     &mut found_smaller,
     635            0 :                     &mut found_larger,
     636            0 :                     ctx,
     637            0 :                 )
     638            0 :                 .await?;
     639              : 
     640            0 :             if cmp {
     641            0 :                 high = mid;
     642            0 :             } else {
     643            0 :                 low = mid + 1;
     644            0 :             }
     645              :         }
     646              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     647              :         // so the LSN of the last commit record before or at `search_timestamp`.
     648              :         // Remove one from `low` to get `t`.
     649              :         //
     650              :         // FIXME: it would be better to get the LSN of the previous commit.
     651              :         // Otherwise, if you restore to the returned LSN, the database will
     652              :         // include physical changes from later commits that will be marked
     653              :         // as aborted, and will need to be vacuumed away.
     654            0 :         let commit_lsn = Lsn((low - 1) * 8);
     655            0 :         match (found_smaller, found_larger) {
     656              :             (false, false) => {
     657              :                 // This can happen if no commit records have been processed yet, e.g.
     658              :                 // just after importing a cluster.
     659            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     660              :             }
     661              :             (false, true) => {
     662              :                 // Didn't find any commit timestamps smaller than the request
     663            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     664              :             }
     665            0 :             (true, _) if commit_lsn < min_lsn => {
     666            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     667            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     668            0 :                 // below the min_lsn. We should never do that.
     669            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     670              :             }
     671              :             (true, false) => {
     672              :                 // Only found commits with timestamps smaller than the request.
     673              :                 // It's still a valid case for branch creation, return it.
     674              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     675              :                 // case, anyway.
     676            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     677              :             }
     678            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     679              :         }
     680            0 :     }
     681              : 
     682              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     683              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     684              :     ///
     685              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     686              :     /// with a smaller/larger timestamp.
     687              :     ///
     688            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     689            0 :         &self,
     690            0 :         search_timestamp: TimestampTz,
     691            0 :         probe_lsn: Lsn,
     692            0 :         found_smaller: &mut bool,
     693            0 :         found_larger: &mut bool,
     694            0 :         ctx: &RequestContext,
     695            0 :     ) -> Result<bool, PageReconstructError> {
     696            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     697            0 :             if timestamp >= search_timestamp {
     698            0 :                 *found_larger = true;
     699            0 :                 return ControlFlow::Break(true);
     700            0 :             } else {
     701            0 :                 *found_smaller = true;
     702            0 :             }
     703            0 :             ControlFlow::Continue(())
     704            0 :         })
     705            0 :         .await
     706            0 :     }
     707              : 
     708              :     /// Obtain the possible timestamp range for the given lsn.
     709              :     ///
     710              :     /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
     711            0 :     pub(crate) async fn get_timestamp_for_lsn(
     712            0 :         &self,
     713            0 :         probe_lsn: Lsn,
     714            0 :         ctx: &RequestContext,
     715            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     716            0 :         let mut max: Option<TimestampTz> = None;
     717            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     718            0 :             if let Some(max_prev) = max {
     719            0 :                 max = Some(max_prev.max(timestamp));
     720            0 :             } else {
     721            0 :                 max = Some(timestamp);
     722            0 :             }
     723            0 :             ControlFlow::Continue(())
     724            0 :         })
     725            0 :         .await?;
     726              : 
     727            0 :         Ok(max)
     728            0 :     }
     729              : 
     730              :     /// Runs the given function on all the timestamps for a given lsn
     731              :     ///
     732              :     /// The return value is either given by the closure, or set to the `Default`
     733              :     /// impl's output.
     734            0 :     async fn map_all_timestamps<T: Default>(
     735            0 :         &self,
     736            0 :         probe_lsn: Lsn,
     737            0 :         ctx: &RequestContext,
     738            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     739            0 :     ) -> Result<T, PageReconstructError> {
     740            0 :         for segno in self
     741            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     742            0 :             .await?
     743              :         {
     744            0 :             let nblocks = self
     745            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     746            0 :                 .await?;
     747            0 :             for blknum in (0..nblocks).rev() {
     748            0 :                 let clog_page = self
     749            0 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     750            0 :                     .await?;
     751              : 
     752            0 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     753            0 :                     let mut timestamp_bytes = [0u8; 8];
     754            0 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     755            0 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     756            0 : 
     757            0 :                     match f(timestamp) {
     758            0 :                         ControlFlow::Break(b) => return Ok(b),
     759            0 :                         ControlFlow::Continue(()) => (),
     760              :                     }
     761            0 :                 }
     762              :             }
     763              :         }
     764            0 :         Ok(Default::default())
     765            0 :     }
     766              : 
     767            0 :     pub(crate) async fn get_slru_keyspace(
     768            0 :         &self,
     769            0 :         version: Version<'_>,
     770            0 :         ctx: &RequestContext,
     771            0 :     ) -> Result<KeySpace, PageReconstructError> {
     772            0 :         let mut accum = KeySpaceAccum::new();
     773              : 
     774            0 :         for kind in SlruKind::iter() {
     775            0 :             let mut segments: Vec<u32> = self
     776            0 :                 .list_slru_segments(kind, version, ctx)
     777            0 :                 .await?
     778            0 :                 .into_iter()
     779            0 :                 .collect();
     780            0 :             segments.sort_unstable();
     781              : 
     782            0 :             for seg in segments {
     783            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     784              : 
     785            0 :                 accum.add_range(
     786            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     787            0 :                 );
     788              :             }
     789              :         }
     790              : 
     791            0 :         Ok(accum.to_keyspace())
     792            0 :     }
     793              : 
     794              :     /// Get a list of SLRU segments
     795            0 :     pub(crate) async fn list_slru_segments(
     796            0 :         &self,
     797            0 :         kind: SlruKind,
     798            0 :         version: Version<'_>,
     799            0 :         ctx: &RequestContext,
     800            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     801            0 :         // fetch directory entry
     802            0 :         let key = slru_dir_to_key(kind);
     803              : 
     804            0 :         let buf = version.get(self, key, ctx).await?;
     805            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
     806            0 :     }
     807              : 
     808            0 :     pub(crate) async fn get_relmap_file(
     809            0 :         &self,
     810            0 :         spcnode: Oid,
     811            0 :         dbnode: Oid,
     812            0 :         version: Version<'_>,
     813            0 :         ctx: &RequestContext,
     814            0 :     ) -> Result<Bytes, PageReconstructError> {
     815            0 :         let key = relmap_file_key(spcnode, dbnode);
     816              : 
     817            0 :         let buf = version.get(self, key, ctx).await?;
     818            0 :         Ok(buf)
     819            0 :     }
     820              : 
     821          296 :     pub(crate) async fn list_dbdirs(
     822          296 :         &self,
     823          296 :         lsn: Lsn,
     824          296 :         ctx: &RequestContext,
     825          296 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     826              :         // fetch directory entry
     827          296 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     828              : 
     829          296 :         Ok(DbDirectory::des(&buf)?.dbdirs)
     830          296 :     }
     831              : 
     832            0 :     pub(crate) async fn get_twophase_file(
     833            0 :         &self,
     834            0 :         xid: u64,
     835            0 :         lsn: Lsn,
     836            0 :         ctx: &RequestContext,
     837            0 :     ) -> Result<Bytes, PageReconstructError> {
     838            0 :         let key = twophase_file_key(xid);
     839            0 :         let buf = self.get(key, lsn, ctx).await?;
     840            0 :         Ok(buf)
     841            0 :     }
     842              : 
     843          298 :     pub(crate) async fn list_twophase_files(
     844          298 :         &self,
     845          298 :         lsn: Lsn,
     846          298 :         ctx: &RequestContext,
     847          298 :     ) -> Result<HashSet<u64>, PageReconstructError> {
     848              :         // fetch directory entry
     849          298 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     850              : 
     851          298 :         if self.pg_version >= 17 {
     852            0 :             Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
     853              :         } else {
     854          298 :             Ok(TwoPhaseDirectory::des(&buf)?
     855              :                 .xids
     856          298 :                 .iter()
     857          298 :                 .map(|x| u64::from(*x))
     858          298 :                 .collect())
     859              :         }
     860          298 :     }
     861              : 
     862            0 :     pub(crate) async fn get_control_file(
     863            0 :         &self,
     864            0 :         lsn: Lsn,
     865            0 :         ctx: &RequestContext,
     866            0 :     ) -> Result<Bytes, PageReconstructError> {
     867            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     868            0 :     }
     869              : 
     870           12 :     pub(crate) async fn get_checkpoint(
     871           12 :         &self,
     872           12 :         lsn: Lsn,
     873           12 :         ctx: &RequestContext,
     874           12 :     ) -> Result<Bytes, PageReconstructError> {
     875           12 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     876           12 :     }
     877              : 
     878           12 :     async fn list_aux_files_v2(
     879           12 :         &self,
     880           12 :         lsn: Lsn,
     881           12 :         ctx: &RequestContext,
     882           12 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     883           12 :         let kv = self
     884           12 :             .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
     885           12 :             .await?;
     886           12 :         let mut result = HashMap::new();
     887           12 :         let mut sz = 0;
     888           30 :         for (_, v) in kv {
     889           18 :             let v = v?;
     890           18 :             let v = aux_file::decode_file_value_bytes(&v)
     891           18 :                 .context("value decode")
     892           18 :                 .map_err(PageReconstructError::Other)?;
     893           34 :             for (fname, content) in v {
     894           16 :                 sz += fname.len();
     895           16 :                 sz += content.len();
     896           16 :                 result.insert(fname, content);
     897           16 :             }
     898              :         }
     899           12 :         self.aux_file_size_estimator.on_initial(sz);
     900           12 :         Ok(result)
     901           12 :     }
     902              : 
     903            0 :     pub(crate) async fn trigger_aux_file_size_computation(
     904            0 :         &self,
     905            0 :         lsn: Lsn,
     906            0 :         ctx: &RequestContext,
     907            0 :     ) -> Result<(), PageReconstructError> {
     908            0 :         self.list_aux_files_v2(lsn, ctx).await?;
     909            0 :         Ok(())
     910            0 :     }
     911              : 
     912           12 :     pub(crate) async fn list_aux_files(
     913           12 :         &self,
     914           12 :         lsn: Lsn,
     915           12 :         ctx: &RequestContext,
     916           12 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
     917           12 :         self.list_aux_files_v2(lsn, ctx).await
     918           12 :     }
     919              : 
     920            0 :     pub(crate) async fn get_replorigins(
     921            0 :         &self,
     922            0 :         lsn: Lsn,
     923            0 :         ctx: &RequestContext,
     924            0 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
     925            0 :         let kv = self
     926            0 :             .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
     927            0 :             .await?;
     928            0 :         let mut result = HashMap::new();
     929            0 :         for (k, v) in kv {
     930            0 :             let v = v?;
     931            0 :             let origin_id = k.field6 as RepOriginId;
     932            0 :             let origin_lsn = Lsn::des(&v).unwrap();
     933            0 :             if origin_lsn != Lsn::INVALID {
     934            0 :                 result.insert(origin_id, origin_lsn);
     935            0 :             }
     936              :         }
     937            0 :         Ok(result)
     938            0 :     }
     939              : 
     940              :     /// Does the same as get_current_logical_size but counted on demand.
     941              :     /// Used to initialize the logical size tracking on startup.
     942              :     ///
     943              :     /// Only relation blocks are counted currently. That excludes metadata,
     944              :     /// SLRUs, twophase files etc.
     945              :     ///
     946              :     /// # Cancel-Safety
     947              :     ///
     948              :     /// This method is cancellation-safe.
     949            0 :     pub(crate) async fn get_current_logical_size_non_incremental(
     950            0 :         &self,
     951            0 :         lsn: Lsn,
     952            0 :         ctx: &RequestContext,
     953            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
     954            0 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     955              : 
     956              :         // Fetch list of database dirs and iterate them
     957            0 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     958            0 :         let dbdir = DbDirectory::des(&buf)?;
     959              : 
     960            0 :         let mut total_size: u64 = 0;
     961            0 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     962            0 :             for rel in self
     963            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
     964            0 :                 .await?
     965              :             {
     966            0 :                 if self.cancel.is_cancelled() {
     967            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
     968            0 :                 }
     969            0 :                 let relsize_key = rel_size_to_key(rel);
     970            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     971            0 :                 let relsize = buf.get_u32_le();
     972            0 : 
     973            0 :                 total_size += relsize as u64;
     974              :             }
     975              :         }
     976            0 :         Ok(total_size * BLCKSZ as u64)
     977            0 :     }
     978              : 
     979              :     /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
     980              :     /// for gc-compaction.
     981              :     ///
     982              :     /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
     983              :     /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
     984              :     /// be kept only for a specific range of LSN.
     985              :     ///
     986              :     /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
     987              :     /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
     988              :     /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
     989              :     /// determine which keys to retain/drop for gc-compaction.
     990              :     ///
     991              :     /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
     992              :     /// to be retained for each of the branch LSN.
     993              :     ///
     994              :     /// The return value is (dense keyspace, sparse keyspace).
     995           52 :     pub(crate) async fn collect_gc_compaction_keyspace(
     996           52 :         &self,
     997           52 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
     998           52 :         let metadata_key_begin = Key::metadata_key_range().start;
     999           52 :         let aux_v1_key = AUX_FILES_KEY;
    1000           52 :         let dense_keyspace = KeySpace {
    1001           52 :             ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
    1002           52 :         };
    1003           52 :         Ok((
    1004           52 :             dense_keyspace,
    1005           52 :             SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
    1006           52 :         ))
    1007           52 :     }
    1008              : 
    1009              :     ///
    1010              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
    1011              :     /// Anything that's not listed maybe removed from the underlying storage (from
    1012              :     /// that LSN forwards).
    1013              :     ///
    1014              :     /// The return value is (dense keyspace, sparse keyspace).
    1015          296 :     pub(crate) async fn collect_keyspace(
    1016          296 :         &self,
    1017          296 :         lsn: Lsn,
    1018          296 :         ctx: &RequestContext,
    1019          296 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1020          296 :         // Iterate through key ranges, greedily packing them into partitions
    1021          296 :         let mut result = KeySpaceAccum::new();
    1022          296 : 
    1023          296 :         // The dbdir metadata always exists
    1024          296 :         result.add_key(DBDIR_KEY);
    1025              : 
    1026              :         // Fetch list of database dirs and iterate them
    1027          296 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
    1028          296 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
    1029          296 : 
    1030          296 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
    1031          296 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
    1032            0 :             if has_relmap_file {
    1033            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
    1034            0 :             }
    1035            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
    1036              : 
    1037            0 :             let mut rels: Vec<RelTag> = self
    1038            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
    1039            0 :                 .await?
    1040            0 :                 .into_iter()
    1041            0 :                 .collect();
    1042            0 :             rels.sort_unstable();
    1043            0 :             for rel in rels {
    1044            0 :                 let relsize_key = rel_size_to_key(rel);
    1045            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1046            0 :                 let relsize = buf.get_u32_le();
    1047            0 : 
    1048            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
    1049            0 :                 result.add_key(relsize_key);
    1050              :             }
    1051              :         }
    1052              : 
    1053              :         // Iterate SLRUs next
    1054          296 :         if self.tenant_shard_id.is_shard_zero() {
    1055          870 :             for kind in [
    1056          290 :                 SlruKind::Clog,
    1057          290 :                 SlruKind::MultiXactMembers,
    1058          290 :                 SlruKind::MultiXactOffsets,
    1059              :             ] {
    1060          870 :                 let slrudir_key = slru_dir_to_key(kind);
    1061          870 :                 result.add_key(slrudir_key);
    1062          870 :                 let buf = self.get(slrudir_key, lsn, ctx).await?;
    1063          870 :                 let dir = SlruSegmentDirectory::des(&buf)?;
    1064          870 :                 let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
    1065          870 :                 segments.sort_unstable();
    1066          870 :                 for segno in segments {
    1067            0 :                     let segsize_key = slru_segment_size_to_key(kind, segno);
    1068            0 :                     let mut buf = self.get(segsize_key, lsn, ctx).await?;
    1069            0 :                     let segsize = buf.get_u32_le();
    1070            0 : 
    1071            0 :                     result.add_range(
    1072            0 :                         slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
    1073            0 :                     );
    1074            0 :                     result.add_key(segsize_key);
    1075              :                 }
    1076              :             }
    1077            6 :         }
    1078              : 
    1079              :         // Then pg_twophase
    1080          296 :         result.add_key(TWOPHASEDIR_KEY);
    1081              : 
    1082          296 :         let mut xids: Vec<u64> = self
    1083          296 :             .list_twophase_files(lsn, ctx)
    1084          296 :             .await?
    1085          296 :             .iter()
    1086          296 :             .cloned()
    1087          296 :             .collect();
    1088          296 :         xids.sort_unstable();
    1089          296 :         for xid in xids {
    1090            0 :             result.add_key(twophase_file_key(xid));
    1091            0 :         }
    1092              : 
    1093          296 :         result.add_key(CONTROLFILE_KEY);
    1094          296 :         result.add_key(CHECKPOINT_KEY);
    1095          296 : 
    1096          296 :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
    1097          296 :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
    1098          296 :         // and the keys will not be garbage-colllected.
    1099          296 :         #[cfg(test)]
    1100          296 :         {
    1101          296 :             let guard = self.extra_test_dense_keyspace.load();
    1102          296 :             for kr in &guard.ranges {
    1103            0 :                 result.add_range(kr.clone());
    1104            0 :             }
    1105            0 :         }
    1106            0 : 
    1107          296 :         let dense_keyspace = result.to_keyspace();
    1108          296 :         let sparse_keyspace = SparseKeySpace(KeySpace {
    1109          296 :             ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
    1110          296 :         });
    1111          296 : 
    1112          296 :         if cfg!(debug_assertions) {
    1113              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
    1114              : 
    1115              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
    1116              :             // category of sparse keys are split into their own image/delta files. If there
    1117              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
    1118              :             // and we want the developer to keep the keyspaces separated.
    1119              : 
    1120          296 :             let ranges = &sparse_keyspace.0.ranges;
    1121              : 
    1122              :             // TODO: use a single overlaps_with across the codebase
    1123          296 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
    1124          296 :                 !(a.end <= b.start || b.end <= a.start)
    1125          296 :             }
    1126          592 :             for i in 0..ranges.len() {
    1127          592 :                 for j in 0..i {
    1128          296 :                     if overlaps_with(&ranges[i], &ranges[j]) {
    1129            0 :                         panic!(
    1130            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
    1131            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
    1132            0 :                         );
    1133          296 :                     }
    1134              :                 }
    1135              :             }
    1136          296 :             for i in 1..ranges.len() {
    1137          296 :                 assert!(
    1138          296 :                     ranges[i - 1].end <= ranges[i].start,
    1139            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
    1140            0 :                     ranges[i - 1].start,
    1141            0 :                     ranges[i - 1].end,
    1142            0 :                     ranges[i].start,
    1143            0 :                     ranges[i].end
    1144              :                 );
    1145              :             }
    1146            0 :         }
    1147              : 
    1148          296 :         Ok((dense_keyspace, sparse_keyspace))
    1149          296 :     }
    1150              : 
    1151              :     /// Get cached size of relation if it not updated after specified LSN
    1152       448540 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
    1153       448540 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
    1154       448540 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
    1155       448518 :             if lsn >= *cached_lsn {
    1156       443372 :                 RELSIZE_CACHE_HITS.inc();
    1157       443372 :                 return Some(*nblocks);
    1158         5146 :             }
    1159         5146 :             RELSIZE_CACHE_MISSES_OLD.inc();
    1160           22 :         }
    1161         5168 :         RELSIZE_CACHE_MISSES.inc();
    1162         5168 :         None
    1163       448540 :     }
    1164              : 
    1165              :     /// Update cached relation size if there is no more recent update
    1166         5136 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1167         5136 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1168         5136 : 
    1169         5136 :         if lsn < rel_size_cache.complete_as_of {
    1170              :             // Do not cache old values. It's safe to cache the size on read, as long as
    1171              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
    1172              :             // never evict values from the cache, so if the relation size changed after
    1173              :             // 'lsn', the new value is already in the cache.
    1174            0 :             return;
    1175         5136 :         }
    1176         5136 : 
    1177         5136 :         match rel_size_cache.map.entry(tag) {
    1178         5136 :             hash_map::Entry::Occupied(mut entry) => {
    1179         5136 :                 let cached_lsn = entry.get_mut();
    1180         5136 :                 if lsn >= cached_lsn.0 {
    1181            0 :                     *cached_lsn = (lsn, nblocks);
    1182         5136 :                 }
    1183              :             }
    1184            0 :             hash_map::Entry::Vacant(entry) => {
    1185            0 :                 entry.insert((lsn, nblocks));
    1186            0 :                 RELSIZE_CACHE_ENTRIES.inc();
    1187            0 :             }
    1188              :         }
    1189         5136 :     }
    1190              : 
    1191              :     /// Store cached relation size
    1192       282720 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1193       282720 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1194       282720 :         if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
    1195         1920 :             RELSIZE_CACHE_ENTRIES.inc();
    1196       280800 :         }
    1197       282720 :     }
    1198              : 
    1199              :     /// Remove cached relation size
    1200            2 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1201            2 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1202            2 :         if rel_size_cache.map.remove(tag).is_some() {
    1203            2 :             RELSIZE_CACHE_ENTRIES.dec();
    1204            2 :         }
    1205            2 :     }
    1206              : }
    1207              : 
    1208              : /// DatadirModification represents an operation to ingest an atomic set of
    1209              : /// updates to the repository.
    1210              : ///
    1211              : /// It is created by the 'begin_record' function. It is called for each WAL
    1212              : /// record, so that all the modifications by a one WAL record appear atomic.
    1213              : pub struct DatadirModification<'a> {
    1214              :     /// The timeline this modification applies to. You can access this to
    1215              :     /// read the state, but note that any pending updates are *not* reflected
    1216              :     /// in the state in 'tline' yet.
    1217              :     pub tline: &'a Timeline,
    1218              : 
    1219              :     /// Current LSN of the modification
    1220              :     lsn: Lsn,
    1221              : 
    1222              :     // The modifications are not applied directly to the underlying key-value store.
    1223              :     // The put-functions add the modifications here, and they are flushed to the
    1224              :     // underlying key-value store by the 'finish' function.
    1225              :     pending_lsns: Vec<Lsn>,
    1226              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1227              :     pending_nblocks: i64,
    1228              : 
    1229              :     /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
    1230              :     /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
    1231              :     pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
    1232              : 
    1233              :     /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
    1234              :     /// which keys are stored here.
    1235              :     pending_data_batch: Option<SerializedValueBatch>,
    1236              : 
    1237              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1238              :     /// if it was updated in this modification.
    1239              :     pending_directory_entries: Vec<(DirectoryKind, usize)>,
    1240              : 
    1241              :     /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
    1242              :     pending_metadata_bytes: usize,
    1243              : }
    1244              : 
    1245              : impl DatadirModification<'_> {
    1246              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1247              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1248              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1249              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1250              : 
    1251              :     /// Get the current lsn
    1252       418058 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1253       418058 :         self.lsn
    1254       418058 :     }
    1255              : 
    1256            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1257            0 :         self.pending_data_batch
    1258            0 :             .as_ref()
    1259            0 :             .map_or(0, |b| b.buffer_size())
    1260            0 :             + self.pending_metadata_bytes
    1261            0 :     }
    1262              : 
    1263            0 :     pub(crate) fn has_dirty_data(&self) -> bool {
    1264            0 :         self.pending_data_batch
    1265            0 :             .as_ref()
    1266            0 :             .is_some_and(|b| b.has_data())
    1267            0 :     }
    1268              : 
    1269              :     /// Set the current lsn
    1270       145858 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
    1271       145858 :         ensure!(
    1272       145858 :             lsn >= self.lsn,
    1273            0 :             "setting an older lsn {} than {} is not allowed",
    1274              :             lsn,
    1275              :             self.lsn
    1276              :         );
    1277              : 
    1278       145858 :         if lsn > self.lsn {
    1279       145858 :             self.pending_lsns.push(self.lsn);
    1280       145858 :             self.lsn = lsn;
    1281       145858 :         }
    1282       145858 :         Ok(())
    1283       145858 :     }
    1284              : 
    1285              :     /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
    1286              :     /// keys that represent literal blocks that postgres can read.  So data includes relation blocks and
    1287              :     /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
    1288              :     ///
    1289              :     /// The distinction is important because data keys are handled on a fast path where dirty writes are
    1290              :     /// not readable until this modification is committed, whereas metadata keys are visible for read
    1291              :     /// via [`Self::get`] as soon as their record has been ingested.
    1292       850442 :     fn is_data_key(key: &Key) -> bool {
    1293       850442 :         key.is_rel_block_key() || key.is_slru_block_key()
    1294       850442 :     }
    1295              : 
    1296              :     /// Initialize a completely new repository.
    1297              :     ///
    1298              :     /// This inserts the directory metadata entries that are assumed to
    1299              :     /// always exist.
    1300          182 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1301          182 :         let buf = DbDirectory::ser(&DbDirectory {
    1302          182 :             dbdirs: HashMap::new(),
    1303          182 :         })?;
    1304          182 :         self.pending_directory_entries.push((DirectoryKind::Db, 0));
    1305          182 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1306              : 
    1307          182 :         let buf = if self.tline.pg_version >= 17 {
    1308            0 :             TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
    1309            0 :                 xids: HashSet::new(),
    1310            0 :             })
    1311              :         } else {
    1312          182 :             TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1313          182 :                 xids: HashSet::new(),
    1314          182 :             })
    1315            0 :         }?;
    1316          182 :         self.pending_directory_entries
    1317          182 :             .push((DirectoryKind::TwoPhase, 0));
    1318          182 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1319              : 
    1320          182 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1321          182 :         let empty_dir = Value::Image(buf);
    1322          182 : 
    1323          182 :         // Initialize SLRUs on shard 0 only: creating these on other shards would be
    1324          182 :         // harmless but they'd just be dropped on later compaction.
    1325          182 :         if self.tline.tenant_shard_id.is_shard_zero() {
    1326          176 :             self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1327          176 :             self.pending_directory_entries
    1328          176 :                 .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1329          176 :             self.put(
    1330          176 :                 slru_dir_to_key(SlruKind::MultiXactMembers),
    1331          176 :                 empty_dir.clone(),
    1332          176 :             );
    1333          176 :             self.pending_directory_entries
    1334          176 :                 .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
    1335          176 :             self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1336          176 :             self.pending_directory_entries
    1337          176 :                 .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
    1338          176 :         }
    1339              : 
    1340          182 :         Ok(())
    1341          182 :     }
    1342              : 
    1343              :     #[cfg(test)]
    1344          180 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1345          180 :         self.init_empty()?;
    1346          180 :         self.put_control_file(bytes::Bytes::from_static(
    1347          180 :             b"control_file contents do not matter",
    1348          180 :         ))
    1349          180 :         .context("put_control_file")?;
    1350          180 :         self.put_checkpoint(bytes::Bytes::from_static(
    1351          180 :             b"checkpoint_file contents do not matter",
    1352          180 :         ))
    1353          180 :         .context("put_checkpoint_file")?;
    1354          180 :         Ok(())
    1355          180 :     }
    1356              : 
    1357              :     /// Creates a relation if it is not already present.
    1358              :     /// Returns the current size of the relation
    1359       418056 :     pub(crate) async fn create_relation_if_required(
    1360       418056 :         &mut self,
    1361       418056 :         rel: RelTag,
    1362       418056 :         ctx: &RequestContext,
    1363       418056 :     ) -> Result<u32, PageReconstructError> {
    1364              :         // Get current size and put rel creation if rel doesn't exist
    1365              :         //
    1366              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1367              :         //       check the cache too. This is because eagerly checking the cache results in
    1368              :         //       less work overall and 10% better performance. It's more work on cache miss
    1369              :         //       but cache miss is rare.
    1370       418056 :         if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
    1371       418046 :             Ok(nblocks)
    1372           10 :         } else if !self
    1373           10 :             .tline
    1374           10 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1375           10 :             .await?
    1376              :         {
    1377              :             // create it with 0 size initially, the logic below will extend it
    1378           10 :             self.put_rel_creation(rel, 0, ctx)
    1379           10 :                 .await
    1380           10 :                 .context("Relation Error")?;
    1381           10 :             Ok(0)
    1382              :         } else {
    1383            0 :             self.tline
    1384            0 :                 .get_rel_size(rel, Version::Modified(self), ctx)
    1385            0 :                 .await
    1386              :         }
    1387       418056 :     }
    1388              : 
    1389              :     /// Given a block number for a relation (which represents a newly written block),
    1390              :     /// the previous block count of the relation, and the shard info, find the gaps
    1391              :     /// that were created by the newly written block if any.
    1392       145670 :     fn find_gaps(
    1393       145670 :         rel: RelTag,
    1394       145670 :         blkno: u32,
    1395       145670 :         previous_nblocks: u32,
    1396       145670 :         shard: &ShardIdentity,
    1397       145670 :     ) -> Option<KeySpace> {
    1398       145670 :         let mut key = rel_block_to_key(rel, blkno);
    1399       145670 :         let mut gap_accum = None;
    1400              : 
    1401       145670 :         for gap_blkno in previous_nblocks..blkno {
    1402           32 :             key.field6 = gap_blkno;
    1403           32 : 
    1404           32 :             if shard.get_shard_number(&key) != shard.number {
    1405            8 :                 continue;
    1406           24 :             }
    1407           24 : 
    1408           24 :             gap_accum
    1409           24 :                 .get_or_insert_with(KeySpaceAccum::new)
    1410           24 :                 .add_key(key);
    1411              :         }
    1412              : 
    1413       145670 :         gap_accum.map(|accum| accum.to_keyspace())
    1414       145670 :     }
    1415              : 
    1416       145852 :     pub async fn ingest_batch(
    1417       145852 :         &mut self,
    1418       145852 :         mut batch: SerializedValueBatch,
    1419       145852 :         // TODO(vlad): remove this argument and replace the shard check with is_key_local
    1420       145852 :         shard: &ShardIdentity,
    1421       145852 :         ctx: &RequestContext,
    1422       145852 :     ) -> anyhow::Result<()> {
    1423       145852 :         let mut gaps_at_lsns = Vec::default();
    1424              : 
    1425       145852 :         for meta in batch.metadata.iter() {
    1426       145642 :             let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
    1427       145642 :             let new_nblocks = blkno + 1;
    1428              : 
    1429       145642 :             let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
    1430       145642 :             if new_nblocks > old_nblocks {
    1431         2390 :                 self.put_rel_extend(rel, new_nblocks, ctx).await?;
    1432       143252 :             }
    1433              : 
    1434       145642 :             if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
    1435            0 :                 gaps_at_lsns.push((gaps, meta.lsn()));
    1436       145642 :             }
    1437              :         }
    1438              : 
    1439       145852 :         if !gaps_at_lsns.is_empty() {
    1440            0 :             batch.zero_gaps(gaps_at_lsns);
    1441       145852 :         }
    1442              : 
    1443       145852 :         match self.pending_data_batch.as_mut() {
    1444           20 :             Some(pending_batch) => {
    1445           20 :                 pending_batch.extend(batch);
    1446           20 :             }
    1447       145832 :             None if batch.has_data() => {
    1448       145630 :                 self.pending_data_batch = Some(batch);
    1449       145630 :             }
    1450          202 :             None => {
    1451          202 :                 // Nothing to initialize the batch with
    1452          202 :             }
    1453              :         }
    1454              : 
    1455       145852 :         Ok(())
    1456       145852 :     }
    1457              : 
    1458              :     /// Put a new page version that can be constructed from a WAL record
    1459              :     ///
    1460              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1461              :     /// current end-of-file. It's up to the caller to check that the relation size
    1462              :     /// matches the blocks inserted!
    1463           12 :     pub fn put_rel_wal_record(
    1464           12 :         &mut self,
    1465           12 :         rel: RelTag,
    1466           12 :         blknum: BlockNumber,
    1467           12 :         rec: NeonWalRecord,
    1468           12 :     ) -> anyhow::Result<()> {
    1469           12 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1470           12 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1471           12 :         Ok(())
    1472           12 :     }
    1473              : 
    1474              :     // Same, but for an SLRU.
    1475            8 :     pub fn put_slru_wal_record(
    1476            8 :         &mut self,
    1477            8 :         kind: SlruKind,
    1478            8 :         segno: u32,
    1479            8 :         blknum: BlockNumber,
    1480            8 :         rec: NeonWalRecord,
    1481            8 :     ) -> anyhow::Result<()> {
    1482            8 :         if !self.tline.tenant_shard_id.is_shard_zero() {
    1483            0 :             return Ok(());
    1484            8 :         }
    1485            8 : 
    1486            8 :         self.put(
    1487            8 :             slru_block_to_key(kind, segno, blknum),
    1488            8 :             Value::WalRecord(rec),
    1489            8 :         );
    1490            8 :         Ok(())
    1491            8 :     }
    1492              : 
    1493              :     /// Like put_wal_record, but with ready-made image of the page.
    1494       277842 :     pub fn put_rel_page_image(
    1495       277842 :         &mut self,
    1496       277842 :         rel: RelTag,
    1497       277842 :         blknum: BlockNumber,
    1498       277842 :         img: Bytes,
    1499       277842 :     ) -> anyhow::Result<()> {
    1500       277842 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1501       277842 :         let key = rel_block_to_key(rel, blknum);
    1502       277842 :         if !key.is_valid_key_on_write_path() {
    1503            0 :             anyhow::bail!(
    1504            0 :                 "the request contains data not supported by pageserver at {}",
    1505            0 :                 key
    1506            0 :             );
    1507       277842 :         }
    1508       277842 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1509       277842 :         Ok(())
    1510       277842 :     }
    1511              : 
    1512            6 :     pub fn put_slru_page_image(
    1513            6 :         &mut self,
    1514            6 :         kind: SlruKind,
    1515            6 :         segno: u32,
    1516            6 :         blknum: BlockNumber,
    1517            6 :         img: Bytes,
    1518            6 :     ) -> anyhow::Result<()> {
    1519            6 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1520              : 
    1521            6 :         let key = slru_block_to_key(kind, segno, blknum);
    1522            6 :         if !key.is_valid_key_on_write_path() {
    1523            0 :             anyhow::bail!(
    1524            0 :                 "the request contains data not supported by pageserver at {}",
    1525            0 :                 key
    1526            0 :             );
    1527            6 :         }
    1528            6 :         self.put(key, Value::Image(img));
    1529            6 :         Ok(())
    1530            6 :     }
    1531              : 
    1532         2998 :     pub(crate) fn put_rel_page_image_zero(
    1533         2998 :         &mut self,
    1534         2998 :         rel: RelTag,
    1535         2998 :         blknum: BlockNumber,
    1536         2998 :     ) -> anyhow::Result<()> {
    1537         2998 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1538         2998 :         let key = rel_block_to_key(rel, blknum);
    1539         2998 :         if !key.is_valid_key_on_write_path() {
    1540            0 :             anyhow::bail!(
    1541            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1542            0 :                 key,
    1543            0 :                 self.lsn
    1544            0 :             );
    1545         2998 :         }
    1546         2998 : 
    1547         2998 :         let batch = self
    1548         2998 :             .pending_data_batch
    1549         2998 :             .get_or_insert_with(SerializedValueBatch::default);
    1550         2998 : 
    1551         2998 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1552         2998 : 
    1553         2998 :         Ok(())
    1554         2998 :     }
    1555              : 
    1556            0 :     pub(crate) fn put_slru_page_image_zero(
    1557            0 :         &mut self,
    1558            0 :         kind: SlruKind,
    1559            0 :         segno: u32,
    1560            0 :         blknum: BlockNumber,
    1561            0 :     ) -> anyhow::Result<()> {
    1562            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1563            0 :         let key = slru_block_to_key(kind, segno, blknum);
    1564            0 :         if !key.is_valid_key_on_write_path() {
    1565            0 :             anyhow::bail!(
    1566            0 :                 "the request contains data not supported by pageserver: {} @ {}",
    1567            0 :                 key,
    1568            0 :                 self.lsn
    1569            0 :             );
    1570            0 :         }
    1571            0 : 
    1572            0 :         let batch = self
    1573            0 :             .pending_data_batch
    1574            0 :             .get_or_insert_with(SerializedValueBatch::default);
    1575            0 : 
    1576            0 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1577            0 : 
    1578            0 :         Ok(())
    1579            0 :     }
    1580              : 
    1581              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1582           16 :     pub async fn put_relmap_file(
    1583           16 :         &mut self,
    1584           16 :         spcnode: Oid,
    1585           16 :         dbnode: Oid,
    1586           16 :         img: Bytes,
    1587           16 :         ctx: &RequestContext,
    1588           16 :     ) -> anyhow::Result<()> {
    1589              :         // Add it to the directory (if it doesn't exist already)
    1590           16 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1591           16 :         let mut dbdir = DbDirectory::des(&buf)?;
    1592              : 
    1593           16 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1594           16 :         if r.is_none() || r == Some(false) {
    1595              :             // The dbdir entry didn't exist, or it contained a
    1596              :             // 'false'. The 'insert' call already updated it with
    1597              :             // 'true', now write the updated 'dbdirs' map back.
    1598           16 :             let buf = DbDirectory::ser(&dbdir)?;
    1599           16 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1600            0 :         }
    1601           16 :         if r.is_none() {
    1602              :             // Create RelDirectory
    1603            8 :             let buf = RelDirectory::ser(&RelDirectory {
    1604            8 :                 rels: HashSet::new(),
    1605            8 :             })?;
    1606            8 :             self.pending_directory_entries.push((DirectoryKind::Rel, 0));
    1607            8 :             self.put(
    1608            8 :                 rel_dir_to_key(spcnode, dbnode),
    1609            8 :                 Value::Image(Bytes::from(buf)),
    1610            8 :             );
    1611            8 :         }
    1612              : 
    1613           16 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1614           16 :         Ok(())
    1615           16 :     }
    1616              : 
    1617            0 :     pub async fn put_twophase_file(
    1618            0 :         &mut self,
    1619            0 :         xid: u64,
    1620            0 :         img: Bytes,
    1621            0 :         ctx: &RequestContext,
    1622            0 :     ) -> anyhow::Result<()> {
    1623              :         // Add it to the directory entry
    1624            0 :         let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1625            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1626            0 :             let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
    1627            0 :             if !dir.xids.insert(xid) {
    1628            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1629            0 :             }
    1630            0 :             self.pending_directory_entries
    1631            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1632            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1633              :         } else {
    1634            0 :             let xid = xid as u32;
    1635            0 :             let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
    1636            0 :             if !dir.xids.insert(xid) {
    1637            0 :                 anyhow::bail!("twophase file for xid {} already exists", xid);
    1638            0 :             }
    1639            0 :             self.pending_directory_entries
    1640            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1641            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1642              :         };
    1643            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1644            0 : 
    1645            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1646            0 :         Ok(())
    1647            0 :     }
    1648              : 
    1649            0 :     pub async fn set_replorigin(
    1650            0 :         &mut self,
    1651            0 :         origin_id: RepOriginId,
    1652            0 :         origin_lsn: Lsn,
    1653            0 :     ) -> anyhow::Result<()> {
    1654            0 :         let key = repl_origin_key(origin_id);
    1655            0 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    1656            0 :         Ok(())
    1657            0 :     }
    1658              : 
    1659            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
    1660            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    1661            0 :     }
    1662              : 
    1663          182 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
    1664          182 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1665          182 :         Ok(())
    1666          182 :     }
    1667              : 
    1668          196 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
    1669          196 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1670          196 :         Ok(())
    1671          196 :     }
    1672              : 
    1673            0 :     pub async fn drop_dbdir(
    1674            0 :         &mut self,
    1675            0 :         spcnode: Oid,
    1676            0 :         dbnode: Oid,
    1677            0 :         ctx: &RequestContext,
    1678            0 :     ) -> anyhow::Result<()> {
    1679            0 :         let total_blocks = self
    1680            0 :             .tline
    1681            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1682            0 :             .await?;
    1683              : 
    1684              :         // Remove entry from dbdir
    1685            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1686            0 :         let mut dir = DbDirectory::des(&buf)?;
    1687            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1688            0 :             let buf = DbDirectory::ser(&dir)?;
    1689            0 :             self.pending_directory_entries
    1690            0 :                 .push((DirectoryKind::Db, dir.dbdirs.len()));
    1691            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1692              :         } else {
    1693            0 :             warn!(
    1694            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1695              :                 spcnode, dbnode
    1696              :             );
    1697              :         }
    1698              : 
    1699              :         // Update logical database size.
    1700            0 :         self.pending_nblocks -= total_blocks as i64;
    1701            0 : 
    1702            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1703            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1704            0 :         Ok(())
    1705            0 :     }
    1706              : 
    1707              :     /// Create a relation fork.
    1708              :     ///
    1709              :     /// 'nblocks' is the initial size.
    1710         1920 :     pub async fn put_rel_creation(
    1711         1920 :         &mut self,
    1712         1920 :         rel: RelTag,
    1713         1920 :         nblocks: BlockNumber,
    1714         1920 :         ctx: &RequestContext,
    1715         1920 :     ) -> Result<(), RelationError> {
    1716         1920 :         if rel.relnode == 0 {
    1717            0 :             return Err(RelationError::InvalidRelnode);
    1718         1920 :         }
    1719              :         // It's possible that this is the first rel for this db in this
    1720              :         // tablespace.  Create the reldir entry for it if so.
    1721         1920 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
    1722         1920 :             .context("deserialize db")?;
    1723         1920 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1724         1920 :         let mut rel_dir =
    1725         1920 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1726              :                 // Didn't exist. Update dbdir
    1727            8 :                 e.insert(false);
    1728            8 :                 let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
    1729            8 :                 self.pending_directory_entries
    1730            8 :                     .push((DirectoryKind::Db, dbdir.dbdirs.len()));
    1731            8 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1732            8 : 
    1733            8 :                 // and create the RelDirectory
    1734            8 :                 RelDirectory::default()
    1735              :             } else {
    1736              :                 // reldir already exists, fetch it
    1737         1912 :                 RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
    1738         1912 :                     .context("deserialize db")?
    1739              :             };
    1740              : 
    1741              :         // Add the new relation to the rel directory entry, and write it back
    1742         1920 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    1743            0 :             return Err(RelationError::AlreadyExists);
    1744         1920 :         }
    1745         1920 : 
    1746         1920 :         self.pending_directory_entries
    1747         1920 :             .push((DirectoryKind::Rel, rel_dir.rels.len()));
    1748         1920 : 
    1749         1920 :         self.put(
    1750         1920 :             rel_dir_key,
    1751         1920 :             Value::Image(Bytes::from(
    1752         1920 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
    1753              :             )),
    1754              :         );
    1755              : 
    1756              :         // Put size
    1757         1920 :         let size_key = rel_size_to_key(rel);
    1758         1920 :         let buf = nblocks.to_le_bytes();
    1759         1920 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1760         1920 : 
    1761         1920 :         self.pending_nblocks += nblocks as i64;
    1762         1920 : 
    1763         1920 :         // Update relation size cache
    1764         1920 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1765         1920 : 
    1766         1920 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    1767         1920 :         // caller.
    1768         1920 :         Ok(())
    1769         1920 :     }
    1770              : 
    1771              :     /// Truncate relation
    1772         6012 :     pub async fn put_rel_truncation(
    1773         6012 :         &mut self,
    1774         6012 :         rel: RelTag,
    1775         6012 :         nblocks: BlockNumber,
    1776         6012 :         ctx: &RequestContext,
    1777         6012 :     ) -> anyhow::Result<()> {
    1778         6012 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1779         6012 :         if self
    1780         6012 :             .tline
    1781         6012 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1782         6012 :             .await?
    1783              :         {
    1784         6012 :             let size_key = rel_size_to_key(rel);
    1785              :             // Fetch the old size first
    1786         6012 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1787         6012 : 
    1788         6012 :             // Update the entry with the new size.
    1789         6012 :             let buf = nblocks.to_le_bytes();
    1790         6012 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1791         6012 : 
    1792         6012 :             // Update relation size cache
    1793         6012 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1794         6012 : 
    1795         6012 :             // Update logical database size.
    1796         6012 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    1797            0 :         }
    1798         6012 :         Ok(())
    1799         6012 :     }
    1800              : 
    1801              :     /// Extend relation
    1802              :     /// If new size is smaller, do nothing.
    1803       276680 :     pub async fn put_rel_extend(
    1804       276680 :         &mut self,
    1805       276680 :         rel: RelTag,
    1806       276680 :         nblocks: BlockNumber,
    1807       276680 :         ctx: &RequestContext,
    1808       276680 :     ) -> anyhow::Result<()> {
    1809       276680 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
    1810              : 
    1811              :         // Put size
    1812       276680 :         let size_key = rel_size_to_key(rel);
    1813       276680 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1814       276680 : 
    1815       276680 :         // only extend relation here. never decrease the size
    1816       276680 :         if nblocks > old_size {
    1817       274788 :             let buf = nblocks.to_le_bytes();
    1818       274788 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1819       274788 : 
    1820       274788 :             // Update relation size cache
    1821       274788 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    1822       274788 : 
    1823       274788 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    1824       274788 :         }
    1825       276680 :         Ok(())
    1826       276680 :     }
    1827              : 
    1828              :     /// Drop some relations
    1829           10 :     pub(crate) async fn put_rel_drops(
    1830           10 :         &mut self,
    1831           10 :         drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
    1832           10 :         ctx: &RequestContext,
    1833           10 :     ) -> anyhow::Result<()> {
    1834           12 :         for ((spc_node, db_node), rel_tags) in drop_relations {
    1835            2 :             let dir_key = rel_dir_to_key(spc_node, db_node);
    1836            2 :             let buf = self.get(dir_key, ctx).await?;
    1837            2 :             let mut dir = RelDirectory::des(&buf)?;
    1838              : 
    1839            2 :             let mut dirty = false;
    1840            4 :             for rel_tag in rel_tags {
    1841            2 :                 if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
    1842            2 :                     dirty = true;
    1843            2 : 
    1844            2 :                     // update logical size
    1845            2 :                     let size_key = rel_size_to_key(rel_tag);
    1846            2 :                     let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1847            2 :                     self.pending_nblocks -= old_size as i64;
    1848            2 : 
    1849            2 :                     // Remove entry from relation size cache
    1850            2 :                     self.tline.remove_cached_rel_size(&rel_tag);
    1851            2 : 
    1852            2 :                     // Delete size entry, as well as all blocks
    1853            2 :                     self.delete(rel_key_range(rel_tag));
    1854            0 :                 }
    1855              :             }
    1856              : 
    1857            2 :             if dirty {
    1858            2 :                 self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1859            2 :                 self.pending_directory_entries
    1860            2 :                     .push((DirectoryKind::Rel, dir.rels.len()));
    1861            0 :             }
    1862              :         }
    1863              : 
    1864           10 :         Ok(())
    1865           10 :     }
    1866              : 
    1867            6 :     pub async fn put_slru_segment_creation(
    1868            6 :         &mut self,
    1869            6 :         kind: SlruKind,
    1870            6 :         segno: u32,
    1871            6 :         nblocks: BlockNumber,
    1872            6 :         ctx: &RequestContext,
    1873            6 :     ) -> anyhow::Result<()> {
    1874            6 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1875              : 
    1876              :         // Add it to the directory entry
    1877            6 :         let dir_key = slru_dir_to_key(kind);
    1878            6 :         let buf = self.get(dir_key, ctx).await?;
    1879            6 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1880              : 
    1881            6 :         if !dir.segments.insert(segno) {
    1882            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1883            6 :         }
    1884            6 :         self.pending_directory_entries
    1885            6 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1886            6 :         self.put(
    1887            6 :             dir_key,
    1888            6 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1889              :         );
    1890              : 
    1891              :         // Put size
    1892            6 :         let size_key = slru_segment_size_to_key(kind, segno);
    1893            6 :         let buf = nblocks.to_le_bytes();
    1894            6 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1895            6 : 
    1896            6 :         // even if nblocks > 0, we don't insert any actual blocks here
    1897            6 : 
    1898            6 :         Ok(())
    1899            6 :     }
    1900              : 
    1901              :     /// Extend SLRU segment
    1902            0 :     pub fn put_slru_extend(
    1903            0 :         &mut self,
    1904            0 :         kind: SlruKind,
    1905            0 :         segno: u32,
    1906            0 :         nblocks: BlockNumber,
    1907            0 :     ) -> anyhow::Result<()> {
    1908            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1909              : 
    1910              :         // Put size
    1911            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    1912            0 :         let buf = nblocks.to_le_bytes();
    1913            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1914            0 :         Ok(())
    1915            0 :     }
    1916              : 
    1917              :     /// This method is used for marking truncated SLRU files
    1918            0 :     pub async fn drop_slru_segment(
    1919            0 :         &mut self,
    1920            0 :         kind: SlruKind,
    1921            0 :         segno: u32,
    1922            0 :         ctx: &RequestContext,
    1923            0 :     ) -> anyhow::Result<()> {
    1924            0 :         // Remove it from the directory entry
    1925            0 :         let dir_key = slru_dir_to_key(kind);
    1926            0 :         let buf = self.get(dir_key, ctx).await?;
    1927            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1928              : 
    1929            0 :         if !dir.segments.remove(&segno) {
    1930            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1931            0 :         }
    1932            0 :         self.pending_directory_entries
    1933            0 :             .push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
    1934            0 :         self.put(
    1935            0 :             dir_key,
    1936            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1937              :         );
    1938              : 
    1939              :         // Delete size entry, as well as all blocks
    1940            0 :         self.delete(slru_segment_key_range(kind, segno));
    1941            0 : 
    1942            0 :         Ok(())
    1943            0 :     }
    1944              : 
    1945              :     /// Drop a relmapper file (pg_filenode.map)
    1946            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1947            0 :         // TODO
    1948            0 :         Ok(())
    1949            0 :     }
    1950              : 
    1951              :     /// This method is used for marking truncated SLRU files
    1952            0 :     pub async fn drop_twophase_file(
    1953            0 :         &mut self,
    1954            0 :         xid: u64,
    1955            0 :         ctx: &RequestContext,
    1956            0 :     ) -> anyhow::Result<()> {
    1957              :         // Remove it from the directory entry
    1958            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1959            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1960            0 :             let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
    1961              : 
    1962            0 :             if !dir.xids.remove(&xid) {
    1963            0 :                 warn!("twophase file for xid {} does not exist", xid);
    1964            0 :             }
    1965            0 :             self.pending_directory_entries
    1966            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1967            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1968              :         } else {
    1969            0 :             let xid: u32 = u32::try_from(xid)?;
    1970            0 :             let mut dir = TwoPhaseDirectory::des(&buf)?;
    1971              : 
    1972            0 :             if !dir.xids.remove(&xid) {
    1973            0 :                 warn!("twophase file for xid {} does not exist", xid);
    1974            0 :             }
    1975            0 :             self.pending_directory_entries
    1976            0 :                 .push((DirectoryKind::TwoPhase, dir.xids.len()));
    1977            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1978              :         };
    1979            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1980            0 : 
    1981            0 :         // Delete it
    1982            0 :         self.delete(twophase_key_range(xid));
    1983            0 : 
    1984            0 :         Ok(())
    1985            0 :     }
    1986              : 
    1987           16 :     pub async fn put_file(
    1988           16 :         &mut self,
    1989           16 :         path: &str,
    1990           16 :         content: &[u8],
    1991           16 :         ctx: &RequestContext,
    1992           16 :     ) -> anyhow::Result<()> {
    1993           16 :         let key = aux_file::encode_aux_file_key(path);
    1994              :         // retrieve the key from the engine
    1995           16 :         let old_val = match self.get(key, ctx).await {
    1996            4 :             Ok(val) => Some(val),
    1997           12 :             Err(PageReconstructError::MissingKey(_)) => None,
    1998            0 :             Err(e) => return Err(e.into()),
    1999              :         };
    2000           16 :         let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    2001            4 :             aux_file::decode_file_value(old_val)?
    2002              :         } else {
    2003           12 :             Vec::new()
    2004              :         };
    2005           16 :         let mut other_files = Vec::with_capacity(files.len());
    2006           16 :         let mut modifying_file = None;
    2007           20 :         for file @ (p, content) in files {
    2008            4 :             if path == p {
    2009            4 :                 assert!(
    2010            4 :                     modifying_file.is_none(),
    2011            0 :                     "duplicated entries found for {}",
    2012              :                     path
    2013              :                 );
    2014            4 :                 modifying_file = Some(content);
    2015            0 :             } else {
    2016            0 :                 other_files.push(file);
    2017            0 :             }
    2018              :         }
    2019           16 :         let mut new_files = other_files;
    2020           16 :         match (modifying_file, content.is_empty()) {
    2021            2 :             (Some(old_content), false) => {
    2022            2 :                 self.tline
    2023            2 :                     .aux_file_size_estimator
    2024            2 :                     .on_update(old_content.len(), content.len());
    2025            2 :                 new_files.push((path, content));
    2026            2 :             }
    2027            2 :             (Some(old_content), true) => {
    2028            2 :                 self.tline
    2029            2 :                     .aux_file_size_estimator
    2030            2 :                     .on_remove(old_content.len());
    2031            2 :                 // not adding the file key to the final `new_files` vec.
    2032            2 :             }
    2033           12 :             (None, false) => {
    2034           12 :                 self.tline.aux_file_size_estimator.on_add(content.len());
    2035           12 :                 new_files.push((path, content));
    2036           12 :             }
    2037            0 :             (None, true) => warn!("removing non-existing aux file: {}", path),
    2038              :         }
    2039           16 :         let new_val = aux_file::encode_file_value(&new_files)?;
    2040           16 :         self.put(key, Value::Image(new_val.into()));
    2041           16 : 
    2042           16 :         Ok(())
    2043           16 :     }
    2044              : 
    2045              :     ///
    2046              :     /// Flush changes accumulated so far to the underlying repository.
    2047              :     ///
    2048              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    2049              :     /// you to flush them to the underlying repository before the final `commit`.
    2050              :     /// That allows to free up the memory used to hold the pending changes.
    2051              :     ///
    2052              :     /// Currently only used during bulk import of a data directory. In that
    2053              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    2054              :     /// whole import fails and the timeline will be deleted anyway.
    2055              :     /// (Or to be precise, it will be left behind for debugging purposes and
    2056              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    2057              :     ///
    2058              :     /// Note: A consequence of flushing the pending operations is that they
    2059              :     /// won't be visible to subsequent operations until `commit`. The function
    2060              :     /// retains all the metadata, but data pages are flushed. That's again OK
    2061              :     /// for bulk import, where you are just loading data pages and won't try to
    2062              :     /// modify the same pages twice.
    2063         1930 :     pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2064         1930 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    2065         1930 :         // to scan through the pending_updates list.
    2066         1930 :         let pending_nblocks = self.pending_nblocks;
    2067         1930 :         if pending_nblocks < 10000 {
    2068         1930 :             return Ok(());
    2069            0 :         }
    2070              : 
    2071            0 :         let mut writer = self.tline.writer().await;
    2072              : 
    2073              :         // Flush relation and  SLRU data blocks, keep metadata.
    2074            0 :         if let Some(batch) = self.pending_data_batch.take() {
    2075            0 :             tracing::debug!(
    2076            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2077            0 :                 batch.max_lsn,
    2078            0 :                 self.tline.get_last_record_lsn()
    2079              :             );
    2080              : 
    2081              :             // This bails out on first error without modifying pending_updates.
    2082              :             // That's Ok, cf this function's doc comment.
    2083            0 :             writer.put_batch(batch, ctx).await?;
    2084            0 :         }
    2085              : 
    2086            0 :         if pending_nblocks != 0 {
    2087            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2088            0 :             self.pending_nblocks = 0;
    2089            0 :         }
    2090              : 
    2091            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2092            0 :             writer.update_directory_entries_count(kind, count as u64);
    2093            0 :         }
    2094              : 
    2095            0 :         Ok(())
    2096         1930 :     }
    2097              : 
    2098              :     ///
    2099              :     /// Finish this atomic update, writing all the updated keys to the
    2100              :     /// underlying timeline.
    2101              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    2102              :     ///
    2103       743068 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2104       743068 :         let mut writer = self.tline.writer().await;
    2105              : 
    2106       743068 :         let pending_nblocks = self.pending_nblocks;
    2107       743068 :         self.pending_nblocks = 0;
    2108              : 
    2109              :         // Ordering: the items in this batch do not need to be in any global order, but values for
    2110              :         // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    2111              :         // this to do efficient updates to its index.  See [`wal_decoder::serialized_batch`] for
    2112              :         // more details.
    2113              : 
    2114       743068 :         let metadata_batch = {
    2115       743068 :             let pending_meta = self
    2116       743068 :                 .pending_metadata_pages
    2117       743068 :                 .drain()
    2118       743068 :                 .flat_map(|(key, values)| {
    2119       273836 :                     values
    2120       273836 :                         .into_iter()
    2121       273836 :                         .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
    2122       743068 :                 })
    2123       743068 :                 .collect::<Vec<_>>();
    2124       743068 : 
    2125       743068 :             if pending_meta.is_empty() {
    2126       472278 :                 None
    2127              :             } else {
    2128       270790 :                 Some(SerializedValueBatch::from_values(pending_meta))
    2129              :             }
    2130              :         };
    2131              : 
    2132       743068 :         let data_batch = self.pending_data_batch.take();
    2133              : 
    2134       743068 :         let maybe_batch = match (data_batch, metadata_batch) {
    2135       264556 :             (Some(mut data), Some(metadata)) => {
    2136       264556 :                 data.extend(metadata);
    2137       264556 :                 Some(data)
    2138              :             }
    2139       143262 :             (Some(data), None) => Some(data),
    2140         6234 :             (None, Some(metadata)) => Some(metadata),
    2141       329016 :             (None, None) => None,
    2142              :         };
    2143              : 
    2144       743068 :         if let Some(batch) = maybe_batch {
    2145       414052 :             tracing::debug!(
    2146            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2147            0 :                 batch.max_lsn,
    2148            0 :                 self.tline.get_last_record_lsn()
    2149              :             );
    2150              : 
    2151              :             // This bails out on first error without modifying pending_updates.
    2152              :             // That's Ok, cf this function's doc comment.
    2153       414052 :             writer.put_batch(batch, ctx).await?;
    2154       329016 :         }
    2155              : 
    2156       743068 :         if !self.pending_deletions.is_empty() {
    2157            2 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    2158            2 :             self.pending_deletions.clear();
    2159       743066 :         }
    2160              : 
    2161       743068 :         self.pending_lsns.push(self.lsn);
    2162       888926 :         for pending_lsn in self.pending_lsns.drain(..) {
    2163       888926 :             // TODO(vlad): pretty sure the comment below is not valid anymore
    2164       888926 :             // and we can call finish write with the latest LSN
    2165       888926 :             //
    2166       888926 :             // Ideally, we should be able to call writer.finish_write() only once
    2167       888926 :             // with the highest LSN. However, the last_record_lsn variable in the
    2168       888926 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    2169       888926 :             // so we need to record every LSN to not leave a gap between them.
    2170       888926 :             writer.finish_write(pending_lsn);
    2171       888926 :         }
    2172              : 
    2173       743068 :         if pending_nblocks != 0 {
    2174       270570 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2175       472498 :         }
    2176              : 
    2177       743068 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2178         2836 :             writer.update_directory_entries_count(kind, count as u64);
    2179         2836 :         }
    2180              : 
    2181       743068 :         self.pending_metadata_bytes = 0;
    2182       743068 : 
    2183       743068 :         Ok(())
    2184       743068 :     }
    2185              : 
    2186       291704 :     pub(crate) fn len(&self) -> usize {
    2187       291704 :         self.pending_metadata_pages.len()
    2188       291704 :             + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
    2189       291704 :             + self.pending_deletions.len()
    2190       291704 :     }
    2191              : 
    2192              :     /// Read a page from the Timeline we are writing to.  For metadata pages, this passes through
    2193              :     /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
    2194              :     /// in the modification.
    2195              :     ///
    2196              :     /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
    2197              :     /// page must ensure that the pages they read are already committed in Timeline, for example
    2198              :     /// DB create operations are always preceded by a call to commit().  This is special cased because
    2199              :     /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
    2200              :     /// and not data pages.
    2201       286586 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    2202       286586 :         if !Self::is_data_key(&key) {
    2203              :             // Have we already updated the same key? Read the latest pending updated
    2204              :             // version in that case.
    2205              :             //
    2206              :             // Note: we don't check pending_deletions. It is an error to request a
    2207              :             // value that has been removed, deletion only avoids leaking storage.
    2208       286586 :             if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
    2209        15928 :                 if let Some((_, _, value)) = values.last() {
    2210        15928 :                     return if let Value::Image(img) = value {
    2211        15928 :                         Ok(img.clone())
    2212              :                     } else {
    2213              :                         // Currently, we never need to read back a WAL record that we
    2214              :                         // inserted in the same "transaction". All the metadata updates
    2215              :                         // work directly with Images, and we never need to read actual
    2216              :                         // data pages. We could handle this if we had to, by calling
    2217              :                         // the walredo manager, but let's keep it simple for now.
    2218            0 :                         Err(PageReconstructError::Other(anyhow::anyhow!(
    2219            0 :                             "unexpected pending WAL record"
    2220            0 :                         )))
    2221              :                     };
    2222            0 :                 }
    2223       270658 :             }
    2224              :         } else {
    2225              :             // This is an expensive check, so we only do it in debug mode. If reading a data key,
    2226              :             // this key should never be present in pending_data_pages. We ensure this by committing
    2227              :             // modifications before ingesting DB create operations, which are the only kind that reads
    2228              :             // data pages during ingest.
    2229            0 :             if cfg!(debug_assertions) {
    2230            0 :                 assert!(!self
    2231            0 :                     .pending_data_batch
    2232            0 :                     .as_ref()
    2233            0 :                     .is_some_and(|b| b.updates_key(&key)));
    2234            0 :             }
    2235              :         }
    2236              : 
    2237              :         // Metadata page cache miss, or we're reading a data page.
    2238       270658 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    2239       270658 :         self.tline.get(key, lsn, ctx).await
    2240       286586 :     }
    2241              : 
    2242       563856 :     fn put(&mut self, key: Key, val: Value) {
    2243       563856 :         if Self::is_data_key(&key) {
    2244       277868 :             self.put_data(key.to_compact(), val)
    2245              :         } else {
    2246       285988 :             self.put_metadata(key.to_compact(), val)
    2247              :         }
    2248       563856 :     }
    2249              : 
    2250       277868 :     fn put_data(&mut self, key: CompactKey, val: Value) {
    2251       277868 :         let batch = self
    2252       277868 :             .pending_data_batch
    2253       277868 :             .get_or_insert_with(SerializedValueBatch::default);
    2254       277868 :         batch.put(key, val, self.lsn);
    2255       277868 :     }
    2256              : 
    2257       285988 :     fn put_metadata(&mut self, key: CompactKey, val: Value) {
    2258       285988 :         let values = self.pending_metadata_pages.entry(key).or_default();
    2259              :         // Replace the previous value if it exists at the same lsn
    2260       285988 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    2261        12152 :             if *last_lsn == self.lsn {
    2262              :                 // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
    2263        12152 :                 self.pending_metadata_bytes -= *last_value_ser_size;
    2264        12152 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    2265        12152 :                 self.pending_metadata_bytes += *last_value_ser_size;
    2266        12152 : 
    2267        12152 :                 // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
    2268        12152 :                 // have been generated by synthesized zero page writes prior to the first real write to a page.
    2269        12152 :                 *last_value = val;
    2270        12152 :                 return;
    2271            0 :             }
    2272       273836 :         }
    2273              : 
    2274       273836 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    2275       273836 :         self.pending_metadata_bytes += val_serialized_size;
    2276       273836 :         values.push((self.lsn, val_serialized_size, val));
    2277       273836 : 
    2278       273836 :         if key == CHECKPOINT_KEY.to_compact() {
    2279          196 :             tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
    2280       273640 :         }
    2281       285988 :     }
    2282              : 
    2283            2 :     fn delete(&mut self, key_range: Range<Key>) {
    2284            2 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    2285            2 :         self.pending_deletions.push((key_range, self.lsn));
    2286            2 :     }
    2287              : }
    2288              : 
    2289              : /// This struct facilitates accessing either a committed key from the timeline at a
    2290              : /// specific LSN, or the latest uncommitted key from a pending modification.
    2291              : ///
    2292              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    2293              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    2294              : /// need to look up the keys in the modification first before looking them up in the
    2295              : /// timeline to not miss the latest updates.
    2296              : #[derive(Clone, Copy)]
    2297              : pub enum Version<'a> {
    2298              :     Lsn(Lsn),
    2299              :     Modified(&'a DatadirModification<'a>),
    2300              : }
    2301              : 
    2302              : impl Version<'_> {
    2303         5176 :     async fn get(
    2304         5176 :         &self,
    2305         5176 :         timeline: &Timeline,
    2306         5176 :         key: Key,
    2307         5176 :         ctx: &RequestContext,
    2308         5176 :     ) -> Result<Bytes, PageReconstructError> {
    2309         5176 :         match self {
    2310         5156 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    2311           20 :             Version::Modified(modification) => modification.get(key, ctx).await,
    2312              :         }
    2313         5176 :     }
    2314              : 
    2315        35620 :     fn get_lsn(&self) -> Lsn {
    2316        35620 :         match self {
    2317        29574 :             Version::Lsn(lsn) => *lsn,
    2318         6046 :             Version::Modified(modification) => modification.lsn,
    2319              :         }
    2320        35620 :     }
    2321              : }
    2322              : 
    2323              : //--- Metadata structs stored in key-value pairs in the repository.
    2324              : 
    2325         2250 : #[derive(Debug, Serialize, Deserialize)]
    2326              : pub(crate) struct DbDirectory {
    2327              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    2328              :     pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
    2329              : }
    2330              : 
    2331              : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
    2332              : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs.  Previously, the files
    2333              : // were named like "pg_twophase/000002E5", now they're like
    2334              : // "pg_twophsae/0000000A000002E4".
    2335              : 
    2336          298 : #[derive(Debug, Serialize, Deserialize)]
    2337              : pub(crate) struct TwoPhaseDirectory {
    2338              :     pub(crate) xids: HashSet<TransactionId>,
    2339              : }
    2340              : 
    2341            0 : #[derive(Debug, Serialize, Deserialize)]
    2342              : struct TwoPhaseDirectoryV17 {
    2343              :     xids: HashSet<u64>,
    2344              : }
    2345              : 
    2346         1932 : #[derive(Debug, Serialize, Deserialize, Default)]
    2347              : pub(crate) struct RelDirectory {
    2348              :     // Set of relations that exist. (relfilenode, forknum)
    2349              :     //
    2350              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    2351              :     // key-value pairs, if you have a lot of relations
    2352              :     pub(crate) rels: HashSet<(Oid, u8)>,
    2353              : }
    2354              : 
    2355            0 : #[derive(Debug, Serialize, Deserialize)]
    2356              : struct RelSizeEntry {
    2357              :     nblocks: u32,
    2358              : }
    2359              : 
    2360          876 : #[derive(Debug, Serialize, Deserialize, Default)]
    2361              : pub(crate) struct SlruSegmentDirectory {
    2362              :     // Set of SLRU segments that exist.
    2363              :     pub(crate) segments: HashSet<u32>,
    2364              : }
    2365              : 
    2366              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2367              : #[repr(u8)]
    2368              : pub(crate) enum DirectoryKind {
    2369              :     Db,
    2370              :     TwoPhase,
    2371              :     Rel,
    2372              :     AuxFiles,
    2373              :     SlruSegment(SlruKind),
    2374              : }
    2375              : 
    2376              : impl DirectoryKind {
    2377              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2378         5672 :     pub(crate) fn offset(&self) -> usize {
    2379         5672 :         self.into_usize()
    2380         5672 :     }
    2381              : }
    2382              : 
    2383              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2384              : 
    2385              : #[allow(clippy::bool_assert_comparison)]
    2386              : #[cfg(test)]
    2387              : mod tests {
    2388              :     use hex_literal::hex;
    2389              :     use pageserver_api::{models::ShardParameters, shard::ShardStripeSize};
    2390              :     use utils::{
    2391              :         id::TimelineId,
    2392              :         shard::{ShardCount, ShardNumber},
    2393              :     };
    2394              : 
    2395              :     use super::*;
    2396              : 
    2397              :     use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
    2398              : 
    2399              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2400              :     #[tokio::test]
    2401            2 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2402            2 :         let name = "aux_files_round_trip";
    2403            2 :         let harness = TenantHarness::create(name).await?;
    2404            2 : 
    2405            2 :         pub const TIMELINE_ID: TimelineId =
    2406            2 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2407            2 : 
    2408            2 :         let (tenant, ctx) = harness.load().await;
    2409            2 :         let tline = tenant
    2410            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2411            2 :             .await?;
    2412            2 :         let tline = tline.raw_timeline().unwrap();
    2413            2 : 
    2414            2 :         // First modification: insert two keys
    2415            2 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2416            2 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2417            2 :         modification.set_lsn(Lsn(0x1008))?;
    2418            2 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2419            2 :         modification.commit(&ctx).await?;
    2420            2 :         let expect_1008 = HashMap::from([
    2421            2 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2422            2 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2423            2 :         ]);
    2424            2 : 
    2425            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2426            2 :         assert_eq!(readback, expect_1008);
    2427            2 : 
    2428            2 :         // Second modification: update one key, remove the other
    2429            2 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2430            2 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2431            2 :         modification.set_lsn(Lsn(0x2008))?;
    2432            2 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2433            2 :         modification.commit(&ctx).await?;
    2434            2 :         let expect_2008 =
    2435            2 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2436            2 : 
    2437            2 :         let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
    2438            2 :         assert_eq!(readback, expect_2008);
    2439            2 : 
    2440            2 :         // Reading back in time works
    2441            2 :         let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
    2442            2 :         assert_eq!(readback, expect_1008);
    2443            2 : 
    2444            2 :         Ok(())
    2445            2 :     }
    2446              : 
    2447              :     #[test]
    2448            2 :     fn gap_finding() {
    2449            2 :         let rel = RelTag {
    2450            2 :             spcnode: 1663,
    2451            2 :             dbnode: 208101,
    2452            2 :             relnode: 2620,
    2453            2 :             forknum: 0,
    2454            2 :         };
    2455            2 :         let base_blkno = 1;
    2456            2 : 
    2457            2 :         let base_key = rel_block_to_key(rel, base_blkno);
    2458            2 :         let before_base_key = rel_block_to_key(rel, base_blkno - 1);
    2459            2 : 
    2460            2 :         let shard = ShardIdentity::unsharded();
    2461            2 : 
    2462            2 :         let mut previous_nblocks = 0;
    2463           22 :         for i in 0..10 {
    2464           20 :             let crnt_blkno = base_blkno + i;
    2465           20 :             let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
    2466           20 : 
    2467           20 :             previous_nblocks = crnt_blkno + 1;
    2468           20 : 
    2469           20 :             if i == 0 {
    2470              :                 // The first block we write is 1, so we should find the gap.
    2471            2 :                 assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
    2472              :             } else {
    2473           18 :                 assert!(gaps.is_none());
    2474              :             }
    2475              :         }
    2476              : 
    2477              :         // This is an update to an already existing block. No gaps here.
    2478            2 :         let update_blkno = 5;
    2479            2 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    2480            2 :         assert!(gaps.is_none());
    2481              : 
    2482              :         // This is an update past the current end block.
    2483            2 :         let after_gap_blkno = 20;
    2484            2 :         let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
    2485            2 : 
    2486            2 :         let gap_start_key = rel_block_to_key(rel, previous_nblocks);
    2487            2 :         let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
    2488            2 :         assert_eq!(
    2489            2 :             gaps.unwrap(),
    2490            2 :             KeySpace::single(gap_start_key..after_gap_key)
    2491            2 :         );
    2492            2 :     }
    2493              : 
    2494              :     #[test]
    2495            2 :     fn sharded_gap_finding() {
    2496            2 :         let rel = RelTag {
    2497            2 :             spcnode: 1663,
    2498            2 :             dbnode: 208101,
    2499            2 :             relnode: 2620,
    2500            2 :             forknum: 0,
    2501            2 :         };
    2502            2 : 
    2503            2 :         let first_blkno = 6;
    2504            2 : 
    2505            2 :         // This shard will get the even blocks
    2506            2 :         let shard = ShardIdentity::from_params(
    2507            2 :             ShardNumber(0),
    2508            2 :             &ShardParameters {
    2509            2 :                 count: ShardCount(2),
    2510            2 :                 stripe_size: ShardStripeSize(1),
    2511            2 :             },
    2512            2 :         );
    2513            2 : 
    2514            2 :         // Only keys belonging to this shard are considered as gaps.
    2515            2 :         let mut previous_nblocks = 0;
    2516            2 :         let gaps =
    2517            2 :             DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
    2518            2 :         assert!(!gaps.ranges.is_empty());
    2519            6 :         for gap_range in gaps.ranges {
    2520            4 :             let mut k = gap_range.start;
    2521            8 :             while k != gap_range.end {
    2522            4 :                 assert_eq!(shard.get_shard_number(&k), shard.number);
    2523            4 :                 k = k.next();
    2524              :             }
    2525              :         }
    2526              : 
    2527            2 :         previous_nblocks = first_blkno;
    2528            2 : 
    2529            2 :         let update_blkno = 2;
    2530            2 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    2531            2 :         assert!(gaps.is_none());
    2532            2 :     }
    2533              : 
    2534              :     /*
    2535              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    2536              :             let incremental = timeline.get_current_logical_size();
    2537              :             let non_incremental = timeline
    2538              :                 .get_current_logical_size_non_incremental(lsn)
    2539              :                 .unwrap();
    2540              :             assert_eq!(incremental, non_incremental);
    2541              :         }
    2542              :     */
    2543              : 
    2544              :     /*
    2545              :     ///
    2546              :     /// Test list_rels() function, with branches and dropped relations
    2547              :     ///
    2548              :     #[test]
    2549              :     fn test_list_rels_drop() -> Result<()> {
    2550              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    2551              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    2552              :         const TESTDB: u32 = 111;
    2553              : 
    2554              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    2555              :         // after branching fails below
    2556              :         let mut writer = tline.begin_record(Lsn(0x10));
    2557              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    2558              :         writer.finish()?;
    2559              : 
    2560              :         // Create a relation on the timeline
    2561              :         let mut writer = tline.begin_record(Lsn(0x20));
    2562              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    2563              :         writer.finish()?;
    2564              : 
    2565              :         let writer = tline.begin_record(Lsn(0x00));
    2566              :         writer.finish()?;
    2567              : 
    2568              :         // Check that list_rels() lists it after LSN 2, but no before it
    2569              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    2570              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    2571              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    2572              : 
    2573              :         // Create a branch, check that the relation is visible there
    2574              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    2575              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    2576              :             Some(timeline) => timeline,
    2577              :             None => panic!("Should have a local timeline"),
    2578              :         };
    2579              :         let newtline = DatadirTimelineImpl::new(newtline);
    2580              :         assert!(newtline
    2581              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2582              :             .contains(&TESTREL_A));
    2583              : 
    2584              :         // Drop it on the branch
    2585              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    2586              :         new_writer.drop_relation(TESTREL_A)?;
    2587              :         new_writer.finish()?;
    2588              : 
    2589              :         // Check that it's no longer listed on the branch after the point where it was dropped
    2590              :         assert!(newtline
    2591              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2592              :             .contains(&TESTREL_A));
    2593              :         assert!(!newtline
    2594              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2595              :             .contains(&TESTREL_A));
    2596              : 
    2597              :         // Run checkpoint and garbage collection and check that it's still not visible
    2598              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    2599              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    2600              : 
    2601              :         assert!(!newtline
    2602              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2603              :             .contains(&TESTREL_A));
    2604              : 
    2605              :         Ok(())
    2606              :     }
    2607              :      */
    2608              : 
    2609              :     /*
    2610              :     #[test]
    2611              :     fn test_read_beyond_eof() -> Result<()> {
    2612              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    2613              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    2614              : 
    2615              :         make_some_layers(&tline, Lsn(0x20))?;
    2616              :         let mut writer = tline.begin_record(Lsn(0x60));
    2617              :         walingest.put_rel_page_image(
    2618              :             &mut writer,
    2619              :             TESTREL_A,
    2620              :             0,
    2621              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    2622              :         )?;
    2623              :         writer.finish()?;
    2624              : 
    2625              :         // Test read before rel creation. Should error out.
    2626              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    2627              : 
    2628              :         // Read block beyond end of relation at different points in time.
    2629              :         // These reads should fall into different delta, image, and in-memory layers.
    2630              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    2631              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    2632              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    2633              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    2634              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    2635              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    2636              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    2637              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    2638              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    2639              : 
    2640              :         // Test on an in-memory layer with no preceding layer
    2641              :         let mut writer = tline.begin_record(Lsn(0x70));
    2642              :         walingest.put_rel_page_image(
    2643              :             &mut writer,
    2644              :             TESTREL_B,
    2645              :             0,
    2646              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    2647              :         )?;
    2648              :         writer.finish()?;
    2649              : 
    2650              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    2651              : 
    2652              :         Ok(())
    2653              :     }
    2654              :      */
    2655              : }
        

Generated by: LCOV version 2.1-beta