LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 553e39c2773e5840c720c90d86e56f89a4330d43.info Lines: 57.6 % 2033 1170
Test Date: 2025-06-13 20:01:21 Functions: 45.9 % 218 100

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use std::collections::{HashMap, HashSet, hash_map};
      10              : use std::ops::{ControlFlow, Range};
      11              : 
      12              : use crate::walingest::{WalIngestError, WalIngestErrorKind};
      13              : use crate::{PERF_TRACE_TARGET, ensure_walingest};
      14              : use anyhow::Context;
      15              : use bytes::{Buf, Bytes, BytesMut};
      16              : use enum_map::Enum;
      17              : use pageserver_api::key::{
      18              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
      19              :     TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
      20              :     rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
      21              :     repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      22              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      23              : };
      24              : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
      25              : use pageserver_api::models::RelSizeMigration;
      26              : use pageserver_api::record::NeonWalRecord;
      27              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      28              : use pageserver_api::shard::ShardIdentity;
      29              : use pageserver_api::value::Value;
      30              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      31              : use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
      32              : use serde::{Deserialize, Serialize};
      33              : use strum::IntoEnumIterator;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::{debug, info, info_span, trace, warn};
      36              : use utils::bin_ser::{BeSer, DeserializeError};
      37              : use utils::lsn::Lsn;
      38              : use utils::pausable_failpoint;
      39              : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
      40              : 
      41              : use super::tenant::{PageReconstructError, Timeline};
      42              : use crate::aux_file;
      43              : use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
      44              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      45              : use crate::metrics::{
      46              :     RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
      47              :     RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
      48              :     RELSIZE_SNAPSHOT_CACHE_MISSES,
      49              : };
      50              : use crate::span::{
      51              :     debug_assert_current_span_has_tenant_and_timeline_id,
      52              :     debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
      53              : };
      54              : use crate::tenant::storage_layer::IoConcurrency;
      55              : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
      56              : 
      57              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      58              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      59              : 
      60              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      61              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
      62              : 
      63              : #[derive(Debug)]
      64              : pub enum LsnForTimestamp {
      65              :     /// Found commits both before and after the given timestamp
      66              :     Present(Lsn),
      67              : 
      68              :     /// Found no commits after the given timestamp, this means
      69              :     /// that the newest data in the branch is older than the given
      70              :     /// timestamp.
      71              :     ///
      72              :     /// All commits <= LSN happened before the given timestamp
      73              :     Future(Lsn),
      74              : 
      75              :     /// The queried timestamp is past our horizon we look back at (PITR)
      76              :     ///
      77              :     /// All commits > LSN happened after the given timestamp,
      78              :     /// but any commits < LSN might have happened before or after
      79              :     /// the given timestamp. We don't know because no data before
      80              :     /// the given lsn is available.
      81              :     Past(Lsn),
      82              : 
      83              :     /// We have found no commit with a timestamp,
      84              :     /// so we can't return anything meaningful.
      85              :     ///
      86              :     /// The associated LSN is the lower bound value we can safely
      87              :     /// create branches on, but no statement is made if it is
      88              :     /// older or newer than the timestamp.
      89              :     ///
      90              :     /// This variant can e.g. be returned right after a
      91              :     /// cluster import.
      92              :     NoData(Lsn),
      93              : }
      94              : 
      95              : /// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
      96              : /// See comments libs/pageserver_api/src/models.rs.
      97              : /// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
      98              : /// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
      99              : #[derive(Debug, Clone, Copy, Default)]
     100              : pub struct LsnRange {
     101              :     pub effective_lsn: Lsn,
     102              :     pub request_lsn: Lsn,
     103              : }
     104              : 
     105              : impl LsnRange {
     106            0 :     pub fn at(lsn: Lsn) -> LsnRange {
     107            0 :         LsnRange {
     108            0 :             effective_lsn: lsn,
     109            0 :             request_lsn: lsn,
     110            0 :         }
     111            0 :     }
     112           16 :     pub fn is_latest(&self) -> bool {
     113           16 :         self.request_lsn == Lsn::MAX
     114           16 :     }
     115              : }
     116              : 
     117              : #[derive(Debug, thiserror::Error)]
     118              : pub(crate) enum CalculateLogicalSizeError {
     119              :     #[error("cancelled")]
     120              :     Cancelled,
     121              : 
     122              :     /// Something went wrong while reading the metadata we use to calculate logical size
     123              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
     124              :     /// in the `From` implementation for this variant.
     125              :     #[error(transparent)]
     126              :     PageRead(PageReconstructError),
     127              : 
     128              :     /// Something went wrong deserializing metadata that we read to calculate logical size
     129              :     #[error("decode error: {0}")]
     130              :     Decode(#[from] DeserializeError),
     131              : }
     132              : 
     133              : #[derive(Debug, thiserror::Error)]
     134              : pub(crate) enum CollectKeySpaceError {
     135              :     #[error(transparent)]
     136              :     Decode(#[from] DeserializeError),
     137              :     #[error(transparent)]
     138              :     PageRead(PageReconstructError),
     139              :     #[error("cancelled")]
     140              :     Cancelled,
     141              : }
     142              : 
     143              : impl From<PageReconstructError> for CollectKeySpaceError {
     144            0 :     fn from(err: PageReconstructError) -> Self {
     145            0 :         match err {
     146            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     147            0 :             err => Self::PageRead(err),
     148              :         }
     149            0 :     }
     150              : }
     151              : 
     152              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     153            0 :     fn from(pre: PageReconstructError) -> Self {
     154            0 :         match pre {
     155            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     156            0 :             _ => Self::PageRead(pre),
     157              :         }
     158            0 :     }
     159              : }
     160              : 
     161              : #[derive(Debug, thiserror::Error)]
     162              : pub enum RelationError {
     163              :     #[error("invalid relnode")]
     164              :     InvalidRelnode,
     165              : }
     166              : 
     167              : ///
     168              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     169              : /// and other special kinds of files, in a versioned key-value store. The
     170              : /// Timeline struct provides the key-value store.
     171              : ///
     172              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     173              : /// implementation, and might be moved into a separate struct later.
     174              : impl Timeline {
     175              :     /// Start ingesting a WAL record, or other atomic modification of
     176              :     /// the timeline.
     177              :     ///
     178              :     /// This provides a transaction-like interface to perform a bunch
     179              :     /// of modifications atomically.
     180              :     ///
     181              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     182              :     /// DatadirModification object. Use the functions in the object to
     183              :     /// modify the repository state, updating all the pages and metadata
     184              :     /// that the WAL record affects. When you're done, call commit() to
     185              :     /// commit the changes.
     186              :     ///
     187              :     /// Lsn stored in modification is advanced by `ingest_record` and
     188              :     /// is used by `commit()` to update `last_record_lsn`.
     189              :     ///
     190              :     /// Calling commit() will flush all the changes and reset the state,
     191              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     192              :     ///
     193              :     /// Note that any pending modifications you make through the
     194              :     /// modification object won't be visible to calls to the 'get' and list
     195              :     /// functions of the timeline until you finish! And if you update the
     196              :     /// same page twice, the last update wins.
     197              :     ///
     198       134214 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     199       134214 :     where
     200       134214 :         Self: Sized,
     201       134214 :     {
     202       134214 :         DatadirModification {
     203       134214 :             tline: self,
     204       134214 :             pending_lsns: Vec::new(),
     205       134214 :             pending_metadata_pages: HashMap::new(),
     206       134214 :             pending_data_batch: None,
     207       134214 :             pending_deletions: Vec::new(),
     208       134214 :             pending_nblocks: 0,
     209       134214 :             pending_directory_entries: Vec::new(),
     210       134214 :             pending_metadata_bytes: 0,
     211       134214 :             lsn,
     212       134214 :         }
     213       134214 :     }
     214              : 
     215              :     //------------------------------------------------------------------------------
     216              :     // Public GET functions
     217              :     //------------------------------------------------------------------------------
     218              : 
     219              :     /// Look up given page version.
     220         9192 :     pub(crate) async fn get_rel_page_at_lsn(
     221         9192 :         &self,
     222         9192 :         tag: RelTag,
     223         9192 :         blknum: BlockNumber,
     224         9192 :         version: Version<'_>,
     225         9192 :         ctx: &RequestContext,
     226         9192 :         io_concurrency: IoConcurrency,
     227         9192 :     ) -> Result<Bytes, PageReconstructError> {
     228         9192 :         match version {
     229         9192 :             Version::LsnRange(lsns) => {
     230         9192 :                 let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
     231         9192 :                 let res = self
     232         9192 :                     .get_rel_page_at_lsn_batched(
     233         9192 :                         pages
     234         9192 :                             .iter()
     235         9192 :                             .map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
     236         9192 :                         io_concurrency.clone(),
     237         9192 :                         ctx,
     238         9192 :                     )
     239         9192 :                     .await;
     240         9192 :                 assert_eq!(res.len(), 1);
     241         9192 :                 res.into_iter().next().unwrap()
     242              :             }
     243            0 :             Version::Modified(modification) => {
     244            0 :                 if tag.relnode == 0 {
     245            0 :                     return Err(PageReconstructError::Other(
     246            0 :                         RelationError::InvalidRelnode.into(),
     247            0 :                     ));
     248            0 :                 }
     249              : 
     250            0 :                 let nblocks = self.get_rel_size(tag, version, ctx).await?;
     251            0 :                 if blknum >= nblocks {
     252            0 :                     debug!(
     253            0 :                         "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     254            0 :                         tag,
     255            0 :                         blknum,
     256            0 :                         version.get_lsn(),
     257              :                         nblocks
     258              :                     );
     259            0 :                     return Ok(ZERO_PAGE.clone());
     260            0 :                 }
     261            0 : 
     262            0 :                 let key = rel_block_to_key(tag, blknum);
     263            0 :                 modification.get(key, ctx).await
     264              :             }
     265              :         }
     266         9192 :     }
     267              : 
     268              :     /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
     269              :     ///
     270              :     /// The ordering of the returned vec corresponds to the ordering of `pages`.
     271         9192 :     pub(crate) async fn get_rel_page_at_lsn_batched(
     272         9192 :         &self,
     273         9192 :         pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
     274         9192 :         io_concurrency: IoConcurrency,
     275         9192 :         ctx: &RequestContext,
     276         9192 :     ) -> Vec<Result<Bytes, PageReconstructError>> {
     277         9192 :         debug_assert_current_span_has_tenant_and_timeline_id();
     278         9192 : 
     279         9192 :         let mut slots_filled = 0;
     280         9192 :         let page_count = pages.len();
     281         9192 : 
     282         9192 :         // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
     283         9192 :         let mut result = Vec::with_capacity(pages.len());
     284         9192 :         let result_slots = result.spare_capacity_mut();
     285         9192 : 
     286         9192 :         let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
     287         9192 :             HashMap::with_capacity(pages.len());
     288         9192 : 
     289         9192 :         let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
     290         9192 :             HashMap::with_capacity(pages.len());
     291              : 
     292         9192 :         for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
     293         9192 :             if tag.relnode == 0 {
     294            0 :                 result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
     295            0 :                     RelationError::InvalidRelnode.into(),
     296            0 :                 )));
     297            0 : 
     298            0 :                 slots_filled += 1;
     299            0 :                 continue;
     300         9192 :             }
     301         9192 :             let lsn = lsns.effective_lsn;
     302         9192 :             let nblocks = {
     303         9192 :                 let ctx = RequestContextBuilder::from(&ctx)
     304         9192 :                     .perf_span(|crnt_perf_span| {
     305            0 :                         info_span!(
     306              :                             target: PERF_TRACE_TARGET,
     307            0 :                             parent: crnt_perf_span,
     308              :                             "GET_REL_SIZE",
     309              :                             reltag=%tag,
     310              :                             lsn=%lsn,
     311              :                         )
     312         9192 :                     })
     313         9192 :                     .attached_child();
     314         9192 : 
     315         9192 :                 match self
     316         9192 :                     .get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
     317         9192 :                     .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
     318         9192 :                     .await
     319              :                 {
     320         9192 :                     Ok(nblocks) => nblocks,
     321            0 :                     Err(err) => {
     322            0 :                         result_slots[response_slot_idx].write(Err(err));
     323            0 :                         slots_filled += 1;
     324            0 :                         continue;
     325              :                     }
     326              :                 }
     327              :             };
     328              : 
     329         9192 :             if *blknum >= nblocks {
     330            0 :                 debug!(
     331            0 :                     "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     332              :                     tag, blknum, lsn, nblocks
     333              :                 );
     334            0 :                 result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
     335            0 :                 slots_filled += 1;
     336            0 :                 continue;
     337         9192 :             }
     338         9192 : 
     339         9192 :             let key = rel_block_to_key(*tag, *blknum);
     340         9192 : 
     341         9192 :             let ctx = RequestContextBuilder::from(&ctx)
     342         9192 :                 .perf_span(|crnt_perf_span| {
     343            0 :                     info_span!(
     344              :                         target: PERF_TRACE_TARGET,
     345            0 :                         parent: crnt_perf_span,
     346              :                         "GET_BATCH",
     347              :                         batch_size = %page_count,
     348              :                     )
     349         9192 :                 })
     350         9192 :                 .attached_child();
     351         9192 : 
     352         9192 :             let key_slots = keys_slots.entry(key).or_default();
     353         9192 :             key_slots.push((response_slot_idx, ctx));
     354         9192 : 
     355         9192 :             let acc = req_keyspaces.entry(lsn).or_default();
     356         9192 :             acc.add_key(key);
     357         9192 :         }
     358              : 
     359         9192 :         let query: Vec<(Lsn, KeySpace)> = req_keyspaces
     360         9192 :             .into_iter()
     361         9192 :             .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
     362         9192 :             .collect();
     363         9192 : 
     364         9192 :         let query = VersionedKeySpaceQuery::scattered(query);
     365         9192 :         let res = self
     366         9192 :             .get_vectored(query, io_concurrency, ctx)
     367         9192 :             .maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
     368         9192 :             .await;
     369              : 
     370         9192 :         match res {
     371         9192 :             Ok(results) => {
     372        18384 :                 for (key, res) in results {
     373         9192 :                     let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
     374         9192 :                     let (first_slot, first_req_ctx) = key_slots.next().unwrap();
     375              : 
     376         9192 :                     for (slot, req_ctx) in key_slots {
     377            0 :                         let clone = match &res {
     378            0 :                             Ok(buf) => Ok(buf.clone()),
     379            0 :                             Err(err) => Err(match err {
     380            0 :                                 PageReconstructError::Cancelled => PageReconstructError::Cancelled,
     381              : 
     382            0 :                                 x @ PageReconstructError::Other(_)
     383            0 :                                 | x @ PageReconstructError::AncestorLsnTimeout(_)
     384            0 :                                 | x @ PageReconstructError::WalRedo(_)
     385            0 :                                 | x @ PageReconstructError::MissingKey(_) => {
     386            0 :                                     PageReconstructError::Other(anyhow::anyhow!(
     387            0 :                                         "there was more than one request for this key in the batch, error logged once: {x:?}"
     388            0 :                                     ))
     389              :                                 }
     390              :                             }),
     391              :                         };
     392              : 
     393            0 :                         result_slots[slot].write(clone);
     394            0 :                         // There is no standardized way to express that the batched span followed from N request spans.
     395            0 :                         // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
     396            0 :                         // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
     397            0 :                         req_ctx.perf_follows_from(ctx);
     398            0 :                         slots_filled += 1;
     399              :                     }
     400              : 
     401         9192 :                     result_slots[first_slot].write(res);
     402         9192 :                     first_req_ctx.perf_follows_from(ctx);
     403         9192 :                     slots_filled += 1;
     404              :                 }
     405              :             }
     406            0 :             Err(err) => {
     407              :                 // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
     408              :                 // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
     409            0 :                 for (slot, req_ctx) in keys_slots.values().flatten() {
     410              :                     // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
     411              :                     // but without taking ownership of the GetVectoredError
     412            0 :                     let err = match &err {
     413            0 :                         GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
     414              :                         // TODO: restructure get_vectored API to make this error per-key
     415            0 :                         GetVectoredError::MissingKey(err) => {
     416            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!(
     417            0 :                                 "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
     418            0 :                             )))
     419              :                         }
     420              :                         // TODO: restructure get_vectored API to make this error per-key
     421            0 :                         GetVectoredError::GetReadyAncestorError(err) => {
     422            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!(
     423            0 :                                 "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
     424            0 :                             )))
     425              :                         }
     426              :                         // TODO: restructure get_vectored API to make this error per-key
     427            0 :                         GetVectoredError::Other(err) => Err(PageReconstructError::Other(
     428            0 :                             anyhow::anyhow!("whole vectored get request failed: {err:?}"),
     429            0 :                         )),
     430              :                         // TODO: we can prevent this error class by moving this check into the type system
     431            0 :                         GetVectoredError::InvalidLsn(e) => {
     432            0 :                             Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
     433              :                         }
     434              :                         // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys
     435              :                         // TODO: we can prevent this error class by moving this check into the type system
     436            0 :                         GetVectoredError::Oversized(err, max) => {
     437            0 :                             Err(anyhow::anyhow!("batching oversized: {err} > {max}").into())
     438              :                         }
     439              :                     };
     440              : 
     441            0 :                     req_ctx.perf_follows_from(ctx);
     442            0 :                     result_slots[*slot].write(err);
     443              :                 }
     444              : 
     445            0 :                 slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
     446            0 :             }
     447              :         };
     448              : 
     449         9192 :         assert_eq!(slots_filled, page_count);
     450              :         // SAFETY:
     451              :         // 1. `result` and any of its uninint members are not read from until this point
     452              :         // 2. The length below is tracked at run-time and matches the number of requested pages.
     453         9192 :         unsafe {
     454         9192 :             result.set_len(page_count);
     455         9192 :         }
     456         9192 : 
     457         9192 :         result
     458         9192 :     }
     459              : 
     460              :     /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
     461              :     /// other shards, by only accounting for relations the shard has pages for, and only accounting
     462              :     /// for pages up to the highest page number it has stored.
     463            0 :     pub(crate) async fn get_db_size(
     464            0 :         &self,
     465            0 :         spcnode: Oid,
     466            0 :         dbnode: Oid,
     467            0 :         version: Version<'_>,
     468            0 :         ctx: &RequestContext,
     469            0 :     ) -> Result<usize, PageReconstructError> {
     470            0 :         let mut total_blocks = 0;
     471              : 
     472            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     473              : 
     474            0 :         if rels.is_empty() {
     475            0 :             return Ok(0);
     476            0 :         }
     477            0 : 
     478            0 :         // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
     479            0 :         let reldir_key = rel_dir_to_key(spcnode, dbnode);
     480            0 :         let buf = version.get(self, reldir_key, ctx).await?;
     481            0 :         let reldir = RelDirectory::des(&buf)?;
     482              : 
     483            0 :         for rel in rels {
     484            0 :             let n_blocks = self
     485            0 :                 .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), ctx)
     486            0 :                 .await?;
     487            0 :             total_blocks += n_blocks as usize;
     488              :         }
     489            0 :         Ok(total_blocks)
     490            0 :     }
     491              : 
     492              :     /// Get size of a relation file. The relation must exist, otherwise an error is returned.
     493              :     ///
     494              :     /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
     495              :     /// page number stored in the shard.
     496        12217 :     pub(crate) async fn get_rel_size(
     497        12217 :         &self,
     498        12217 :         tag: RelTag,
     499        12217 :         version: Version<'_>,
     500        12217 :         ctx: &RequestContext,
     501        12217 :     ) -> Result<BlockNumber, PageReconstructError> {
     502        12217 :         self.get_rel_size_in_reldir(tag, version, None, ctx).await
     503        12217 :     }
     504              : 
     505              :     /// Get size of a relation file. The relation must exist, otherwise an error is returned.
     506              :     ///
     507              :     /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
     508        12217 :     pub(crate) async fn get_rel_size_in_reldir(
     509        12217 :         &self,
     510        12217 :         tag: RelTag,
     511        12217 :         version: Version<'_>,
     512        12217 :         deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
     513        12217 :         ctx: &RequestContext,
     514        12217 :     ) -> Result<BlockNumber, PageReconstructError> {
     515        12217 :         if tag.relnode == 0 {
     516            0 :             return Err(PageReconstructError::Other(
     517            0 :                 RelationError::InvalidRelnode.into(),
     518            0 :             ));
     519        12217 :         }
     520              : 
     521        12217 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
     522        12210 :             return Ok(nblocks);
     523            7 :         }
     524            7 : 
     525            7 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     526            0 :             && !self
     527            0 :                 .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
     528            0 :                 .await?
     529              :         {
     530              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     531              :             // FSM, and smgrnblocks() on it immediately afterwards,
     532              :             // without extending it.  Tolerate that by claiming that
     533              :             // any non-existent FSM fork has size 0.
     534            0 :             return Ok(0);
     535            7 :         }
     536            7 : 
     537            7 :         let key = rel_size_to_key(tag);
     538            7 :         let mut buf = version.get(self, key, ctx).await?;
     539            5 :         let nblocks = buf.get_u32_le();
     540            5 : 
     541            5 :         self.update_cached_rel_size(tag, version, nblocks);
     542            5 : 
     543            5 :         Ok(nblocks)
     544        12217 :     }
     545              : 
     546              :     /// Does the relation exist?
     547              :     ///
     548              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     549              :     /// the shard stores pages for.
     550              :     ///
     551         3025 :     pub(crate) async fn get_rel_exists(
     552         3025 :         &self,
     553         3025 :         tag: RelTag,
     554         3025 :         version: Version<'_>,
     555         3025 :         ctx: &RequestContext,
     556         3025 :     ) -> Result<bool, PageReconstructError> {
     557         3025 :         self.get_rel_exists_in_reldir(tag, version, None, ctx).await
     558         3025 :     }
     559              : 
     560              :     /// Does the relation exist? With a cached deserialized `RelDirectory`.
     561              :     ///
     562              :     /// There are some cases where the caller loops across all relations. In that specific case,
     563              :     /// the caller should obtain the deserialized `RelDirectory` first and then call this function
     564              :     /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
     565              :     /// a new API (e.g., `get_rel_exists_batched`).
     566         3025 :     pub(crate) async fn get_rel_exists_in_reldir(
     567         3025 :         &self,
     568         3025 :         tag: RelTag,
     569         3025 :         version: Version<'_>,
     570         3025 :         deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
     571         3025 :         ctx: &RequestContext,
     572         3025 :     ) -> Result<bool, PageReconstructError> {
     573         3025 :         if tag.relnode == 0 {
     574            0 :             return Err(PageReconstructError::Other(
     575            0 :                 RelationError::InvalidRelnode.into(),
     576            0 :             ));
     577         3025 :         }
     578              : 
     579              :         // first try to lookup relation in cache
     580         3025 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
     581         3016 :             return Ok(true);
     582            9 :         }
     583              :         // then check if the database was already initialized.
     584              :         // get_rel_exists can be called before dbdir is created.
     585            9 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     586            9 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     587            9 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     588            0 :             return Ok(false);
     589            9 :         }
     590            9 : 
     591            9 :         // Read path: first read the new reldir keyspace. Early return if the relation exists.
     592            9 :         // Otherwise, read the old reldir keyspace.
     593            9 :         // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
     594            9 : 
     595            9 :         if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
     596            9 :             self.get_rel_size_v2_status()
     597              :         {
     598              :             // fetch directory listing (new)
     599            0 :             let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
     600            0 :             let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
     601            0 :                 .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
     602            0 :             let exists_v2 = buf == RelDirExists::Exists;
     603            0 :             // Fast path: if the relation exists in the new format, return true.
     604            0 :             // TODO: we should have a verification mode that checks both keyspaces
     605            0 :             // to ensure the relation only exists in one of them.
     606            0 :             if exists_v2 {
     607            0 :                 return Ok(true);
     608            0 :             }
     609            9 :         }
     610              : 
     611              :         // fetch directory listing (old)
     612              : 
     613            9 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     614              : 
     615            9 :         if let Some((cached_key, dir)) = deserialized_reldir_v1 {
     616            0 :             if cached_key == key {
     617            0 :                 return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
     618            0 :             } else if cfg!(test) || cfg!(feature = "testing") {
     619            0 :                 panic!("cached reldir key mismatch: {cached_key} != {key}");
     620              :             } else {
     621            0 :                 warn!("cached reldir key mismatch: {cached_key} != {key}");
     622              :             }
     623              :             // Fallback to reading the directory from the datadir.
     624            9 :         }
     625            9 :         let buf = version.get(self, key, ctx).await?;
     626              : 
     627            9 :         let dir = RelDirectory::des(&buf)?;
     628            9 :         let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
     629            9 :         Ok(exists_v1)
     630         3025 :     }
     631              : 
     632              :     /// Get a list of all existing relations in given tablespace and database.
     633              :     ///
     634              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     635              :     /// the shard stores pages for.
     636              :     ///
     637              :     /// # Cancel-Safety
     638              :     ///
     639              :     /// This method is cancellation-safe.
     640            0 :     pub(crate) async fn list_rels(
     641            0 :         &self,
     642            0 :         spcnode: Oid,
     643            0 :         dbnode: Oid,
     644            0 :         version: Version<'_>,
     645            0 :         ctx: &RequestContext,
     646            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     647            0 :         // fetch directory listing (old)
     648            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     649            0 :         let buf = version.get(self, key, ctx).await?;
     650              : 
     651            0 :         let dir = RelDirectory::des(&buf)?;
     652            0 :         let rels_v1: HashSet<RelTag> =
     653            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     654            0 :                 spcnode,
     655            0 :                 dbnode,
     656            0 :                 relnode: *relnode,
     657            0 :                 forknum: *forknum,
     658            0 :             }));
     659            0 : 
     660            0 :         if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
     661            0 :             return Ok(rels_v1);
     662            0 :         }
     663            0 : 
     664            0 :         // scan directory listing (new), merge with the old results
     665            0 :         let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
     666            0 :         let io_concurrency = IoConcurrency::spawn_from_conf(
     667            0 :             self.conf.get_vectored_concurrent_io,
     668            0 :             self.gate
     669            0 :                 .enter()
     670            0 :                 .map_err(|_| PageReconstructError::Cancelled)?,
     671              :         );
     672            0 :         let results = self
     673            0 :             .scan(
     674            0 :                 KeySpace::single(key_range),
     675            0 :                 version.get_lsn(),
     676            0 :                 ctx,
     677            0 :                 io_concurrency,
     678            0 :             )
     679            0 :             .await?;
     680            0 :         let mut rels = rels_v1;
     681            0 :         for (key, val) in results {
     682            0 :             let val = RelDirExists::decode(&val?)
     683            0 :                 .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
     684            0 :             assert_eq!(key.field6, 1);
     685            0 :             assert_eq!(key.field2, spcnode);
     686            0 :             assert_eq!(key.field3, dbnode);
     687            0 :             let tag = RelTag {
     688            0 :                 spcnode,
     689            0 :                 dbnode,
     690            0 :                 relnode: key.field4,
     691            0 :                 forknum: key.field5,
     692            0 :             };
     693            0 :             if val == RelDirExists::Removed {
     694            0 :                 debug_assert!(!rels.contains(&tag), "removed reltag in v2");
     695            0 :                 continue;
     696            0 :             }
     697            0 :             let did_not_contain = rels.insert(tag);
     698            0 :             debug_assert!(did_not_contain, "duplicate reltag in v2");
     699              :         }
     700            0 :         Ok(rels)
     701            0 :     }
     702              : 
     703              :     /// Get the whole SLRU segment
     704            0 :     pub(crate) async fn get_slru_segment(
     705            0 :         &self,
     706            0 :         kind: SlruKind,
     707            0 :         segno: u32,
     708            0 :         lsn: Lsn,
     709            0 :         ctx: &RequestContext,
     710            0 :     ) -> Result<Bytes, PageReconstructError> {
     711            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     712            0 :         let n_blocks = self
     713            0 :             .get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
     714            0 :             .await?;
     715              : 
     716            0 :         let keyspace = KeySpace::single(
     717            0 :             slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
     718            0 :         );
     719            0 : 
     720            0 :         let batches = keyspace.partition(
     721            0 :             self.get_shard_identity(),
     722            0 :             self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
     723            0 :         );
     724              : 
     725            0 :         let io_concurrency = IoConcurrency::spawn_from_conf(
     726            0 :             self.conf.get_vectored_concurrent_io,
     727            0 :             self.gate
     728            0 :                 .enter()
     729            0 :                 .map_err(|_| PageReconstructError::Cancelled)?,
     730              :         );
     731              : 
     732            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     733            0 :         for batch in batches.parts {
     734            0 :             let query = VersionedKeySpaceQuery::uniform(batch, lsn);
     735            0 :             let blocks = self
     736            0 :                 .get_vectored(query, io_concurrency.clone(), ctx)
     737            0 :                 .await?;
     738              : 
     739            0 :             for (_key, block) in blocks {
     740            0 :                 let block = block?;
     741            0 :                 segment.extend_from_slice(&block[..BLCKSZ as usize]);
     742              :             }
     743              :         }
     744              : 
     745            0 :         Ok(segment.freeze())
     746            0 :     }
     747              : 
     748              :     /// Get size of an SLRU segment
     749            0 :     pub(crate) async fn get_slru_segment_size(
     750            0 :         &self,
     751            0 :         kind: SlruKind,
     752            0 :         segno: u32,
     753            0 :         version: Version<'_>,
     754            0 :         ctx: &RequestContext,
     755            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     756            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     757            0 :         let key = slru_segment_size_to_key(kind, segno);
     758            0 :         let mut buf = version.get(self, key, ctx).await?;
     759            0 :         Ok(buf.get_u32_le())
     760            0 :     }
     761              : 
     762              :     /// Does the slru segment exist?
     763            0 :     pub(crate) async fn get_slru_segment_exists(
     764            0 :         &self,
     765            0 :         kind: SlruKind,
     766            0 :         segno: u32,
     767            0 :         version: Version<'_>,
     768            0 :         ctx: &RequestContext,
     769            0 :     ) -> Result<bool, PageReconstructError> {
     770            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     771              :         // fetch directory listing
     772            0 :         let key = slru_dir_to_key(kind);
     773            0 :         let buf = version.get(self, key, ctx).await?;
     774              : 
     775            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     776            0 :         Ok(dir.segments.contains(&segno))
     777            0 :     }
     778              : 
     779              :     /// Locate LSN, such that all transactions that committed before
     780              :     /// 'search_timestamp' are visible, but nothing newer is.
     781              :     ///
     782              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     783              :     /// so it's not well defined which LSN you get if there were multiple commits
     784              :     /// "in flight" at that point in time.
     785              :     ///
     786            0 :     pub(crate) async fn find_lsn_for_timestamp(
     787            0 :         &self,
     788            0 :         search_timestamp: TimestampTz,
     789            0 :         cancel: &CancellationToken,
     790            0 :         ctx: &RequestContext,
     791            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     792            0 :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     793              : 
     794            0 :         let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
     795            0 :         let gc_cutoff_planned = {
     796            0 :             let gc_info = self.gc_info.read().unwrap();
     797            0 :             gc_info.min_cutoff()
     798            0 :         };
     799            0 :         // Usually the planned cutoff is newer than the cutoff of the last gc run,
     800            0 :         // but let's be defensive.
     801            0 :         let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
     802            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     803            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     804            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     805            0 :         // on the safe side.
     806            0 :         let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
     807            0 :         let max_lsn = self.get_last_record_lsn();
     808            0 : 
     809            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     810            0 :         // LSN divided by 8.
     811            0 :         let mut low = min_lsn.0 / 8;
     812            0 :         let mut high = max_lsn.0 / 8 + 1;
     813            0 : 
     814            0 :         let mut found_smaller = false;
     815            0 :         let mut found_larger = false;
     816              : 
     817            0 :         while low < high {
     818            0 :             if cancel.is_cancelled() {
     819            0 :                 return Err(PageReconstructError::Cancelled);
     820            0 :             }
     821            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     822            0 :             let mid = (high + low) / 2;
     823              : 
     824            0 :             let cmp = match self
     825            0 :                 .is_latest_commit_timestamp_ge_than(
     826            0 :                     search_timestamp,
     827            0 :                     Lsn(mid * 8),
     828            0 :                     &mut found_smaller,
     829            0 :                     &mut found_larger,
     830            0 :                     ctx,
     831            0 :                 )
     832            0 :                 .await
     833              :             {
     834            0 :                 Ok(res) => res,
     835            0 :                 Err(PageReconstructError::MissingKey(e)) => {
     836            0 :                     warn!(
     837            0 :                         "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
     838              :                         e
     839              :                     );
     840              :                     // Return that we didn't find any requests smaller than the LSN, and logging the error.
     841            0 :                     return Ok(LsnForTimestamp::Past(min_lsn));
     842              :                 }
     843            0 :                 Err(e) => return Err(e),
     844              :             };
     845              : 
     846            0 :             if cmp {
     847            0 :                 high = mid;
     848            0 :             } else {
     849            0 :                 low = mid + 1;
     850            0 :             }
     851              :         }
     852              : 
     853              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     854              :         // so the LSN of the last commit record before or at `search_timestamp`.
     855              :         // Remove one from `low` to get `t`.
     856              :         //
     857              :         // FIXME: it would be better to get the LSN of the previous commit.
     858              :         // Otherwise, if you restore to the returned LSN, the database will
     859              :         // include physical changes from later commits that will be marked
     860              :         // as aborted, and will need to be vacuumed away.
     861            0 :         let commit_lsn = Lsn((low - 1) * 8);
     862            0 :         match (found_smaller, found_larger) {
     863              :             (false, false) => {
     864              :                 // This can happen if no commit records have been processed yet, e.g.
     865              :                 // just after importing a cluster.
     866            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     867              :             }
     868              :             (false, true) => {
     869              :                 // Didn't find any commit timestamps smaller than the request
     870            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     871              :             }
     872            0 :             (true, _) if commit_lsn < min_lsn => {
     873            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     874            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     875            0 :                 // below the min_lsn. We should never do that.
     876            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     877              :             }
     878              :             (true, false) => {
     879              :                 // Only found commits with timestamps smaller than the request.
     880              :                 // It's still a valid case for branch creation, return it.
     881              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     882              :                 // case, anyway.
     883            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     884              :             }
     885            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     886              :         }
     887            0 :     }
     888              : 
     889              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     890              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     891              :     ///
     892              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     893              :     /// with a smaller/larger timestamp.
     894              :     ///
     895            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     896            0 :         &self,
     897            0 :         search_timestamp: TimestampTz,
     898            0 :         probe_lsn: Lsn,
     899            0 :         found_smaller: &mut bool,
     900            0 :         found_larger: &mut bool,
     901            0 :         ctx: &RequestContext,
     902            0 :     ) -> Result<bool, PageReconstructError> {
     903            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     904            0 :             if timestamp >= search_timestamp {
     905            0 :                 *found_larger = true;
     906            0 :                 return ControlFlow::Break(true);
     907            0 :             } else {
     908            0 :                 *found_smaller = true;
     909            0 :             }
     910            0 :             ControlFlow::Continue(())
     911            0 :         })
     912            0 :         .await
     913            0 :     }
     914              : 
     915              :     /// Obtain the timestamp for the given lsn.
     916              :     ///
     917              :     /// If the lsn has no timestamps (e.g. no commits), returns None.
     918            0 :     pub(crate) async fn get_timestamp_for_lsn(
     919            0 :         &self,
     920            0 :         probe_lsn: Lsn,
     921            0 :         ctx: &RequestContext,
     922            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     923            0 :         let mut max: Option<TimestampTz> = None;
     924            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     925            0 :             if let Some(max_prev) = max {
     926            0 :                 max = Some(max_prev.max(timestamp));
     927            0 :             } else {
     928            0 :                 max = Some(timestamp);
     929            0 :             }
     930            0 :             ControlFlow::Continue(())
     931            0 :         })
     932            0 :         .await?;
     933              : 
     934            0 :         Ok(max)
     935            0 :     }
     936              : 
     937              :     /// Runs the given function on all the timestamps for a given lsn
     938              :     ///
     939              :     /// The return value is either given by the closure, or set to the `Default`
     940              :     /// impl's output.
     941            0 :     async fn map_all_timestamps<T: Default>(
     942            0 :         &self,
     943            0 :         probe_lsn: Lsn,
     944            0 :         ctx: &RequestContext,
     945            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     946            0 :     ) -> Result<T, PageReconstructError> {
     947            0 :         for segno in self
     948            0 :             .list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
     949            0 :             .await?
     950              :         {
     951            0 :             let nblocks = self
     952            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
     953            0 :                 .await?;
     954              : 
     955            0 :             let keyspace = KeySpace::single(
     956            0 :                 slru_block_to_key(SlruKind::Clog, segno, 0)
     957            0 :                     ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
     958            0 :             );
     959            0 : 
     960            0 :             let batches = keyspace.partition(
     961            0 :                 self.get_shard_identity(),
     962            0 :                 self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
     963            0 :             );
     964              : 
     965            0 :             let io_concurrency = IoConcurrency::spawn_from_conf(
     966            0 :                 self.conf.get_vectored_concurrent_io,
     967            0 :                 self.gate
     968            0 :                     .enter()
     969            0 :                     .map_err(|_| PageReconstructError::Cancelled)?,
     970              :             );
     971              : 
     972            0 :             for batch in batches.parts.into_iter().rev() {
     973            0 :                 let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
     974            0 :                 let blocks = self
     975            0 :                     .get_vectored(query, io_concurrency.clone(), ctx)
     976            0 :                     .await?;
     977              : 
     978            0 :                 for (_key, clog_page) in blocks.into_iter().rev() {
     979            0 :                     let clog_page = clog_page?;
     980              : 
     981            0 :                     if clog_page.len() == BLCKSZ as usize + 8 {
     982            0 :                         let mut timestamp_bytes = [0u8; 8];
     983            0 :                         timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     984            0 :                         let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     985            0 : 
     986            0 :                         match f(timestamp) {
     987            0 :                             ControlFlow::Break(b) => return Ok(b),
     988            0 :                             ControlFlow::Continue(()) => (),
     989              :                         }
     990            0 :                     }
     991              :                 }
     992              :             }
     993              :         }
     994            0 :         Ok(Default::default())
     995            0 :     }
     996              : 
     997            0 :     pub(crate) async fn get_slru_keyspace(
     998            0 :         &self,
     999            0 :         version: Version<'_>,
    1000            0 :         ctx: &RequestContext,
    1001            0 :     ) -> Result<KeySpace, PageReconstructError> {
    1002            0 :         let mut accum = KeySpaceAccum::new();
    1003              : 
    1004            0 :         for kind in SlruKind::iter() {
    1005            0 :             let mut segments: Vec<u32> = self
    1006            0 :                 .list_slru_segments(kind, version, ctx)
    1007            0 :                 .await?
    1008            0 :                 .into_iter()
    1009            0 :                 .collect();
    1010            0 :             segments.sort_unstable();
    1011              : 
    1012            0 :             for seg in segments {
    1013            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
    1014              : 
    1015            0 :                 accum.add_range(
    1016            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
    1017            0 :                 );
    1018              :             }
    1019              :         }
    1020              : 
    1021            0 :         Ok(accum.to_keyspace())
    1022            0 :     }
    1023              : 
    1024              :     /// Get a list of SLRU segments
    1025            0 :     pub(crate) async fn list_slru_segments(
    1026            0 :         &self,
    1027            0 :         kind: SlruKind,
    1028            0 :         version: Version<'_>,
    1029            0 :         ctx: &RequestContext,
    1030            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
    1031            0 :         // fetch directory entry
    1032            0 :         let key = slru_dir_to_key(kind);
    1033              : 
    1034            0 :         let buf = version.get(self, key, ctx).await?;
    1035            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
    1036            0 :     }
    1037              : 
    1038            0 :     pub(crate) async fn get_relmap_file(
    1039            0 :         &self,
    1040            0 :         spcnode: Oid,
    1041            0 :         dbnode: Oid,
    1042            0 :         version: Version<'_>,
    1043            0 :         ctx: &RequestContext,
    1044            0 :     ) -> Result<Bytes, PageReconstructError> {
    1045            0 :         let key = relmap_file_key(spcnode, dbnode);
    1046              : 
    1047            0 :         let buf = version.get(self, key, ctx).await?;
    1048            0 :         Ok(buf)
    1049            0 :     }
    1050              : 
    1051          168 :     pub(crate) async fn list_dbdirs(
    1052          168 :         &self,
    1053          168 :         lsn: Lsn,
    1054          168 :         ctx: &RequestContext,
    1055          168 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
    1056              :         // fetch directory entry
    1057          168 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
    1058              : 
    1059          168 :         Ok(DbDirectory::des(&buf)?.dbdirs)
    1060          168 :     }
    1061              : 
    1062            0 :     pub(crate) async fn get_twophase_file(
    1063            0 :         &self,
    1064            0 :         xid: u64,
    1065            0 :         lsn: Lsn,
    1066            0 :         ctx: &RequestContext,
    1067            0 :     ) -> Result<Bytes, PageReconstructError> {
    1068            0 :         let key = twophase_file_key(xid);
    1069            0 :         let buf = self.get(key, lsn, ctx).await?;
    1070            0 :         Ok(buf)
    1071            0 :     }
    1072              : 
    1073          169 :     pub(crate) async fn list_twophase_files(
    1074          169 :         &self,
    1075          169 :         lsn: Lsn,
    1076          169 :         ctx: &RequestContext,
    1077          169 :     ) -> Result<HashSet<u64>, PageReconstructError> {
    1078              :         // fetch directory entry
    1079          169 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
    1080              : 
    1081          169 :         if self.pg_version >= 17 {
    1082          156 :             Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
    1083              :         } else {
    1084           13 :             Ok(TwoPhaseDirectory::des(&buf)?
    1085              :                 .xids
    1086           13 :                 .iter()
    1087           13 :                 .map(|x| u64::from(*x))
    1088           13 :                 .collect())
    1089              :         }
    1090          169 :     }
    1091              : 
    1092            0 :     pub(crate) async fn get_control_file(
    1093            0 :         &self,
    1094            0 :         lsn: Lsn,
    1095            0 :         ctx: &RequestContext,
    1096            0 :     ) -> Result<Bytes, PageReconstructError> {
    1097            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
    1098            0 :     }
    1099              : 
    1100            6 :     pub(crate) async fn get_checkpoint(
    1101            6 :         &self,
    1102            6 :         lsn: Lsn,
    1103            6 :         ctx: &RequestContext,
    1104            6 :     ) -> Result<Bytes, PageReconstructError> {
    1105            6 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
    1106            6 :     }
    1107              : 
    1108            6 :     async fn list_aux_files_v2(
    1109            6 :         &self,
    1110            6 :         lsn: Lsn,
    1111            6 :         ctx: &RequestContext,
    1112            6 :         io_concurrency: IoConcurrency,
    1113            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
    1114            6 :         let kv = self
    1115            6 :             .scan(
    1116            6 :                 KeySpace::single(Key::metadata_aux_key_range()),
    1117            6 :                 lsn,
    1118            6 :                 ctx,
    1119            6 :                 io_concurrency,
    1120            6 :             )
    1121            6 :             .await?;
    1122            6 :         let mut result = HashMap::new();
    1123            6 :         let mut sz = 0;
    1124           15 :         for (_, v) in kv {
    1125            9 :             let v = v?;
    1126            9 :             let v = aux_file::decode_file_value_bytes(&v)
    1127            9 :                 .context("value decode")
    1128            9 :                 .map_err(PageReconstructError::Other)?;
    1129           17 :             for (fname, content) in v {
    1130            8 :                 sz += fname.len();
    1131            8 :                 sz += content.len();
    1132            8 :                 result.insert(fname, content);
    1133            8 :             }
    1134              :         }
    1135            6 :         self.aux_file_size_estimator.on_initial(sz);
    1136            6 :         Ok(result)
    1137            6 :     }
    1138              : 
    1139            0 :     pub(crate) async fn trigger_aux_file_size_computation(
    1140            0 :         &self,
    1141            0 :         lsn: Lsn,
    1142            0 :         ctx: &RequestContext,
    1143            0 :         io_concurrency: IoConcurrency,
    1144            0 :     ) -> Result<(), PageReconstructError> {
    1145            0 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
    1146            0 :         Ok(())
    1147            0 :     }
    1148              : 
    1149            6 :     pub(crate) async fn list_aux_files(
    1150            6 :         &self,
    1151            6 :         lsn: Lsn,
    1152            6 :         ctx: &RequestContext,
    1153            6 :         io_concurrency: IoConcurrency,
    1154            6 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
    1155            6 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await
    1156            6 :     }
    1157              : 
    1158            2 :     pub(crate) async fn get_replorigins(
    1159            2 :         &self,
    1160            2 :         lsn: Lsn,
    1161            2 :         ctx: &RequestContext,
    1162            2 :         io_concurrency: IoConcurrency,
    1163            2 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
    1164            2 :         let kv = self
    1165            2 :             .scan(
    1166            2 :                 KeySpace::single(repl_origin_key_range()),
    1167            2 :                 lsn,
    1168            2 :                 ctx,
    1169            2 :                 io_concurrency,
    1170            2 :             )
    1171            2 :             .await?;
    1172            2 :         let mut result = HashMap::new();
    1173            6 :         for (k, v) in kv {
    1174            5 :             let v = v?;
    1175            5 :             if v.is_empty() {
    1176              :                 // This is a tombstone -- we can skip it.
    1177              :                 // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
    1178              :                 // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
    1179              :                 // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
    1180              :                 // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
    1181            2 :                 continue;
    1182            3 :             }
    1183            3 :             let origin_id = k.field6 as RepOriginId;
    1184            3 :             let origin_lsn = Lsn::des(&v)
    1185            3 :                 .with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
    1186            2 :             if origin_lsn != Lsn::INVALID {
    1187            2 :                 result.insert(origin_id, origin_lsn);
    1188            2 :             }
    1189              :         }
    1190            1 :         Ok(result)
    1191            2 :     }
    1192              : 
    1193              :     /// Does the same as get_current_logical_size but counted on demand.
    1194              :     /// Used to initialize the logical size tracking on startup.
    1195              :     ///
    1196              :     /// Only relation blocks are counted currently. That excludes metadata,
    1197              :     /// SLRUs, twophase files etc.
    1198              :     ///
    1199              :     /// # Cancel-Safety
    1200              :     ///
    1201              :     /// This method is cancellation-safe.
    1202            7 :     pub(crate) async fn get_current_logical_size_non_incremental(
    1203            7 :         &self,
    1204            7 :         lsn: Lsn,
    1205            7 :         ctx: &RequestContext,
    1206            7 :     ) -> Result<u64, CalculateLogicalSizeError> {
    1207            7 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
    1208            7 : 
    1209            7 :         fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
    1210              : 
    1211              :         // Fetch list of database dirs and iterate them
    1212            7 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
    1213            7 :         let dbdir = DbDirectory::des(&buf)?;
    1214              : 
    1215            7 :         let mut total_size: u64 = 0;
    1216            7 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
    1217            0 :             for rel in self
    1218            0 :                 .list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
    1219            0 :                 .await?
    1220              :             {
    1221            0 :                 if self.cancel.is_cancelled() {
    1222            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
    1223            0 :                 }
    1224            0 :                 let relsize_key = rel_size_to_key(rel);
    1225            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1226            0 :                 let relsize = buf.get_u32_le();
    1227            0 : 
    1228            0 :                 total_size += relsize as u64;
    1229              :             }
    1230              :         }
    1231            7 :         Ok(total_size * BLCKSZ as u64)
    1232            7 :     }
    1233              : 
    1234              :     /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
    1235              :     /// for gc-compaction.
    1236              :     ///
    1237              :     /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
    1238              :     /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
    1239              :     /// be kept only for a specific range of LSN.
    1240              :     ///
    1241              :     /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
    1242              :     /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
    1243              :     /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
    1244              :     /// determine which keys to retain/drop for gc-compaction.
    1245              :     ///
    1246              :     /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
    1247              :     /// to be retained for each of the branch LSN.
    1248              :     ///
    1249              :     /// The return value is (dense keyspace, sparse keyspace).
    1250           27 :     pub(crate) async fn collect_gc_compaction_keyspace(
    1251           27 :         &self,
    1252           27 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1253           27 :         let metadata_key_begin = Key::metadata_key_range().start;
    1254           27 :         let aux_v1_key = AUX_FILES_KEY;
    1255           27 :         let dense_keyspace = KeySpace {
    1256           27 :             ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
    1257           27 :         };
    1258           27 :         Ok((
    1259           27 :             dense_keyspace,
    1260           27 :             SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
    1261           27 :         ))
    1262           27 :     }
    1263              : 
    1264              :     ///
    1265              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
    1266              :     /// Anything that's not listed maybe removed from the underlying storage (from
    1267              :     /// that LSN forwards).
    1268              :     ///
    1269              :     /// The return value is (dense keyspace, sparse keyspace).
    1270          168 :     pub(crate) async fn collect_keyspace(
    1271          168 :         &self,
    1272          168 :         lsn: Lsn,
    1273          168 :         ctx: &RequestContext,
    1274          168 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1275          168 :         // Iterate through key ranges, greedily packing them into partitions
    1276          168 :         let mut result = KeySpaceAccum::new();
    1277          168 : 
    1278          168 :         // The dbdir metadata always exists
    1279          168 :         result.add_key(DBDIR_KEY);
    1280              : 
    1281              :         // Fetch list of database dirs and iterate them
    1282          168 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
    1283          168 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
    1284          168 : 
    1285          168 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
    1286          168 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
    1287            0 :             if has_relmap_file {
    1288            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
    1289            0 :             }
    1290            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
    1291              : 
    1292            0 :             let mut rels: Vec<RelTag> = self
    1293            0 :                 .list_rels(spcnode, dbnode, Version::at(lsn), ctx)
    1294            0 :                 .await?
    1295            0 :                 .into_iter()
    1296            0 :                 .collect();
    1297            0 :             rels.sort_unstable();
    1298            0 :             for rel in rels {
    1299            0 :                 let relsize_key = rel_size_to_key(rel);
    1300            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1301            0 :                 let relsize = buf.get_u32_le();
    1302            0 : 
    1303            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
    1304            0 :                 result.add_key(relsize_key);
    1305              :             }
    1306              :         }
    1307              : 
    1308              :         // Iterate SLRUs next
    1309          168 :         if self.tenant_shard_id.is_shard_zero() {
    1310          495 :             for kind in [
    1311          165 :                 SlruKind::Clog,
    1312          165 :                 SlruKind::MultiXactMembers,
    1313          165 :                 SlruKind::MultiXactOffsets,
    1314              :             ] {
    1315          495 :                 let slrudir_key = slru_dir_to_key(kind);
    1316          495 :                 result.add_key(slrudir_key);
    1317          495 :                 let buf = self.get(slrudir_key, lsn, ctx).await?;
    1318          495 :                 let dir = SlruSegmentDirectory::des(&buf)?;
    1319          495 :                 let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
    1320          495 :                 segments.sort_unstable();
    1321          495 :                 for segno in segments {
    1322            0 :                     let segsize_key = slru_segment_size_to_key(kind, segno);
    1323            0 :                     let mut buf = self.get(segsize_key, lsn, ctx).await?;
    1324            0 :                     let segsize = buf.get_u32_le();
    1325            0 : 
    1326            0 :                     result.add_range(
    1327            0 :                         slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
    1328            0 :                     );
    1329            0 :                     result.add_key(segsize_key);
    1330              :                 }
    1331              :             }
    1332            3 :         }
    1333              : 
    1334              :         // Then pg_twophase
    1335          168 :         result.add_key(TWOPHASEDIR_KEY);
    1336              : 
    1337          168 :         let mut xids: Vec<u64> = self
    1338          168 :             .list_twophase_files(lsn, ctx)
    1339          168 :             .await?
    1340          168 :             .iter()
    1341          168 :             .cloned()
    1342          168 :             .collect();
    1343          168 :         xids.sort_unstable();
    1344          168 :         for xid in xids {
    1345            0 :             result.add_key(twophase_file_key(xid));
    1346            0 :         }
    1347              : 
    1348          168 :         result.add_key(CONTROLFILE_KEY);
    1349          168 :         result.add_key(CHECKPOINT_KEY);
    1350          168 : 
    1351          168 :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
    1352          168 :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
    1353          168 :         // and the keys will not be garbage-colllected.
    1354          168 :         #[cfg(test)]
    1355          168 :         {
    1356          168 :             let guard = self.extra_test_dense_keyspace.load();
    1357          168 :             for kr in &guard.ranges {
    1358            0 :                 result.add_range(kr.clone());
    1359            0 :             }
    1360            0 :         }
    1361            0 : 
    1362          168 :         let dense_keyspace = result.to_keyspace();
    1363          168 :         let sparse_keyspace = SparseKeySpace(KeySpace {
    1364          168 :             ranges: vec![
    1365          168 :                 Key::metadata_aux_key_range(),
    1366          168 :                 repl_origin_key_range(),
    1367          168 :                 Key::rel_dir_sparse_key_range(),
    1368          168 :             ],
    1369          168 :         });
    1370          168 : 
    1371          168 :         if cfg!(debug_assertions) {
    1372              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
    1373              : 
    1374              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
    1375              :             // category of sparse keys are split into their own image/delta files. If there
    1376              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
    1377              :             // and we want the developer to keep the keyspaces separated.
    1378              : 
    1379          168 :             let ranges = &sparse_keyspace.0.ranges;
    1380              : 
    1381              :             // TODO: use a single overlaps_with across the codebase
    1382          504 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
    1383          504 :                 !(a.end <= b.start || b.end <= a.start)
    1384          504 :             }
    1385          504 :             for i in 0..ranges.len() {
    1386          504 :                 for j in 0..i {
    1387          504 :                     if overlaps_with(&ranges[i], &ranges[j]) {
    1388            0 :                         panic!(
    1389            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
    1390            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
    1391            0 :                         );
    1392          504 :                     }
    1393              :                 }
    1394              :             }
    1395          336 :             for i in 1..ranges.len() {
    1396          336 :                 assert!(
    1397          336 :                     ranges[i - 1].end <= ranges[i].start,
    1398            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
    1399            0 :                     ranges[i - 1].start,
    1400            0 :                     ranges[i - 1].end,
    1401            0 :                     ranges[i].start,
    1402            0 :                     ranges[i].end
    1403              :                 );
    1404              :             }
    1405            0 :         }
    1406              : 
    1407          168 :         Ok((dense_keyspace, sparse_keyspace))
    1408          168 :     }
    1409              : 
    1410              :     /// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
    1411              :     /// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
    1412              :     /// at the particular LSN (snapshot).
    1413       224270 :     pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
    1414       224270 :         let lsn = version.get_lsn();
    1415       224270 :         {
    1416       224270 :             let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
    1417       224270 :             if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
    1418       224259 :                 if lsn >= *cached_lsn {
    1419       221686 :                     RELSIZE_LATEST_CACHE_HITS.inc();
    1420       221686 :                     return Some(*nblocks);
    1421         2573 :                 }
    1422         2573 :                 RELSIZE_CACHE_MISSES_OLD.inc();
    1423           11 :             }
    1424              :         }
    1425              :         {
    1426         2584 :             let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
    1427         2584 :             if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
    1428         2563 :                 RELSIZE_SNAPSHOT_CACHE_HITS.inc();
    1429         2563 :                 return Some(*nblock);
    1430           21 :             }
    1431           21 :         }
    1432           21 :         if version.is_latest() {
    1433           10 :             RELSIZE_LATEST_CACHE_MISSES.inc();
    1434           11 :         } else {
    1435           11 :             RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
    1436           11 :         }
    1437           21 :         None
    1438       224270 :     }
    1439              : 
    1440              :     /// Update cached relation size if there is no more recent update
    1441            5 :     pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
    1442            5 :         let lsn = version.get_lsn();
    1443            5 :         if version.is_latest() {
    1444            0 :             let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
    1445            0 :             match rel_size_cache.entry(tag) {
    1446            0 :                 hash_map::Entry::Occupied(mut entry) => {
    1447            0 :                     let cached_lsn = entry.get_mut();
    1448            0 :                     if lsn >= cached_lsn.0 {
    1449            0 :                         *cached_lsn = (lsn, nblocks);
    1450            0 :                     }
    1451              :                 }
    1452            0 :                 hash_map::Entry::Vacant(entry) => {
    1453            0 :                     entry.insert((lsn, nblocks));
    1454            0 :                     RELSIZE_LATEST_CACHE_ENTRIES.inc();
    1455            0 :                 }
    1456              :             }
    1457              :         } else {
    1458            5 :             let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
    1459            5 :             if rel_size_cache.capacity() != 0 {
    1460            5 :                 rel_size_cache.insert((lsn, tag), nblocks);
    1461            5 :                 RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
    1462            5 :             }
    1463              :         }
    1464            5 :     }
    1465              : 
    1466              :     /// Store cached relation size
    1467       141360 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1468       141360 :         let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
    1469       141360 :         if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
    1470          960 :             RELSIZE_LATEST_CACHE_ENTRIES.inc();
    1471       140400 :         }
    1472       141360 :     }
    1473              : 
    1474              :     /// Remove cached relation size
    1475            1 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1476            1 :         let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
    1477            1 :         if rel_size_cache.remove(tag).is_some() {
    1478            1 :             RELSIZE_LATEST_CACHE_ENTRIES.dec();
    1479            1 :         }
    1480            1 :     }
    1481              : }
    1482              : 
    1483              : /// DatadirModification represents an operation to ingest an atomic set of
    1484              : /// updates to the repository.
    1485              : ///
    1486              : /// It is created by the 'begin_record' function. It is called for each WAL
    1487              : /// record, so that all the modifications by a one WAL record appear atomic.
    1488              : pub struct DatadirModification<'a> {
    1489              :     /// The timeline this modification applies to. You can access this to
    1490              :     /// read the state, but note that any pending updates are *not* reflected
    1491              :     /// in the state in 'tline' yet.
    1492              :     pub tline: &'a Timeline,
    1493              : 
    1494              :     /// Current LSN of the modification
    1495              :     lsn: Lsn,
    1496              : 
    1497              :     // The modifications are not applied directly to the underlying key-value store.
    1498              :     // The put-functions add the modifications here, and they are flushed to the
    1499              :     // underlying key-value store by the 'finish' function.
    1500              :     pending_lsns: Vec<Lsn>,
    1501              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1502              :     pending_nblocks: i64,
    1503              : 
    1504              :     /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
    1505              :     /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
    1506              :     pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
    1507              : 
    1508              :     /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
    1509              :     /// which keys are stored here.
    1510              :     pending_data_batch: Option<SerializedValueBatch>,
    1511              : 
    1512              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1513              :     /// if it was updated in this modification.
    1514              :     pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
    1515              : 
    1516              :     /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
    1517              :     pending_metadata_bytes: usize,
    1518              : }
    1519              : 
    1520              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    1521              : pub enum MetricsUpdate {
    1522              :     /// Set the metrics to this value
    1523              :     Set(u64),
    1524              :     /// Increment the metrics by this value
    1525              :     Add(u64),
    1526              :     /// Decrement the metrics by this value
    1527              :     Sub(u64),
    1528              : }
    1529              : 
    1530              : impl DatadirModification<'_> {
    1531              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1532              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1533              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1534              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1535              : 
    1536              :     /// Get the current lsn
    1537            1 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1538            1 :         self.lsn
    1539            1 :     }
    1540              : 
    1541            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1542            0 :         self.pending_data_batch
    1543            0 :             .as_ref()
    1544            0 :             .map_or(0, |b| b.buffer_size())
    1545            0 :             + self.pending_metadata_bytes
    1546            0 :     }
    1547              : 
    1548            0 :     pub(crate) fn has_dirty_data(&self) -> bool {
    1549            0 :         self.pending_data_batch
    1550            0 :             .as_ref()
    1551            0 :             .is_some_and(|b| b.has_data())
    1552            0 :     }
    1553              : 
    1554              :     /// Returns statistics about the currently pending modifications.
    1555            0 :     pub(crate) fn stats(&self) -> DatadirModificationStats {
    1556            0 :         let mut stats = DatadirModificationStats::default();
    1557            0 :         for (_, _, value) in self.pending_metadata_pages.values().flatten() {
    1558            0 :             match value {
    1559            0 :                 Value::Image(_) => stats.metadata_images += 1,
    1560            0 :                 Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
    1561            0 :                 Value::WalRecord(_) => stats.metadata_deltas += 1,
    1562              :             }
    1563              :         }
    1564            0 :         for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
    1565            0 :             match valuemeta {
    1566            0 :                 ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
    1567            0 :                 ValueMeta::Serialized(_) => stats.data_deltas += 1,
    1568            0 :                 ValueMeta::Observed(_) => {}
    1569              :             }
    1570              :         }
    1571            0 :         stats
    1572            0 :     }
    1573              : 
    1574              :     /// Set the current lsn
    1575        72929 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
    1576        72929 :         ensure_walingest!(
    1577        72929 :             lsn >= self.lsn,
    1578        72929 :             "setting an older lsn {} than {} is not allowed",
    1579        72929 :             lsn,
    1580        72929 :             self.lsn
    1581        72929 :         );
    1582              : 
    1583        72929 :         if lsn > self.lsn {
    1584        72929 :             self.pending_lsns.push(self.lsn);
    1585        72929 :             self.lsn = lsn;
    1586        72929 :         }
    1587        72929 :         Ok(())
    1588        72929 :     }
    1589              : 
    1590              :     /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
    1591              :     /// keys that represent literal blocks that postgres can read.  So data includes relation blocks and
    1592              :     /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
    1593              :     ///
    1594              :     /// The distinction is important because data keys are handled on a fast path where dirty writes are
    1595              :     /// not readable until this modification is committed, whereas metadata keys are visible for read
    1596              :     /// via [`Self::get`] as soon as their record has been ingested.
    1597       425364 :     fn is_data_key(key: &Key) -> bool {
    1598       425364 :         key.is_rel_block_key() || key.is_slru_block_key()
    1599       425364 :     }
    1600              : 
    1601              :     /// Initialize a completely new repository.
    1602              :     ///
    1603              :     /// This inserts the directory metadata entries that are assumed to
    1604              :     /// always exist.
    1605          111 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1606          111 :         let buf = DbDirectory::ser(&DbDirectory {
    1607          111 :             dbdirs: HashMap::new(),
    1608          111 :         })?;
    1609          111 :         self.pending_directory_entries
    1610          111 :             .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
    1611          111 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1612              : 
    1613          111 :         let buf = if self.tline.pg_version >= 17 {
    1614           98 :             TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
    1615           98 :                 xids: HashSet::new(),
    1616           98 :             })
    1617              :         } else {
    1618           13 :             TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1619           13 :                 xids: HashSet::new(),
    1620           13 :             })
    1621            0 :         }?;
    1622          111 :         self.pending_directory_entries
    1623          111 :             .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
    1624          111 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1625              : 
    1626          111 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1627          111 :         let empty_dir = Value::Image(buf);
    1628          111 : 
    1629          111 :         // Initialize SLRUs on shard 0 only: creating these on other shards would be
    1630          111 :         // harmless but they'd just be dropped on later compaction.
    1631          111 :         if self.tline.tenant_shard_id.is_shard_zero() {
    1632          108 :             self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1633          108 :             self.pending_directory_entries.push((
    1634          108 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1635          108 :                 MetricsUpdate::Set(0),
    1636          108 :             ));
    1637          108 :             self.put(
    1638          108 :                 slru_dir_to_key(SlruKind::MultiXactMembers),
    1639          108 :                 empty_dir.clone(),
    1640          108 :             );
    1641          108 :             self.pending_directory_entries.push((
    1642          108 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1643          108 :                 MetricsUpdate::Set(0),
    1644          108 :             ));
    1645          108 :             self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1646          108 :             self.pending_directory_entries.push((
    1647          108 :                 DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
    1648          108 :                 MetricsUpdate::Set(0),
    1649          108 :             ));
    1650          108 :         }
    1651              : 
    1652          111 :         Ok(())
    1653          111 :     }
    1654              : 
    1655              :     #[cfg(test)]
    1656          110 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1657          110 :         self.init_empty()?;
    1658          110 :         self.put_control_file(bytes::Bytes::from_static(
    1659          110 :             b"control_file contents do not matter",
    1660          110 :         ))
    1661          110 :         .context("put_control_file")?;
    1662          110 :         self.put_checkpoint(bytes::Bytes::from_static(
    1663          110 :             b"checkpoint_file contents do not matter",
    1664          110 :         ))
    1665          110 :         .context("put_checkpoint_file")?;
    1666          110 :         Ok(())
    1667          110 :     }
    1668              : 
    1669              :     /// Creates a relation if it is not already present.
    1670              :     /// Returns the current size of the relation
    1671       209028 :     pub(crate) async fn create_relation_if_required(
    1672       209028 :         &mut self,
    1673       209028 :         rel: RelTag,
    1674       209028 :         ctx: &RequestContext,
    1675       209028 :     ) -> Result<u32, WalIngestError> {
    1676              :         // Get current size and put rel creation if rel doesn't exist
    1677              :         //
    1678              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1679              :         //       check the cache too. This is because eagerly checking the cache results in
    1680              :         //       less work overall and 10% better performance. It's more work on cache miss
    1681              :         //       but cache miss is rare.
    1682       209028 :         if let Some(nblocks) = self
    1683       209028 :             .tline
    1684       209028 :             .get_cached_rel_size(&rel, Version::Modified(self))
    1685              :         {
    1686       209023 :             Ok(nblocks)
    1687            5 :         } else if !self
    1688            5 :             .tline
    1689            5 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1690            5 :             .await?
    1691              :         {
    1692              :             // create it with 0 size initially, the logic below will extend it
    1693            5 :             self.put_rel_creation(rel, 0, ctx).await?;
    1694            5 :             Ok(0)
    1695              :         } else {
    1696            0 :             Ok(self
    1697            0 :                 .tline
    1698            0 :                 .get_rel_size(rel, Version::Modified(self), ctx)
    1699            0 :                 .await?)
    1700              :         }
    1701       209028 :     }
    1702              : 
    1703              :     /// Given a block number for a relation (which represents a newly written block),
    1704              :     /// the previous block count of the relation, and the shard info, find the gaps
    1705              :     /// that were created by the newly written block if any.
    1706        72835 :     fn find_gaps(
    1707        72835 :         rel: RelTag,
    1708        72835 :         blkno: u32,
    1709        72835 :         previous_nblocks: u32,
    1710        72835 :         shard: &ShardIdentity,
    1711        72835 :     ) -> Option<KeySpace> {
    1712        72835 :         let mut key = rel_block_to_key(rel, blkno);
    1713        72835 :         let mut gap_accum = None;
    1714              : 
    1715        72835 :         for gap_blkno in previous_nblocks..blkno {
    1716           16 :             key.field6 = gap_blkno;
    1717           16 : 
    1718           16 :             if shard.get_shard_number(&key) != shard.number {
    1719            4 :                 continue;
    1720           12 :             }
    1721           12 : 
    1722           12 :             gap_accum
    1723           12 :                 .get_or_insert_with(KeySpaceAccum::new)
    1724           12 :                 .add_key(key);
    1725              :         }
    1726              : 
    1727        72835 :         gap_accum.map(|accum| accum.to_keyspace())
    1728        72835 :     }
    1729              : 
    1730        72926 :     pub async fn ingest_batch(
    1731        72926 :         &mut self,
    1732        72926 :         mut batch: SerializedValueBatch,
    1733        72926 :         // TODO(vlad): remove this argument and replace the shard check with is_key_local
    1734        72926 :         shard: &ShardIdentity,
    1735        72926 :         ctx: &RequestContext,
    1736        72926 :     ) -> Result<(), WalIngestError> {
    1737        72926 :         let mut gaps_at_lsns = Vec::default();
    1738              : 
    1739        72926 :         for meta in batch.metadata.iter() {
    1740        72821 :             let key = Key::from_compact(meta.key());
    1741        72821 :             let (rel, blkno) = key
    1742        72821 :                 .to_rel_block()
    1743        72821 :                 .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
    1744        72821 :             let new_nblocks = blkno + 1;
    1745              : 
    1746        72821 :             let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
    1747        72821 :             if new_nblocks > old_nblocks {
    1748         1195 :                 self.put_rel_extend(rel, new_nblocks, ctx).await?;
    1749        71626 :             }
    1750              : 
    1751        72821 :             if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
    1752            0 :                 gaps_at_lsns.push((gaps, meta.lsn()));
    1753        72821 :             }
    1754              :         }
    1755              : 
    1756        72926 :         if !gaps_at_lsns.is_empty() {
    1757            0 :             batch.zero_gaps(gaps_at_lsns);
    1758        72926 :         }
    1759              : 
    1760        72926 :         match self.pending_data_batch.as_mut() {
    1761           10 :             Some(pending_batch) => {
    1762           10 :                 pending_batch.extend(batch);
    1763           10 :             }
    1764        72916 :             None if batch.has_data() => {
    1765        72815 :                 self.pending_data_batch = Some(batch);
    1766        72815 :             }
    1767          101 :             None => {
    1768          101 :                 // Nothing to initialize the batch with
    1769          101 :             }
    1770              :         }
    1771              : 
    1772        72926 :         Ok(())
    1773        72926 :     }
    1774              : 
    1775              :     /// Put a new page version that can be constructed from a WAL record
    1776              :     ///
    1777              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1778              :     /// current end-of-file. It's up to the caller to check that the relation size
    1779              :     /// matches the blocks inserted!
    1780            6 :     pub fn put_rel_wal_record(
    1781            6 :         &mut self,
    1782            6 :         rel: RelTag,
    1783            6 :         blknum: BlockNumber,
    1784            6 :         rec: NeonWalRecord,
    1785            6 :     ) -> Result<(), WalIngestError> {
    1786            6 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    1787            6 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1788            6 :         Ok(())
    1789            6 :     }
    1790              : 
    1791              :     // Same, but for an SLRU.
    1792            4 :     pub fn put_slru_wal_record(
    1793            4 :         &mut self,
    1794            4 :         kind: SlruKind,
    1795            4 :         segno: u32,
    1796            4 :         blknum: BlockNumber,
    1797            4 :         rec: NeonWalRecord,
    1798            4 :     ) -> Result<(), WalIngestError> {
    1799            4 :         if !self.tline.tenant_shard_id.is_shard_zero() {
    1800            0 :             return Ok(());
    1801            4 :         }
    1802            4 : 
    1803            4 :         self.put(
    1804            4 :             slru_block_to_key(kind, segno, blknum),
    1805            4 :             Value::WalRecord(rec),
    1806            4 :         );
    1807            4 :         Ok(())
    1808            4 :     }
    1809              : 
    1810              :     /// Like put_wal_record, but with ready-made image of the page.
    1811       138921 :     pub fn put_rel_page_image(
    1812       138921 :         &mut self,
    1813       138921 :         rel: RelTag,
    1814       138921 :         blknum: BlockNumber,
    1815       138921 :         img: Bytes,
    1816       138921 :     ) -> Result<(), WalIngestError> {
    1817       138921 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    1818       138921 :         let key = rel_block_to_key(rel, blknum);
    1819       138921 :         if !key.is_valid_key_on_write_path() {
    1820            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1821       138921 :         }
    1822       138921 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1823       138921 :         Ok(())
    1824       138921 :     }
    1825              : 
    1826            3 :     pub fn put_slru_page_image(
    1827            3 :         &mut self,
    1828            3 :         kind: SlruKind,
    1829            3 :         segno: u32,
    1830            3 :         blknum: BlockNumber,
    1831            3 :         img: Bytes,
    1832            3 :     ) -> Result<(), WalIngestError> {
    1833            3 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1834              : 
    1835            3 :         let key = slru_block_to_key(kind, segno, blknum);
    1836            3 :         if !key.is_valid_key_on_write_path() {
    1837            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1838            3 :         }
    1839            3 :         self.put(key, Value::Image(img));
    1840            3 :         Ok(())
    1841            3 :     }
    1842              : 
    1843         1499 :     pub(crate) fn put_rel_page_image_zero(
    1844         1499 :         &mut self,
    1845         1499 :         rel: RelTag,
    1846         1499 :         blknum: BlockNumber,
    1847         1499 :     ) -> Result<(), WalIngestError> {
    1848         1499 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    1849         1499 :         let key = rel_block_to_key(rel, blknum);
    1850         1499 :         if !key.is_valid_key_on_write_path() {
    1851            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1852         1499 :         }
    1853              : 
    1854         1499 :         let batch = self
    1855         1499 :             .pending_data_batch
    1856         1499 :             .get_or_insert_with(SerializedValueBatch::default);
    1857         1499 : 
    1858         1499 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1859         1499 : 
    1860         1499 :         Ok(())
    1861         1499 :     }
    1862              : 
    1863            0 :     pub(crate) fn put_slru_page_image_zero(
    1864            0 :         &mut self,
    1865            0 :         kind: SlruKind,
    1866            0 :         segno: u32,
    1867            0 :         blknum: BlockNumber,
    1868            0 :     ) -> Result<(), WalIngestError> {
    1869            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1870            0 :         let key = slru_block_to_key(kind, segno, blknum);
    1871            0 :         if !key.is_valid_key_on_write_path() {
    1872            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1873            0 :         }
    1874              : 
    1875            0 :         let batch = self
    1876            0 :             .pending_data_batch
    1877            0 :             .get_or_insert_with(SerializedValueBatch::default);
    1878            0 : 
    1879            0 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1880            0 : 
    1881            0 :         Ok(())
    1882            0 :     }
    1883              : 
    1884              :     /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
    1885              :     /// we enable it, we also need to persist it in `index_part.json`.
    1886          973 :     pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
    1887          973 :         let status = self.tline.get_rel_size_v2_status();
    1888          973 :         let config = self.tline.get_rel_size_v2_enabled();
    1889          973 :         match (config, status) {
    1890              :             (false, RelSizeMigration::Legacy) => {
    1891              :                 // tenant config didn't enable it and we didn't write any reldir_v2 key yet
    1892          973 :                 Ok(false)
    1893              :             }
    1894              :             (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
    1895              :                 // index_part already persisted that the timeline has enabled rel_size_v2
    1896            0 :                 Ok(true)
    1897              :             }
    1898              :             (true, RelSizeMigration::Legacy) => {
    1899              :                 // The first time we enable it, we need to persist it in `index_part.json`
    1900            0 :                 self.tline
    1901            0 :                     .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
    1902            0 :                 tracing::info!("enabled rel_size_v2");
    1903            0 :                 Ok(true)
    1904              :             }
    1905              :             (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
    1906              :                 // index_part already persisted that the timeline has enabled rel_size_v2
    1907              :                 // and we don't need to do anything
    1908            0 :                 Ok(true)
    1909              :             }
    1910              :         }
    1911          973 :     }
    1912              : 
    1913              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1914            8 :     pub async fn put_relmap_file(
    1915            8 :         &mut self,
    1916            8 :         spcnode: Oid,
    1917            8 :         dbnode: Oid,
    1918            8 :         img: Bytes,
    1919            8 :         ctx: &RequestContext,
    1920            8 :     ) -> Result<(), WalIngestError> {
    1921            8 :         let v2_enabled = self
    1922            8 :             .maybe_enable_rel_size_v2()
    1923            8 :             .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
    1924              : 
    1925              :         // Add it to the directory (if it doesn't exist already)
    1926            8 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1927            8 :         let mut dbdir = DbDirectory::des(&buf)?;
    1928              : 
    1929            8 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1930            8 :         if r.is_none() || r == Some(false) {
    1931              :             // The dbdir entry didn't exist, or it contained a
    1932              :             // 'false'. The 'insert' call already updated it with
    1933              :             // 'true', now write the updated 'dbdirs' map back.
    1934            8 :             let buf = DbDirectory::ser(&dbdir)?;
    1935            8 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1936            0 :         }
    1937            8 :         if r.is_none() {
    1938              :             // Create RelDirectory
    1939              :             // TODO: if we have fully migrated to v2, no need to create this directory
    1940            4 :             let buf = RelDirectory::ser(&RelDirectory {
    1941            4 :                 rels: HashSet::new(),
    1942            4 :             })?;
    1943            4 :             self.pending_directory_entries
    1944            4 :                 .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
    1945            4 :             if v2_enabled {
    1946            0 :                 self.pending_directory_entries
    1947            0 :                     .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    1948            4 :             }
    1949            4 :             self.put(
    1950            4 :                 rel_dir_to_key(spcnode, dbnode),
    1951            4 :                 Value::Image(Bytes::from(buf)),
    1952            4 :             );
    1953            4 :         }
    1954              : 
    1955            8 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1956            8 :         Ok(())
    1957            8 :     }
    1958              : 
    1959            0 :     pub async fn put_twophase_file(
    1960            0 :         &mut self,
    1961            0 :         xid: u64,
    1962            0 :         img: Bytes,
    1963            0 :         ctx: &RequestContext,
    1964            0 :     ) -> Result<(), WalIngestError> {
    1965              :         // Add it to the directory entry
    1966            0 :         let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1967            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1968            0 :             let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
    1969            0 :             if !dir.xids.insert(xid) {
    1970            0 :                 Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
    1971            0 :             }
    1972            0 :             self.pending_directory_entries.push((
    1973            0 :                 DirectoryKind::TwoPhase,
    1974            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    1975            0 :             ));
    1976            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1977              :         } else {
    1978            0 :             let xid = xid as u32;
    1979            0 :             let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
    1980            0 :             if !dir.xids.insert(xid) {
    1981            0 :                 Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
    1982            0 :             }
    1983            0 :             self.pending_directory_entries.push((
    1984            0 :                 DirectoryKind::TwoPhase,
    1985            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    1986            0 :             ));
    1987            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1988              :         };
    1989            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1990            0 : 
    1991            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1992            0 :         Ok(())
    1993            0 :     }
    1994              : 
    1995            1 :     pub async fn set_replorigin(
    1996            1 :         &mut self,
    1997            1 :         origin_id: RepOriginId,
    1998            1 :         origin_lsn: Lsn,
    1999            1 :     ) -> Result<(), WalIngestError> {
    2000            1 :         let key = repl_origin_key(origin_id);
    2001            1 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    2002            1 :         Ok(())
    2003            1 :     }
    2004              : 
    2005            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
    2006            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    2007            0 :     }
    2008              : 
    2009          111 :     pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
    2010          111 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    2011          111 :         Ok(())
    2012          111 :     }
    2013              : 
    2014          118 :     pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
    2015          118 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    2016          118 :         Ok(())
    2017          118 :     }
    2018              : 
    2019            0 :     pub async fn drop_dbdir(
    2020            0 :         &mut self,
    2021            0 :         spcnode: Oid,
    2022            0 :         dbnode: Oid,
    2023            0 :         ctx: &RequestContext,
    2024            0 :     ) -> Result<(), WalIngestError> {
    2025            0 :         let total_blocks = self
    2026            0 :             .tline
    2027            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    2028            0 :             .await?;
    2029              : 
    2030              :         // Remove entry from dbdir
    2031            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    2032            0 :         let mut dir = DbDirectory::des(&buf)?;
    2033            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    2034            0 :             let buf = DbDirectory::ser(&dir)?;
    2035            0 :             self.pending_directory_entries.push((
    2036            0 :                 DirectoryKind::Db,
    2037            0 :                 MetricsUpdate::Set(dir.dbdirs.len() as u64),
    2038            0 :             ));
    2039            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    2040              :         } else {
    2041            0 :             warn!(
    2042            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    2043              :                 spcnode, dbnode
    2044              :             );
    2045              :         }
    2046              : 
    2047              :         // Update logical database size.
    2048            0 :         self.pending_nblocks -= total_blocks as i64;
    2049            0 : 
    2050            0 :         // Delete all relations and metadata files for the spcnode/dnode
    2051            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    2052            0 :         Ok(())
    2053            0 :     }
    2054              : 
    2055              :     /// Create a relation fork.
    2056              :     ///
    2057              :     /// 'nblocks' is the initial size.
    2058          960 :     pub async fn put_rel_creation(
    2059          960 :         &mut self,
    2060          960 :         rel: RelTag,
    2061          960 :         nblocks: BlockNumber,
    2062          960 :         ctx: &RequestContext,
    2063          960 :     ) -> Result<(), WalIngestError> {
    2064          960 :         if rel.relnode == 0 {
    2065            0 :             Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
    2066            0 :                 "invalid relnode"
    2067            0 :             )))?;
    2068          960 :         }
    2069              :         // It's possible that this is the first rel for this db in this
    2070              :         // tablespace.  Create the reldir entry for it if so.
    2071          960 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
    2072              : 
    2073          960 :         let dbdir_exists =
    2074          960 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    2075              :                 // Didn't exist. Update dbdir
    2076            4 :                 e.insert(false);
    2077            4 :                 let buf = DbDirectory::ser(&dbdir)?;
    2078            4 :                 self.pending_directory_entries.push((
    2079            4 :                     DirectoryKind::Db,
    2080            4 :                     MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
    2081            4 :                 ));
    2082            4 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    2083            4 :                 false
    2084              :             } else {
    2085          956 :                 true
    2086              :             };
    2087              : 
    2088          960 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    2089          960 :         let mut rel_dir = if !dbdir_exists {
    2090              :             // Create the RelDirectory
    2091            4 :             RelDirectory::default()
    2092              :         } else {
    2093              :             // reldir already exists, fetch it
    2094          956 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
    2095              :         };
    2096              : 
    2097          960 :         let v2_enabled = self
    2098          960 :             .maybe_enable_rel_size_v2()
    2099          960 :             .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
    2100              : 
    2101          960 :         if v2_enabled {
    2102            0 :             if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
    2103            0 :                 Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    2104            0 :             }
    2105            0 :             let sparse_rel_dir_key =
    2106            0 :                 rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
    2107              :             // check if the rel_dir_key exists in v2
    2108            0 :             let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
    2109            0 :             let val = RelDirExists::decode_option(val)
    2110            0 :                 .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
    2111            0 :             if val == RelDirExists::Exists {
    2112            0 :                 Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    2113            0 :             }
    2114            0 :             self.put(
    2115            0 :                 sparse_rel_dir_key,
    2116            0 :                 Value::Image(RelDirExists::Exists.encode()),
    2117            0 :             );
    2118            0 :             if !dbdir_exists {
    2119            0 :                 self.pending_directory_entries
    2120            0 :                     .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
    2121            0 :                 self.pending_directory_entries
    2122            0 :                     .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    2123            0 :                 // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
    2124            0 :                 // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
    2125            0 :                 // will be key not found errors if we don't create an empty one for rel_size_v2.
    2126            0 :                 self.put(
    2127            0 :                     rel_dir_key,
    2128            0 :                     Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
    2129              :                 );
    2130            0 :             }
    2131            0 :             self.pending_directory_entries
    2132            0 :                 .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
    2133              :         } else {
    2134              :             // Add the new relation to the rel directory entry, and write it back
    2135          960 :             if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    2136            0 :                 Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    2137          960 :             }
    2138          960 :             if !dbdir_exists {
    2139            4 :                 self.pending_directory_entries
    2140            4 :                     .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
    2141          956 :             }
    2142          960 :             self.pending_directory_entries
    2143          960 :                 .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
    2144          960 :             self.put(
    2145          960 :                 rel_dir_key,
    2146          960 :                 Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
    2147              :             );
    2148              :         }
    2149              : 
    2150              :         // Put size
    2151          960 :         let size_key = rel_size_to_key(rel);
    2152          960 :         let buf = nblocks.to_le_bytes();
    2153          960 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2154          960 : 
    2155          960 :         self.pending_nblocks += nblocks as i64;
    2156          960 : 
    2157          960 :         // Update relation size cache
    2158          960 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2159          960 : 
    2160          960 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    2161          960 :         // caller.
    2162          960 :         Ok(())
    2163          960 :     }
    2164              : 
    2165              :     /// Truncate relation
    2166         3006 :     pub async fn put_rel_truncation(
    2167         3006 :         &mut self,
    2168         3006 :         rel: RelTag,
    2169         3006 :         nblocks: BlockNumber,
    2170         3006 :         ctx: &RequestContext,
    2171         3006 :     ) -> Result<(), WalIngestError> {
    2172         3006 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2173         3006 :         if self
    2174         3006 :             .tline
    2175         3006 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    2176         3006 :             .await?
    2177              :         {
    2178         3006 :             let size_key = rel_size_to_key(rel);
    2179              :             // Fetch the old size first
    2180         3006 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2181         3006 : 
    2182         3006 :             // Update the entry with the new size.
    2183         3006 :             let buf = nblocks.to_le_bytes();
    2184         3006 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2185         3006 : 
    2186         3006 :             // Update relation size cache
    2187         3006 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2188         3006 : 
    2189         3006 :             // Update logical database size.
    2190         3006 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    2191            0 :         }
    2192         3006 :         Ok(())
    2193         3006 :     }
    2194              : 
    2195              :     /// Extend relation
    2196              :     /// If new size is smaller, do nothing.
    2197       138340 :     pub async fn put_rel_extend(
    2198       138340 :         &mut self,
    2199       138340 :         rel: RelTag,
    2200       138340 :         nblocks: BlockNumber,
    2201       138340 :         ctx: &RequestContext,
    2202       138340 :     ) -> Result<(), WalIngestError> {
    2203       138340 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2204              : 
    2205              :         // Put size
    2206       138340 :         let size_key = rel_size_to_key(rel);
    2207       138340 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2208       138340 : 
    2209       138340 :         // only extend relation here. never decrease the size
    2210       138340 :         if nblocks > old_size {
    2211       137394 :             let buf = nblocks.to_le_bytes();
    2212       137394 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2213       137394 : 
    2214       137394 :             // Update relation size cache
    2215       137394 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2216       137394 : 
    2217       137394 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    2218       137394 :         }
    2219       138340 :         Ok(())
    2220       138340 :     }
    2221              : 
    2222              :     /// Drop some relations
    2223            5 :     pub(crate) async fn put_rel_drops(
    2224            5 :         &mut self,
    2225            5 :         drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
    2226            5 :         ctx: &RequestContext,
    2227            5 :     ) -> Result<(), WalIngestError> {
    2228            5 :         let v2_enabled = self
    2229            5 :             .maybe_enable_rel_size_v2()
    2230            5 :             .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
    2231            6 :         for ((spc_node, db_node), rel_tags) in drop_relations {
    2232            1 :             let dir_key = rel_dir_to_key(spc_node, db_node);
    2233            1 :             let buf = self.get(dir_key, ctx).await?;
    2234            1 :             let mut dir = RelDirectory::des(&buf)?;
    2235              : 
    2236            1 :             let mut dirty = false;
    2237            2 :             for rel_tag in rel_tags {
    2238            1 :                 let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
    2239            1 :                     self.pending_directory_entries
    2240            1 :                         .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
    2241            1 :                     dirty = true;
    2242            1 :                     true
    2243            0 :                 } else if v2_enabled {
    2244              :                     // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
    2245              :                     // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
    2246              :                     // logic).
    2247            0 :                     let key =
    2248            0 :                         rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
    2249            0 :                     let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
    2250            0 :                         .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    2251            0 :                     if val == RelDirExists::Exists {
    2252            0 :                         self.pending_directory_entries
    2253            0 :                             .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
    2254            0 :                         // put tombstone
    2255            0 :                         self.put(key, Value::Image(RelDirExists::Removed.encode()));
    2256            0 :                         // no need to set dirty to true
    2257            0 :                         true
    2258              :                     } else {
    2259            0 :                         false
    2260              :                     }
    2261              :                 } else {
    2262            0 :                     false
    2263              :                 };
    2264              : 
    2265            1 :                 if found {
    2266              :                     // update logical size
    2267            1 :                     let size_key = rel_size_to_key(rel_tag);
    2268            1 :                     let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2269            1 :                     self.pending_nblocks -= old_size as i64;
    2270            1 : 
    2271            1 :                     // Remove entry from relation size cache
    2272            1 :                     self.tline.remove_cached_rel_size(&rel_tag);
    2273            1 : 
    2274            1 :                     // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
    2275            1 :                     self.delete(rel_key_range(rel_tag));
    2276            0 :                 }
    2277              :             }
    2278              : 
    2279            1 :             if dirty {
    2280            1 :                 self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    2281            0 :             }
    2282              :         }
    2283              : 
    2284            5 :         Ok(())
    2285            5 :     }
    2286              : 
    2287            3 :     pub async fn put_slru_segment_creation(
    2288            3 :         &mut self,
    2289            3 :         kind: SlruKind,
    2290            3 :         segno: u32,
    2291            3 :         nblocks: BlockNumber,
    2292            3 :         ctx: &RequestContext,
    2293            3 :     ) -> Result<(), WalIngestError> {
    2294            3 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2295              : 
    2296              :         // Add it to the directory entry
    2297            3 :         let dir_key = slru_dir_to_key(kind);
    2298            3 :         let buf = self.get(dir_key, ctx).await?;
    2299            3 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2300              : 
    2301            3 :         if !dir.segments.insert(segno) {
    2302            0 :             Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
    2303            3 :         }
    2304            3 :         self.pending_directory_entries.push((
    2305            3 :             DirectoryKind::SlruSegment(kind),
    2306            3 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2307            3 :         ));
    2308            3 :         self.put(
    2309            3 :             dir_key,
    2310            3 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2311              :         );
    2312              : 
    2313              :         // Put size
    2314            3 :         let size_key = slru_segment_size_to_key(kind, segno);
    2315            3 :         let buf = nblocks.to_le_bytes();
    2316            3 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2317            3 : 
    2318            3 :         // even if nblocks > 0, we don't insert any actual blocks here
    2319            3 : 
    2320            3 :         Ok(())
    2321            3 :     }
    2322              : 
    2323              :     /// Extend SLRU segment
    2324            0 :     pub fn put_slru_extend(
    2325            0 :         &mut self,
    2326            0 :         kind: SlruKind,
    2327            0 :         segno: u32,
    2328            0 :         nblocks: BlockNumber,
    2329            0 :     ) -> Result<(), WalIngestError> {
    2330            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2331              : 
    2332              :         // Put size
    2333            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    2334            0 :         let buf = nblocks.to_le_bytes();
    2335            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2336            0 :         Ok(())
    2337            0 :     }
    2338              : 
    2339              :     /// This method is used for marking truncated SLRU files
    2340            0 :     pub async fn drop_slru_segment(
    2341            0 :         &mut self,
    2342            0 :         kind: SlruKind,
    2343            0 :         segno: u32,
    2344            0 :         ctx: &RequestContext,
    2345            0 :     ) -> Result<(), WalIngestError> {
    2346            0 :         // Remove it from the directory entry
    2347            0 :         let dir_key = slru_dir_to_key(kind);
    2348            0 :         let buf = self.get(dir_key, ctx).await?;
    2349            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2350              : 
    2351            0 :         if !dir.segments.remove(&segno) {
    2352            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    2353            0 :         }
    2354            0 :         self.pending_directory_entries.push((
    2355            0 :             DirectoryKind::SlruSegment(kind),
    2356            0 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2357            0 :         ));
    2358            0 :         self.put(
    2359            0 :             dir_key,
    2360            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2361              :         );
    2362              : 
    2363              :         // Delete size entry, as well as all blocks
    2364            0 :         self.delete(slru_segment_key_range(kind, segno));
    2365            0 : 
    2366            0 :         Ok(())
    2367            0 :     }
    2368              : 
    2369              :     /// Drop a relmapper file (pg_filenode.map)
    2370            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
    2371            0 :         // TODO
    2372            0 :         Ok(())
    2373            0 :     }
    2374              : 
    2375              :     /// This method is used for marking truncated SLRU files
    2376            0 :     pub async fn drop_twophase_file(
    2377            0 :         &mut self,
    2378            0 :         xid: u64,
    2379            0 :         ctx: &RequestContext,
    2380            0 :     ) -> Result<(), WalIngestError> {
    2381              :         // Remove it from the directory entry
    2382            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    2383            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    2384            0 :             let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
    2385              : 
    2386            0 :             if !dir.xids.remove(&xid) {
    2387            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2388            0 :             }
    2389            0 :             self.pending_directory_entries.push((
    2390            0 :                 DirectoryKind::TwoPhase,
    2391            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2392            0 :             ));
    2393            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    2394              :         } else {
    2395            0 :             let xid: u32 = u32::try_from(xid)
    2396            0 :                 .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
    2397            0 :             let mut dir = TwoPhaseDirectory::des(&buf)?;
    2398              : 
    2399            0 :             if !dir.xids.remove(&xid) {
    2400            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2401            0 :             }
    2402            0 :             self.pending_directory_entries.push((
    2403            0 :                 DirectoryKind::TwoPhase,
    2404            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2405            0 :             ));
    2406            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    2407              :         };
    2408            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    2409            0 : 
    2410            0 :         // Delete it
    2411            0 :         self.delete(twophase_key_range(xid));
    2412            0 : 
    2413            0 :         Ok(())
    2414            0 :     }
    2415              : 
    2416            8 :     pub async fn put_file(
    2417            8 :         &mut self,
    2418            8 :         path: &str,
    2419            8 :         content: &[u8],
    2420            8 :         ctx: &RequestContext,
    2421            8 :     ) -> Result<(), WalIngestError> {
    2422            8 :         let key = aux_file::encode_aux_file_key(path);
    2423              :         // retrieve the key from the engine
    2424            8 :         let old_val = match self.get(key, ctx).await {
    2425            2 :             Ok(val) => Some(val),
    2426            6 :             Err(PageReconstructError::MissingKey(_)) => None,
    2427            0 :             Err(e) => return Err(e.into()),
    2428              :         };
    2429            8 :         let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    2430            2 :             aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
    2431              :         } else {
    2432            6 :             Vec::new()
    2433              :         };
    2434            8 :         let mut other_files = Vec::with_capacity(files.len());
    2435            8 :         let mut modifying_file = None;
    2436           10 :         for file @ (p, content) in files {
    2437            2 :             if path == p {
    2438            2 :                 assert!(
    2439            2 :                     modifying_file.is_none(),
    2440            0 :                     "duplicated entries found for {}",
    2441              :                     path
    2442              :                 );
    2443            2 :                 modifying_file = Some(content);
    2444            0 :             } else {
    2445            0 :                 other_files.push(file);
    2446            0 :             }
    2447              :         }
    2448            8 :         let mut new_files = other_files;
    2449            8 :         match (modifying_file, content.is_empty()) {
    2450            1 :             (Some(old_content), false) => {
    2451            1 :                 self.tline
    2452            1 :                     .aux_file_size_estimator
    2453            1 :                     .on_update(old_content.len(), content.len());
    2454            1 :                 new_files.push((path, content));
    2455            1 :             }
    2456            1 :             (Some(old_content), true) => {
    2457            1 :                 self.tline
    2458            1 :                     .aux_file_size_estimator
    2459            1 :                     .on_remove(old_content.len());
    2460            1 :                 // not adding the file key to the final `new_files` vec.
    2461            1 :             }
    2462            6 :             (None, false) => {
    2463            6 :                 self.tline.aux_file_size_estimator.on_add(content.len());
    2464            6 :                 new_files.push((path, content));
    2465            6 :             }
    2466              :             // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
    2467              :             // Compute doesn't know if previous version of this file exists or not, so
    2468              :             // attempt to delete non-existing file can cause this message.
    2469              :             // To avoid false alarms, log it as info rather than warning.
    2470            0 :             (None, true) if path.starts_with("pg_stat/") => {
    2471            0 :                 info!("removing non-existing pg_stat file: {}", path)
    2472              :             }
    2473            0 :             (None, true) => warn!("removing non-existing aux file: {}", path),
    2474              :         }
    2475            8 :         let new_val = aux_file::encode_file_value(&new_files)
    2476            8 :             .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
    2477            8 :         self.put(key, Value::Image(new_val.into()));
    2478            8 : 
    2479            8 :         Ok(())
    2480            8 :     }
    2481              : 
    2482              :     ///
    2483              :     /// Flush changes accumulated so far to the underlying repository.
    2484              :     ///
    2485              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    2486              :     /// you to flush them to the underlying repository before the final `commit`.
    2487              :     /// That allows to free up the memory used to hold the pending changes.
    2488              :     ///
    2489              :     /// Currently only used during bulk import of a data directory. In that
    2490              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    2491              :     /// whole import fails and the timeline will be deleted anyway.
    2492              :     /// (Or to be precise, it will be left behind for debugging purposes and
    2493              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    2494              :     ///
    2495              :     /// Note: A consequence of flushing the pending operations is that they
    2496              :     /// won't be visible to subsequent operations until `commit`. The function
    2497              :     /// retains all the metadata, but data pages are flushed. That's again OK
    2498              :     /// for bulk import, where you are just loading data pages and won't try to
    2499              :     /// modify the same pages twice.
    2500          965 :     pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2501          965 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    2502          965 :         // to scan through the pending_updates list.
    2503          965 :         let pending_nblocks = self.pending_nblocks;
    2504          965 :         if pending_nblocks < 10000 {
    2505          965 :             return Ok(());
    2506            0 :         }
    2507              : 
    2508            0 :         let mut writer = self.tline.writer().await;
    2509              : 
    2510              :         // Flush relation and  SLRU data blocks, keep metadata.
    2511            0 :         if let Some(batch) = self.pending_data_batch.take() {
    2512            0 :             tracing::debug!(
    2513            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2514            0 :                 batch.max_lsn,
    2515            0 :                 self.tline.get_last_record_lsn()
    2516              :             );
    2517              : 
    2518              :             // This bails out on first error without modifying pending_updates.
    2519              :             // That's Ok, cf this function's doc comment.
    2520            0 :             writer.put_batch(batch, ctx).await?;
    2521            0 :         }
    2522              : 
    2523            0 :         if pending_nblocks != 0 {
    2524            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2525            0 :             self.pending_nblocks = 0;
    2526            0 :         }
    2527              : 
    2528            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2529            0 :             writer.update_directory_entries_count(kind, count);
    2530            0 :         }
    2531              : 
    2532            0 :         Ok(())
    2533          965 :     }
    2534              : 
    2535              :     ///
    2536              :     /// Finish this atomic update, writing all the updated keys to the
    2537              :     /// underlying timeline.
    2538              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    2539              :     ///
    2540       371556 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2541       371556 :         let mut writer = self.tline.writer().await;
    2542              : 
    2543       371556 :         let pending_nblocks = self.pending_nblocks;
    2544       371556 :         self.pending_nblocks = 0;
    2545              : 
    2546              :         // Ordering: the items in this batch do not need to be in any global order, but values for
    2547              :         // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    2548              :         // this to do efficient updates to its index.  See [`wal_decoder::serialized_batch`] for
    2549              :         // more details.
    2550              : 
    2551       371556 :         let metadata_batch = {
    2552       371556 :             let pending_meta = self
    2553       371556 :                 .pending_metadata_pages
    2554       371556 :                 .drain()
    2555       371556 :                 .flat_map(|(key, values)| {
    2556       137061 :                     values
    2557       137061 :                         .into_iter()
    2558       137061 :                         .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
    2559       371556 :                 })
    2560       371556 :                 .collect::<Vec<_>>();
    2561       371556 : 
    2562       371556 :             if pending_meta.is_empty() {
    2563       236139 :                 None
    2564              :             } else {
    2565       135417 :                 Some(SerializedValueBatch::from_values(pending_meta))
    2566              :             }
    2567              :         };
    2568              : 
    2569       371556 :         let data_batch = self.pending_data_batch.take();
    2570              : 
    2571       371556 :         let maybe_batch = match (data_batch, metadata_batch) {
    2572       132278 :             (Some(mut data), Some(metadata)) => {
    2573       132278 :                 data.extend(metadata);
    2574       132278 :                 Some(data)
    2575              :             }
    2576        71631 :             (Some(data), None) => Some(data),
    2577         3139 :             (None, Some(metadata)) => Some(metadata),
    2578       164508 :             (None, None) => None,
    2579              :         };
    2580              : 
    2581       371556 :         if let Some(batch) = maybe_batch {
    2582       207048 :             tracing::debug!(
    2583            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2584            0 :                 batch.max_lsn,
    2585            0 :                 self.tline.get_last_record_lsn()
    2586              :             );
    2587              : 
    2588              :             // This bails out on first error without modifying pending_updates.
    2589              :             // That's Ok, cf this function's doc comment.
    2590       207048 :             writer.put_batch(batch, ctx).await?;
    2591       164508 :         }
    2592              : 
    2593       371556 :         if !self.pending_deletions.is_empty() {
    2594            1 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    2595            1 :             self.pending_deletions.clear();
    2596       371555 :         }
    2597              : 
    2598       371556 :         self.pending_lsns.push(self.lsn);
    2599       444485 :         for pending_lsn in self.pending_lsns.drain(..) {
    2600       444485 :             // TODO(vlad): pretty sure the comment below is not valid anymore
    2601       444485 :             // and we can call finish write with the latest LSN
    2602       444485 :             //
    2603       444485 :             // Ideally, we should be able to call writer.finish_write() only once
    2604       444485 :             // with the highest LSN. However, the last_record_lsn variable in the
    2605       444485 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    2606       444485 :             // so we need to record every LSN to not leave a gap between them.
    2607       444485 :             writer.finish_write(pending_lsn);
    2608       444485 :         }
    2609              : 
    2610       371556 :         if pending_nblocks != 0 {
    2611       135285 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2612       236271 :         }
    2613              : 
    2614       371556 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2615         1522 :             writer.update_directory_entries_count(kind, count);
    2616         1522 :         }
    2617              : 
    2618       371556 :         self.pending_metadata_bytes = 0;
    2619       371556 : 
    2620       371556 :         Ok(())
    2621       371556 :     }
    2622              : 
    2623       145852 :     pub(crate) fn len(&self) -> usize {
    2624       145852 :         self.pending_metadata_pages.len()
    2625       145852 :             + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
    2626       145852 :             + self.pending_deletions.len()
    2627       145852 :     }
    2628              : 
    2629              :     /// Read a page from the Timeline we are writing to.  For metadata pages, this passes through
    2630              :     /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
    2631              :     /// in the modification.
    2632              :     ///
    2633              :     /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
    2634              :     /// page must ensure that the pages they read are already committed in Timeline, for example
    2635              :     /// DB create operations are always preceded by a call to commit().  This is special cased because
    2636              :     /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
    2637              :     /// and not data pages.
    2638       143293 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    2639       143293 :         if !Self::is_data_key(&key) {
    2640              :             // Have we already updated the same key? Read the latest pending updated
    2641              :             // version in that case.
    2642              :             //
    2643              :             // Note: we don't check pending_deletions. It is an error to request a
    2644              :             // value that has been removed, deletion only avoids leaking storage.
    2645       143293 :             if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
    2646         7964 :                 if let Some((_, _, value)) = values.last() {
    2647         7964 :                     return if let Value::Image(img) = value {
    2648         7964 :                         Ok(img.clone())
    2649              :                     } else {
    2650              :                         // Currently, we never need to read back a WAL record that we
    2651              :                         // inserted in the same "transaction". All the metadata updates
    2652              :                         // work directly with Images, and we never need to read actual
    2653              :                         // data pages. We could handle this if we had to, by calling
    2654              :                         // the walredo manager, but let's keep it simple for now.
    2655            0 :                         Err(PageReconstructError::Other(anyhow::anyhow!(
    2656            0 :                             "unexpected pending WAL record"
    2657            0 :                         )))
    2658              :                     };
    2659            0 :                 }
    2660       135329 :             }
    2661              :         } else {
    2662              :             // This is an expensive check, so we only do it in debug mode. If reading a data key,
    2663              :             // this key should never be present in pending_data_pages. We ensure this by committing
    2664              :             // modifications before ingesting DB create operations, which are the only kind that reads
    2665              :             // data pages during ingest.
    2666            0 :             if cfg!(debug_assertions) {
    2667            0 :                 assert!(
    2668            0 :                     !self
    2669            0 :                         .pending_data_batch
    2670            0 :                         .as_ref()
    2671            0 :                         .is_some_and(|b| b.updates_key(&key))
    2672            0 :                 );
    2673            0 :             }
    2674              :         }
    2675              : 
    2676              :         // Metadata page cache miss, or we're reading a data page.
    2677       135329 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    2678       135329 :         self.tline.get(key, lsn, ctx).await
    2679       143293 :     }
    2680              : 
    2681              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    2682              :     /// and the empty value into None.
    2683            0 :     async fn sparse_get(
    2684            0 :         &self,
    2685            0 :         key: Key,
    2686            0 :         ctx: &RequestContext,
    2687            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    2688            0 :         let val = self.get(key, ctx).await;
    2689            0 :         match val {
    2690            0 :             Ok(val) if val.is_empty() => Ok(None),
    2691            0 :             Ok(val) => Ok(Some(val)),
    2692            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    2693            0 :             Err(e) => Err(e),
    2694              :         }
    2695            0 :     }
    2696              : 
    2697              :     #[cfg(test)]
    2698            2 :     pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
    2699            2 :         self.put(key, val);
    2700            2 :     }
    2701              : 
    2702       282071 :     fn put(&mut self, key: Key, val: Value) {
    2703       282071 :         if Self::is_data_key(&key) {
    2704       138934 :             self.put_data(key.to_compact(), val)
    2705              :         } else {
    2706       143137 :             self.put_metadata(key.to_compact(), val)
    2707              :         }
    2708       282071 :     }
    2709              : 
    2710       138934 :     fn put_data(&mut self, key: CompactKey, val: Value) {
    2711       138934 :         let batch = self
    2712       138934 :             .pending_data_batch
    2713       138934 :             .get_or_insert_with(SerializedValueBatch::default);
    2714       138934 :         batch.put(key, val, self.lsn);
    2715       138934 :     }
    2716              : 
    2717       143137 :     fn put_metadata(&mut self, key: CompactKey, val: Value) {
    2718       143137 :         let values = self.pending_metadata_pages.entry(key).or_default();
    2719              :         // Replace the previous value if it exists at the same lsn
    2720       143137 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    2721         6076 :             if *last_lsn == self.lsn {
    2722              :                 // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
    2723         6076 :                 self.pending_metadata_bytes -= *last_value_ser_size;
    2724         6076 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    2725         6076 :                 self.pending_metadata_bytes += *last_value_ser_size;
    2726         6076 : 
    2727         6076 :                 // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
    2728         6076 :                 // have been generated by synthesized zero page writes prior to the first real write to a page.
    2729         6076 :                 *last_value = val;
    2730         6076 :                 return;
    2731            0 :             }
    2732       137061 :         }
    2733              : 
    2734       137061 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    2735       137061 :         self.pending_metadata_bytes += val_serialized_size;
    2736       137061 :         values.push((self.lsn, val_serialized_size, val));
    2737       137061 : 
    2738       137061 :         if key == CHECKPOINT_KEY.to_compact() {
    2739          118 :             tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
    2740       136943 :         }
    2741       143137 :     }
    2742              : 
    2743            1 :     fn delete(&mut self, key_range: Range<Key>) {
    2744            1 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    2745            1 :         self.pending_deletions.push((key_range, self.lsn));
    2746            1 :     }
    2747              : }
    2748              : 
    2749              : /// Statistics for a DatadirModification.
    2750              : #[derive(Default)]
    2751              : pub struct DatadirModificationStats {
    2752              :     pub metadata_images: u64,
    2753              :     pub metadata_deltas: u64,
    2754              :     pub data_images: u64,
    2755              :     pub data_deltas: u64,
    2756              : }
    2757              : 
    2758              : /// This struct facilitates accessing either a committed key from the timeline at a
    2759              : /// specific LSN, or the latest uncommitted key from a pending modification.
    2760              : ///
    2761              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    2762              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    2763              : /// need to look up the keys in the modification first before looking them up in the
    2764              : /// timeline to not miss the latest updates.
    2765              : #[derive(Clone, Copy)]
    2766              : pub enum Version<'a> {
    2767              :     LsnRange(LsnRange),
    2768              :     Modified(&'a DatadirModification<'a>),
    2769              : }
    2770              : 
    2771              : impl Version<'_> {
    2772           25 :     async fn get(
    2773           25 :         &self,
    2774           25 :         timeline: &Timeline,
    2775           25 :         key: Key,
    2776           25 :         ctx: &RequestContext,
    2777           25 :     ) -> Result<Bytes, PageReconstructError> {
    2778           25 :         match self {
    2779           15 :             Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
    2780           10 :             Version::Modified(modification) => modification.get(key, ctx).await,
    2781              :         }
    2782           25 :     }
    2783              : 
    2784              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    2785              :     /// and the empty value into None.
    2786            0 :     async fn sparse_get(
    2787            0 :         &self,
    2788            0 :         timeline: &Timeline,
    2789            0 :         key: Key,
    2790            0 :         ctx: &RequestContext,
    2791            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    2792            0 :         let val = self.get(timeline, key, ctx).await;
    2793            0 :         match val {
    2794            0 :             Ok(val) if val.is_empty() => Ok(None),
    2795            0 :             Ok(val) => Ok(Some(val)),
    2796            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    2797            0 :             Err(e) => Err(e),
    2798              :         }
    2799            0 :     }
    2800              : 
    2801           26 :     pub fn is_latest(&self) -> bool {
    2802           26 :         match self {
    2803           16 :             Version::LsnRange(lsns) => lsns.is_latest(),
    2804           10 :             Version::Modified(_) => true,
    2805              :         }
    2806           26 :     }
    2807              : 
    2808       224275 :     pub fn get_lsn(&self) -> Lsn {
    2809       224275 :         match self {
    2810        12224 :             Version::LsnRange(lsns) => lsns.effective_lsn,
    2811       212051 :             Version::Modified(modification) => modification.lsn,
    2812              :         }
    2813       224275 :     }
    2814              : 
    2815        12219 :     pub fn at(lsn: Lsn) -> Self {
    2816        12219 :         Version::LsnRange(LsnRange {
    2817        12219 :             effective_lsn: lsn,
    2818        12219 :             request_lsn: lsn,
    2819        12219 :         })
    2820        12219 :     }
    2821              : }
    2822              : 
    2823              : //--- Metadata structs stored in key-value pairs in the repository.
    2824              : 
    2825            0 : #[derive(Debug, Serialize, Deserialize)]
    2826              : pub(crate) struct DbDirectory {
    2827              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    2828              :     pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
    2829              : }
    2830              : 
    2831              : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
    2832              : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs.  Previously, the files
    2833              : // were named like "pg_twophase/000002E5", now they're like
    2834              : // "pg_twophsae/0000000A000002E4".
    2835              : 
    2836            0 : #[derive(Debug, Serialize, Deserialize)]
    2837              : pub(crate) struct TwoPhaseDirectory {
    2838              :     pub(crate) xids: HashSet<TransactionId>,
    2839              : }
    2840              : 
    2841            0 : #[derive(Debug, Serialize, Deserialize)]
    2842              : struct TwoPhaseDirectoryV17 {
    2843              :     xids: HashSet<u64>,
    2844              : }
    2845              : 
    2846            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    2847              : pub(crate) struct RelDirectory {
    2848              :     // Set of relations that exist. (relfilenode, forknum)
    2849              :     //
    2850              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    2851              :     // key-value pairs, if you have a lot of relations
    2852              :     pub(crate) rels: HashSet<(Oid, u8)>,
    2853              : }
    2854              : 
    2855            0 : #[derive(Debug, Serialize, Deserialize)]
    2856              : struct RelSizeEntry {
    2857              :     nblocks: u32,
    2858              : }
    2859              : 
    2860            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    2861              : pub(crate) struct SlruSegmentDirectory {
    2862              :     // Set of SLRU segments that exist.
    2863              :     pub(crate) segments: HashSet<u32>,
    2864              : }
    2865              : 
    2866              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2867              : #[repr(u8)]
    2868              : pub(crate) enum DirectoryKind {
    2869              :     Db,
    2870              :     TwoPhase,
    2871              :     Rel,
    2872              :     AuxFiles,
    2873              :     SlruSegment(SlruKind),
    2874              :     RelV2,
    2875              : }
    2876              : 
    2877              : impl DirectoryKind {
    2878              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2879         4567 :     pub(crate) fn offset(&self) -> usize {
    2880         4567 :         self.into_usize()
    2881         4567 :     }
    2882              : }
    2883              : 
    2884              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2885              : 
    2886              : #[allow(clippy::bool_assert_comparison)]
    2887              : #[cfg(test)]
    2888              : mod tests {
    2889              :     use hex_literal::hex;
    2890              :     use pageserver_api::models::ShardParameters;
    2891              :     use pageserver_api::shard::ShardStripeSize;
    2892              :     use utils::id::TimelineId;
    2893              :     use utils::shard::{ShardCount, ShardNumber};
    2894              : 
    2895              :     use super::*;
    2896              :     use crate::DEFAULT_PG_VERSION;
    2897              :     use crate::tenant::harness::TenantHarness;
    2898              : 
    2899              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2900              :     #[tokio::test]
    2901            1 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2902            1 :         let name = "aux_files_round_trip";
    2903            1 :         let harness = TenantHarness::create(name).await?;
    2904            1 : 
    2905            1 :         pub const TIMELINE_ID: TimelineId =
    2906            1 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2907            1 : 
    2908            1 :         let (tenant, ctx) = harness.load().await;
    2909            1 :         let (tline, ctx) = tenant
    2910            1 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2911            1 :             .await?;
    2912            1 :         let tline = tline.raw_timeline().unwrap();
    2913            1 : 
    2914            1 :         // First modification: insert two keys
    2915            1 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2916            1 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2917            1 :         modification.set_lsn(Lsn(0x1008))?;
    2918            1 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2919            1 :         modification.commit(&ctx).await?;
    2920            1 :         let expect_1008 = HashMap::from([
    2921            1 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2922            1 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2923            1 :         ]);
    2924            1 : 
    2925            1 :         let io_concurrency = IoConcurrency::spawn_for_test();
    2926            1 : 
    2927            1 :         let readback = tline
    2928            1 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    2929            1 :             .await?;
    2930            1 :         assert_eq!(readback, expect_1008);
    2931            1 : 
    2932            1 :         // Second modification: update one key, remove the other
    2933            1 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2934            1 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2935            1 :         modification.set_lsn(Lsn(0x2008))?;
    2936            1 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2937            1 :         modification.commit(&ctx).await?;
    2938            1 :         let expect_2008 =
    2939            1 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2940            1 : 
    2941            1 :         let readback = tline
    2942            1 :             .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
    2943            1 :             .await?;
    2944            1 :         assert_eq!(readback, expect_2008);
    2945            1 : 
    2946            1 :         // Reading back in time works
    2947            1 :         let readback = tline
    2948            1 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    2949            1 :             .await?;
    2950            1 :         assert_eq!(readback, expect_1008);
    2951            1 : 
    2952            1 :         Ok(())
    2953            1 :     }
    2954              : 
    2955              :     #[test]
    2956            1 :     fn gap_finding() {
    2957            1 :         let rel = RelTag {
    2958            1 :             spcnode: 1663,
    2959            1 :             dbnode: 208101,
    2960            1 :             relnode: 2620,
    2961            1 :             forknum: 0,
    2962            1 :         };
    2963            1 :         let base_blkno = 1;
    2964            1 : 
    2965            1 :         let base_key = rel_block_to_key(rel, base_blkno);
    2966            1 :         let before_base_key = rel_block_to_key(rel, base_blkno - 1);
    2967            1 : 
    2968            1 :         let shard = ShardIdentity::unsharded();
    2969            1 : 
    2970            1 :         let mut previous_nblocks = 0;
    2971           11 :         for i in 0..10 {
    2972           10 :             let crnt_blkno = base_blkno + i;
    2973           10 :             let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
    2974           10 : 
    2975           10 :             previous_nblocks = crnt_blkno + 1;
    2976           10 : 
    2977           10 :             if i == 0 {
    2978              :                 // The first block we write is 1, so we should find the gap.
    2979            1 :                 assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
    2980              :             } else {
    2981            9 :                 assert!(gaps.is_none());
    2982              :             }
    2983              :         }
    2984              : 
    2985              :         // This is an update to an already existing block. No gaps here.
    2986            1 :         let update_blkno = 5;
    2987            1 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    2988            1 :         assert!(gaps.is_none());
    2989              : 
    2990              :         // This is an update past the current end block.
    2991            1 :         let after_gap_blkno = 20;
    2992            1 :         let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
    2993            1 : 
    2994            1 :         let gap_start_key = rel_block_to_key(rel, previous_nblocks);
    2995            1 :         let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
    2996            1 :         assert_eq!(
    2997            1 :             gaps.unwrap(),
    2998            1 :             KeySpace::single(gap_start_key..after_gap_key)
    2999            1 :         );
    3000            1 :     }
    3001              : 
    3002              :     #[test]
    3003            1 :     fn sharded_gap_finding() {
    3004            1 :         let rel = RelTag {
    3005            1 :             spcnode: 1663,
    3006            1 :             dbnode: 208101,
    3007            1 :             relnode: 2620,
    3008            1 :             forknum: 0,
    3009            1 :         };
    3010            1 : 
    3011            1 :         let first_blkno = 6;
    3012            1 : 
    3013            1 :         // This shard will get the even blocks
    3014            1 :         let shard = ShardIdentity::from_params(
    3015            1 :             ShardNumber(0),
    3016            1 :             &ShardParameters {
    3017            1 :                 count: ShardCount(2),
    3018            1 :                 stripe_size: ShardStripeSize(1),
    3019            1 :             },
    3020            1 :         );
    3021            1 : 
    3022            1 :         // Only keys belonging to this shard are considered as gaps.
    3023            1 :         let mut previous_nblocks = 0;
    3024            1 :         let gaps =
    3025            1 :             DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
    3026            1 :         assert!(!gaps.ranges.is_empty());
    3027            3 :         for gap_range in gaps.ranges {
    3028            2 :             let mut k = gap_range.start;
    3029            4 :             while k != gap_range.end {
    3030            2 :                 assert_eq!(shard.get_shard_number(&k), shard.number);
    3031            2 :                 k = k.next();
    3032              :             }
    3033              :         }
    3034              : 
    3035            1 :         previous_nblocks = first_blkno;
    3036            1 : 
    3037            1 :         let update_blkno = 2;
    3038            1 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    3039            1 :         assert!(gaps.is_none());
    3040            1 :     }
    3041              : 
    3042              :     /*
    3043              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    3044              :             let incremental = timeline.get_current_logical_size();
    3045              :             let non_incremental = timeline
    3046              :                 .get_current_logical_size_non_incremental(lsn)
    3047              :                 .unwrap();
    3048              :             assert_eq!(incremental, non_incremental);
    3049              :         }
    3050              :     */
    3051              : 
    3052              :     /*
    3053              :     ///
    3054              :     /// Test list_rels() function, with branches and dropped relations
    3055              :     ///
    3056              :     #[test]
    3057              :     fn test_list_rels_drop() -> Result<()> {
    3058              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    3059              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    3060              :         const TESTDB: u32 = 111;
    3061              : 
    3062              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    3063              :         // after branching fails below
    3064              :         let mut writer = tline.begin_record(Lsn(0x10));
    3065              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    3066              :         writer.finish()?;
    3067              : 
    3068              :         // Create a relation on the timeline
    3069              :         let mut writer = tline.begin_record(Lsn(0x20));
    3070              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    3071              :         writer.finish()?;
    3072              : 
    3073              :         let writer = tline.begin_record(Lsn(0x00));
    3074              :         writer.finish()?;
    3075              : 
    3076              :         // Check that list_rels() lists it after LSN 2, but no before it
    3077              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    3078              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    3079              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    3080              : 
    3081              :         // Create a branch, check that the relation is visible there
    3082              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    3083              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    3084              :             Some(timeline) => timeline,
    3085              :             None => panic!("Should have a local timeline"),
    3086              :         };
    3087              :         let newtline = DatadirTimelineImpl::new(newtline);
    3088              :         assert!(newtline
    3089              :             .list_rels(0, TESTDB, Lsn(0x30))?
    3090              :             .contains(&TESTREL_A));
    3091              : 
    3092              :         // Drop it on the branch
    3093              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    3094              :         new_writer.drop_relation(TESTREL_A)?;
    3095              :         new_writer.finish()?;
    3096              : 
    3097              :         // Check that it's no longer listed on the branch after the point where it was dropped
    3098              :         assert!(newtline
    3099              :             .list_rels(0, TESTDB, Lsn(0x30))?
    3100              :             .contains(&TESTREL_A));
    3101              :         assert!(!newtline
    3102              :             .list_rels(0, TESTDB, Lsn(0x40))?
    3103              :             .contains(&TESTREL_A));
    3104              : 
    3105              :         // Run checkpoint and garbage collection and check that it's still not visible
    3106              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    3107              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    3108              : 
    3109              :         assert!(!newtline
    3110              :             .list_rels(0, TESTDB, Lsn(0x40))?
    3111              :             .contains(&TESTREL_A));
    3112              : 
    3113              :         Ok(())
    3114              :     }
    3115              :      */
    3116              : 
    3117              :     /*
    3118              :     #[test]
    3119              :     fn test_read_beyond_eof() -> Result<()> {
    3120              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    3121              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    3122              : 
    3123              :         make_some_layers(&tline, Lsn(0x20))?;
    3124              :         let mut writer = tline.begin_record(Lsn(0x60));
    3125              :         walingest.put_rel_page_image(
    3126              :             &mut writer,
    3127              :             TESTREL_A,
    3128              :             0,
    3129              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    3130              :         )?;
    3131              :         writer.finish()?;
    3132              : 
    3133              :         // Test read before rel creation. Should error out.
    3134              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    3135              : 
    3136              :         // Read block beyond end of relation at different points in time.
    3137              :         // These reads should fall into different delta, image, and in-memory layers.
    3138              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    3139              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    3140              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    3141              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    3142              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    3143              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    3144              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    3145              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    3146              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    3147              : 
    3148              :         // Test on an in-memory layer with no preceding layer
    3149              :         let mut writer = tline.begin_record(Lsn(0x70));
    3150              :         walingest.put_rel_page_image(
    3151              :             &mut writer,
    3152              :             TESTREL_B,
    3153              :             0,
    3154              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    3155              :         )?;
    3156              :         writer.finish()?;
    3157              : 
    3158              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    3159              : 
    3160              :         Ok(())
    3161              :     }
    3162              :      */
    3163              : }
        

Generated by: LCOV version 2.1-beta