LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 5713ff31fc16472ab3f92425989ca6addc3dcf9c.info Lines: 52.8 % 2144 1132
Test Date: 2025-07-30 16:18:19 Functions: 49.1 % 218 107

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use std::collections::{BTreeSet, HashMap, HashSet, hash_map};
      10              : use std::ops::{ControlFlow, Range};
      11              : use std::sync::Arc;
      12              : 
      13              : use crate::walingest::{WalIngestError, WalIngestErrorKind};
      14              : use crate::{PERF_TRACE_TARGET, ensure_walingest};
      15              : use anyhow::Context;
      16              : use bytes::{Buf, Bytes, BytesMut};
      17              : use enum_map::Enum;
      18              : use pageserver_api::key::{
      19              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
      20              :     TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
      21              :     rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
      22              :     repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      23              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      24              : };
      25              : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
      26              : use pageserver_api::models::RelSizeMigration;
      27              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      28              : use pageserver_api::shard::ShardIdentity;
      29              : use postgres_ffi::{BLCKSZ, PgMajorVersion, TransactionId};
      30              : use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      31              : use postgres_ffi_types::{Oid, RepOriginId, TimestampTz};
      32              : use serde::{Deserialize, Serialize};
      33              : use strum::IntoEnumIterator;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::{debug, info, info_span, trace, warn};
      36              : use utils::bin_ser::{BeSer, DeserializeError};
      37              : use utils::lsn::Lsn;
      38              : use utils::pausable_failpoint;
      39              : use wal_decoder::models::record::NeonWalRecord;
      40              : use wal_decoder::models::value::Value;
      41              : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
      42              : 
      43              : use super::tenant::{PageReconstructError, Timeline};
      44              : use crate::aux_file;
      45              : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
      46              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      47              : use crate::metrics::{
      48              :     RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
      49              :     RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
      50              :     RELSIZE_SNAPSHOT_CACHE_MISSES,
      51              : };
      52              : use crate::span::{
      53              :     debug_assert_current_span_has_tenant_and_timeline_id,
      54              :     debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
      55              : };
      56              : use crate::tenant::storage_layer::IoConcurrency;
      57              : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
      58              : 
      59              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      60              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      61              : 
      62              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      63              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
      64              : 
      65              : #[derive(Debug)]
      66              : pub enum LsnForTimestamp {
      67              :     /// Found commits both before and after the given timestamp
      68              :     Present(Lsn),
      69              : 
      70              :     /// Found no commits after the given timestamp, this means
      71              :     /// that the newest data in the branch is older than the given
      72              :     /// timestamp.
      73              :     ///
      74              :     /// All commits <= LSN happened before the given timestamp
      75              :     Future(Lsn),
      76              : 
      77              :     /// The queried timestamp is past our horizon we look back at (PITR)
      78              :     ///
      79              :     /// All commits > LSN happened after the given timestamp,
      80              :     /// but any commits < LSN might have happened before or after
      81              :     /// the given timestamp. We don't know because no data before
      82              :     /// the given lsn is available.
      83              :     Past(Lsn),
      84              : 
      85              :     /// We have found no commit with a timestamp,
      86              :     /// so we can't return anything meaningful.
      87              :     ///
      88              :     /// The associated LSN is the lower bound value we can safely
      89              :     /// create branches on, but no statement is made if it is
      90              :     /// older or newer than the timestamp.
      91              :     ///
      92              :     /// This variant can e.g. be returned right after a
      93              :     /// cluster import.
      94              :     NoData(Lsn),
      95              : }
      96              : 
      97              : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
      98              : /// See comments libs/pageserver_api/src/models.rs.
      99              : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
     100              : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
     101              : #[derive(Debug, Clone, Copy, Default)]
     102              : pub struct LsnRange {
     103              :     pub effective_lsn: Lsn,
     104              :     pub request_lsn: Lsn,
     105              : }
     106              : 
     107              : impl LsnRange {
     108            0 :     pub fn at(lsn: Lsn) -> LsnRange {
     109            0 :         LsnRange {
     110            0 :             effective_lsn: lsn,
     111            0 :             request_lsn: lsn,
     112            0 :         }
     113            0 :     }
     114           16 :     pub fn is_latest(&self) -> bool {
     115           16 :         self.request_lsn == Lsn::MAX
     116           16 :     }
     117              : }
     118              : 
     119              : #[derive(Debug, thiserror::Error)]
     120              : pub(crate) enum CalculateLogicalSizeError {
     121              :     #[error("cancelled")]
     122              :     Cancelled,
     123              : 
     124              :     /// Something went wrong while reading the metadata we use to calculate logical size
     125              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
     126              :     /// in the `From` implementation for this variant.
     127              :     #[error(transparent)]
     128              :     PageRead(PageReconstructError),
     129              : 
     130              :     /// Something went wrong deserializing metadata that we read to calculate logical size
     131              :     #[error("decode error: {0}")]
     132              :     Decode(#[from] DeserializeError),
     133              : }
     134              : 
     135              : #[derive(Debug, thiserror::Error)]
     136              : pub(crate) enum CollectKeySpaceError {
     137              :     #[error(transparent)]
     138              :     Decode(#[from] DeserializeError),
     139              :     #[error(transparent)]
     140              :     PageRead(PageReconstructError),
     141              :     #[error("cancelled")]
     142              :     Cancelled,
     143              : }
     144              : 
     145              : impl CollectKeySpaceError {
     146            0 :     pub(crate) fn is_cancel(&self) -> bool {
     147            0 :         match self {
     148            0 :             CollectKeySpaceError::Decode(_) => false,
     149            0 :             CollectKeySpaceError::PageRead(e) => e.is_cancel(),
     150            0 :             CollectKeySpaceError::Cancelled => true,
     151              :         }
     152            0 :     }
     153            0 :     pub(crate) fn into_anyhow(self) -> anyhow::Error {
     154            0 :         match self {
     155            0 :             CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
     156            0 :             CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
     157            0 :             CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
     158              :         }
     159            0 :     }
     160              : }
     161              : 
     162              : impl From<PageReconstructError> for CollectKeySpaceError {
     163            0 :     fn from(err: PageReconstructError) -> Self {
     164            0 :         match err {
     165            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     166            0 :             err => Self::PageRead(err),
     167              :         }
     168            0 :     }
     169              : }
     170              : 
     171              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     172            0 :     fn from(pre: PageReconstructError) -> Self {
     173            0 :         match pre {
     174            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     175            0 :             _ => Self::PageRead(pre),
     176              :         }
     177            0 :     }
     178              : }
     179              : 
     180              : #[derive(Debug, thiserror::Error)]
     181              : pub enum RelationError {
     182              :     #[error("invalid relnode")]
     183              :     InvalidRelnode,
     184              : }
     185              : 
     186              : ///
     187              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     188              : /// and other special kinds of files, in a versioned key-value store. The
     189              : /// Timeline struct provides the key-value store.
     190              : ///
     191              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     192              : /// implementation, and might be moved into a separate struct later.
     193              : impl Timeline {
     194              :     /// Start ingesting a WAL record, or other atomic modification of
     195              :     /// the timeline.
     196              :     ///
     197              :     /// This provides a transaction-like interface to perform a bunch
     198              :     /// of modifications atomically.
     199              :     ///
     200              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     201              :     /// DatadirModification object. Use the functions in the object to
     202              :     /// modify the repository state, updating all the pages and metadata
     203              :     /// that the WAL record affects. When you're done, call commit() to
     204              :     /// commit the changes.
     205              :     ///
     206              :     /// Lsn stored in modification is advanced by `ingest_record` and
     207              :     /// is used by `commit()` to update `last_record_lsn`.
     208              :     ///
     209              :     /// Calling commit() will flush all the changes and reset the state,
     210              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     211              :     ///
     212              :     /// Note that any pending modifications you make through the
     213              :     /// modification object won't be visible to calls to the 'get' and list
     214              :     /// functions of the timeline until you finish! And if you update the
     215              :     /// same page twice, the last update wins.
     216              :     ///
     217       134213 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     218       134213 :     where
     219       134213 :         Self: Sized,
     220              :     {
     221       134213 :         DatadirModification {
     222       134213 :             tline: self,
     223       134213 :             pending_lsns: Vec::new(),
     224       134213 :             pending_metadata_pages: HashMap::new(),
     225       134213 :             pending_data_batch: None,
     226       134213 :             pending_deletions: Vec::new(),
     227       134213 :             pending_nblocks: 0,
     228       134213 :             pending_directory_entries: Vec::new(),
     229       134213 :             pending_metadata_bytes: 0,
     230       134213 :             is_importing_pgdata: false,
     231       134213 :             lsn,
     232       134213 :         }
     233       134213 :     }
     234              : 
     235            2 :     pub fn begin_modification_for_import(&self, lsn: Lsn) -> DatadirModification
     236            2 :     where
     237            2 :         Self: Sized,
     238              :     {
     239            2 :         DatadirModification {
     240            2 :             tline: self,
     241            2 :             pending_lsns: Vec::new(),
     242            2 :             pending_metadata_pages: HashMap::new(),
     243            2 :             pending_data_batch: None,
     244            2 :             pending_deletions: Vec::new(),
     245            2 :             pending_nblocks: 0,
     246            2 :             pending_directory_entries: Vec::new(),
     247            2 :             pending_metadata_bytes: 0,
     248            2 :             is_importing_pgdata: true,
     249            2 :             lsn,
     250            2 :         }
     251            2 :     }
     252              : 
     253              :     //------------------------------------------------------------------------------
     254              :     // Public GET functions
     255              :     //------------------------------------------------------------------------------
     256              : 
     257              :     /// Look up given page version.
     258         9192 :     pub(crate) async fn get_rel_page_at_lsn(
     259         9192 :         &self,
     260         9192 :         tag: RelTag,
     261         9192 :         blknum: BlockNumber,
     262         9192 :         version: Version<'_>,
     263         9192 :         ctx: &RequestContext,
     264         9192 :         io_concurrency: IoConcurrency,
     265         9192 :     ) -> Result<Bytes, PageReconstructError> {
     266         9192 :         match version {
     267         9192 :             Version::LsnRange(lsns) => {
     268         9192 :                 let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
     269         9192 :                 let res = self
     270         9192 :                     .get_rel_page_at_lsn_batched(
     271         9192 :                         pages
     272         9192 :                             .iter()
     273         9192 :                             .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
     274         9192 :                         io_concurrency.clone(),
     275         9192 :                         ctx,
     276              :                     )
     277         9192 :                     .await;
     278         9192 :                 assert_eq!(res.len(), 1);
     279         9192 :                 res.into_iter().next().unwrap()
     280              :             }
     281            0 :             Version::Modified(modification) => {
     282            0 :                 if tag.relnode == 0 {
     283            0 :                     return Err(PageReconstructError::Other(
     284            0 :                         RelationError::InvalidRelnode.into(),
     285            0 :                     ));
     286            0 :                 }
     287              : 
     288            0 :                 let nblocks = self.get_rel_size(tag, version, ctx).await?;
     289            0 :                 if blknum >= nblocks {
     290            0 :                     debug!(
     291            0 :                         "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     292              :                         tag,
     293              :                         blknum,
     294            0 :                         version.get_lsn(),
     295              :                         nblocks
     296              :                     );
     297            0 :                     return Ok(ZERO_PAGE.clone());
     298            0 :                 }
     299              : 
     300            0 :                 let key = rel_block_to_key(tag, blknum);
     301            0 :                 modification.get(key, ctx).await
     302              :             }
     303              :         }
     304         9192 :     }
     305              : 
     306              :     /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
     307              :     ///
     308              :     /// The ordering of the returned vec corresponds to the ordering of `pages`.
     309              :     ///
     310              :     /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
     311              :     /// if the client goes away (e.g. due to timeout or cancellation).
     312              :     /// TODO: verify that it actually is cancellation-safe.
     313         9192 :     pub(crate) async fn get_rel_page_at_lsn_batched(
     314         9192 :         &self,
     315         9192 :         pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
     316         9192 :         io_concurrency: IoConcurrency,
     317         9192 :         ctx: &RequestContext,
     318         9192 :     ) -> Vec<Result<Bytes, PageReconstructError>> {
     319         9192 :         debug_assert_current_span_has_tenant_and_timeline_id();
     320              : 
     321         9192 :         let mut slots_filled = 0;
     322         9192 :         let page_count = pages.len();
     323              : 
     324              :         // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
     325         9192 :         let mut result = Vec::with_capacity(pages.len());
     326         9192 :         let result_slots = result.spare_capacity_mut();
     327              : 
     328         9192 :         let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
     329         9192 :             HashMap::with_capacity(pages.len());
     330              : 
     331         9192 :         let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
     332         9192 :             HashMap::with_capacity(pages.len());
     333              : 
     334         9192 :         for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
     335         9192 :             if tag.relnode == 0 {
     336            0 :                 result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
     337            0 :                     RelationError::InvalidRelnode.into(),
     338            0 :                 )));
     339              : 
     340            0 :                 slots_filled += 1;
     341            0 :                 continue;
     342         9192 :             }
     343         9192 :             let lsn = lsns.effective_lsn;
     344         9192 :             let nblocks = {
     345         9192 :                 let ctx = RequestContextBuilder::from(&ctx)
     346         9192 :                     .perf_span(|crnt_perf_span| {
     347            0 :                         info_span!(
     348              :                             target: PERF_TRACE_TARGET,
     349            0 :                             parent: crnt_perf_span,
     350              :                             "GET_REL_SIZE",
     351              :                             reltag=%tag,
     352              :                             lsn=%lsn,
     353              :                         )
     354            0 :                     })
     355         9192 :                     .attached_child();
     356              : 
     357         9192 :                 match self
     358         9192 :                     .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
     359         9192 :                     .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
     360         9192 :                     .await
     361              :                 {
     362         9192 :                     Ok(nblocks) => nblocks,
     363            0 :                     Err(err) => {
     364            0 :                         result_slots[response_slot_idx].write(Err(err));
     365            0 :                         slots_filled += 1;
     366            0 :                         continue;
     367              :                     }
     368              :                 }
     369              :             };
     370              : 
     371         9192 :             if *blknum >= nblocks {
     372            0 :                 debug!(
     373            0 :                     "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     374              :                     tag, blknum, lsn, nblocks
     375              :                 );
     376            0 :                 result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
     377            0 :                 slots_filled += 1;
     378            0 :                 continue;
     379         9192 :             }
     380              : 
     381         9192 :             let key = rel_block_to_key(*tag, *blknum);
     382              : 
     383         9192 :             let ctx = RequestContextBuilder::from(&ctx)
     384         9192 :                 .perf_span(|crnt_perf_span| {
     385            0 :                     info_span!(
     386              :                         target: PERF_TRACE_TARGET,
     387            0 :                         parent: crnt_perf_span,
     388              :                         "GET_BATCH",
     389              :                         batch_size = %page_count,
     390              :                     )
     391            0 :                 })
     392         9192 :                 .attached_child();
     393              : 
     394         9192 :             let key_slots = keys_slots.entry(key).or_default();
     395         9192 :             key_slots.push((response_slot_idx, ctx));
     396              : 
     397         9192 :             let acc = req_keyspaces.entry(lsn).or_default();
     398         9192 :             acc.add_key(key);
     399              :         }
     400              : 
     401         9192 :         let query: Vec<(Lsn, KeySpace)> = req_keyspaces
     402         9192 :             .into_iter()
     403         9192 :             .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
     404         9192 :             .collect();
     405              : 
     406         9192 :         let query = VersionedKeySpaceQuery::scattered(query);
     407         9192 :         let res = self
     408         9192 :             .get_vectored(query, io_concurrency, ctx)
     409         9192 :             .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
     410         9192 :             .await;
     411              : 
     412         9192 :         match res {
     413         9192 :             Ok(results) => {
     414        18384 :                 for (key, res) in results {
     415         9192 :                     let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
     416         9192 :                     let (first_slot, first_req_ctx) = key_slots.next().unwrap();
     417              : 
     418         9192 :                     for (slot, req_ctx) in key_slots {
     419            0 :                         let clone = match &res {
     420            0 :                             Ok(buf) => Ok(buf.clone()),
     421            0 :                             Err(err) => Err(match err {
     422            0 :                                 PageReconstructError::Cancelled => PageReconstructError::Cancelled,
     423              : 
     424            0 :                                 x @ PageReconstructError::Other(_)
     425            0 :                                 | x @ PageReconstructError::AncestorLsnTimeout(_)
     426            0 :                                 | x @ PageReconstructError::WalRedo(_)
     427            0 :                                 | x @ PageReconstructError::MissingKey(_) => {
     428            0 :                                     PageReconstructError::Other(anyhow::anyhow!(
     429            0 :                                         "there was more than one request for this key in the batch, error logged once: {x:?}"
     430            0 :                                     ))
     431              :                                 }
     432              :                             }),
     433              :                         };
     434              : 
     435            0 :                         result_slots[slot].write(clone);
     436              :                         // There is no standardized way to express that the batched span followed from N request spans.
     437              :                         // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
     438              :                         // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
     439            0 :                         req_ctx.perf_follows_from(ctx);
     440            0 :                         slots_filled += 1;
     441              :                     }
     442              : 
     443         9192 :                     result_slots[first_slot].write(res);
     444         9192 :                     first_req_ctx.perf_follows_from(ctx);
     445         9192 :                     slots_filled += 1;
     446              :                 }
     447              :             }
     448            0 :             Err(err) => {
     449              :                 // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
     450              :                 // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
     451            0 :                 for (slot, req_ctx) in keys_slots.values().flatten() {
     452              :                     // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
     453              :                     // but without taking ownership of the GetVectoredError
     454            0 :                     let err = match &err {
     455            0 :                         GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
     456              :                         // TODO: restructure get_vectored API to make this error per-key
     457            0 :                         GetVectoredError::MissingKey(err) => {
     458            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!(
     459            0 :                                 "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
     460            0 :                             )))
     461              :                         }
     462              :                         // TODO: restructure get_vectored API to make this error per-key
     463            0 :                         GetVectoredError::GetReadyAncestorError(err) => {
     464            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!(
     465            0 :                                 "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
     466            0 :                             )))
     467              :                         }
     468              :                         // TODO: restructure get_vectored API to make this error per-key
     469            0 :                         GetVectoredError::Other(err) => Err(PageReconstructError::Other(
     470            0 :                             anyhow::anyhow!("whole vectored get request failed: {err:?}"),
     471            0 :                         )),
     472              :                         // TODO: we can prevent this error class by moving this check into the type system
     473            0 :                         GetVectoredError::InvalidLsn(e) => {
     474            0 :                             Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
     475              :                         }
     476              :                         // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys
     477              :                         // TODO: we can prevent this error class by moving this check into the type system
     478            0 :                         GetVectoredError::Oversized(err, max) => {
     479            0 :                             Err(anyhow::anyhow!("batching oversized: {err} > {max}").into())
     480              :                         }
     481              :                     };
     482              : 
     483            0 :                     req_ctx.perf_follows_from(ctx);
     484            0 :                     result_slots[*slot].write(err);
     485              :                 }
     486              : 
     487            0 :                 slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
     488              :             }
     489              :         };
     490              : 
     491         9192 :         assert_eq!(slots_filled, page_count);
     492              :         // SAFETY:
     493              :         // 1. `result` and any of its uninint members are not read from until this point
     494              :         // 2. The length below is tracked at run-time and matches the number of requested pages.
     495         9192 :         unsafe {
     496         9192 :             result.set_len(page_count);
     497         9192 :         }
     498              : 
     499         9192 :         result
     500         9192 :     }
     501              : 
     502              :     /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
     503              :     /// other shards, by only accounting for relations the shard has pages for, and only accounting
     504              :     /// for pages up to the highest page number it has stored.
     505            0 :     pub(crate) async fn get_db_size(
     506            0 :         &self,
     507            0 :         spcnode: Oid,
     508            0 :         dbnode: Oid,
     509            0 :         version: Version<'_>,
     510            0 :         ctx: &RequestContext,
     511            0 :     ) -> Result<usize, PageReconstructError> {
     512            0 :         let mut total_blocks = 0;
     513              : 
     514            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     515              : 
     516            0 :         if rels.is_empty() {
     517            0 :             return Ok(0);
     518            0 :         }
     519              : 
     520              :         // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
     521              :         // TODO: refactor this read path to use reldir v2.
     522            0 :         let reldir_key = rel_dir_to_key(spcnode, dbnode);
     523            0 :         let buf = version.get(self, reldir_key, ctx).await?;
     524            0 :         let reldir = RelDirectory::des(&buf)?;
     525              : 
     526            0 :         for rel in rels {
     527            0 :             let n_blocks = self
     528            0 :                 .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), false, ctx)
     529            0 :                 .await?
     530            0 :                 .expect("allow_missing=false");
     531            0 :             total_blocks += n_blocks as usize;
     532              :         }
     533            0 :         Ok(total_blocks)
     534            0 :     }
     535              : 
     536              :     /// Get size of a relation file. The relation must exist, otherwise an error is returned.
     537              :     ///
     538              :     /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
     539              :     /// page number stored in the shard.
     540        12217 :     pub(crate) async fn get_rel_size(
     541        12217 :         &self,
     542        12217 :         tag: RelTag,
     543        12217 :         version: Version<'_>,
     544        12217 :         ctx: &RequestContext,
     545        12217 :     ) -> Result<BlockNumber, PageReconstructError> {
     546        12217 :         Ok(self
     547        12217 :             .get_rel_size_in_reldir(tag, version, None, false, ctx)
     548        12217 :             .await?
     549        12215 :             .expect("allow_missing=false"))
     550        12217 :     }
     551              : 
     552              :     /// Get size of a relation file. If `allow_missing` is true, returns None for missing relations,
     553              :     /// otherwise errors.
     554              :     ///
     555              :     /// INVARIANT: never returns None if `allow_missing=false`.
     556              :     ///
     557              :     /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
     558        12217 :     pub(crate) async fn get_rel_size_in_reldir(
     559        12217 :         &self,
     560        12217 :         tag: RelTag,
     561        12217 :         version: Version<'_>,
     562        12217 :         deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
     563        12217 :         allow_missing: bool,
     564        12217 :         ctx: &RequestContext,
     565        12217 :     ) -> Result<Option<BlockNumber>, PageReconstructError> {
     566        12217 :         if tag.relnode == 0 {
     567            0 :             return Err(PageReconstructError::Other(
     568            0 :                 RelationError::InvalidRelnode.into(),
     569            0 :             ));
     570        12217 :         }
     571              : 
     572        12217 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
     573        12210 :             return Ok(Some(nblocks));
     574            7 :         }
     575              : 
     576            7 :         if allow_missing
     577            0 :             && !self
     578            0 :                 .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
     579            0 :                 .await?
     580              :         {
     581            0 :             return Ok(None);
     582            7 :         }
     583              : 
     584            7 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     585            0 :             && !self
     586            0 :                 .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
     587            0 :                 .await?
     588              :         {
     589              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     590              :             // FSM, and smgrnblocks() on it immediately afterwards,
     591              :             // without extending it.  Tolerate that by claiming that
     592              :             // any non-existent FSM fork has size 0.
     593            0 :             return Ok(Some(0));
     594            7 :         }
     595              : 
     596            7 :         let key = rel_size_to_key(tag);
     597            7 :         let mut buf = version.get(self, key, ctx).await?;
     598            5 :         let nblocks = buf.get_u32_le();
     599              : 
     600            5 :         self.update_cached_rel_size(tag, version, nblocks);
     601              : 
     602            5 :         Ok(Some(nblocks))
     603        12217 :     }
     604              : 
     605              :     /// Does the relation exist?
     606              :     ///
     607              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     608              :     /// the shard stores pages for.
     609              :     ///
     610         3025 :     pub(crate) async fn get_rel_exists(
     611         3025 :         &self,
     612         3025 :         tag: RelTag,
     613         3025 :         version: Version<'_>,
     614         3025 :         ctx: &RequestContext,
     615         3025 :     ) -> Result<bool, PageReconstructError> {
     616         3025 :         self.get_rel_exists_in_reldir(tag, version, None, ctx).await
     617         3025 :     }
     618              : 
     619            9 :     async fn get_rel_exists_in_reldir_v1(
     620            9 :         &self,
     621            9 :         tag: RelTag,
     622            9 :         version: Version<'_>,
     623            9 :         deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
     624            9 :         ctx: &RequestContext,
     625            9 :     ) -> Result<bool, PageReconstructError> {
     626            9 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     627            9 :         if let Some((cached_key, dir)) = deserialized_reldir_v1 {
     628            0 :             if cached_key == key {
     629            0 :                 return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
     630            0 :             } else if cfg!(test) || cfg!(feature = "testing") {
     631            0 :                 panic!("cached reldir key mismatch: {cached_key} != {key}");
     632              :             } else {
     633            0 :                 warn!("cached reldir key mismatch: {cached_key} != {key}");
     634              :             }
     635              :             // Fallback to reading the directory from the datadir.
     636            9 :         }
     637              : 
     638            9 :         let buf = version.get(self, key, ctx).await?;
     639              : 
     640            9 :         let dir = RelDirectory::des(&buf)?;
     641            9 :         Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
     642            9 :     }
     643              : 
     644            0 :     async fn get_rel_exists_in_reldir_v2(
     645            0 :         &self,
     646            0 :         tag: RelTag,
     647            0 :         version: Version<'_>,
     648            0 :         ctx: &RequestContext,
     649            0 :     ) -> Result<bool, PageReconstructError> {
     650            0 :         let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
     651            0 :         let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?).map_err(
     652            0 :             |_| {
     653            0 :                 PageReconstructError::Other(anyhow::anyhow!(
     654            0 :                     "invalid reldir key: decode failed, {}",
     655            0 :                     key
     656            0 :                 ))
     657            0 :             },
     658            0 :         )?;
     659            0 :         let exists_v2 = buf == RelDirExists::Exists;
     660            0 :         Ok(exists_v2)
     661            0 :     }
     662              : 
     663              :     /// Does the relation exist? With a cached deserialized `RelDirectory`.
     664              :     ///
     665              :     /// There are some cases where the caller loops across all relations. In that specific case,
     666              :     /// the caller should obtain the deserialized `RelDirectory` first and then call this function
     667              :     /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
     668              :     /// a new API (e.g., `get_rel_exists_batched`).
     669         3025 :     pub(crate) async fn get_rel_exists_in_reldir(
     670         3025 :         &self,
     671         3025 :         tag: RelTag,
     672         3025 :         version: Version<'_>,
     673         3025 :         deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
     674         3025 :         ctx: &RequestContext,
     675         3025 :     ) -> Result<bool, PageReconstructError> {
     676         3025 :         if tag.relnode == 0 {
     677            0 :             return Err(PageReconstructError::Other(
     678            0 :                 RelationError::InvalidRelnode.into(),
     679            0 :             ));
     680         3025 :         }
     681              : 
     682              :         // first try to lookup relation in cache
     683         3025 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
     684         3016 :             return Ok(true);
     685            9 :         }
     686              :         // then check if the database was already initialized.
     687              :         // get_rel_exists can be called before dbdir is created.
     688            9 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     689            9 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     690            9 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     691            0 :             return Ok(false);
     692            9 :         }
     693              : 
     694            9 :         let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
     695              : 
     696            0 :         match v2_status {
     697              :             RelSizeMigration::Legacy => {
     698            9 :                 let v1_exists = self
     699            9 :                     .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
     700            9 :                     .await?;
     701            9 :                 Ok(v1_exists)
     702              :             }
     703              :             RelSizeMigration::Migrating | RelSizeMigration::Migrated
     704            0 :                 if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
     705              :             {
     706              :                 // For requests below the migrated LSN, we still use the v1 read path.
     707            0 :                 let v1_exists = self
     708            0 :                     .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
     709            0 :                     .await?;
     710            0 :                 Ok(v1_exists)
     711              :             }
     712              :             RelSizeMigration::Migrating => {
     713            0 :                 let v1_exists = self
     714            0 :                     .get_rel_exists_in_reldir_v1(tag, version, deserialized_reldir_v1, ctx)
     715            0 :                     .await?;
     716            0 :                 let v2_exists_res = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await;
     717            0 :                 match v2_exists_res {
     718            0 :                     Ok(v2_exists) if v1_exists == v2_exists => {}
     719            0 :                     Ok(v2_exists) => {
     720            0 :                         tracing::warn!(
     721            0 :                             "inconsistent v1/v2 reldir keyspace for rel {}: v1_exists={}, v2_exists={}",
     722              :                             tag,
     723              :                             v1_exists,
     724              :                             v2_exists,
     725              :                         );
     726              :                     }
     727            0 :                     Err(e) if e.is_cancel() => {
     728            0 :                         // Cancellation errors are fine to ignore, do not log.
     729            0 :                     }
     730            0 :                     Err(e) => {
     731            0 :                         tracing::warn!("failed to get rel exists in v2: {e}");
     732              :                     }
     733              :                 }
     734            0 :                 Ok(v1_exists)
     735              :             }
     736              :             RelSizeMigration::Migrated => {
     737            0 :                 let v2_exists = self.get_rel_exists_in_reldir_v2(tag, version, ctx).await?;
     738            0 :                 Ok(v2_exists)
     739              :             }
     740              :         }
     741         3025 :     }
     742              : 
     743            0 :     async fn list_rels_v1(
     744            0 :         &self,
     745            0 :         spcnode: Oid,
     746            0 :         dbnode: Oid,
     747            0 :         version: Version<'_>,
     748            0 :         ctx: &RequestContext,
     749            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     750            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     751            0 :         let buf = version.get(self, key, ctx).await?;
     752            0 :         let dir = RelDirectory::des(&buf)?;
     753            0 :         let rels_v1: HashSet<RelTag> =
     754            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     755            0 :                 spcnode,
     756            0 :                 dbnode,
     757            0 :                 relnode: *relnode,
     758            0 :                 forknum: *forknum,
     759            0 :             }));
     760            0 :         Ok(rels_v1)
     761            0 :     }
     762              : 
     763            0 :     async fn list_rels_v2(
     764            0 :         &self,
     765            0 :         spcnode: Oid,
     766            0 :         dbnode: Oid,
     767            0 :         version: Version<'_>,
     768            0 :         ctx: &RequestContext,
     769            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     770            0 :         let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
     771            0 :         let io_concurrency = IoConcurrency::spawn_from_conf(
     772            0 :             self.conf.get_vectored_concurrent_io,
     773            0 :             self.gate
     774            0 :                 .enter()
     775            0 :                 .map_err(|_| PageReconstructError::Cancelled)?,
     776              :         );
     777            0 :         let results = self
     778            0 :             .scan(
     779            0 :                 KeySpace::single(key_range),
     780            0 :                 version.get_lsn(),
     781            0 :                 ctx,
     782            0 :                 io_concurrency,
     783            0 :             )
     784            0 :             .await?;
     785            0 :         let mut rels = HashSet::new();
     786            0 :         for (key, val) in results {
     787            0 :             let val = RelDirExists::decode(&val?).map_err(|_| {
     788            0 :                 PageReconstructError::Other(anyhow::anyhow!(
     789            0 :                     "invalid reldir key: decode failed, {}",
     790            0 :                     key
     791            0 :                 ))
     792            0 :             })?;
     793            0 :             if key.field6 != 1 {
     794            0 :                 return Err(PageReconstructError::Other(anyhow::anyhow!(
     795            0 :                     "invalid reldir key: field6 != 1, {}",
     796            0 :                     key
     797            0 :                 )));
     798            0 :             }
     799            0 :             if key.field2 != spcnode {
     800            0 :                 return Err(PageReconstructError::Other(anyhow::anyhow!(
     801            0 :                     "invalid reldir key: field2 != spcnode, {}",
     802            0 :                     key
     803            0 :                 )));
     804            0 :             }
     805            0 :             if key.field3 != dbnode {
     806            0 :                 return Err(PageReconstructError::Other(anyhow::anyhow!(
     807            0 :                     "invalid reldir key: field3 != dbnode, {}",
     808            0 :                     key
     809            0 :                 )));
     810            0 :             }
     811            0 :             let tag = RelTag {
     812            0 :                 spcnode,
     813            0 :                 dbnode,
     814            0 :                 relnode: key.field4,
     815            0 :                 forknum: key.field5,
     816            0 :             };
     817            0 :             if val == RelDirExists::Removed {
     818            0 :                 debug_assert!(!rels.contains(&tag), "removed reltag in v2: {tag}");
     819            0 :                 continue;
     820            0 :             }
     821            0 :             let did_not_contain = rels.insert(tag);
     822            0 :             debug_assert!(did_not_contain, "duplicate reltag in v2: {tag}");
     823              :         }
     824            0 :         Ok(rels)
     825            0 :     }
     826              : 
     827              :     /// Get a list of all existing relations in given tablespace and database.
     828              :     ///
     829              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     830              :     /// the shard stores pages for.
     831              :     ///
     832              :     /// # Cancel-Safety
     833              :     ///
     834              :     /// This method is cancellation-safe.
     835            0 :     pub(crate) async fn list_rels(
     836            0 :         &self,
     837            0 :         spcnode: Oid,
     838            0 :         dbnode: Oid,
     839            0 :         version: Version<'_>,
     840            0 :         ctx: &RequestContext,
     841            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     842            0 :         let (v2_status, migrated_lsn) = self.get_rel_size_v2_status();
     843              : 
     844            0 :         match v2_status {
     845              :             RelSizeMigration::Legacy => {
     846            0 :                 let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
     847            0 :                 Ok(rels_v1)
     848              :             }
     849              :             RelSizeMigration::Migrating | RelSizeMigration::Migrated
     850            0 :                 if version.get_lsn() < migrated_lsn.unwrap_or(Lsn(0)) =>
     851              :             {
     852              :                 // For requests below the migrated LSN, we still use the v1 read path.
     853            0 :                 let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
     854            0 :                 Ok(rels_v1)
     855              :             }
     856              :             RelSizeMigration::Migrating => {
     857            0 :                 let rels_v1 = self.list_rels_v1(spcnode, dbnode, version, ctx).await?;
     858            0 :                 let rels_v2_res = self.list_rels_v2(spcnode, dbnode, version, ctx).await;
     859            0 :                 match rels_v2_res {
     860            0 :                     Ok(rels_v2) if rels_v1 == rels_v2 => {}
     861            0 :                     Ok(rels_v2) => {
     862            0 :                         tracing::warn!(
     863            0 :                             "inconsistent v1/v2 reldir keyspace for db {} {}: v1_rels.len()={}, v2_rels.len()={}",
     864              :                             spcnode,
     865              :                             dbnode,
     866            0 :                             rels_v1.len(),
     867            0 :                             rels_v2.len()
     868              :                         );
     869              :                     }
     870            0 :                     Err(e) if e.is_cancel() => {
     871            0 :                         // Cancellation errors are fine to ignore, do not log.
     872            0 :                     }
     873            0 :                     Err(e) => {
     874            0 :                         tracing::warn!("failed to list rels in v2: {e}");
     875              :                     }
     876              :                 }
     877            0 :                 Ok(rels_v1)
     878              :             }
     879              :             RelSizeMigration::Migrated => {
     880            0 :                 let rels_v2 = self.list_rels_v2(spcnode, dbnode, version, ctx).await?;
     881            0 :                 Ok(rels_v2)
     882              :             }
     883              :         }
     884            0 :     }
     885              : 
     886              :     /// Get the whole SLRU segment
     887            0 :     pub(crate) async fn get_slru_segment(
     888            0 :         &self,
     889            0 :         kind: SlruKind,
     890            0 :         segno: u32,
     891            0 :         lsn: Lsn,
     892            0 :         ctx: &RequestContext,
     893            0 :     ) -> Result<Bytes, PageReconstructError> {
     894            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     895            0 :         let n_blocks = self
     896            0 :             .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
     897            0 :             .await?;
     898              : 
     899            0 :         let keyspace = KeySpace::single(
     900            0 :             slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
     901              :         );
     902              : 
     903            0 :         let batches = keyspace.partition(
     904            0 :             self.get_shard_identity(),
     905            0 :             self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
     906            0 :             BLCKSZ as u64,
     907              :         );
     908              : 
     909            0 :         let io_concurrency = IoConcurrency::spawn_from_conf(
     910            0 :             self.conf.get_vectored_concurrent_io,
     911            0 :             self.gate
     912            0 :                 .enter()
     913            0 :                 .map_err(|_| PageReconstructError::Cancelled)?,
     914              :         );
     915              : 
     916            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     917            0 :         for batch in batches.parts {
     918            0 :             let query = VersionedKeySpaceQuery::uniform(batch, lsn);
     919            0 :             let blocks = self
     920            0 :                 .get_vectored(query, io_concurrency.clone(), ctx)
     921            0 :                 .await?;
     922              : 
     923            0 :             for (_key, block) in blocks {
     924            0 :                 let block = block?;
     925            0 :                 segment.extend_from_slice(&block[..BLCKSZ as usize]);
     926              :             }
     927              :         }
     928              : 
     929            0 :         Ok(segment.freeze())
     930            0 :     }
     931              : 
     932              :     /// Get size of an SLRU segment
     933            0 :     pub(crate) async fn get_slru_segment_size(
     934            0 :         &self,
     935            0 :         kind: SlruKind,
     936            0 :         segno: u32,
     937            0 :         version: Version<'_>,
     938            0 :         ctx: &RequestContext,
     939            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     940            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     941            0 :         let key = slru_segment_size_to_key(kind, segno);
     942            0 :         let mut buf = version.get(self, key, ctx).await?;
     943            0 :         Ok(buf.get_u32_le())
     944            0 :     }
     945              : 
     946              :     /// Does the slru segment exist?
     947            0 :     pub(crate) async fn get_slru_segment_exists(
     948            0 :         &self,
     949            0 :         kind: SlruKind,
     950            0 :         segno: u32,
     951            0 :         version: Version<'_>,
     952            0 :         ctx: &RequestContext,
     953            0 :     ) -> Result<bool, PageReconstructError> {
     954            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     955              :         // fetch directory listing
     956            0 :         let key = slru_dir_to_key(kind);
     957            0 :         let buf = version.get(self, key, ctx).await?;
     958              : 
     959            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     960            0 :         Ok(dir.segments.contains(&segno))
     961            0 :     }
     962              : 
     963              :     /// Locate LSN, such that all transactions that committed before
     964              :     /// 'search_timestamp' are visible, but nothing newer is.
     965              :     ///
     966              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     967              :     /// so it's not well defined which LSN you get if there were multiple commits
     968              :     /// "in flight" at that point in time.
     969              :     ///
     970            0 :     pub(crate) async fn find_lsn_for_timestamp(
     971            0 :         &self,
     972            0 :         search_timestamp: TimestampTz,
     973            0 :         cancel: &CancellationToken,
     974            0 :         ctx: &RequestContext,
     975            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     976            0 :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     977              : 
     978            0 :         let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
     979            0 :         let gc_cutoff_planned = {
     980            0 :             let gc_info = self.gc_info.read().unwrap();
     981            0 :             info!(cutoffs=?gc_info.cutoffs, applied_cutoff=%*gc_cutoff_lsn_guard, "starting find_lsn_for_timestamp");
     982            0 :             gc_info.min_cutoff()
     983              :         };
     984              :         // Usually the planned cutoff is newer than the cutoff of the last gc run,
     985              :         // but let's be defensive.
     986            0 :         let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
     987              :         // We use this method to figure out the branching LSN for the new branch, but the
     988              :         // GC cutoff could be before the branching point and we cannot create a new branch
     989              :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     990              :         // on the safe side.
     991            0 :         let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
     992            0 :         let max_lsn = self.get_last_record_lsn();
     993              : 
     994              :         // LSNs are always 8-byte aligned. low/mid/high represent the
     995              :         // LSN divided by 8.
     996            0 :         let mut low = min_lsn.0 / 8;
     997            0 :         let mut high = max_lsn.0 / 8 + 1;
     998              : 
     999            0 :         let mut found_smaller = false;
    1000            0 :         let mut found_larger = false;
    1001              : 
    1002            0 :         while low < high {
    1003            0 :             if cancel.is_cancelled() {
    1004            0 :                 return Err(PageReconstructError::Cancelled);
    1005            0 :             }
    1006              :             // cannot overflow, high and low are both smaller than u64::MAX / 2
    1007            0 :             let mid = (high + low) / 2;
    1008              : 
    1009            0 :             let cmp = match self
    1010            0 :                 .is_latest_commit_timestamp_ge_than(
    1011            0 :                     search_timestamp,
    1012            0 :                     Lsn(mid * 8),
    1013            0 :                     &mut found_smaller,
    1014            0 :                     &mut found_larger,
    1015            0 :                     ctx,
    1016              :                 )
    1017            0 :                 .await
    1018              :             {
    1019            0 :                 Ok(res) => res,
    1020            0 :                 Err(PageReconstructError::MissingKey(e)) => {
    1021            0 :                     warn!(
    1022            0 :                         "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
    1023              :                         e
    1024              :                     );
    1025              :                     // Return that we didn't find any requests smaller than the LSN, and logging the error.
    1026            0 :                     return Ok(LsnForTimestamp::Past(min_lsn));
    1027              :                 }
    1028            0 :                 Err(e) => return Err(e),
    1029              :             };
    1030              : 
    1031            0 :             if cmp {
    1032            0 :                 high = mid;
    1033            0 :             } else {
    1034            0 :                 low = mid + 1;
    1035            0 :             }
    1036              :         }
    1037              : 
    1038              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
    1039              :         // so the LSN of the last commit record before or at `search_timestamp`.
    1040              :         // Remove one from `low` to get `t`.
    1041              :         //
    1042              :         // FIXME: it would be better to get the LSN of the previous commit.
    1043              :         // Otherwise, if you restore to the returned LSN, the database will
    1044              :         // include physical changes from later commits that will be marked
    1045              :         // as aborted, and will need to be vacuumed away.
    1046            0 :         let commit_lsn = Lsn((low - 1) * 8);
    1047            0 :         match (found_smaller, found_larger) {
    1048              :             (false, false) => {
    1049              :                 // This can happen if no commit records have been processed yet, e.g.
    1050              :                 // just after importing a cluster.
    1051            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
    1052              :             }
    1053              :             (false, true) => {
    1054              :                 // Didn't find any commit timestamps smaller than the request
    1055            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
    1056              :             }
    1057            0 :             (true, _) if commit_lsn < min_lsn => {
    1058              :                 // the search above did set found_smaller to true but it never increased the lsn.
    1059              :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
    1060              :                 // below the min_lsn. We should never do that.
    1061            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
    1062              :             }
    1063              :             (true, false) => {
    1064              :                 // Only found commits with timestamps smaller than the request.
    1065              :                 // It's still a valid case for branch creation, return it.
    1066              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
    1067              :                 // case, anyway.
    1068            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
    1069              :             }
    1070            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
    1071              :         }
    1072            0 :     }
    1073              : 
    1074              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
    1075              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
    1076              :     ///
    1077              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
    1078              :     /// with a smaller/larger timestamp.
    1079              :     ///
    1080            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
    1081            0 :         &self,
    1082            0 :         search_timestamp: TimestampTz,
    1083            0 :         probe_lsn: Lsn,
    1084            0 :         found_smaller: &mut bool,
    1085            0 :         found_larger: &mut bool,
    1086            0 :         ctx: &RequestContext,
    1087            0 :     ) -> Result<bool, PageReconstructError> {
    1088            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
    1089            0 :             if timestamp >= search_timestamp {
    1090            0 :                 *found_larger = true;
    1091            0 :                 return ControlFlow::Break(true);
    1092            0 :             } else {
    1093            0 :                 *found_smaller = true;
    1094            0 :             }
    1095            0 :             ControlFlow::Continue(())
    1096            0 :         })
    1097            0 :         .await
    1098            0 :     }
    1099              : 
    1100              :     /// Obtain the timestamp for the given lsn.
    1101              :     ///
    1102              :     /// If the lsn has no timestamps (e.g. no commits), returns None.
    1103            0 :     pub(crate) async fn get_timestamp_for_lsn(
    1104            0 :         &self,
    1105            0 :         probe_lsn: Lsn,
    1106            0 :         ctx: &RequestContext,
    1107            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
    1108            0 :         let mut max: Option<TimestampTz> = None;
    1109            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
    1110            0 :             if let Some(max_prev) = max {
    1111            0 :                 max = Some(max_prev.max(timestamp));
    1112            0 :             } else {
    1113            0 :                 max = Some(timestamp);
    1114            0 :             }
    1115            0 :             ControlFlow::Continue(())
    1116            0 :         })
    1117            0 :         .await?;
    1118              : 
    1119            0 :         Ok(max)
    1120            0 :     }
    1121              : 
    1122              :     /// Runs the given function on all the timestamps for a given lsn
    1123              :     ///
    1124              :     /// The return value is either given by the closure, or set to the `Default`
    1125              :     /// impl's output.
    1126            0 :     async fn map_all_timestamps<T: Default>(
    1127            0 :         &self,
    1128            0 :         probe_lsn: Lsn,
    1129            0 :         ctx: &RequestContext,
    1130            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
    1131            0 :     ) -> Result<T, PageReconstructError> {
    1132            0 :         for segno in self
    1133            0 :             .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
    1134            0 :             .await?
    1135              :         {
    1136            0 :             let nblocks = self
    1137            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
    1138            0 :                 .await?;
    1139              : 
    1140            0 :             let keyspace = KeySpace::single(
    1141            0 :                 slru_block_to_key(SlruKind::Clog, segno, 0)
    1142            0 :                     ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
    1143              :             );
    1144              : 
    1145            0 :             let batches = keyspace.partition(
    1146            0 :                 self.get_shard_identity(),
    1147            0 :                 self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
    1148            0 :                 BLCKSZ as u64,
    1149              :             );
    1150              : 
    1151            0 :             let io_concurrency = IoConcurrency::spawn_from_conf(
    1152            0 :                 self.conf.get_vectored_concurrent_io,
    1153            0 :                 self.gate
    1154            0 :                     .enter()
    1155            0 :                     .map_err(|_| PageReconstructError::Cancelled)?,
    1156              :             );
    1157              : 
    1158            0 :             for batch in batches.parts.into_iter().rev() {
    1159            0 :                 let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
    1160            0 :                 let blocks = self
    1161            0 :                     .get_vectored(query, io_concurrency.clone(), ctx)
    1162            0 :                     .await?;
    1163              : 
    1164            0 :                 for (_key, clog_page) in blocks.into_iter().rev() {
    1165            0 :                     let clog_page = clog_page?;
    1166              : 
    1167            0 :                     if clog_page.len() == BLCKSZ as usize + 8 {
    1168            0 :                         let mut timestamp_bytes = [0u8; 8];
    1169            0 :                         timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
    1170            0 :                         let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
    1171              : 
    1172            0 :                         match f(timestamp) {
    1173            0 :                             ControlFlow::Break(b) => return Ok(b),
    1174            0 :                             ControlFlow::Continue(()) => (),
    1175              :                         }
    1176            0 :                     }
    1177              :                 }
    1178              :             }
    1179              :         }
    1180            0 :         Ok(Default::default())
    1181            0 :     }
    1182              : 
    1183            0 :     pub(crate) async fn get_slru_keyspace(
    1184            0 :         &self,
    1185            0 :         version: Version<'_>,
    1186            0 :         ctx: &RequestContext,
    1187            0 :     ) -> Result<KeySpace, PageReconstructError> {
    1188            0 :         let mut accum = KeySpaceAccum::new();
    1189              : 
    1190            0 :         for kind in SlruKind::iter() {
    1191            0 :             let mut segments: Vec<u32> = self
    1192            0 :                 .list_slru_segments(kind, version, ctx)
    1193            0 :                 .await?
    1194            0 :                 .into_iter()
    1195            0 :                 .collect();
    1196            0 :             segments.sort_unstable();
    1197              : 
    1198            0 :             for seg in segments {
    1199            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
    1200              : 
    1201            0 :                 accum.add_range(
    1202            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
    1203              :                 );
    1204              :             }
    1205              :         }
    1206              : 
    1207            0 :         Ok(accum.to_keyspace())
    1208            0 :     }
    1209              : 
    1210              :     /// Get a list of SLRU segments
    1211            0 :     pub(crate) async fn list_slru_segments(
    1212            0 :         &self,
    1213            0 :         kind: SlruKind,
    1214            0 :         version: Version<'_>,
    1215            0 :         ctx: &RequestContext,
    1216            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
    1217              :         // fetch directory entry
    1218            0 :         let key = slru_dir_to_key(kind);
    1219              : 
    1220            0 :         let buf = version.get(self, key, ctx).await?;
    1221            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
    1222            0 :     }
    1223              : 
    1224            0 :     pub(crate) async fn get_relmap_file(
    1225            0 :         &self,
    1226            0 :         spcnode: Oid,
    1227            0 :         dbnode: Oid,
    1228            0 :         version: Version<'_>,
    1229            0 :         ctx: &RequestContext,
    1230            0 :     ) -> Result<Bytes, PageReconstructError> {
    1231            0 :         let key = relmap_file_key(spcnode, dbnode);
    1232              : 
    1233            0 :         let buf = version.get(self, key, ctx).await?;
    1234            0 :         Ok(buf)
    1235            0 :     }
    1236              : 
    1237          169 :     pub(crate) async fn list_dbdirs(
    1238          169 :         &self,
    1239          169 :         lsn: Lsn,
    1240          169 :         ctx: &RequestContext,
    1241          169 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
    1242              :         // fetch directory entry
    1243          169 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
    1244              : 
    1245          169 :         Ok(DbDirectory::des(&buf)?.dbdirs)
    1246          169 :     }
    1247              : 
    1248            0 :     pub(crate) async fn get_twophase_file(
    1249            0 :         &self,
    1250            0 :         xid: u64,
    1251            0 :         lsn: Lsn,
    1252            0 :         ctx: &RequestContext,
    1253            0 :     ) -> Result<Bytes, PageReconstructError> {
    1254            0 :         let key = twophase_file_key(xid);
    1255            0 :         let buf = self.get(key, lsn, ctx).await?;
    1256            0 :         Ok(buf)
    1257            0 :     }
    1258              : 
    1259          170 :     pub(crate) async fn list_twophase_files(
    1260          170 :         &self,
    1261          170 :         lsn: Lsn,
    1262          170 :         ctx: &RequestContext,
    1263          170 :     ) -> Result<HashSet<u64>, PageReconstructError> {
    1264              :         // fetch directory entry
    1265          170 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
    1266              : 
    1267          170 :         if self.pg_version >= PgMajorVersion::PG17 {
    1268          157 :             Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
    1269              :         } else {
    1270           13 :             Ok(TwoPhaseDirectory::des(&buf)?
    1271              :                 .xids
    1272           13 :                 .iter()
    1273           13 :                 .map(|x| u64::from(*x))
    1274           13 :                 .collect())
    1275              :         }
    1276          170 :     }
    1277              : 
    1278            0 :     pub(crate) async fn get_control_file(
    1279            0 :         &self,
    1280            0 :         lsn: Lsn,
    1281            0 :         ctx: &RequestContext,
    1282            0 :     ) -> Result<Bytes, PageReconstructError> {
    1283            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
    1284            0 :     }
    1285              : 
    1286            6 :     pub(crate) async fn get_checkpoint(
    1287            6 :         &self,
    1288            6 :         lsn: Lsn,
    1289            6 :         ctx: &RequestContext,
    1290            6 :     ) -> Result<Bytes, PageReconstructError> {
    1291            6 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
    1292            6 :     }
    1293              : 
    1294            6 :     async fn list_aux_files_v2(
    1295            6 :         &self,
    1296            6 :         lsn: Lsn,
    1297            6 :         ctx: &RequestContext,
    1298            6 :         io_concurrency: IoConcurrency,
    1299            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
    1300            6 :         let kv = self
    1301            6 :             .scan(
    1302            6 :                 KeySpace::single(Key::metadata_aux_key_range()),
    1303            6 :                 lsn,
    1304            6 :                 ctx,
    1305            6 :                 io_concurrency,
    1306            6 :             )
    1307            6 :             .await?;
    1308            6 :         let mut result = HashMap::new();
    1309            6 :         let mut sz = 0;
    1310           15 :         for (_, v) in kv {
    1311            9 :             let v = v?;
    1312            9 :             let v = aux_file::decode_file_value_bytes(&v)
    1313            9 :                 .context("value decode")
    1314            9 :                 .map_err(PageReconstructError::Other)?;
    1315           17 :             for (fname, content) in v {
    1316            8 :                 sz += fname.len();
    1317            8 :                 sz += content.len();
    1318            8 :                 result.insert(fname, content);
    1319            8 :             }
    1320              :         }
    1321            6 :         self.aux_file_size_estimator.on_initial(sz);
    1322            6 :         Ok(result)
    1323            6 :     }
    1324              : 
    1325            0 :     pub(crate) async fn trigger_aux_file_size_computation(
    1326            0 :         &self,
    1327            0 :         lsn: Lsn,
    1328            0 :         ctx: &RequestContext,
    1329            0 :         io_concurrency: IoConcurrency,
    1330            0 :     ) -> Result<(), PageReconstructError> {
    1331            0 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
    1332            0 :         Ok(())
    1333            0 :     }
    1334              : 
    1335            6 :     pub(crate) async fn list_aux_files(
    1336            6 :         &self,
    1337            6 :         lsn: Lsn,
    1338            6 :         ctx: &RequestContext,
    1339            6 :         io_concurrency: IoConcurrency,
    1340            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
    1341            6 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await
    1342            6 :     }
    1343              : 
    1344            2 :     pub(crate) async fn get_replorigins(
    1345            2 :         &self,
    1346            2 :         lsn: Lsn,
    1347            2 :         ctx: &RequestContext,
    1348            2 :         io_concurrency: IoConcurrency,
    1349            2 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
    1350            2 :         let kv = self
    1351            2 :             .scan(
    1352            2 :                 KeySpace::single(repl_origin_key_range()),
    1353            2 :                 lsn,
    1354            2 :                 ctx,
    1355            2 :                 io_concurrency,
    1356            2 :             )
    1357            2 :             .await?;
    1358            2 :         let mut result = HashMap::new();
    1359            6 :         for (k, v) in kv {
    1360            5 :             let v = v?;
    1361            5 :             if v.is_empty() {
    1362              :                 // This is a tombstone -- we can skip it.
    1363              :                 // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
    1364              :                 // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
    1365              :                 // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
    1366              :                 // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
    1367            2 :                 continue;
    1368            3 :             }
    1369            3 :             let origin_id = k.field6 as RepOriginId;
    1370            3 :             let origin_lsn = Lsn::des(&v)
    1371            3 :                 .with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
    1372            2 :             if origin_lsn != Lsn::INVALID {
    1373            2 :                 result.insert(origin_id, origin_lsn);
    1374            2 :             }
    1375              :         }
    1376            1 :         Ok(result)
    1377            2 :     }
    1378              : 
    1379              :     /// Does the same as get_current_logical_size but counted on demand.
    1380              :     /// Used to initialize the logical size tracking on startup.
    1381              :     ///
    1382              :     /// Only relation blocks are counted currently. That excludes metadata,
    1383              :     /// SLRUs, twophase files etc.
    1384              :     ///
    1385              :     /// # Cancel-Safety
    1386              :     ///
    1387              :     /// This method is cancellation-safe.
    1388            7 :     pub(crate) async fn get_current_logical_size_non_incremental(
    1389            7 :         &self,
    1390            7 :         lsn: Lsn,
    1391            7 :         ctx: &RequestContext,
    1392            7 :     ) -> Result<u64, CalculateLogicalSizeError> {
    1393            7 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
    1394              : 
    1395            7 :         fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
    1396              : 
    1397              :         // Fetch list of database dirs and iterate them
    1398            7 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
    1399            7 :         let dbdir = DbDirectory::des(&buf)?;
    1400              : 
    1401            7 :         let mut total_size: u64 = 0;
    1402            7 :         let mut dbdir_cnt = 0;
    1403            7 :         let mut rel_cnt = 0;
    1404              : 
    1405            7 :         for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
    1406            0 :             dbdir_cnt += 1;
    1407            0 :             for rel in self
    1408            0 :                 .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
    1409            0 :                 .await?
    1410              :             {
    1411            0 :                 rel_cnt += 1;
    1412            0 :                 if self.cancel.is_cancelled() {
    1413            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
    1414            0 :                 }
    1415            0 :                 let relsize_key = rel_size_to_key(rel);
    1416            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1417            0 :                 let relsize = buf.get_u32_le();
    1418              : 
    1419            0 :                 total_size += relsize as u64;
    1420              :             }
    1421              :         }
    1422              : 
    1423            7 :         self.db_rel_count
    1424            7 :             .store(Some(Arc::new((dbdir_cnt, rel_cnt))));
    1425              : 
    1426            7 :         Ok(total_size * BLCKSZ as u64)
    1427            7 :     }
    1428              : 
    1429              :     /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
    1430              :     /// for gc-compaction.
    1431              :     ///
    1432              :     /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
    1433              :     /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
    1434              :     /// be kept only for a specific range of LSN.
    1435              :     ///
    1436              :     /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
    1437              :     /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
    1438              :     /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
    1439              :     /// determine which keys to retain/drop for gc-compaction.
    1440              :     ///
    1441              :     /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
    1442              :     /// to be retained for each of the branch LSN.
    1443              :     ///
    1444              :     /// The return value is (dense keyspace, sparse keyspace).
    1445           27 :     pub(crate) async fn collect_gc_compaction_keyspace(
    1446           27 :         &self,
    1447           27 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1448           27 :         let metadata_key_begin = Key::metadata_key_range().start;
    1449           27 :         let aux_v1_key = AUX_FILES_KEY;
    1450           27 :         let dense_keyspace = KeySpace {
    1451           27 :             ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
    1452           27 :         };
    1453           27 :         Ok((
    1454           27 :             dense_keyspace,
    1455           27 :             SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
    1456           27 :         ))
    1457           27 :     }
    1458              : 
    1459              :     ///
    1460              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
    1461              :     /// Anything that's not listed maybe removed from the underlying storage (from
    1462              :     /// that LSN forwards).
    1463              :     ///
    1464              :     /// The return value is (dense keyspace, sparse keyspace).
    1465          169 :     pub(crate) async fn collect_keyspace(
    1466          169 :         &self,
    1467          169 :         lsn: Lsn,
    1468          169 :         ctx: &RequestContext,
    1469          169 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1470              :         // Iterate through key ranges, greedily packing them into partitions
    1471          169 :         let mut result = KeySpaceAccum::new();
    1472              : 
    1473              :         // The dbdir metadata always exists
    1474          169 :         result.add_key(DBDIR_KEY);
    1475              : 
    1476              :         // Fetch list of database dirs and iterate them
    1477          169 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
    1478          169 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
    1479              : 
    1480          169 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
    1481          169 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
    1482            0 :             if has_relmap_file {
    1483            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
    1484            0 :             }
    1485            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
    1486              : 
    1487            0 :             let mut rels: Vec<RelTag> = self
    1488            0 :                 .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
    1489            0 :                 .await?
    1490            0 :                 .into_iter()
    1491            0 :                 .collect();
    1492            0 :             rels.sort_unstable();
    1493            0 :             for rel in rels {
    1494            0 :                 let relsize_key = rel_size_to_key(rel);
    1495            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1496            0 :                 let relsize = buf.get_u32_le();
    1497              : 
    1498            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
    1499            0 :                 result.add_key(relsize_key);
    1500              :             }
    1501              :         }
    1502              : 
    1503              :         // Iterate SLRUs next
    1504          169 :         if self.tenant_shard_id.is_shard_zero() {
    1505          498 :             for kind in [
    1506          166 :                 SlruKind::Clog,
    1507          166 :                 SlruKind::MultiXactMembers,
    1508          166 :                 SlruKind::MultiXactOffsets,
    1509              :             ] {
    1510          498 :                 let slrudir_key = slru_dir_to_key(kind);
    1511          498 :                 result.add_key(slrudir_key);
    1512          498 :                 let buf = self.get(slrudir_key, lsn, ctx).await?;
    1513          498 :                 let dir = SlruSegmentDirectory::des(&buf)?;
    1514          498 :                 let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
    1515          498 :                 segments.sort_unstable();
    1516          498 :                 for segno in segments {
    1517            0 :                     let segsize_key = slru_segment_size_to_key(kind, segno);
    1518            0 :                     let mut buf = self.get(segsize_key, lsn, ctx).await?;
    1519            0 :                     let segsize = buf.get_u32_le();
    1520              : 
    1521            0 :                     result.add_range(
    1522            0 :                         slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
    1523              :                     );
    1524            0 :                     result.add_key(segsize_key);
    1525              :                 }
    1526              :             }
    1527            3 :         }
    1528              : 
    1529              :         // Then pg_twophase
    1530          169 :         result.add_key(TWOPHASEDIR_KEY);
    1531              : 
    1532          169 :         let mut xids: Vec<u64> = self
    1533          169 :             .list_twophase_files(lsn, ctx)
    1534          169 :             .await?
    1535          169 :             .iter()
    1536          169 :             .cloned()
    1537          169 :             .collect();
    1538          169 :         xids.sort_unstable();
    1539          169 :         for xid in xids {
    1540            0 :             result.add_key(twophase_file_key(xid));
    1541            0 :         }
    1542              : 
    1543          169 :         result.add_key(CONTROLFILE_KEY);
    1544          169 :         result.add_key(CHECKPOINT_KEY);
    1545              : 
    1546              :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
    1547              :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
    1548              :         // and the keys will not be garbage-colllected.
    1549              :         #[cfg(test)]
    1550              :         {
    1551          169 :             let guard = self.extra_test_dense_keyspace.load();
    1552          169 :             for kr in &guard.ranges {
    1553            0 :                 result.add_range(kr.clone());
    1554            0 :             }
    1555              :         }
    1556              : 
    1557          169 :         let dense_keyspace = result.to_keyspace();
    1558          169 :         let sparse_keyspace = SparseKeySpace(KeySpace {
    1559          169 :             ranges: vec![
    1560          169 :                 Key::metadata_aux_key_range(),
    1561          169 :                 repl_origin_key_range(),
    1562          169 :                 Key::rel_dir_sparse_key_range(),
    1563          169 :             ],
    1564          169 :         });
    1565              : 
    1566          169 :         if cfg!(debug_assertions) {
    1567              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
    1568              : 
    1569              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
    1570              :             // category of sparse keys are split into their own image/delta files. If there
    1571              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
    1572              :             // and we want the developer to keep the keyspaces separated.
    1573              : 
    1574          169 :             let ranges = &sparse_keyspace.0.ranges;
    1575              : 
    1576              :             // TODO: use a single overlaps_with across the codebase
    1577          507 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
    1578          507 :                 !(a.end <= b.start || b.end <= a.start)
    1579          507 :             }
    1580          507 :             for i in 0..ranges.len() {
    1581          507 :                 for j in 0..i {
    1582          507 :                     if overlaps_with(&ranges[i], &ranges[j]) {
    1583            0 :                         panic!(
    1584            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
    1585            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
    1586              :                         );
    1587          507 :                     }
    1588              :                 }
    1589              :             }
    1590          338 :             for i in 1..ranges.len() {
    1591          338 :                 assert!(
    1592          338 :                     ranges[i - 1].end <= ranges[i].start,
    1593            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
    1594            0 :                     ranges[i - 1].start,
    1595            0 :                     ranges[i - 1].end,
    1596            0 :                     ranges[i].start,
    1597            0 :                     ranges[i].end
    1598              :                 );
    1599              :             }
    1600            0 :         }
    1601              : 
    1602          169 :         Ok((dense_keyspace, sparse_keyspace))
    1603          169 :     }
    1604              : 
    1605              :     /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
    1606              :     /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
    1607              :     /// at the particular LSN (snapshot).
    1608       224270 :     pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
    1609       224270 :         let lsn = version.get_lsn();
    1610              :         {
    1611       224270 :             let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
    1612       224270 :             if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
    1613       224259 :                 if lsn >= *cached_lsn {
    1614       221686 :                     RELSIZE_LATEST_CACHE_HITS.inc();
    1615       221686 :                     return Some(*nblocks);
    1616         2573 :                 }
    1617         2573 :                 RELSIZE_CACHE_MISSES_OLD.inc();
    1618           11 :             }
    1619              :         }
    1620              :         {
    1621         2584 :             let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
    1622         2584 :             if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
    1623         2563 :                 RELSIZE_SNAPSHOT_CACHE_HITS.inc();
    1624         2563 :                 return Some(*nblock);
    1625           21 :             }
    1626              :         }
    1627           21 :         if version.is_latest() {
    1628           10 :             RELSIZE_LATEST_CACHE_MISSES.inc();
    1629           11 :         } else {
    1630           11 :             RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
    1631           11 :         }
    1632           21 :         None
    1633       224270 :     }
    1634              : 
    1635              :     /// Update cached relation size if there is no more recent update
    1636            5 :     pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
    1637            5 :         let lsn = version.get_lsn();
    1638            5 :         if version.is_latest() {
    1639            0 :             let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
    1640            0 :             match rel_size_cache.entry(tag) {
    1641            0 :                 hash_map::Entry::Occupied(mut entry) => {
    1642            0 :                     let cached_lsn = entry.get_mut();
    1643            0 :                     if lsn >= cached_lsn.0 {
    1644            0 :                         *cached_lsn = (lsn, nblocks);
    1645            0 :                     }
    1646              :                 }
    1647            0 :                 hash_map::Entry::Vacant(entry) => {
    1648            0 :                     entry.insert((lsn, nblocks));
    1649            0 :                     RELSIZE_LATEST_CACHE_ENTRIES.inc();
    1650            0 :                 }
    1651              :             }
    1652              :         } else {
    1653            5 :             let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
    1654            5 :             if rel_size_cache.capacity() != 0 {
    1655            5 :                 rel_size_cache.insert((lsn, tag), nblocks);
    1656            5 :                 RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
    1657            5 :             }
    1658              :         }
    1659            5 :     }
    1660              : 
    1661              :     /// Store cached relation size
    1662       141360 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1663       141360 :         let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
    1664       141360 :         if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
    1665          960 :             RELSIZE_LATEST_CACHE_ENTRIES.inc();
    1666       140400 :         }
    1667       141360 :     }
    1668              : 
    1669              :     /// Remove cached relation size
    1670            1 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1671            1 :         let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
    1672            1 :         if rel_size_cache.remove(tag).is_some() {
    1673            1 :             RELSIZE_LATEST_CACHE_ENTRIES.dec();
    1674            1 :         }
    1675            1 :     }
    1676              : }
    1677              : 
    1678              : /// DatadirModification represents an operation to ingest an atomic set of
    1679              : /// updates to the repository.
    1680              : ///
    1681              : /// It is created by the 'begin_record' function. It is called for each WAL
    1682              : /// record, so that all the modifications by a one WAL record appear atomic.
    1683              : pub struct DatadirModification<'a> {
    1684              :     /// The timeline this modification applies to. You can access this to
    1685              :     /// read the state, but note that any pending updates are *not* reflected
    1686              :     /// in the state in 'tline' yet.
    1687              :     pub tline: &'a Timeline,
    1688              : 
    1689              :     /// Current LSN of the modification
    1690              :     lsn: Lsn,
    1691              : 
    1692              :     // The modifications are not applied directly to the underlying key-value store.
    1693              :     // The put-functions add the modifications here, and they are flushed to the
    1694              :     // underlying key-value store by the 'finish' function.
    1695              :     pending_lsns: Vec<Lsn>,
    1696              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1697              :     pending_nblocks: i64,
    1698              : 
    1699              :     /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
    1700              :     /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
    1701              :     pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
    1702              : 
    1703              :     /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
    1704              :     /// which keys are stored here.
    1705              :     pending_data_batch: Option<SerializedValueBatch>,
    1706              : 
    1707              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1708              :     /// if it was updated in this modification.
    1709              :     pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
    1710              : 
    1711              :     /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
    1712              :     pending_metadata_bytes: usize,
    1713              : 
    1714              :     /// Whether we are importing a pgdata directory.
    1715              :     is_importing_pgdata: bool,
    1716              : }
    1717              : 
    1718              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    1719              : pub enum MetricsUpdate {
    1720              :     /// Set the metrics to this value
    1721              :     Set(u64),
    1722              :     /// Increment the metrics by this value
    1723              :     Add(u64),
    1724              :     /// Decrement the metrics by this value
    1725              :     Sub(u64),
    1726              : }
    1727              : 
    1728              : /// Controls the behavior of the reldir keyspace.
    1729              : pub struct RelDirMode {
    1730              :     // Whether we can read the v2 keyspace or not.
    1731              :     current_status: RelSizeMigration,
    1732              :     // Whether we should initialize the v2 keyspace or not.
    1733              :     initialize: bool,
    1734              :     // Whether we should disable v1 access starting this LSNor not
    1735              :     disable_v1: bool,
    1736              : }
    1737              : 
    1738              : impl DatadirModification<'_> {
    1739              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1740              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1741              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1742              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1743              : 
    1744              :     /// Get the current lsn
    1745            1 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1746            1 :         self.lsn
    1747            1 :     }
    1748              : 
    1749            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1750            0 :         self.pending_data_batch
    1751            0 :             .as_ref()
    1752            0 :             .map_or(0, |b| b.buffer_size())
    1753            0 :             + self.pending_metadata_bytes
    1754            0 :     }
    1755              : 
    1756            0 :     pub(crate) fn has_dirty_data(&self) -> bool {
    1757            0 :         self.pending_data_batch
    1758            0 :             .as_ref()
    1759            0 :             .is_some_and(|b| b.has_data())
    1760            0 :     }
    1761              : 
    1762              :     /// Returns statistics about the currently pending modifications.
    1763            0 :     pub(crate) fn stats(&self) -> DatadirModificationStats {
    1764            0 :         let mut stats = DatadirModificationStats::default();
    1765            0 :         for (_, _, value) in self.pending_metadata_pages.values().flatten() {
    1766            0 :             match value {
    1767            0 :                 Value::Image(_) => stats.metadata_images += 1,
    1768            0 :                 Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
    1769            0 :                 Value::WalRecord(_) => stats.metadata_deltas += 1,
    1770              :             }
    1771              :         }
    1772            0 :         for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
    1773            0 :             match valuemeta {
    1774            0 :                 ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
    1775            0 :                 ValueMeta::Serialized(_) => stats.data_deltas += 1,
    1776            0 :                 ValueMeta::Observed(_) => {}
    1777              :             }
    1778              :         }
    1779            0 :         stats
    1780            0 :     }
    1781              : 
    1782              :     /// Set the current lsn
    1783        72929 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
    1784        72929 :         ensure_walingest!(
    1785              :             lsn >= self.lsn,
    1786              :             "setting an older lsn {} than {} is not allowed",
    1787              :             lsn,
    1788              :             self.lsn
    1789              :         );
    1790              : 
    1791        72929 :         if lsn > self.lsn {
    1792        72929 :             self.pending_lsns.push(self.lsn);
    1793        72929 :             self.lsn = lsn;
    1794        72929 :         }
    1795        72929 :         Ok(())
    1796        72929 :     }
    1797              : 
    1798              :     /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
    1799              :     /// keys that represent literal blocks that postgres can read.  So data includes relation blocks and
    1800              :     /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
    1801              :     ///
    1802              :     /// The distinction is important because data keys are handled on a fast path where dirty writes are
    1803              :     /// not readable until this modification is committed, whereas metadata keys are visible for read
    1804              :     /// via [`Self::get`] as soon as their record has been ingested.
    1805       425371 :     fn is_data_key(key: &Key) -> bool {
    1806       425371 :         key.is_rel_block_key() || key.is_slru_block_key()
    1807       425371 :     }
    1808              : 
    1809              :     /// Initialize a completely new repository.
    1810              :     ///
    1811              :     /// This inserts the directory metadata entries that are assumed to
    1812              :     /// always exist.
    1813          112 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1814          112 :         let buf = DbDirectory::ser(&DbDirectory {
    1815          112 :             dbdirs: HashMap::new(),
    1816          112 :         })?;
    1817          112 :         self.pending_directory_entries
    1818          112 :             .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
    1819          112 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1820              : 
    1821          112 :         let buf = if self.tline.pg_version >= PgMajorVersion::PG17 {
    1822           99 :             TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
    1823           99 :                 xids: HashSet::new(),
    1824           99 :             })
    1825              :         } else {
    1826           13 :             TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1827           13 :                 xids: HashSet::new(),
    1828           13 :             })
    1829            0 :         }?;
    1830          112 :         self.pending_directory_entries
    1831          112 :             .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
    1832          112 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1833              : 
    1834          112 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1835          112 :         let empty_dir = Value::Image(buf);
    1836              : 
    1837              :         // Initialize SLRUs on shard 0 only: creating these on other shards would be
    1838              :         // harmless but they'd just be dropped on later compaction.
    1839          112 :         if self.tline.tenant_shard_id.is_shard_zero() {
    1840          109 :             self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1841          109 :             self.pending_directory_entries.push((
    1842          109 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1843          109 :                 MetricsUpdate::Set(0),
    1844          109 :             ));
    1845          109 :             self.put(
    1846          109 :                 slru_dir_to_key(SlruKind::MultiXactMembers),
    1847          109 :                 empty_dir.clone(),
    1848          109 :             );
    1849          109 :             self.pending_directory_entries.push((
    1850          109 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1851          109 :                 MetricsUpdate::Set(0),
    1852          109 :             ));
    1853          109 :             self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1854          109 :             self.pending_directory_entries.push((
    1855          109 :                 DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
    1856          109 :                 MetricsUpdate::Set(0),
    1857          109 :             ));
    1858          109 :         }
    1859              : 
    1860          112 :         Ok(())
    1861          112 :     }
    1862              : 
    1863              :     #[cfg(test)]
    1864          111 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1865          111 :         self.init_empty()?;
    1866          111 :         self.put_control_file(bytes::Bytes::from_static(
    1867          111 :             b"control_file contents do not matter",
    1868              :         ))
    1869          111 :         .context("put_control_file")?;
    1870          111 :         self.put_checkpoint(bytes::Bytes::from_static(
    1871          111 :             b"checkpoint_file contents do not matter",
    1872              :         ))
    1873          111 :         .context("put_checkpoint_file")?;
    1874          111 :         Ok(())
    1875          111 :     }
    1876              : 
    1877              :     /// Creates a relation if it is not already present.
    1878              :     /// Returns the current size of the relation
    1879       209028 :     pub(crate) async fn create_relation_if_required(
    1880       209028 :         &mut self,
    1881       209028 :         rel: RelTag,
    1882       209028 :         ctx: &RequestContext,
    1883       209028 :     ) -> Result<u32, WalIngestError> {
    1884              :         // Get current size and put rel creation if rel doesn't exist
    1885              :         //
    1886              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1887              :         //       check the cache too. This is because eagerly checking the cache results in
    1888              :         //       less work overall and 10% better performance. It's more work on cache miss
    1889              :         //       but cache miss is rare.
    1890       209028 :         if let Some(nblocks) = self
    1891       209028 :             .tline
    1892       209028 :             .get_cached_rel_size(&rel, Version::Modified(self))
    1893              :         {
    1894       209023 :             Ok(nblocks)
    1895            5 :         } else if !self
    1896            5 :             .tline
    1897            5 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1898            5 :             .await?
    1899              :         {
    1900              :             // create it with 0 size initially, the logic below will extend it
    1901            5 :             self.put_rel_creation(rel, 0, ctx).await?;
    1902            5 :             Ok(0)
    1903              :         } else {
    1904            0 :             Ok(self
    1905            0 :                 .tline
    1906            0 :                 .get_rel_size(rel, Version::Modified(self), ctx)
    1907            0 :                 .await?)
    1908              :         }
    1909       209028 :     }
    1910              : 
    1911              :     /// Given a block number for a relation (which represents a newly written block),
    1912              :     /// the previous block count of the relation, and the shard info, find the gaps
    1913              :     /// that were created by the newly written block if any.
    1914        72835 :     fn find_gaps(
    1915        72835 :         rel: RelTag,
    1916        72835 :         blkno: u32,
    1917        72835 :         previous_nblocks: u32,
    1918        72835 :         shard: &ShardIdentity,
    1919        72835 :     ) -> Option<KeySpace> {
    1920        72835 :         let mut key = rel_block_to_key(rel, blkno);
    1921        72835 :         let mut gap_accum = None;
    1922              : 
    1923        72835 :         for gap_blkno in previous_nblocks..blkno {
    1924           16 :             key.field6 = gap_blkno;
    1925              : 
    1926           16 :             if shard.get_shard_number(&key) != shard.number {
    1927            4 :                 continue;
    1928           12 :             }
    1929              : 
    1930           12 :             gap_accum
    1931           12 :                 .get_or_insert_with(KeySpaceAccum::new)
    1932           12 :                 .add_key(key);
    1933              :         }
    1934              : 
    1935        72835 :         gap_accum.map(|accum| accum.to_keyspace())
    1936        72835 :     }
    1937              : 
    1938        72926 :     pub async fn ingest_batch(
    1939        72926 :         &mut self,
    1940        72926 :         mut batch: SerializedValueBatch,
    1941        72926 :         // TODO(vlad): remove this argument and replace the shard check with is_key_local
    1942        72926 :         shard: &ShardIdentity,
    1943        72926 :         ctx: &RequestContext,
    1944        72926 :     ) -> Result<(), WalIngestError> {
    1945        72926 :         let mut gaps_at_lsns = Vec::default();
    1946              : 
    1947        72926 :         for meta in batch.metadata.iter() {
    1948        72821 :             let key = Key::from_compact(meta.key());
    1949        72821 :             let (rel, blkno) = key
    1950        72821 :                 .to_rel_block()
    1951        72821 :                 .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
    1952        72821 :             let new_nblocks = blkno + 1;
    1953              : 
    1954        72821 :             let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
    1955        72821 :             if new_nblocks > old_nblocks {
    1956         1195 :                 self.put_rel_extend(rel, new_nblocks, ctx).await?;
    1957        71626 :             }
    1958              : 
    1959        72821 :             if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
    1960            0 :                 gaps_at_lsns.push((gaps, meta.lsn()));
    1961        72821 :             }
    1962              :         }
    1963              : 
    1964        72926 :         if !gaps_at_lsns.is_empty() {
    1965            0 :             batch.zero_gaps(gaps_at_lsns);
    1966        72926 :         }
    1967              : 
    1968        72926 :         match self.pending_data_batch.as_mut() {
    1969           10 :             Some(pending_batch) => {
    1970           10 :                 pending_batch.extend(batch);
    1971           10 :             }
    1972        72916 :             None if batch.has_data() => {
    1973        72815 :                 self.pending_data_batch = Some(batch);
    1974        72815 :             }
    1975          101 :             None => {
    1976          101 :                 // Nothing to initialize the batch with
    1977          101 :             }
    1978              :         }
    1979              : 
    1980        72926 :         Ok(())
    1981        72926 :     }
    1982              : 
    1983              :     /// Put a new page version that can be constructed from a WAL record
    1984              :     ///
    1985              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1986              :     /// current end-of-file. It's up to the caller to check that the relation size
    1987              :     /// matches the blocks inserted!
    1988            6 :     pub fn put_rel_wal_record(
    1989            6 :         &mut self,
    1990            6 :         rel: RelTag,
    1991            6 :         blknum: BlockNumber,
    1992            6 :         rec: NeonWalRecord,
    1993            6 :     ) -> Result<(), WalIngestError> {
    1994            6 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    1995            6 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1996            6 :         Ok(())
    1997            6 :     }
    1998              : 
    1999              :     // Same, but for an SLRU.
    2000            4 :     pub fn put_slru_wal_record(
    2001            4 :         &mut self,
    2002            4 :         kind: SlruKind,
    2003            4 :         segno: u32,
    2004            4 :         blknum: BlockNumber,
    2005            4 :         rec: NeonWalRecord,
    2006            4 :     ) -> Result<(), WalIngestError> {
    2007            4 :         if !self.tline.tenant_shard_id.is_shard_zero() {
    2008            0 :             return Ok(());
    2009            4 :         }
    2010              : 
    2011            4 :         self.put(
    2012            4 :             slru_block_to_key(kind, segno, blknum),
    2013            4 :             Value::WalRecord(rec),
    2014              :         );
    2015            4 :         Ok(())
    2016            4 :     }
    2017              : 
    2018              :     /// Like put_wal_record, but with ready-made image of the page.
    2019       138921 :     pub fn put_rel_page_image(
    2020       138921 :         &mut self,
    2021       138921 :         rel: RelTag,
    2022       138921 :         blknum: BlockNumber,
    2023       138921 :         img: Bytes,
    2024       138921 :     ) -> Result<(), WalIngestError> {
    2025       138921 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2026       138921 :         let key = rel_block_to_key(rel, blknum);
    2027       138921 :         if !key.is_valid_key_on_write_path() {
    2028            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    2029       138921 :         }
    2030       138921 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    2031       138921 :         Ok(())
    2032       138921 :     }
    2033              : 
    2034            3 :     pub fn put_slru_page_image(
    2035            3 :         &mut self,
    2036            3 :         kind: SlruKind,
    2037            3 :         segno: u32,
    2038            3 :         blknum: BlockNumber,
    2039            3 :         img: Bytes,
    2040            3 :     ) -> Result<(), WalIngestError> {
    2041            3 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2042              : 
    2043            3 :         let key = slru_block_to_key(kind, segno, blknum);
    2044            3 :         if !key.is_valid_key_on_write_path() {
    2045            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    2046            3 :         }
    2047            3 :         self.put(key, Value::Image(img));
    2048            3 :         Ok(())
    2049            3 :     }
    2050              : 
    2051         1499 :     pub(crate) fn put_rel_page_image_zero(
    2052         1499 :         &mut self,
    2053         1499 :         rel: RelTag,
    2054         1499 :         blknum: BlockNumber,
    2055         1499 :     ) -> Result<(), WalIngestError> {
    2056         1499 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2057         1499 :         let key = rel_block_to_key(rel, blknum);
    2058         1499 :         if !key.is_valid_key_on_write_path() {
    2059            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    2060         1499 :         }
    2061              : 
    2062         1499 :         let batch = self
    2063         1499 :             .pending_data_batch
    2064         1499 :             .get_or_insert_with(SerializedValueBatch::default);
    2065              : 
    2066         1499 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    2067              : 
    2068         1499 :         Ok(())
    2069         1499 :     }
    2070              : 
    2071            0 :     pub(crate) fn put_slru_page_image_zero(
    2072            0 :         &mut self,
    2073            0 :         kind: SlruKind,
    2074            0 :         segno: u32,
    2075            0 :         blknum: BlockNumber,
    2076            0 :     ) -> Result<(), WalIngestError> {
    2077            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2078            0 :         let key = slru_block_to_key(kind, segno, blknum);
    2079            0 :         if !key.is_valid_key_on_write_path() {
    2080            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    2081            0 :         }
    2082              : 
    2083            0 :         let batch = self
    2084            0 :             .pending_data_batch
    2085            0 :             .get_or_insert_with(SerializedValueBatch::default);
    2086              : 
    2087            0 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    2088              : 
    2089            0 :         Ok(())
    2090            0 :     }
    2091              : 
    2092              :     /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
    2093              :     /// we enable it, we also need to persist it in `index_part.json` (initialize is true).
    2094              :     ///
    2095              :     /// As this function is only used on the write path, we do not need to read the migrated_at
    2096              :     /// field.
    2097          973 :     pub(crate) fn maybe_enable_rel_size_v2(
    2098          973 :         &mut self,
    2099          973 :         lsn: Lsn,
    2100          973 :         is_create: bool,
    2101          973 :     ) -> anyhow::Result<RelDirMode> {
    2102              :         // TODO: define the behavior of the tenant-level config flag and use feature flag to enable this feature
    2103          973 :         let expected_status = self.tline.get_rel_size_v2_expected_state();
    2104          973 :         let (persistent_status, migrated_at) = self.tline.get_rel_size_v2_status();
    2105              : 
    2106              :         // migrated_at LSN means the LSN where we "enable_v2" (but not "disable_v1")
    2107          973 :         if let Some(migrated_at) = migrated_at
    2108            0 :             && lsn <= migrated_at
    2109            0 :             && persistent_status == RelSizeMigration::Migrating
    2110              :         {
    2111              :             // Revert the status to legacy if the write head LSN is before the migrating LSN.
    2112            0 :             self.tline
    2113            0 :                 .update_rel_size_v2_status(RelSizeMigration::Legacy, None)?;
    2114          973 :         }
    2115              :         // TODO: what if the migration is at "migrated" state but we need to revert?
    2116              : 
    2117              :         // Only initialize the v2 keyspace on new relation creation. No initialization
    2118              :         // during `timeline_create` (TODO: fix this, we should allow, but currently it
    2119              :         // hits expected consistency issues; need to ignore warnings).
    2120          973 :         let can_update = is_create && !self.is_importing_pgdata;
    2121              : 
    2122          973 :         match (expected_status, persistent_status) {
    2123              :             (RelSizeMigration::Legacy, RelSizeMigration::Legacy) => {
    2124              :                 // tenant config didn't enable it and we didn't write any reldir_v2 key yet
    2125          973 :                 Ok(RelDirMode {
    2126          973 :                     current_status: RelSizeMigration::Legacy,
    2127          973 :                     initialize: false,
    2128          973 :                     disable_v1: false,
    2129          973 :                 })
    2130              :             }
    2131              :             (
    2132              :                 RelSizeMigration::Legacy,
    2133            0 :                 current_status @ RelSizeMigration::Migrating
    2134            0 :                 | current_status @ RelSizeMigration::Migrated,
    2135              :             ) => {
    2136              :                 // already persisted that the timeline has enabled rel_size_v2, cannot rollback
    2137            0 :                 Ok(RelDirMode {
    2138            0 :                     current_status,
    2139            0 :                     initialize: false,
    2140            0 :                     disable_v1: false,
    2141            0 :                 })
    2142              :             }
    2143              :             (
    2144            0 :                 expected_status @ RelSizeMigration::Migrating
    2145            0 :                 | expected_status @ RelSizeMigration::Migrated,
    2146              :                 RelSizeMigration::Legacy,
    2147              :             ) => {
    2148              :                 // The first time we enable it, we need to persist it in `index_part.json`
    2149              :                 // The caller should update the reldir status once the initialization is done.
    2150              : 
    2151              :                 Ok(RelDirMode {
    2152            0 :                     current_status: RelSizeMigration::Legacy,
    2153            0 :                     initialize: can_update,
    2154            0 :                     disable_v1: can_update && expected_status == RelSizeMigration::Migrated,
    2155              :                 })
    2156              :             }
    2157            0 :             (RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrating)
    2158            0 :             | (RelSizeMigration::Migrated, current_status @ RelSizeMigration::Migrated)
    2159            0 :             | (RelSizeMigration::Migrating, current_status @ RelSizeMigration::Migrated) => {
    2160              :                 // Keep the current state
    2161            0 :                 Ok(RelDirMode {
    2162            0 :                     current_status,
    2163            0 :                     initialize: false,
    2164            0 :                     disable_v1: false,
    2165            0 :                 })
    2166              :             }
    2167              :             (RelSizeMigration::Migrated, RelSizeMigration::Migrating) => {
    2168              :                 // Switch to v2-only mode
    2169            0 :                 Ok(RelDirMode {
    2170            0 :                     current_status: RelSizeMigration::Migrating,
    2171            0 :                     initialize: false,
    2172            0 :                     disable_v1: can_update,
    2173            0 :                 })
    2174              :             }
    2175              :         }
    2176          973 :     }
    2177              : 
    2178              :     /// Store a relmapper file (pg_filenode.map) in the repository
    2179            8 :     pub async fn put_relmap_file(
    2180            8 :         &mut self,
    2181            8 :         spcnode: Oid,
    2182            8 :         dbnode: Oid,
    2183            8 :         img: Bytes,
    2184            8 :         ctx: &RequestContext,
    2185            8 :     ) -> Result<(), WalIngestError> {
    2186            8 :         let v2_mode = self
    2187            8 :             .maybe_enable_rel_size_v2(self.lsn, false)
    2188            8 :             .map_err(WalIngestErrorKind::RelSizeV2Error)?;
    2189              : 
    2190              :         // Add it to the directory (if it doesn't exist already)
    2191            8 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    2192            8 :         let mut dbdir = DbDirectory::des(&buf)?;
    2193              : 
    2194            8 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    2195            8 :         if r.is_none() || r == Some(false) {
    2196              :             // The dbdir entry didn't exist, or it contained a
    2197              :             // 'false'. The 'insert' call already updated it with
    2198              :             // 'true', now write the updated 'dbdirs' map back.
    2199            8 :             let buf = DbDirectory::ser(&dbdir)?;
    2200            8 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    2201            0 :         }
    2202            8 :         if r.is_none() {
    2203            4 :             if v2_mode.current_status != RelSizeMigration::Legacy {
    2204            0 :                 self.pending_directory_entries
    2205            0 :                     .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    2206            4 :             }
    2207              : 
    2208              :             // Create RelDirectory in v1 keyspace. TODO: if we have fully migrated to v2, no need to create this directory.
    2209              :             // Some code path relies on this directory to be present. We should remove it once we starts to set tenants to
    2210              :             // `RelSizeMigration::Migrated` state (currently we don't, all tenants will have `RelSizeMigration::Migrating`).
    2211            4 :             let buf = RelDirectory::ser(&RelDirectory {
    2212            4 :                 rels: HashSet::new(),
    2213            4 :             })?;
    2214            4 :             self.pending_directory_entries
    2215            4 :                 .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
    2216            4 :             self.put(
    2217            4 :                 rel_dir_to_key(spcnode, dbnode),
    2218            4 :                 Value::Image(Bytes::from(buf)),
    2219              :             );
    2220            4 :         }
    2221              : 
    2222            8 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    2223            8 :         Ok(())
    2224            8 :     }
    2225              : 
    2226            0 :     pub async fn put_twophase_file(
    2227            0 :         &mut self,
    2228            0 :         xid: u64,
    2229            0 :         img: Bytes,
    2230            0 :         ctx: &RequestContext,
    2231            0 :     ) -> Result<(), WalIngestError> {
    2232              :         // Add it to the directory entry
    2233            0 :         let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    2234            0 :         let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
    2235            0 :             let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
    2236            0 :             if !dir.xids.insert(xid) {
    2237            0 :                 Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
    2238            0 :             }
    2239            0 :             self.pending_directory_entries.push((
    2240            0 :                 DirectoryKind::TwoPhase,
    2241            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2242            0 :             ));
    2243            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    2244              :         } else {
    2245            0 :             let xid = xid as u32;
    2246            0 :             let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
    2247            0 :             if !dir.xids.insert(xid) {
    2248            0 :                 Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
    2249            0 :             }
    2250            0 :             self.pending_directory_entries.push((
    2251            0 :                 DirectoryKind::TwoPhase,
    2252            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2253            0 :             ));
    2254            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    2255              :         };
    2256            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    2257              : 
    2258            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    2259            0 :         Ok(())
    2260            0 :     }
    2261              : 
    2262            1 :     pub async fn set_replorigin(
    2263            1 :         &mut self,
    2264            1 :         origin_id: RepOriginId,
    2265            1 :         origin_lsn: Lsn,
    2266            1 :     ) -> Result<(), WalIngestError> {
    2267            1 :         let key = repl_origin_key(origin_id);
    2268            1 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    2269            1 :         Ok(())
    2270            1 :     }
    2271              : 
    2272            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
    2273            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    2274            0 :     }
    2275              : 
    2276          112 :     pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
    2277          112 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    2278          112 :         Ok(())
    2279          112 :     }
    2280              : 
    2281          119 :     pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
    2282          119 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    2283          119 :         Ok(())
    2284          119 :     }
    2285              : 
    2286            0 :     pub async fn drop_dbdir(
    2287            0 :         &mut self,
    2288            0 :         spcnode: Oid,
    2289            0 :         dbnode: Oid,
    2290            0 :         ctx: &RequestContext,
    2291            0 :     ) -> Result<(), WalIngestError> {
    2292            0 :         let total_blocks = self
    2293            0 :             .tline
    2294            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    2295            0 :             .await?;
    2296              : 
    2297              :         // Remove entry from dbdir
    2298            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    2299            0 :         let mut dir = DbDirectory::des(&buf)?;
    2300            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    2301            0 :             let buf = DbDirectory::ser(&dir)?;
    2302            0 :             self.pending_directory_entries.push((
    2303            0 :                 DirectoryKind::Db,
    2304            0 :                 MetricsUpdate::Set(dir.dbdirs.len() as u64),
    2305            0 :             ));
    2306            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    2307              :         } else {
    2308            0 :             warn!(
    2309            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    2310              :                 spcnode, dbnode
    2311              :             );
    2312              :         }
    2313              : 
    2314              :         // Update logical database size.
    2315            0 :         self.pending_nblocks -= total_blocks as i64;
    2316              : 
    2317              :         // Delete all relations and metadata files for the spcnode/dnode
    2318            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    2319            0 :         Ok(())
    2320            0 :     }
    2321              : 
    2322            0 :     async fn initialize_rel_size_v2_keyspace(
    2323            0 :         &mut self,
    2324            0 :         ctx: &RequestContext,
    2325            0 :         dbdir: &DbDirectory,
    2326            0 :     ) -> Result<(), WalIngestError> {
    2327              :         // Copy everything from relv1 to relv2; TODO: check if there's any key in the v2 keyspace, if so, abort.
    2328            0 :         tracing::info!("initializing rel_size_v2 keyspace");
    2329            0 :         let mut rel_cnt = 0;
    2330              :         // relmap_exists (the value of dbdirs hashmap) does not affect the migration: we need to copy things over anyways
    2331            0 :         for &(spcnode, dbnode) in dbdir.dbdirs.keys() {
    2332            0 :             let rel_dir_key = rel_dir_to_key(spcnode, dbnode);
    2333            0 :             let rel_dir = RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?;
    2334            0 :             for (relnode, forknum) in rel_dir.rels {
    2335            0 :                 let sparse_rel_dir_key = rel_tag_sparse_key(spcnode, dbnode, relnode, forknum);
    2336            0 :                 self.put(
    2337            0 :                     sparse_rel_dir_key,
    2338            0 :                     Value::Image(RelDirExists::Exists.encode()),
    2339            0 :                 );
    2340            0 :                 rel_cnt += 1;
    2341            0 :             }
    2342              :         }
    2343            0 :         tracing::info!(
    2344            0 :             "initialized rel_size_v2 keyspace at lsn {}: migrated {} relations",
    2345              :             self.lsn,
    2346              :             rel_cnt
    2347              :         );
    2348            0 :         Ok::<_, WalIngestError>(())
    2349            0 :     }
    2350              : 
    2351            0 :     async fn put_rel_creation_v1_placeholder(
    2352            0 :         &mut self,
    2353            0 :         rel: RelTag,
    2354            0 :         dbdir_exists: bool,
    2355            0 :         _ctx: &RequestContext,
    2356            0 :     ) -> Result<(), WalIngestError> {
    2357            0 :         if !dbdir_exists {
    2358            0 :             let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    2359            0 :             self.put(
    2360            0 :                 rel_dir_key,
    2361            0 :                 Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
    2362              :             );
    2363            0 :         }
    2364            0 :         Ok(())
    2365            0 :     }
    2366              : 
    2367          960 :     async fn put_rel_creation_v1(
    2368          960 :         &mut self,
    2369          960 :         rel: RelTag,
    2370          960 :         dbdir_exists: bool,
    2371          960 :         ctx: &RequestContext,
    2372          960 :     ) -> Result<(), WalIngestError> {
    2373              :         // Reldir v1 write path
    2374          960 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    2375          960 :         let mut rel_dir = if !dbdir_exists {
    2376              :             // Create the RelDirectory
    2377            4 :             RelDirectory::default()
    2378              :         } else {
    2379              :             // reldir already exists, fetch it
    2380          956 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
    2381              :         };
    2382              : 
    2383              :         // Add the new relation to the rel directory entry, and write it back
    2384          960 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    2385            0 :             Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    2386          960 :         }
    2387          960 :         if !dbdir_exists {
    2388            4 :             self.pending_directory_entries
    2389            4 :                 .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
    2390          956 :         }
    2391          960 :         self.pending_directory_entries
    2392          960 :             .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
    2393          960 :         self.put(
    2394          960 :             rel_dir_key,
    2395          960 :             Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
    2396              :         );
    2397          960 :         Ok(())
    2398          960 :     }
    2399              : 
    2400            0 :     async fn put_rel_creation_v2(
    2401            0 :         &mut self,
    2402            0 :         rel: RelTag,
    2403            0 :         dbdir_exists: bool,
    2404            0 :         ctx: &RequestContext,
    2405            0 :     ) -> Result<(), WalIngestError> {
    2406              :         // Reldir v2 write path
    2407            0 :         let sparse_rel_dir_key =
    2408            0 :             rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
    2409              :         // check if the rel_dir_key exists in v2
    2410            0 :         let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
    2411            0 :         let val = RelDirExists::decode_option(val)
    2412            0 :             .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
    2413            0 :         if val == RelDirExists::Exists {
    2414            0 :             Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    2415            0 :         }
    2416            0 :         self.put(
    2417            0 :             sparse_rel_dir_key,
    2418            0 :             Value::Image(RelDirExists::Exists.encode()),
    2419              :         );
    2420            0 :         if !dbdir_exists {
    2421            0 :             self.pending_directory_entries
    2422            0 :                 .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    2423            0 :         }
    2424            0 :         self.pending_directory_entries
    2425            0 :             .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
    2426            0 :         Ok(())
    2427            0 :     }
    2428              : 
    2429              :     /// Create a relation fork.
    2430              :     ///
    2431              :     /// 'nblocks' is the initial size.
    2432          960 :     pub async fn put_rel_creation(
    2433          960 :         &mut self,
    2434          960 :         rel: RelTag,
    2435          960 :         nblocks: BlockNumber,
    2436          960 :         ctx: &RequestContext,
    2437          960 :     ) -> Result<(), WalIngestError> {
    2438          960 :         if rel.relnode == 0 {
    2439            0 :             Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
    2440            0 :                 "invalid relnode"
    2441            0 :             )))?;
    2442          960 :         }
    2443              :         // It's possible that this is the first rel for this db in this
    2444              :         // tablespace.  Create the reldir entry for it if so.
    2445          960 :         let mut dbdir: DbDirectory = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
    2446          960 :         let mut is_dbdir_dirty = false;
    2447              : 
    2448          960 :         let dbdir_exists =
    2449          960 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    2450              :                 // Didn't exist. Update dbdir
    2451            4 :                 e.insert(false);
    2452            4 :                 self.pending_directory_entries.push((
    2453            4 :                     DirectoryKind::Db,
    2454            4 :                     MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
    2455            4 :                 ));
    2456            4 :                 is_dbdir_dirty = true;
    2457            4 :                 false
    2458              :             } else {
    2459          956 :                 true
    2460              :             };
    2461              : 
    2462          960 :         let mut v2_mode = self
    2463          960 :             .maybe_enable_rel_size_v2(self.lsn, true)
    2464          960 :             .map_err(WalIngestErrorKind::RelSizeV2Error)?;
    2465              : 
    2466          960 :         if v2_mode.initialize {
    2467            0 :             if let Err(e) = self.initialize_rel_size_v2_keyspace(ctx, &dbdir).await {
    2468            0 :                 tracing::warn!("error initializing rel_size_v2 keyspace: {}", e);
    2469              :                 // TODO: circuit breaker so that it won't retry forever
    2470              :             } else {
    2471            0 :                 v2_mode.current_status = RelSizeMigration::Migrating;
    2472            0 :                 self.tline
    2473            0 :                     .update_rel_size_v2_status(RelSizeMigration::Migrating, Some(self.lsn))
    2474            0 :                     .map_err(WalIngestErrorKind::RelSizeV2Error)?;
    2475              :             }
    2476          960 :         }
    2477              : 
    2478          960 :         if v2_mode.disable_v1 {
    2479            0 :             v2_mode.current_status = RelSizeMigration::Migrated;
    2480            0 :             self.tline
    2481            0 :                 .update_rel_size_v2_status(RelSizeMigration::Migrated, Some(self.lsn))
    2482            0 :                 .map_err(WalIngestErrorKind::RelSizeV2Error)?;
    2483          960 :         }
    2484              : 
    2485          960 :         if is_dbdir_dirty {
    2486            4 :             let buf = DbDirectory::ser(&dbdir)?;
    2487            4 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    2488          956 :         }
    2489              : 
    2490          960 :         if v2_mode.current_status != RelSizeMigration::Migrated {
    2491          960 :             self.put_rel_creation_v1(rel, dbdir_exists, ctx).await?;
    2492              :         } else {
    2493              :             // collect_keyspace depends on the rel_dir_to_key to be present, so we need to create a placeholder
    2494            0 :             self.put_rel_creation_v1_placeholder(rel, dbdir_exists, ctx)
    2495            0 :                 .await?;
    2496              :         }
    2497              : 
    2498          960 :         if v2_mode.current_status != RelSizeMigration::Legacy {
    2499            0 :             let write_v2_res = self.put_rel_creation_v2(rel, dbdir_exists, ctx).await;
    2500            0 :             if let Err(e) = write_v2_res {
    2501            0 :                 if v2_mode.current_status == RelSizeMigration::Migrated {
    2502            0 :                     return Err(e);
    2503            0 :                 }
    2504            0 :                 tracing::warn!("error writing rel_size_v2 keyspace: {}", e);
    2505            0 :             }
    2506          960 :         }
    2507              : 
    2508              :         // Put size
    2509          960 :         let size_key = rel_size_to_key(rel);
    2510          960 :         let buf = nblocks.to_le_bytes();
    2511          960 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2512              : 
    2513          960 :         self.pending_nblocks += nblocks as i64;
    2514              : 
    2515              :         // Update relation size cache
    2516          960 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2517              : 
    2518              :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    2519              :         // caller.
    2520          960 :         Ok(())
    2521          960 :     }
    2522              : 
    2523              :     /// Truncate relation
    2524         3006 :     pub async fn put_rel_truncation(
    2525         3006 :         &mut self,
    2526         3006 :         rel: RelTag,
    2527         3006 :         nblocks: BlockNumber,
    2528         3006 :         ctx: &RequestContext,
    2529         3006 :     ) -> Result<(), WalIngestError> {
    2530         3006 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2531         3006 :         if self
    2532         3006 :             .tline
    2533         3006 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    2534         3006 :             .await?
    2535              :         {
    2536         3006 :             let size_key = rel_size_to_key(rel);
    2537              :             // Fetch the old size first
    2538         3006 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2539              : 
    2540              :             // Update the entry with the new size.
    2541         3006 :             let buf = nblocks.to_le_bytes();
    2542         3006 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2543              : 
    2544              :             // Update relation size cache
    2545         3006 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2546              : 
    2547              :             // Update logical database size.
    2548         3006 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    2549            0 :         }
    2550         3006 :         Ok(())
    2551         3006 :     }
    2552              : 
    2553              :     /// Extend relation
    2554              :     /// If new size is smaller, do nothing.
    2555       138340 :     pub async fn put_rel_extend(
    2556       138340 :         &mut self,
    2557       138340 :         rel: RelTag,
    2558       138340 :         nblocks: BlockNumber,
    2559       138340 :         ctx: &RequestContext,
    2560       138340 :     ) -> Result<(), WalIngestError> {
    2561       138340 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2562              : 
    2563              :         // Put size
    2564       138340 :         let size_key = rel_size_to_key(rel);
    2565       138340 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2566              : 
    2567              :         // only extend relation here. never decrease the size
    2568       138340 :         if nblocks > old_size {
    2569       137394 :             let buf = nblocks.to_le_bytes();
    2570       137394 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2571       137394 : 
    2572       137394 :             // Update relation size cache
    2573       137394 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2574       137394 : 
    2575       137394 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    2576       137394 :         }
    2577       138340 :         Ok(())
    2578       138340 :     }
    2579              : 
    2580            5 :     async fn put_rel_drop_v1(
    2581            5 :         &mut self,
    2582            5 :         drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
    2583            5 :         ctx: &RequestContext,
    2584            5 :     ) -> Result<BTreeSet<RelTag>, WalIngestError> {
    2585            5 :         let mut dropped_rels = BTreeSet::new();
    2586            6 :         for ((spc_node, db_node), rel_tags) in drop_relations {
    2587            1 :             let dir_key = rel_dir_to_key(spc_node, db_node);
    2588            1 :             let buf = self.get(dir_key, ctx).await?;
    2589            1 :             let mut dir = RelDirectory::des(&buf)?;
    2590              : 
    2591            1 :             let mut dirty = false;
    2592            2 :             for rel_tag in rel_tags {
    2593            1 :                 let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
    2594            1 :                     self.pending_directory_entries
    2595            1 :                         .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
    2596            1 :                     dirty = true;
    2597            1 :                     dropped_rels.insert(rel_tag);
    2598            1 :                     true
    2599              :                 } else {
    2600            0 :                     false
    2601              :                 };
    2602              : 
    2603            1 :                 if found {
    2604              :                     // update logical size
    2605            1 :                     let size_key = rel_size_to_key(rel_tag);
    2606            1 :                     let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2607            1 :                     self.pending_nblocks -= old_size as i64;
    2608              : 
    2609              :                     // Remove entry from relation size cache
    2610            1 :                     self.tline.remove_cached_rel_size(&rel_tag);
    2611              : 
    2612              :                     // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
    2613            1 :                     self.delete(rel_key_range(rel_tag));
    2614            0 :                 }
    2615              :             }
    2616              : 
    2617            1 :             if dirty {
    2618            1 :                 self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    2619            0 :             }
    2620              :         }
    2621            5 :         Ok(dropped_rels)
    2622            5 :     }
    2623              : 
    2624            0 :     async fn put_rel_drop_v2(
    2625            0 :         &mut self,
    2626            0 :         drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
    2627            0 :         ctx: &RequestContext,
    2628            0 :     ) -> Result<BTreeSet<RelTag>, WalIngestError> {
    2629            0 :         let mut dropped_rels = BTreeSet::new();
    2630            0 :         for ((spc_node, db_node), rel_tags) in drop_relations {
    2631            0 :             for rel_tag in rel_tags {
    2632            0 :                 let key = rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
    2633            0 :                 let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
    2634            0 :                     .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    2635            0 :                 if val == RelDirExists::Exists {
    2636            0 :                     dropped_rels.insert(rel_tag);
    2637            0 :                     self.pending_directory_entries
    2638            0 :                         .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
    2639            0 :                     // put tombstone
    2640            0 :                     self.put(key, Value::Image(RelDirExists::Removed.encode()));
    2641            0 :                 }
    2642              :             }
    2643              :         }
    2644            0 :         Ok(dropped_rels)
    2645            0 :     }
    2646              : 
    2647              :     /// Drop some relations
    2648            5 :     pub(crate) async fn put_rel_drops(
    2649            5 :         &mut self,
    2650            5 :         drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
    2651            5 :         ctx: &RequestContext,
    2652            5 :     ) -> Result<(), WalIngestError> {
    2653            5 :         let v2_mode = self
    2654            5 :             .maybe_enable_rel_size_v2(self.lsn, false)
    2655            5 :             .map_err(WalIngestErrorKind::RelSizeV2Error)?;
    2656            5 :         match v2_mode.current_status {
    2657              :             RelSizeMigration::Legacy => {
    2658            5 :                 self.put_rel_drop_v1(drop_relations, ctx).await?;
    2659              :             }
    2660              :             RelSizeMigration::Migrating => {
    2661            0 :                 let dropped_rels_v1 = self.put_rel_drop_v1(drop_relations.clone(), ctx).await?;
    2662            0 :                 let dropped_rels_v2_res = self.put_rel_drop_v2(drop_relations, ctx).await;
    2663            0 :                 match dropped_rels_v2_res {
    2664            0 :                     Ok(dropped_rels_v2) => {
    2665            0 :                         if dropped_rels_v1 != dropped_rels_v2 {
    2666            0 :                             tracing::warn!(
    2667            0 :                                 "inconsistent v1/v2 rel drop: dropped_rels_v1.len()={}, dropped_rels_v2.len()={}",
    2668            0 :                                 dropped_rels_v1.len(),
    2669            0 :                                 dropped_rels_v2.len()
    2670              :                             );
    2671            0 :                         }
    2672              :                     }
    2673              :                     Err(WalIngestError {
    2674              :                         kind: WalIngestErrorKind::Cancelled,
    2675              :                         ..
    2676            0 :                     }) => {
    2677            0 :                         // Cancellation errors are fine to ignore, do not log.
    2678            0 :                     }
    2679            0 :                     Err(e) => {
    2680            0 :                         tracing::warn!("error dropping rels: {}", e);
    2681              :                     }
    2682              :                 }
    2683              :             }
    2684              :             RelSizeMigration::Migrated => {
    2685            0 :                 self.put_rel_drop_v2(drop_relations, ctx).await?;
    2686              :             }
    2687              :         }
    2688            5 :         Ok(())
    2689            5 :     }
    2690              : 
    2691            3 :     pub async fn put_slru_segment_creation(
    2692            3 :         &mut self,
    2693            3 :         kind: SlruKind,
    2694            3 :         segno: u32,
    2695            3 :         nblocks: BlockNumber,
    2696            3 :         ctx: &RequestContext,
    2697            3 :     ) -> Result<(), WalIngestError> {
    2698            3 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2699              : 
    2700              :         // Add it to the directory entry
    2701            3 :         let dir_key = slru_dir_to_key(kind);
    2702            3 :         let buf = self.get(dir_key, ctx).await?;
    2703            3 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2704              : 
    2705            3 :         if !dir.segments.insert(segno) {
    2706            0 :             Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
    2707            3 :         }
    2708            3 :         self.pending_directory_entries.push((
    2709            3 :             DirectoryKind::SlruSegment(kind),
    2710            3 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2711            3 :         ));
    2712            3 :         self.put(
    2713            3 :             dir_key,
    2714            3 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2715              :         );
    2716              : 
    2717              :         // Put size
    2718            3 :         let size_key = slru_segment_size_to_key(kind, segno);
    2719            3 :         let buf = nblocks.to_le_bytes();
    2720            3 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2721              : 
    2722              :         // even if nblocks > 0, we don't insert any actual blocks here
    2723              : 
    2724            3 :         Ok(())
    2725            3 :     }
    2726              : 
    2727              :     /// Extend SLRU segment
    2728            0 :     pub fn put_slru_extend(
    2729            0 :         &mut self,
    2730            0 :         kind: SlruKind,
    2731            0 :         segno: u32,
    2732            0 :         nblocks: BlockNumber,
    2733            0 :     ) -> Result<(), WalIngestError> {
    2734            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2735              : 
    2736              :         // Put size
    2737            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    2738            0 :         let buf = nblocks.to_le_bytes();
    2739            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2740            0 :         Ok(())
    2741            0 :     }
    2742              : 
    2743              :     /// This method is used for marking truncated SLRU files
    2744            0 :     pub async fn drop_slru_segment(
    2745            0 :         &mut self,
    2746            0 :         kind: SlruKind,
    2747            0 :         segno: u32,
    2748            0 :         ctx: &RequestContext,
    2749            0 :     ) -> Result<(), WalIngestError> {
    2750              :         // Remove it from the directory entry
    2751            0 :         let dir_key = slru_dir_to_key(kind);
    2752            0 :         let buf = self.get(dir_key, ctx).await?;
    2753            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2754              : 
    2755            0 :         if !dir.segments.remove(&segno) {
    2756            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    2757            0 :         }
    2758            0 :         self.pending_directory_entries.push((
    2759            0 :             DirectoryKind::SlruSegment(kind),
    2760            0 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2761            0 :         ));
    2762            0 :         self.put(
    2763            0 :             dir_key,
    2764            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2765              :         );
    2766              : 
    2767              :         // Delete size entry, as well as all blocks
    2768            0 :         self.delete(slru_segment_key_range(kind, segno));
    2769              : 
    2770            0 :         Ok(())
    2771            0 :     }
    2772              : 
    2773              :     /// Drop a relmapper file (pg_filenode.map)
    2774            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
    2775              :         // TODO
    2776            0 :         Ok(())
    2777            0 :     }
    2778              : 
    2779              :     /// This method is used for marking truncated SLRU files
    2780            0 :     pub async fn drop_twophase_file(
    2781            0 :         &mut self,
    2782            0 :         xid: u64,
    2783            0 :         ctx: &RequestContext,
    2784            0 :     ) -> Result<(), WalIngestError> {
    2785              :         // Remove it from the directory entry
    2786            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    2787            0 :         let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
    2788            0 :             let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
    2789              : 
    2790            0 :             if !dir.xids.remove(&xid) {
    2791            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2792            0 :             }
    2793            0 :             self.pending_directory_entries.push((
    2794            0 :                 DirectoryKind::TwoPhase,
    2795            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2796            0 :             ));
    2797            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    2798              :         } else {
    2799            0 :             let xid: u32 = u32::try_from(xid)
    2800            0 :                 .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
    2801            0 :             let mut dir = TwoPhaseDirectory::des(&buf)?;
    2802              : 
    2803            0 :             if !dir.xids.remove(&xid) {
    2804            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2805            0 :             }
    2806            0 :             self.pending_directory_entries.push((
    2807            0 :                 DirectoryKind::TwoPhase,
    2808            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2809            0 :             ));
    2810            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    2811              :         };
    2812            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    2813              : 
    2814              :         // Delete it
    2815            0 :         self.delete(twophase_key_range(xid));
    2816              : 
    2817            0 :         Ok(())
    2818            0 :     }
    2819              : 
    2820            8 :     pub async fn put_file(
    2821            8 :         &mut self,
    2822            8 :         path: &str,
    2823            8 :         content: &[u8],
    2824            8 :         ctx: &RequestContext,
    2825            8 :     ) -> Result<(), WalIngestError> {
    2826            8 :         let key = aux_file::encode_aux_file_key(path);
    2827              :         // retrieve the key from the engine
    2828            8 :         let old_val = match self.get(key, ctx).await {
    2829            2 :             Ok(val) => Some(val),
    2830            6 :             Err(PageReconstructError::MissingKey(_)) => None,
    2831            0 :             Err(e) => return Err(e.into()),
    2832              :         };
    2833            8 :         let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    2834            2 :             aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
    2835              :         } else {
    2836            6 :             Vec::new()
    2837              :         };
    2838            8 :         let mut other_files = Vec::with_capacity(files.len());
    2839            8 :         let mut modifying_file = None;
    2840           10 :         for file @ (p, content) in files {
    2841            2 :             if path == p {
    2842            2 :                 assert!(
    2843            2 :                     modifying_file.is_none(),
    2844            0 :                     "duplicated entries found for {path}"
    2845              :                 );
    2846            2 :                 modifying_file = Some(content);
    2847            0 :             } else {
    2848            0 :                 other_files.push(file);
    2849            0 :             }
    2850              :         }
    2851            8 :         let mut new_files = other_files;
    2852            8 :         match (modifying_file, content.is_empty()) {
    2853            1 :             (Some(old_content), false) => {
    2854            1 :                 self.tline
    2855            1 :                     .aux_file_size_estimator
    2856            1 :                     .on_update(old_content.len(), content.len());
    2857            1 :                 new_files.push((path, content));
    2858            1 :             }
    2859            1 :             (Some(old_content), true) => {
    2860            1 :                 self.tline
    2861            1 :                     .aux_file_size_estimator
    2862            1 :                     .on_remove(old_content.len());
    2863            1 :                 // not adding the file key to the final `new_files` vec.
    2864            1 :             }
    2865            6 :             (None, false) => {
    2866            6 :                 self.tline.aux_file_size_estimator.on_add(content.len());
    2867            6 :                 new_files.push((path, content));
    2868            6 :             }
    2869              :             // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
    2870              :             // Compute doesn't know if previous version of this file exists or not, so
    2871              :             // attempt to delete non-existing file can cause this message.
    2872              :             // To avoid false alarms, log it as info rather than warning.
    2873            0 :             (None, true) if path.starts_with("pg_stat/") => {
    2874            0 :                 info!("removing non-existing pg_stat file: {}", path)
    2875              :             }
    2876            0 :             (None, true) => warn!("removing non-existing aux file: {}", path),
    2877              :         }
    2878            8 :         let new_val = aux_file::encode_file_value(&new_files)
    2879            8 :             .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
    2880            8 :         self.put(key, Value::Image(new_val.into()));
    2881              : 
    2882            8 :         Ok(())
    2883            8 :     }
    2884              : 
    2885              :     ///
    2886              :     /// Flush changes accumulated so far to the underlying repository.
    2887              :     ///
    2888              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    2889              :     /// you to flush them to the underlying repository before the final `commit`.
    2890              :     /// That allows to free up the memory used to hold the pending changes.
    2891              :     ///
    2892              :     /// Currently only used during bulk import of a data directory. In that
    2893              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    2894              :     /// whole import fails and the timeline will be deleted anyway.
    2895              :     /// (Or to be precise, it will be left behind for debugging purposes and
    2896              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    2897              :     ///
    2898              :     /// Note: A consequence of flushing the pending operations is that they
    2899              :     /// won't be visible to subsequent operations until `commit`. The function
    2900              :     /// retains all the metadata, but data pages are flushed. That's again OK
    2901              :     /// for bulk import, where you are just loading data pages and won't try to
    2902              :     /// modify the same pages twice.
    2903          965 :     pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2904              :         // Unless we have accumulated a decent amount of changes, it's not worth it
    2905              :         // to scan through the pending_updates list.
    2906          965 :         let pending_nblocks = self.pending_nblocks;
    2907          965 :         if pending_nblocks < 10000 {
    2908          965 :             return Ok(());
    2909            0 :         }
    2910              : 
    2911            0 :         let mut writer = self.tline.writer().await;
    2912              : 
    2913              :         // Flush relation and  SLRU data blocks, keep metadata.
    2914            0 :         if let Some(batch) = self.pending_data_batch.take() {
    2915            0 :             tracing::debug!(
    2916            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2917              :                 batch.max_lsn,
    2918            0 :                 self.tline.get_last_record_lsn()
    2919              :             );
    2920              : 
    2921              :             // This bails out on first error without modifying pending_updates.
    2922              :             // That's Ok, cf this function's doc comment.
    2923            0 :             writer.put_batch(batch, ctx).await?;
    2924            0 :         }
    2925              : 
    2926            0 :         if pending_nblocks != 0 {
    2927            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2928            0 :             self.pending_nblocks = 0;
    2929            0 :         }
    2930              : 
    2931            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2932            0 :             writer.update_directory_entries_count(kind, count);
    2933            0 :         }
    2934              : 
    2935            0 :         Ok(())
    2936          965 :     }
    2937              : 
    2938              :     ///
    2939              :     /// Finish this atomic update, writing all the updated keys to the
    2940              :     /// underlying timeline.
    2941              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    2942              :     ///
    2943       371557 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2944       371557 :         let mut writer = self.tline.writer().await;
    2945              : 
    2946       371557 :         let pending_nblocks = self.pending_nblocks;
    2947       371557 :         self.pending_nblocks = 0;
    2948              : 
    2949              :         // Ordering: the items in this batch do not need to be in any global order, but values for
    2950              :         // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    2951              :         // this to do efficient updates to its index.  See [`wal_decoder::serialized_batch`] for
    2952              :         // more details.
    2953              : 
    2954       371557 :         let metadata_batch = {
    2955       371557 :             let pending_meta = self
    2956       371557 :                 .pending_metadata_pages
    2957       371557 :                 .drain()
    2958       371557 :                 .flat_map(|(key, values)| {
    2959       137068 :                     values
    2960       137068 :                         .into_iter()
    2961       137068 :                         .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
    2962       137068 :                 })
    2963       371557 :                 .collect::<Vec<_>>();
    2964              : 
    2965       371557 :             if pending_meta.is_empty() {
    2966       236139 :                 None
    2967              :             } else {
    2968       135418 :                 Some(SerializedValueBatch::from_values(pending_meta))
    2969              :             }
    2970              :         };
    2971              : 
    2972       371557 :         let data_batch = self.pending_data_batch.take();
    2973              : 
    2974       371557 :         let maybe_batch = match (data_batch, metadata_batch) {
    2975       132278 :             (Some(mut data), Some(metadata)) => {
    2976       132278 :                 data.extend(metadata);
    2977       132278 :                 Some(data)
    2978              :             }
    2979        71631 :             (Some(data), None) => Some(data),
    2980         3140 :             (None, Some(metadata)) => Some(metadata),
    2981       164508 :             (None, None) => None,
    2982              :         };
    2983              : 
    2984       371557 :         if let Some(batch) = maybe_batch {
    2985       207049 :             tracing::debug!(
    2986            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2987              :                 batch.max_lsn,
    2988            0 :                 self.tline.get_last_record_lsn()
    2989              :             );
    2990              : 
    2991              :             // This bails out on first error without modifying pending_updates.
    2992              :             // That's Ok, cf this function's doc comment.
    2993       207049 :             writer.put_batch(batch, ctx).await?;
    2994       164508 :         }
    2995              : 
    2996       371557 :         if !self.pending_deletions.is_empty() {
    2997            1 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    2998            1 :             self.pending_deletions.clear();
    2999       371556 :         }
    3000              : 
    3001       371557 :         self.pending_lsns.push(self.lsn);
    3002       444486 :         for pending_lsn in self.pending_lsns.drain(..) {
    3003       444486 :             // TODO(vlad): pretty sure the comment below is not valid anymore
    3004       444486 :             // and we can call finish write with the latest LSN
    3005       444486 :             //
    3006       444486 :             // Ideally, we should be able to call writer.finish_write() only once
    3007       444486 :             // with the highest LSN. However, the last_record_lsn variable in the
    3008       444486 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    3009       444486 :             // so we need to record every LSN to not leave a gap between them.
    3010       444486 :             writer.finish_write(pending_lsn);
    3011       444486 :         }
    3012              : 
    3013       371557 :         if pending_nblocks != 0 {
    3014       135285 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    3015       236272 :         }
    3016              : 
    3017       371557 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    3018         1527 :             writer.update_directory_entries_count(kind, count);
    3019         1527 :         }
    3020              : 
    3021       371557 :         self.pending_metadata_bytes = 0;
    3022              : 
    3023       371557 :         Ok(())
    3024       371557 :     }
    3025              : 
    3026       145852 :     pub(crate) fn len(&self) -> usize {
    3027       145852 :         self.pending_metadata_pages.len()
    3028       145852 :             + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
    3029       145852 :             + self.pending_deletions.len()
    3030       145852 :     }
    3031              : 
    3032              :     /// Read a page from the Timeline we are writing to.  For metadata pages, this passes through
    3033              :     /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
    3034              :     /// in the modification.
    3035              :     ///
    3036              :     /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
    3037              :     /// page must ensure that the pages they read are already committed in Timeline, for example
    3038              :     /// DB create operations are always preceded by a call to commit().  This is special cased because
    3039              :     /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
    3040              :     /// and not data pages.
    3041       143293 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    3042       143293 :         if !Self::is_data_key(&key) {
    3043              :             // Have we already updated the same key? Read the latest pending updated
    3044              :             // version in that case.
    3045              :             //
    3046              :             // Note: we don't check pending_deletions. It is an error to request a
    3047              :             // value that has been removed, deletion only avoids leaking storage.
    3048       143293 :             if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
    3049         7964 :                 if let Some((_, _, value)) = values.last() {
    3050         7964 :                     return if let Value::Image(img) = value {
    3051         7964 :                         Ok(img.clone())
    3052              :                     } else {
    3053              :                         // Currently, we never need to read back a WAL record that we
    3054              :                         // inserted in the same "transaction". All the metadata updates
    3055              :                         // work directly with Images, and we never need to read actual
    3056              :                         // data pages. We could handle this if we had to, by calling
    3057              :                         // the walredo manager, but let's keep it simple for now.
    3058            0 :                         Err(PageReconstructError::Other(anyhow::anyhow!(
    3059            0 :                             "unexpected pending WAL record"
    3060            0 :                         )))
    3061              :                     };
    3062            0 :                 }
    3063       135329 :             }
    3064              :         } else {
    3065              :             // This is an expensive check, so we only do it in debug mode. If reading a data key,
    3066              :             // this key should never be present in pending_data_pages. We ensure this by committing
    3067              :             // modifications before ingesting DB create operations, which are the only kind that reads
    3068              :             // data pages during ingest.
    3069            0 :             if cfg!(debug_assertions) {
    3070            0 :                 assert!(
    3071            0 :                     !self
    3072            0 :                         .pending_data_batch
    3073            0 :                         .as_ref()
    3074            0 :                         .is_some_and(|b| b.updates_key(&key))
    3075              :                 );
    3076            0 :             }
    3077              :         }
    3078              : 
    3079              :         // Metadata page cache miss, or we're reading a data page.
    3080       135329 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    3081       135329 :         self.tline.get(key, lsn, ctx).await
    3082       143293 :     }
    3083              : 
    3084              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    3085              :     /// and the empty value into None.
    3086            0 :     async fn sparse_get(
    3087            0 :         &self,
    3088            0 :         key: Key,
    3089            0 :         ctx: &RequestContext,
    3090            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    3091            0 :         let val = self.get(key, ctx).await;
    3092            0 :         match val {
    3093            0 :             Ok(val) if val.is_empty() => Ok(None),
    3094            0 :             Ok(val) => Ok(Some(val)),
    3095            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    3096            0 :             Err(e) => Err(e),
    3097              :         }
    3098            0 :     }
    3099              : 
    3100              :     #[cfg(test)]
    3101            2 :     pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
    3102            2 :         self.put(key, val);
    3103            2 :     }
    3104              : 
    3105       282078 :     fn put(&mut self, key: Key, val: Value) {
    3106       282078 :         if Self::is_data_key(&key) {
    3107       138934 :             self.put_data(key.to_compact(), val)
    3108              :         } else {
    3109       143144 :             self.put_metadata(key.to_compact(), val)
    3110              :         }
    3111       282078 :     }
    3112              : 
    3113       138934 :     fn put_data(&mut self, key: CompactKey, val: Value) {
    3114       138934 :         let batch = self
    3115       138934 :             .pending_data_batch
    3116       138934 :             .get_or_insert_with(SerializedValueBatch::default);
    3117       138934 :         batch.put(key, val, self.lsn);
    3118       138934 :     }
    3119              : 
    3120       143144 :     fn put_metadata(&mut self, key: CompactKey, val: Value) {
    3121       143144 :         let values = self.pending_metadata_pages.entry(key).or_default();
    3122              :         // Replace the previous value if it exists at the same lsn
    3123       143144 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    3124         6076 :             if *last_lsn == self.lsn {
    3125              :                 // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
    3126         6076 :                 self.pending_metadata_bytes -= *last_value_ser_size;
    3127         6076 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    3128         6076 :                 self.pending_metadata_bytes += *last_value_ser_size;
    3129              : 
    3130              :                 // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
    3131              :                 // have been generated by synthesized zero page writes prior to the first real write to a page.
    3132         6076 :                 *last_value = val;
    3133         6076 :                 return;
    3134            0 :             }
    3135       137068 :         }
    3136              : 
    3137       137068 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    3138       137068 :         self.pending_metadata_bytes += val_serialized_size;
    3139       137068 :         values.push((self.lsn, val_serialized_size, val));
    3140              : 
    3141       137068 :         if key == CHECKPOINT_KEY.to_compact() {
    3142          119 :             tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
    3143       136949 :         }
    3144       143144 :     }
    3145              : 
    3146            1 :     fn delete(&mut self, key_range: Range<Key>) {
    3147            1 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    3148            1 :         self.pending_deletions.push((key_range, self.lsn));
    3149            1 :     }
    3150              : }
    3151              : 
    3152              : /// Statistics for a DatadirModification.
    3153              : #[derive(Default)]
    3154              : pub struct DatadirModificationStats {
    3155              :     pub metadata_images: u64,
    3156              :     pub metadata_deltas: u64,
    3157              :     pub data_images: u64,
    3158              :     pub data_deltas: u64,
    3159              : }
    3160              : 
    3161              : /// This struct facilitates accessing either a committed key from the timeline at a
    3162              : /// specific LSN, or the latest uncommitted key from a pending modification.
    3163              : ///
    3164              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    3165              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    3166              : /// need to look up the keys in the modification first before looking them up in the
    3167              : /// timeline to not miss the latest updates.
    3168              : #[derive(Clone, Copy)]
    3169              : pub enum Version<'a> {
    3170              :     LsnRange(LsnRange),
    3171              :     Modified(&'a DatadirModification<'a>),
    3172              : }
    3173              : 
    3174              : impl Version<'_> {
    3175           25 :     async fn get(
    3176           25 :         &self,
    3177           25 :         timeline: &Timeline,
    3178           25 :         key: Key,
    3179           25 :         ctx: &RequestContext,
    3180           25 :     ) -> Result<Bytes, PageReconstructError> {
    3181           25 :         match self {
    3182           15 :             Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
    3183           10 :             Version::Modified(modification) => modification.get(key, ctx).await,
    3184              :         }
    3185           25 :     }
    3186              : 
    3187              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    3188              :     /// and the empty value into None.
    3189            0 :     async fn sparse_get(
    3190            0 :         &self,
    3191            0 :         timeline: &Timeline,
    3192            0 :         key: Key,
    3193            0 :         ctx: &RequestContext,
    3194            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    3195            0 :         let val = self.get(timeline, key, ctx).await;
    3196            0 :         match val {
    3197            0 :             Ok(val) if val.is_empty() => Ok(None),
    3198            0 :             Ok(val) => Ok(Some(val)),
    3199            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    3200            0 :             Err(e) => Err(e),
    3201              :         }
    3202            0 :     }
    3203              : 
    3204           26 :     pub fn is_latest(&self) -> bool {
    3205           26 :         match self {
    3206           16 :             Version::LsnRange(lsns) => lsns.is_latest(),
    3207           10 :             Version::Modified(_) => true,
    3208              :         }
    3209           26 :     }
    3210              : 
    3211       224275 :     pub fn get_lsn(&self) -> Lsn {
    3212       224275 :         match self {
    3213        12224 :             Version::LsnRange(lsns) => lsns.effective_lsn,
    3214       212051 :             Version::Modified(modification) => modification.lsn,
    3215              :         }
    3216       224275 :     }
    3217              : 
    3218        12219 :     pub fn at(lsn: Lsn) -> Self {
    3219        12219 :         Version::LsnRange(LsnRange {
    3220        12219 :             effective_lsn: lsn,
    3221        12219 :             request_lsn: lsn,
    3222        12219 :         })
    3223        12219 :     }
    3224              : }
    3225              : 
    3226              : //--- Metadata structs stored in key-value pairs in the repository.
    3227              : 
    3228            0 : #[derive(Debug, Serialize, Deserialize)]
    3229              : pub(crate) struct DbDirectory {
    3230              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    3231              :     pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
    3232              : }
    3233              : 
    3234              : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
    3235              : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs.  Previously, the files
    3236              : // were named like "pg_twophase/000002E5", now they're like
    3237              : // "pg_twophsae/0000000A000002E4".
    3238              : 
    3239            0 : #[derive(Debug, Serialize, Deserialize)]
    3240              : pub(crate) struct TwoPhaseDirectory {
    3241              :     pub(crate) xids: HashSet<TransactionId>,
    3242              : }
    3243              : 
    3244            0 : #[derive(Debug, Serialize, Deserialize)]
    3245              : struct TwoPhaseDirectoryV17 {
    3246              :     xids: HashSet<u64>,
    3247              : }
    3248              : 
    3249            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    3250              : pub(crate) struct RelDirectory {
    3251              :     // Set of relations that exist. (relfilenode, forknum)
    3252              :     //
    3253              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    3254              :     // key-value pairs, if you have a lot of relations
    3255              :     pub(crate) rels: HashSet<(Oid, u8)>,
    3256              : }
    3257              : 
    3258            0 : #[derive(Debug, Serialize, Deserialize)]
    3259              : struct RelSizeEntry {
    3260              :     nblocks: u32,
    3261              : }
    3262              : 
    3263            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    3264              : pub(crate) struct SlruSegmentDirectory {
    3265              :     // Set of SLRU segments that exist.
    3266              :     pub(crate) segments: HashSet<u32>,
    3267              : }
    3268              : 
    3269              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    3270              : #[repr(u8)]
    3271              : pub(crate) enum DirectoryKind {
    3272              :     Db,
    3273              :     TwoPhase,
    3274              :     Rel,
    3275              :     AuxFiles,
    3276              :     SlruSegment(SlruKind),
    3277              :     RelV2,
    3278              : }
    3279              : 
    3280              : impl DirectoryKind {
    3281              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    3282         4582 :     pub(crate) fn offset(&self) -> usize {
    3283         4582 :         self.into_usize()
    3284         4582 :     }
    3285              : }
    3286              : 
    3287              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    3288              : 
    3289              : #[allow(clippy::bool_assert_comparison)]
    3290              : #[cfg(test)]
    3291              : mod tests {
    3292              :     use hex_literal::hex;
    3293              :     use pageserver_api::models::ShardParameters;
    3294              :     use utils::id::TimelineId;
    3295              :     use utils::shard::{ShardCount, ShardNumber, ShardStripeSize};
    3296              : 
    3297              :     use super::*;
    3298              :     use crate::DEFAULT_PG_VERSION;
    3299              :     use crate::tenant::harness::TenantHarness;
    3300              : 
    3301              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    3302              :     #[tokio::test]
    3303            1 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    3304            1 :         let name = "aux_files_round_trip";
    3305            1 :         let harness = TenantHarness::create(name).await?;
    3306              : 
    3307              :         pub const TIMELINE_ID: TimelineId =
    3308              :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3309              : 
    3310            1 :         let (tenant, ctx) = harness.load().await;
    3311            1 :         let (tline, ctx) = tenant
    3312            1 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3313            1 :             .await?;
    3314            1 :         let tline = tline.raw_timeline().unwrap();
    3315              : 
    3316              :         // First modification: insert two keys
    3317            1 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    3318            1 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    3319            1 :         modification.set_lsn(Lsn(0x1008))?;
    3320            1 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    3321            1 :         modification.commit(&ctx).await?;
    3322            1 :         let expect_1008 = HashMap::from([
    3323            1 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    3324            1 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    3325            1 :         ]);
    3326              : 
    3327            1 :         let io_concurrency = IoConcurrency::spawn_for_test();
    3328              : 
    3329            1 :         let readback = tline
    3330            1 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    3331            1 :             .await?;
    3332            1 :         assert_eq!(readback, expect_1008);
    3333              : 
    3334              :         // Second modification: update one key, remove the other
    3335            1 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    3336            1 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    3337            1 :         modification.set_lsn(Lsn(0x2008))?;
    3338            1 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    3339            1 :         modification.commit(&ctx).await?;
    3340            1 :         let expect_2008 =
    3341            1 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    3342              : 
    3343            1 :         let readback = tline
    3344            1 :             .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
    3345            1 :             .await?;
    3346            1 :         assert_eq!(readback, expect_2008);
    3347              : 
    3348              :         // Reading back in time works
    3349            1 :         let readback = tline
    3350            1 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    3351            1 :             .await?;
    3352            1 :         assert_eq!(readback, expect_1008);
    3353              : 
    3354            2 :         Ok(())
    3355            1 :     }
    3356              : 
    3357              :     #[test]
    3358            1 :     fn gap_finding() {
    3359            1 :         let rel = RelTag {
    3360            1 :             spcnode: 1663,
    3361            1 :             dbnode: 208101,
    3362            1 :             relnode: 2620,
    3363            1 :             forknum: 0,
    3364            1 :         };
    3365            1 :         let base_blkno = 1;
    3366              : 
    3367            1 :         let base_key = rel_block_to_key(rel, base_blkno);
    3368            1 :         let before_base_key = rel_block_to_key(rel, base_blkno - 1);
    3369              : 
    3370            1 :         let shard = ShardIdentity::unsharded();
    3371              : 
    3372            1 :         let mut previous_nblocks = 0;
    3373           11 :         for i in 0..10 {
    3374           10 :             let crnt_blkno = base_blkno + i;
    3375           10 :             let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
    3376              : 
    3377           10 :             previous_nblocks = crnt_blkno + 1;
    3378              : 
    3379           10 :             if i == 0 {
    3380              :                 // The first block we write is 1, so we should find the gap.
    3381            1 :                 assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
    3382              :             } else {
    3383            9 :                 assert!(gaps.is_none());
    3384              :             }
    3385              :         }
    3386              : 
    3387              :         // This is an update to an already existing block. No gaps here.
    3388            1 :         let update_blkno = 5;
    3389            1 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    3390            1 :         assert!(gaps.is_none());
    3391              : 
    3392              :         // This is an update past the current end block.
    3393            1 :         let after_gap_blkno = 20;
    3394            1 :         let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
    3395              : 
    3396            1 :         let gap_start_key = rel_block_to_key(rel, previous_nblocks);
    3397            1 :         let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
    3398            1 :         assert_eq!(
    3399            1 :             gaps.unwrap(),
    3400            1 :             KeySpace::single(gap_start_key..after_gap_key)
    3401              :         );
    3402            1 :     }
    3403              : 
    3404              :     #[test]
    3405            1 :     fn sharded_gap_finding() {
    3406            1 :         let rel = RelTag {
    3407            1 :             spcnode: 1663,
    3408            1 :             dbnode: 208101,
    3409            1 :             relnode: 2620,
    3410            1 :             forknum: 0,
    3411            1 :         };
    3412              : 
    3413            1 :         let first_blkno = 6;
    3414              : 
    3415              :         // This shard will get the even blocks
    3416            1 :         let shard = ShardIdentity::from_params(
    3417            1 :             ShardNumber(0),
    3418            1 :             ShardParameters {
    3419            1 :                 count: ShardCount(2),
    3420            1 :                 stripe_size: ShardStripeSize(1),
    3421            1 :             },
    3422              :         );
    3423              : 
    3424              :         // Only keys belonging to this shard are considered as gaps.
    3425            1 :         let mut previous_nblocks = 0;
    3426            1 :         let gaps =
    3427            1 :             DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
    3428            1 :         assert!(!gaps.ranges.is_empty());
    3429            3 :         for gap_range in gaps.ranges {
    3430            2 :             let mut k = gap_range.start;
    3431            4 :             while k != gap_range.end {
    3432            2 :                 assert_eq!(shard.get_shard_number(&k), shard.number);
    3433            2 :                 k = k.next();
    3434              :             }
    3435              :         }
    3436              : 
    3437            1 :         previous_nblocks = first_blkno;
    3438              : 
    3439            1 :         let update_blkno = 2;
    3440            1 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    3441            1 :         assert!(gaps.is_none());
    3442            1 :     }
    3443              : 
    3444              :     /*
    3445              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    3446              :             let incremental = timeline.get_current_logical_size();
    3447              :             let non_incremental = timeline
    3448              :                 .get_current_logical_size_non_incremental(lsn)
    3449              :                 .unwrap();
    3450              :             assert_eq!(incremental, non_incremental);
    3451              :         }
    3452              :     */
    3453              : 
    3454              :     /*
    3455              :     ///
    3456              :     /// Test list_rels() function, with branches and dropped relations
    3457              :     ///
    3458              :     #[test]
    3459              :     fn test_list_rels_drop() -> Result<()> {
    3460              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    3461              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    3462              :         const TESTDB: u32 = 111;
    3463              : 
    3464              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    3465              :         // after branching fails below
    3466              :         let mut writer = tline.begin_record(Lsn(0x10));
    3467              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    3468              :         writer.finish()?;
    3469              : 
    3470              :         // Create a relation on the timeline
    3471              :         let mut writer = tline.begin_record(Lsn(0x20));
    3472              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    3473              :         writer.finish()?;
    3474              : 
    3475              :         let writer = tline.begin_record(Lsn(0x00));
    3476              :         writer.finish()?;
    3477              : 
    3478              :         // Check that list_rels() lists it after LSN 2, but no before it
    3479              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    3480              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    3481              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    3482              : 
    3483              :         // Create a branch, check that the relation is visible there
    3484              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    3485              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    3486              :             Some(timeline) => timeline,
    3487              :             None => panic!("Should have a local timeline"),
    3488              :         };
    3489              :         let newtline = DatadirTimelineImpl::new(newtline);
    3490              :         assert!(newtline
    3491              :             .list_rels(0, TESTDB, Lsn(0x30))?
    3492              :             .contains(&TESTREL_A));
    3493              : 
    3494              :         // Drop it on the branch
    3495              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    3496              :         new_writer.drop_relation(TESTREL_A)?;
    3497              :         new_writer.finish()?;
    3498              : 
    3499              :         // Check that it's no longer listed on the branch after the point where it was dropped
    3500              :         assert!(newtline
    3501              :             .list_rels(0, TESTDB, Lsn(0x30))?
    3502              :             .contains(&TESTREL_A));
    3503              :         assert!(!newtline
    3504              :             .list_rels(0, TESTDB, Lsn(0x40))?
    3505              :             .contains(&TESTREL_A));
    3506              : 
    3507              :         // Run checkpoint and garbage collection and check that it's still not visible
    3508              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    3509              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    3510              : 
    3511              :         assert!(!newtline
    3512              :             .list_rels(0, TESTDB, Lsn(0x40))?
    3513              :             .contains(&TESTREL_A));
    3514              : 
    3515              :         Ok(())
    3516              :     }
    3517              :      */
    3518              : 
    3519              :     /*
    3520              :     #[test]
    3521              :     fn test_read_beyond_eof() -> Result<()> {
    3522              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    3523              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    3524              : 
    3525              :         make_some_layers(&tline, Lsn(0x20))?;
    3526              :         let mut writer = tline.begin_record(Lsn(0x60));
    3527              :         walingest.put_rel_page_image(
    3528              :             &mut writer,
    3529              :             TESTREL_A,
    3530              :             0,
    3531              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    3532              :         )?;
    3533              :         writer.finish()?;
    3534              : 
    3535              :         // Test read before rel creation. Should error out.
    3536              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    3537              : 
    3538              :         // Read block beyond end of relation at different points in time.
    3539              :         // These reads should fall into different delta, image, and in-memory layers.
    3540              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    3541              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    3542              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    3543              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    3544              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    3545              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    3546              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    3547              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    3548              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    3549              : 
    3550              :         // Test on an in-memory layer with no preceding layer
    3551              :         let mut writer = tline.begin_record(Lsn(0x70));
    3552              :         walingest.put_rel_page_image(
    3553              :             &mut writer,
    3554              :             TESTREL_B,
    3555              :             0,
    3556              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    3557              :         )?;
    3558              :         writer.finish()?;
    3559              : 
    3560              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    3561              : 
    3562              :         Ok(())
    3563              :     }
    3564              :      */
    3565              : }
        

Generated by: LCOV version 2.1-beta