LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 57.2 % 1949 1114
Test Date: 2025-05-01 22:50:11 Functions: 45.1 % 206 93

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use std::collections::{HashMap, HashSet, hash_map};
      10              : use std::ops::{ControlFlow, Range};
      11              : 
      12              : use crate::walingest::{WalIngestError, WalIngestErrorKind};
      13              : use crate::{PERF_TRACE_TARGET, ensure_walingest};
      14              : use anyhow::Context;
      15              : use bytes::{Buf, Bytes, BytesMut};
      16              : use enum_map::Enum;
      17              : use pageserver_api::key::{
      18              :     AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
      19              :     TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
      20              :     rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
      21              :     repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
      22              :     slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
      23              : };
      24              : use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
      25              : use pageserver_api::models::RelSizeMigration;
      26              : use pageserver_api::record::NeonWalRecord;
      27              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      28              : use pageserver_api::shard::ShardIdentity;
      29              : use pageserver_api::value::Value;
      30              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      31              : use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
      32              : use serde::{Deserialize, Serialize};
      33              : use strum::IntoEnumIterator;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::{debug, info, info_span, trace, warn};
      36              : use utils::bin_ser::{BeSer, DeserializeError};
      37              : use utils::lsn::Lsn;
      38              : use utils::pausable_failpoint;
      39              : use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
      40              : 
      41              : use super::tenant::{PageReconstructError, Timeline};
      42              : use crate::aux_file;
      43              : use crate::context::{PerfInstrumentFutureExt, RequestContext};
      44              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      45              : use crate::metrics::{
      46              :     RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
      47              : };
      48              : use crate::span::{
      49              :     debug_assert_current_span_has_tenant_and_timeline_id,
      50              :     debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
      51              : };
      52              : use crate::tenant::storage_layer::IoConcurrency;
      53              : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
      54              : 
      55              : /// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
      56              : pub const MAX_AUX_FILE_DELTAS: usize = 1024;
      57              : 
      58              : /// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
      59              : pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
      60              : 
      61              : #[derive(Debug)]
      62              : pub enum LsnForTimestamp {
      63              :     /// Found commits both before and after the given timestamp
      64              :     Present(Lsn),
      65              : 
      66              :     /// Found no commits after the given timestamp, this means
      67              :     /// that the newest data in the branch is older than the given
      68              :     /// timestamp.
      69              :     ///
      70              :     /// All commits <= LSN happened before the given timestamp
      71              :     Future(Lsn),
      72              : 
      73              :     /// The queried timestamp is past our horizon we look back at (PITR)
      74              :     ///
      75              :     /// All commits > LSN happened after the given timestamp,
      76              :     /// but any commits < LSN might have happened before or after
      77              :     /// the given timestamp. We don't know because no data before
      78              :     /// the given lsn is available.
      79              :     Past(Lsn),
      80              : 
      81              :     /// We have found no commit with a timestamp,
      82              :     /// so we can't return anything meaningful.
      83              :     ///
      84              :     /// The associated LSN is the lower bound value we can safely
      85              :     /// create branches on, but no statement is made if it is
      86              :     /// older or newer than the timestamp.
      87              :     ///
      88              :     /// This variant can e.g. be returned right after a
      89              :     /// cluster import.
      90              :     NoData(Lsn),
      91              : }
      92              : 
      93              : #[derive(Debug, thiserror::Error)]
      94              : pub(crate) enum CalculateLogicalSizeError {
      95              :     #[error("cancelled")]
      96              :     Cancelled,
      97              : 
      98              :     /// Something went wrong while reading the metadata we use to calculate logical size
      99              :     /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
     100              :     /// in the `From` implementation for this variant.
     101              :     #[error(transparent)]
     102              :     PageRead(PageReconstructError),
     103              : 
     104              :     /// Something went wrong deserializing metadata that we read to calculate logical size
     105              :     #[error("decode error: {0}")]
     106              :     Decode(#[from] DeserializeError),
     107              : }
     108              : 
     109              : #[derive(Debug, thiserror::Error)]
     110              : pub(crate) enum CollectKeySpaceError {
     111              :     #[error(transparent)]
     112              :     Decode(#[from] DeserializeError),
     113              :     #[error(transparent)]
     114              :     PageRead(PageReconstructError),
     115              :     #[error("cancelled")]
     116              :     Cancelled,
     117              : }
     118              : 
     119              : impl From<PageReconstructError> for CollectKeySpaceError {
     120            0 :     fn from(err: PageReconstructError) -> Self {
     121            0 :         match err {
     122            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     123            0 :             err => Self::PageRead(err),
     124              :         }
     125            0 :     }
     126              : }
     127              : 
     128              : impl From<PageReconstructError> for CalculateLogicalSizeError {
     129            0 :     fn from(pre: PageReconstructError) -> Self {
     130            0 :         match pre {
     131            0 :             PageReconstructError::Cancelled => Self::Cancelled,
     132            0 :             _ => Self::PageRead(pre),
     133              :         }
     134            0 :     }
     135              : }
     136              : 
     137              : #[derive(Debug, thiserror::Error)]
     138              : pub enum RelationError {
     139              :     #[error("invalid relnode")]
     140              :     InvalidRelnode,
     141              : }
     142              : 
     143              : ///
     144              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
     145              : /// and other special kinds of files, in a versioned key-value store. The
     146              : /// Timeline struct provides the key-value store.
     147              : ///
     148              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
     149              : /// implementation, and might be moved into a separate struct later.
     150              : impl Timeline {
     151              :     /// Start ingesting a WAL record, or other atomic modification of
     152              :     /// the timeline.
     153              :     ///
     154              :     /// This provides a transaction-like interface to perform a bunch
     155              :     /// of modifications atomically.
     156              :     ///
     157              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
     158              :     /// DatadirModification object. Use the functions in the object to
     159              :     /// modify the repository state, updating all the pages and metadata
     160              :     /// that the WAL record affects. When you're done, call commit() to
     161              :     /// commit the changes.
     162              :     ///
     163              :     /// Lsn stored in modification is advanced by `ingest_record` and
     164              :     /// is used by `commit()` to update `last_record_lsn`.
     165              :     ///
     166              :     /// Calling commit() will flush all the changes and reset the state,
     167              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
     168              :     ///
     169              :     /// Note that any pending modifications you make through the
     170              :     /// modification object won't be visible to calls to the 'get' and list
     171              :     /// functions of the timeline until you finish! And if you update the
     172              :     /// same page twice, the last update wins.
     173              :     ///
     174      1610556 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
     175      1610556 :     where
     176      1610556 :         Self: Sized,
     177      1610556 :     {
     178      1610556 :         DatadirModification {
     179      1610556 :             tline: self,
     180      1610556 :             pending_lsns: Vec::new(),
     181      1610556 :             pending_metadata_pages: HashMap::new(),
     182      1610556 :             pending_data_batch: None,
     183      1610556 :             pending_deletions: Vec::new(),
     184      1610556 :             pending_nblocks: 0,
     185      1610556 :             pending_directory_entries: Vec::new(),
     186      1610556 :             pending_metadata_bytes: 0,
     187      1610556 :             lsn,
     188      1610556 :         }
     189      1610556 :     }
     190              : 
     191              :     //------------------------------------------------------------------------------
     192              :     // Public GET functions
     193              :     //------------------------------------------------------------------------------
     194              : 
     195              :     /// Look up given page version.
     196       110304 :     pub(crate) async fn get_rel_page_at_lsn(
     197       110304 :         &self,
     198       110304 :         tag: RelTag,
     199       110304 :         blknum: BlockNumber,
     200       110304 :         version: Version<'_>,
     201       110304 :         ctx: &RequestContext,
     202       110304 :         io_concurrency: IoConcurrency,
     203       110304 :     ) -> Result<Bytes, PageReconstructError> {
     204       110304 :         match version {
     205       110304 :             Version::Lsn(effective_lsn) => {
     206       110304 :                 let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
     207       110304 :                 let res = self
     208       110304 :                     .get_rel_page_at_lsn_batched(
     209       110304 :                         pages.iter().map(|(tag, blknum)| {
     210       110304 :                             (tag, blknum, effective_lsn, ctx.attached_child())
     211       110304 :                         }),
     212       110304 :                         io_concurrency.clone(),
     213       110304 :                         ctx,
     214       110304 :                     )
     215       110304 :                     .await;
     216       110304 :                 assert_eq!(res.len(), 1);
     217       110304 :                 res.into_iter().next().unwrap()
     218              :             }
     219            0 :             Version::Modified(modification) => {
     220            0 :                 if tag.relnode == 0 {
     221            0 :                     return Err(PageReconstructError::Other(
     222            0 :                         RelationError::InvalidRelnode.into(),
     223            0 :                     ));
     224            0 :                 }
     225              : 
     226            0 :                 let nblocks = self.get_rel_size(tag, version, ctx).await?;
     227            0 :                 if blknum >= nblocks {
     228            0 :                     debug!(
     229            0 :                         "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     230            0 :                         tag,
     231            0 :                         blknum,
     232            0 :                         version.get_lsn(),
     233              :                         nblocks
     234              :                     );
     235            0 :                     return Ok(ZERO_PAGE.clone());
     236            0 :                 }
     237            0 : 
     238            0 :                 let key = rel_block_to_key(tag, blknum);
     239            0 :                 modification.get(key, ctx).await
     240              :             }
     241              :         }
     242       110304 :     }
     243              : 
     244              :     /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
     245              :     ///
     246              :     /// The ordering of the returned vec corresponds to the ordering of `pages`.
     247       110304 :     pub(crate) async fn get_rel_page_at_lsn_batched(
     248       110304 :         &self,
     249       110304 :         pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
     250       110304 :         io_concurrency: IoConcurrency,
     251       110304 :         ctx: &RequestContext,
     252       110304 :     ) -> Vec<Result<Bytes, PageReconstructError>> {
     253       110304 :         debug_assert_current_span_has_tenant_and_timeline_id();
     254       110304 : 
     255       110304 :         let mut slots_filled = 0;
     256       110304 :         let page_count = pages.len();
     257       110304 : 
     258       110304 :         // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
     259       110304 :         let mut result = Vec::with_capacity(pages.len());
     260       110304 :         let result_slots = result.spare_capacity_mut();
     261       110304 : 
     262       110304 :         let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
     263       110304 :             HashMap::with_capacity(pages.len());
     264       110304 : 
     265       110304 :         let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
     266       110304 :             HashMap::with_capacity(pages.len());
     267              : 
     268       110304 :         for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
     269       110304 :             if tag.relnode == 0 {
     270            0 :                 result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
     271            0 :                     RelationError::InvalidRelnode.into(),
     272            0 :                 )));
     273            0 : 
     274            0 :                 slots_filled += 1;
     275            0 :                 continue;
     276       110304 :             }
     277              : 
     278       110304 :             let nblocks = match self
     279       110304 :                 .get_rel_size(*tag, Version::Lsn(lsn), &ctx)
     280       110304 :                 .maybe_perf_instrument(&ctx, |crnt_perf_span| {
     281            0 :                     info_span!(
     282              :                         target: PERF_TRACE_TARGET,
     283            0 :                         parent: crnt_perf_span,
     284              :                         "GET_REL_SIZE",
     285              :                         reltag=%tag,
     286              :                         lsn=%lsn,
     287              :                     )
     288       110304 :                 })
     289       110304 :                 .await
     290              :             {
     291       110304 :                 Ok(nblocks) => nblocks,
     292            0 :                 Err(err) => {
     293            0 :                     result_slots[response_slot_idx].write(Err(err));
     294            0 :                     slots_filled += 1;
     295            0 :                     continue;
     296              :                 }
     297              :             };
     298              : 
     299       110304 :             if *blknum >= nblocks {
     300            0 :                 debug!(
     301            0 :                     "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     302              :                     tag, blknum, lsn, nblocks
     303              :                 );
     304            0 :                 result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
     305            0 :                 slots_filled += 1;
     306            0 :                 continue;
     307       110304 :             }
     308       110304 : 
     309       110304 :             let key = rel_block_to_key(*tag, *blknum);
     310       110304 : 
     311       110304 :             let key_slots = keys_slots.entry(key).or_default();
     312       110304 :             key_slots.push((response_slot_idx, ctx));
     313       110304 : 
     314       110304 :             let acc = req_keyspaces.entry(lsn).or_default();
     315       110304 :             acc.add_key(key);
     316              :         }
     317              : 
     318       110304 :         let query: Vec<(Lsn, KeySpace)> = req_keyspaces
     319       110304 :             .into_iter()
     320       110304 :             .map(|(lsn, acc)| (lsn, acc.to_keyspace()))
     321       110304 :             .collect();
     322       110304 : 
     323       110304 :         let query = VersionedKeySpaceQuery::scattered(query);
     324       110304 :         let res = self
     325       110304 :             .get_vectored(query, io_concurrency, ctx)
     326       110304 :             .maybe_perf_instrument(ctx, |current_perf_span| {
     327            0 :                 info_span!(
     328              :                     target: PERF_TRACE_TARGET,
     329            0 :                     parent: current_perf_span,
     330              :                     "GET_BATCH",
     331              :                     batch_size = %page_count,
     332              :                 )
     333       110304 :             })
     334       110304 :             .await;
     335              : 
     336       110304 :         match res {
     337       110304 :             Ok(results) => {
     338       220608 :                 for (key, res) in results {
     339       110304 :                     let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
     340       110304 :                     let (first_slot, first_req_ctx) = key_slots.next().unwrap();
     341              : 
     342       110304 :                     for (slot, req_ctx) in key_slots {
     343            0 :                         let clone = match &res {
     344            0 :                             Ok(buf) => Ok(buf.clone()),
     345            0 :                             Err(err) => Err(match err {
     346            0 :                                 PageReconstructError::Cancelled => PageReconstructError::Cancelled,
     347              : 
     348            0 :                                 x @ PageReconstructError::Other(_)
     349            0 :                                 | x @ PageReconstructError::AncestorLsnTimeout(_)
     350            0 :                                 | x @ PageReconstructError::WalRedo(_)
     351            0 :                                 | x @ PageReconstructError::MissingKey(_) => {
     352            0 :                                     PageReconstructError::Other(anyhow::anyhow!(
     353            0 :                                         "there was more than one request for this key in the batch, error logged once: {x:?}"
     354            0 :                                     ))
     355              :                                 }
     356              :                             }),
     357              :                         };
     358              : 
     359            0 :                         result_slots[slot].write(clone);
     360            0 :                         // There is no standardized way to express that the batched span followed from N request spans.
     361            0 :                         // So, abuse the system and mark the request contexts as follows_from the batch span, so we get
     362            0 :                         // some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
     363            0 :                         req_ctx.perf_follows_from(ctx);
     364            0 :                         slots_filled += 1;
     365              :                     }
     366              : 
     367       110304 :                     result_slots[first_slot].write(res);
     368       110304 :                     first_req_ctx.perf_follows_from(ctx);
     369       110304 :                     slots_filled += 1;
     370              :                 }
     371              :             }
     372            0 :             Err(err) => {
     373              :                 // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
     374              :                 // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
     375            0 :                 for (slot, req_ctx) in keys_slots.values().flatten() {
     376              :                     // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
     377              :                     // but without taking ownership of the GetVectoredError
     378            0 :                     let err = match &err {
     379            0 :                         GetVectoredError::Cancelled => Err(PageReconstructError::Cancelled),
     380              :                         // TODO: restructure get_vectored API to make this error per-key
     381            0 :                         GetVectoredError::MissingKey(err) => {
     382            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!(
     383            0 :                                 "whole vectored get request failed because one or more of the requested keys were missing: {err:?}"
     384            0 :                             )))
     385              :                         }
     386              :                         // TODO: restructure get_vectored API to make this error per-key
     387            0 :                         GetVectoredError::GetReadyAncestorError(err) => {
     388            0 :                             Err(PageReconstructError::Other(anyhow::anyhow!(
     389            0 :                                 "whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"
     390            0 :                             )))
     391              :                         }
     392              :                         // TODO: restructure get_vectored API to make this error per-key
     393            0 :                         GetVectoredError::Other(err) => Err(PageReconstructError::Other(
     394            0 :                             anyhow::anyhow!("whole vectored get request failed: {err:?}"),
     395            0 :                         )),
     396              :                         // TODO: we can prevent this error class by moving this check into the type system
     397            0 :                         GetVectoredError::InvalidLsn(e) => {
     398            0 :                             Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
     399              :                         }
     400              :                         // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
     401              :                         // TODO: we can prevent this error class by moving this check into the type system
     402            0 :                         GetVectoredError::Oversized(err) => {
     403            0 :                             Err(anyhow::anyhow!("batching oversized: {err:?}").into())
     404              :                         }
     405              :                     };
     406              : 
     407            0 :                     req_ctx.perf_follows_from(ctx);
     408            0 :                     result_slots[*slot].write(err);
     409              :                 }
     410              : 
     411            0 :                 slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
     412            0 :             }
     413              :         };
     414              : 
     415       110304 :         assert_eq!(slots_filled, page_count);
     416              :         // SAFETY:
     417              :         // 1. `result` and any of its uninint members are not read from until this point
     418              :         // 2. The length below is tracked at run-time and matches the number of requested pages.
     419       110304 :         unsafe {
     420       110304 :             result.set_len(page_count);
     421       110304 :         }
     422       110304 : 
     423       110304 :         result
     424       110304 :     }
     425              : 
     426              :     /// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
     427              :     /// other shards, by only accounting for relations the shard has pages for, and only accounting
     428              :     /// for pages up to the highest page number it has stored.
     429            0 :     pub(crate) async fn get_db_size(
     430            0 :         &self,
     431            0 :         spcnode: Oid,
     432            0 :         dbnode: Oid,
     433            0 :         version: Version<'_>,
     434            0 :         ctx: &RequestContext,
     435            0 :     ) -> Result<usize, PageReconstructError> {
     436            0 :         let mut total_blocks = 0;
     437              : 
     438            0 :         let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
     439              : 
     440            0 :         for rel in rels {
     441            0 :             let n_blocks = self.get_rel_size(rel, version, ctx).await?;
     442            0 :             total_blocks += n_blocks as usize;
     443              :         }
     444            0 :         Ok(total_blocks)
     445            0 :     }
     446              : 
     447              :     /// Get size of a relation file. The relation must exist, otherwise an error is returned.
     448              :     ///
     449              :     /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
     450              :     /// page number stored in the shard.
     451       146604 :     pub(crate) async fn get_rel_size(
     452       146604 :         &self,
     453       146604 :         tag: RelTag,
     454       146604 :         version: Version<'_>,
     455       146604 :         ctx: &RequestContext,
     456       146604 :     ) -> Result<BlockNumber, PageReconstructError> {
     457       146604 :         if tag.relnode == 0 {
     458            0 :             return Err(PageReconstructError::Other(
     459            0 :                 RelationError::InvalidRelnode.into(),
     460            0 :             ));
     461       146604 :         }
     462              : 
     463       146604 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     464       115764 :             return Ok(nblocks);
     465        30840 :         }
     466        30840 : 
     467        30840 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     468            0 :             && !self.get_rel_exists(tag, version, ctx).await?
     469              :         {
     470              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     471              :             // FSM, and smgrnblocks() on it immediately afterwards,
     472              :             // without extending it.  Tolerate that by claiming that
     473              :             // any non-existent FSM fork has size 0.
     474            0 :             return Ok(0);
     475        30840 :         }
     476        30840 : 
     477        30840 :         let key = rel_size_to_key(tag);
     478        30840 :         let mut buf = version.get(self, key, ctx).await?;
     479        30816 :         let nblocks = buf.get_u32_le();
     480        30816 : 
     481        30816 :         self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
     482        30816 : 
     483        30816 :         Ok(nblocks)
     484       146604 :     }
     485              : 
     486              :     /// Does the relation exist?
     487              :     ///
     488              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     489              :     /// the shard stores pages for.
     490        36300 :     pub(crate) async fn get_rel_exists(
     491        36300 :         &self,
     492        36300 :         tag: RelTag,
     493        36300 :         version: Version<'_>,
     494        36300 :         ctx: &RequestContext,
     495        36300 :     ) -> Result<bool, PageReconstructError> {
     496        36300 :         if tag.relnode == 0 {
     497            0 :             return Err(PageReconstructError::Other(
     498            0 :                 RelationError::InvalidRelnode.into(),
     499            0 :             ));
     500        36300 :         }
     501              : 
     502              :         // first try to lookup relation in cache
     503        36300 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
     504        36192 :             return Ok(true);
     505          108 :         }
     506              :         // then check if the database was already initialized.
     507              :         // get_rel_exists can be called before dbdir is created.
     508          108 :         let buf = version.get(self, DBDIR_KEY, ctx).await?;
     509          108 :         let dbdirs = DbDirectory::des(&buf)?.dbdirs;
     510          108 :         if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
     511            0 :             return Ok(false);
     512          108 :         }
     513          108 : 
     514          108 :         // Read path: first read the new reldir keyspace. Early return if the relation exists.
     515          108 :         // Otherwise, read the old reldir keyspace.
     516          108 :         // TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
     517          108 : 
     518          108 :         if let RelSizeMigration::Migrated | RelSizeMigration::Migrating =
     519          108 :             self.get_rel_size_v2_status()
     520              :         {
     521              :             // fetch directory listing (new)
     522            0 :             let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
     523            0 :             let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
     524            0 :                 .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
     525            0 :             let exists_v2 = buf == RelDirExists::Exists;
     526            0 :             // Fast path: if the relation exists in the new format, return true.
     527            0 :             // TODO: we should have a verification mode that checks both keyspaces
     528            0 :             // to ensure the relation only exists in one of them.
     529            0 :             if exists_v2 {
     530            0 :                 return Ok(true);
     531            0 :             }
     532          108 :         }
     533              : 
     534              :         // fetch directory listing (old)
     535              : 
     536          108 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     537          108 :         let buf = version.get(self, key, ctx).await?;
     538              : 
     539          108 :         let dir = RelDirectory::des(&buf)?;
     540          108 :         let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
     541          108 :         Ok(exists_v1)
     542        36300 :     }
     543              : 
     544              :     /// Get a list of all existing relations in given tablespace and database.
     545              :     ///
     546              :     /// Only shard 0 has a full view of the relations. Other shards only know about relations that
     547              :     /// the shard stores pages for.
     548              :     ///
     549              :     /// # Cancel-Safety
     550              :     ///
     551              :     /// This method is cancellation-safe.
     552            0 :     pub(crate) async fn list_rels(
     553            0 :         &self,
     554            0 :         spcnode: Oid,
     555            0 :         dbnode: Oid,
     556            0 :         version: Version<'_>,
     557            0 :         ctx: &RequestContext,
     558            0 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     559            0 :         // fetch directory listing (old)
     560            0 :         let key = rel_dir_to_key(spcnode, dbnode);
     561            0 :         let buf = version.get(self, key, ctx).await?;
     562              : 
     563            0 :         let dir = RelDirectory::des(&buf)?;
     564            0 :         let rels_v1: HashSet<RelTag> =
     565            0 :             HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     566            0 :                 spcnode,
     567            0 :                 dbnode,
     568            0 :                 relnode: *relnode,
     569            0 :                 forknum: *forknum,
     570            0 :             }));
     571            0 : 
     572            0 :         if let RelSizeMigration::Legacy = self.get_rel_size_v2_status() {
     573            0 :             return Ok(rels_v1);
     574            0 :         }
     575            0 : 
     576            0 :         // scan directory listing (new), merge with the old results
     577            0 :         let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
     578            0 :         let io_concurrency = IoConcurrency::spawn_from_conf(
     579            0 :             self.conf,
     580            0 :             self.gate
     581            0 :                 .enter()
     582            0 :                 .map_err(|_| PageReconstructError::Cancelled)?,
     583              :         );
     584            0 :         let results = self
     585            0 :             .scan(
     586            0 :                 KeySpace::single(key_range),
     587            0 :                 version.get_lsn(),
     588            0 :                 ctx,
     589            0 :                 io_concurrency,
     590            0 :             )
     591            0 :             .await?;
     592            0 :         let mut rels = rels_v1;
     593            0 :         for (key, val) in results {
     594            0 :             let val = RelDirExists::decode(&val?)
     595            0 :                 .map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
     596            0 :             assert_eq!(key.field6, 1);
     597            0 :             assert_eq!(key.field2, spcnode);
     598            0 :             assert_eq!(key.field3, dbnode);
     599            0 :             let tag = RelTag {
     600            0 :                 spcnode,
     601            0 :                 dbnode,
     602            0 :                 relnode: key.field4,
     603            0 :                 forknum: key.field5,
     604            0 :             };
     605            0 :             if val == RelDirExists::Removed {
     606            0 :                 debug_assert!(!rels.contains(&tag), "removed reltag in v2");
     607            0 :                 continue;
     608            0 :             }
     609            0 :             let did_not_contain = rels.insert(tag);
     610            0 :             debug_assert!(did_not_contain, "duplicate reltag in v2");
     611              :         }
     612            0 :         Ok(rels)
     613            0 :     }
     614              : 
     615              :     /// Get the whole SLRU segment
     616            0 :     pub(crate) async fn get_slru_segment(
     617            0 :         &self,
     618            0 :         kind: SlruKind,
     619            0 :         segno: u32,
     620            0 :         lsn: Lsn,
     621            0 :         ctx: &RequestContext,
     622            0 :     ) -> Result<Bytes, PageReconstructError> {
     623            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     624            0 :         let n_blocks = self
     625            0 :             .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
     626            0 :             .await?;
     627              : 
     628            0 :         let keyspace = KeySpace::single(
     629            0 :             slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks),
     630            0 :         );
     631            0 : 
     632            0 :         let batches = keyspace.partition(
     633            0 :             self.get_shard_identity(),
     634            0 :             Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
     635            0 :         );
     636              : 
     637            0 :         let io_concurrency = IoConcurrency::spawn_from_conf(
     638            0 :             self.conf,
     639            0 :             self.gate
     640            0 :                 .enter()
     641            0 :                 .map_err(|_| PageReconstructError::Cancelled)?,
     642              :         );
     643              : 
     644            0 :         let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
     645            0 :         for batch in batches.parts {
     646            0 :             let query = VersionedKeySpaceQuery::uniform(batch, lsn);
     647            0 :             let blocks = self
     648            0 :                 .get_vectored(query, io_concurrency.clone(), ctx)
     649            0 :                 .await?;
     650              : 
     651            0 :             for (_key, block) in blocks {
     652            0 :                 let block = block?;
     653            0 :                 segment.extend_from_slice(&block[..BLCKSZ as usize]);
     654              :             }
     655              :         }
     656              : 
     657            0 :         Ok(segment.freeze())
     658            0 :     }
     659              : 
     660              :     /// Get size of an SLRU segment
     661            0 :     pub(crate) async fn get_slru_segment_size(
     662            0 :         &self,
     663            0 :         kind: SlruKind,
     664            0 :         segno: u32,
     665            0 :         version: Version<'_>,
     666            0 :         ctx: &RequestContext,
     667            0 :     ) -> Result<BlockNumber, PageReconstructError> {
     668            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     669            0 :         let key = slru_segment_size_to_key(kind, segno);
     670            0 :         let mut buf = version.get(self, key, ctx).await?;
     671            0 :         Ok(buf.get_u32_le())
     672            0 :     }
     673              : 
     674              :     /// Does the slru segment exist?
     675            0 :     pub(crate) async fn get_slru_segment_exists(
     676            0 :         &self,
     677            0 :         kind: SlruKind,
     678            0 :         segno: u32,
     679            0 :         version: Version<'_>,
     680            0 :         ctx: &RequestContext,
     681            0 :     ) -> Result<bool, PageReconstructError> {
     682            0 :         assert!(self.tenant_shard_id.is_shard_zero());
     683              :         // fetch directory listing
     684            0 :         let key = slru_dir_to_key(kind);
     685            0 :         let buf = version.get(self, key, ctx).await?;
     686              : 
     687            0 :         let dir = SlruSegmentDirectory::des(&buf)?;
     688            0 :         Ok(dir.segments.contains(&segno))
     689            0 :     }
     690              : 
     691              :     /// Locate LSN, such that all transactions that committed before
     692              :     /// 'search_timestamp' are visible, but nothing newer is.
     693              :     ///
     694              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     695              :     /// so it's not well defined which LSN you get if there were multiple commits
     696              :     /// "in flight" at that point in time.
     697              :     ///
     698            0 :     pub(crate) async fn find_lsn_for_timestamp(
     699            0 :         &self,
     700            0 :         search_timestamp: TimestampTz,
     701            0 :         cancel: &CancellationToken,
     702            0 :         ctx: &RequestContext,
     703            0 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     704            0 :         pausable_failpoint!("find-lsn-for-timestamp-pausable");
     705              : 
     706            0 :         let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
     707            0 :         let gc_cutoff_planned = {
     708            0 :             let gc_info = self.gc_info.read().unwrap();
     709            0 :             gc_info.min_cutoff()
     710            0 :         };
     711            0 :         // Usually the planned cutoff is newer than the cutoff of the last gc run,
     712            0 :         // but let's be defensive.
     713            0 :         let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
     714            0 :         // We use this method to figure out the branching LSN for the new branch, but the
     715            0 :         // GC cutoff could be before the branching point and we cannot create a new branch
     716            0 :         // with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
     717            0 :         // on the safe side.
     718            0 :         let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
     719            0 :         let max_lsn = self.get_last_record_lsn();
     720            0 : 
     721            0 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     722            0 :         // LSN divided by 8.
     723            0 :         let mut low = min_lsn.0 / 8;
     724            0 :         let mut high = max_lsn.0 / 8 + 1;
     725            0 : 
     726            0 :         let mut found_smaller = false;
     727            0 :         let mut found_larger = false;
     728              : 
     729            0 :         while low < high {
     730            0 :             if cancel.is_cancelled() {
     731            0 :                 return Err(PageReconstructError::Cancelled);
     732            0 :             }
     733            0 :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     734            0 :             let mid = (high + low) / 2;
     735              : 
     736            0 :             let cmp = match self
     737            0 :                 .is_latest_commit_timestamp_ge_than(
     738            0 :                     search_timestamp,
     739            0 :                     Lsn(mid * 8),
     740            0 :                     &mut found_smaller,
     741            0 :                     &mut found_larger,
     742            0 :                     ctx,
     743            0 :                 )
     744            0 :                 .await
     745              :             {
     746            0 :                 Ok(res) => res,
     747            0 :                 Err(PageReconstructError::MissingKey(e)) => {
     748            0 :                     warn!(
     749            0 :                         "Missing key while find_lsn_for_timestamp. Either we might have already garbage-collected that data or the key is really missing. Last error: {:#}",
     750              :                         e
     751              :                     );
     752              :                     // Return that we didn't find any requests smaller than the LSN, and logging the error.
     753            0 :                     return Ok(LsnForTimestamp::Past(min_lsn));
     754              :                 }
     755            0 :                 Err(e) => return Err(e),
     756              :             };
     757              : 
     758            0 :             if cmp {
     759            0 :                 high = mid;
     760            0 :             } else {
     761            0 :                 low = mid + 1;
     762            0 :             }
     763              :         }
     764              : 
     765              :         // If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
     766              :         // so the LSN of the last commit record before or at `search_timestamp`.
     767              :         // Remove one from `low` to get `t`.
     768              :         //
     769              :         // FIXME: it would be better to get the LSN of the previous commit.
     770              :         // Otherwise, if you restore to the returned LSN, the database will
     771              :         // include physical changes from later commits that will be marked
     772              :         // as aborted, and will need to be vacuumed away.
     773            0 :         let commit_lsn = Lsn((low - 1) * 8);
     774            0 :         match (found_smaller, found_larger) {
     775              :             (false, false) => {
     776              :                 // This can happen if no commit records have been processed yet, e.g.
     777              :                 // just after importing a cluster.
     778            0 :                 Ok(LsnForTimestamp::NoData(min_lsn))
     779              :             }
     780              :             (false, true) => {
     781              :                 // Didn't find any commit timestamps smaller than the request
     782            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     783              :             }
     784            0 :             (true, _) if commit_lsn < min_lsn => {
     785            0 :                 // the search above did set found_smaller to true but it never increased the lsn.
     786            0 :                 // Then, low is still the old min_lsn, and the subtraction above gave a value
     787            0 :                 // below the min_lsn. We should never do that.
     788            0 :                 Ok(LsnForTimestamp::Past(min_lsn))
     789              :             }
     790              :             (true, false) => {
     791              :                 // Only found commits with timestamps smaller than the request.
     792              :                 // It's still a valid case for branch creation, return it.
     793              :                 // And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
     794              :                 // case, anyway.
     795            0 :                 Ok(LsnForTimestamp::Future(commit_lsn))
     796              :             }
     797            0 :             (true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
     798              :         }
     799            0 :     }
     800              : 
     801              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     802              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     803              :     ///
     804              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     805              :     /// with a smaller/larger timestamp.
     806              :     ///
     807            0 :     pub(crate) async fn is_latest_commit_timestamp_ge_than(
     808            0 :         &self,
     809            0 :         search_timestamp: TimestampTz,
     810            0 :         probe_lsn: Lsn,
     811            0 :         found_smaller: &mut bool,
     812            0 :         found_larger: &mut bool,
     813            0 :         ctx: &RequestContext,
     814            0 :     ) -> Result<bool, PageReconstructError> {
     815            0 :         self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
     816            0 :             if timestamp >= search_timestamp {
     817            0 :                 *found_larger = true;
     818            0 :                 return ControlFlow::Break(true);
     819            0 :             } else {
     820            0 :                 *found_smaller = true;
     821            0 :             }
     822            0 :             ControlFlow::Continue(())
     823            0 :         })
     824            0 :         .await
     825            0 :     }
     826              : 
     827              :     /// Obtain the timestamp for the given lsn.
     828              :     ///
     829              :     /// If the lsn has no timestamps (e.g. no commits), returns None.
     830            0 :     pub(crate) async fn get_timestamp_for_lsn(
     831            0 :         &self,
     832            0 :         probe_lsn: Lsn,
     833            0 :         ctx: &RequestContext,
     834            0 :     ) -> Result<Option<TimestampTz>, PageReconstructError> {
     835            0 :         let mut max: Option<TimestampTz> = None;
     836            0 :         self.map_all_timestamps::<()>(probe_lsn, ctx, |timestamp| {
     837            0 :             if let Some(max_prev) = max {
     838            0 :                 max = Some(max_prev.max(timestamp));
     839            0 :             } else {
     840            0 :                 max = Some(timestamp);
     841            0 :             }
     842            0 :             ControlFlow::Continue(())
     843            0 :         })
     844            0 :         .await?;
     845              : 
     846            0 :         Ok(max)
     847            0 :     }
     848              : 
     849              :     /// Runs the given function on all the timestamps for a given lsn
     850              :     ///
     851              :     /// The return value is either given by the closure, or set to the `Default`
     852              :     /// impl's output.
     853            0 :     async fn map_all_timestamps<T: Default>(
     854            0 :         &self,
     855            0 :         probe_lsn: Lsn,
     856            0 :         ctx: &RequestContext,
     857            0 :         mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
     858            0 :     ) -> Result<T, PageReconstructError> {
     859            0 :         for segno in self
     860            0 :             .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
     861            0 :             .await?
     862              :         {
     863            0 :             let nblocks = self
     864            0 :                 .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
     865            0 :                 .await?;
     866              : 
     867            0 :             let keyspace = KeySpace::single(
     868            0 :                 slru_block_to_key(SlruKind::Clog, segno, 0)
     869            0 :                     ..slru_block_to_key(SlruKind::Clog, segno, nblocks),
     870            0 :             );
     871            0 : 
     872            0 :             let batches = keyspace.partition(
     873            0 :                 self.get_shard_identity(),
     874            0 :                 Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
     875            0 :             );
     876              : 
     877            0 :             let io_concurrency = IoConcurrency::spawn_from_conf(
     878            0 :                 self.conf,
     879            0 :                 self.gate
     880            0 :                     .enter()
     881            0 :                     .map_err(|_| PageReconstructError::Cancelled)?,
     882              :             );
     883              : 
     884            0 :             for batch in batches.parts.into_iter().rev() {
     885            0 :                 let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
     886            0 :                 let blocks = self
     887            0 :                     .get_vectored(query, io_concurrency.clone(), ctx)
     888            0 :                     .await?;
     889              : 
     890            0 :                 for (_key, clog_page) in blocks.into_iter().rev() {
     891            0 :                     let clog_page = clog_page?;
     892              : 
     893            0 :                     if clog_page.len() == BLCKSZ as usize + 8 {
     894            0 :                         let mut timestamp_bytes = [0u8; 8];
     895            0 :                         timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     896            0 :                         let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     897            0 : 
     898            0 :                         match f(timestamp) {
     899            0 :                             ControlFlow::Break(b) => return Ok(b),
     900            0 :                             ControlFlow::Continue(()) => (),
     901              :                         }
     902            0 :                     }
     903              :                 }
     904              :             }
     905              :         }
     906            0 :         Ok(Default::default())
     907            0 :     }
     908              : 
     909            0 :     pub(crate) async fn get_slru_keyspace(
     910            0 :         &self,
     911            0 :         version: Version<'_>,
     912            0 :         ctx: &RequestContext,
     913            0 :     ) -> Result<KeySpace, PageReconstructError> {
     914            0 :         let mut accum = KeySpaceAccum::new();
     915              : 
     916            0 :         for kind in SlruKind::iter() {
     917            0 :             let mut segments: Vec<u32> = self
     918            0 :                 .list_slru_segments(kind, version, ctx)
     919            0 :                 .await?
     920            0 :                 .into_iter()
     921            0 :                 .collect();
     922            0 :             segments.sort_unstable();
     923              : 
     924            0 :             for seg in segments {
     925            0 :                 let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
     926              : 
     927            0 :                 accum.add_range(
     928            0 :                     slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
     929            0 :                 );
     930              :             }
     931              :         }
     932              : 
     933            0 :         Ok(accum.to_keyspace())
     934            0 :     }
     935              : 
     936              :     /// Get a list of SLRU segments
     937            0 :     pub(crate) async fn list_slru_segments(
     938            0 :         &self,
     939            0 :         kind: SlruKind,
     940            0 :         version: Version<'_>,
     941            0 :         ctx: &RequestContext,
     942            0 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     943            0 :         // fetch directory entry
     944            0 :         let key = slru_dir_to_key(kind);
     945              : 
     946            0 :         let buf = version.get(self, key, ctx).await?;
     947            0 :         Ok(SlruSegmentDirectory::des(&buf)?.segments)
     948            0 :     }
     949              : 
     950            0 :     pub(crate) async fn get_relmap_file(
     951            0 :         &self,
     952            0 :         spcnode: Oid,
     953            0 :         dbnode: Oid,
     954            0 :         version: Version<'_>,
     955            0 :         ctx: &RequestContext,
     956            0 :     ) -> Result<Bytes, PageReconstructError> {
     957            0 :         let key = relmap_file_key(spcnode, dbnode);
     958              : 
     959            0 :         let buf = version.get(self, key, ctx).await?;
     960            0 :         Ok(buf)
     961            0 :     }
     962              : 
     963         2004 :     pub(crate) async fn list_dbdirs(
     964         2004 :         &self,
     965         2004 :         lsn: Lsn,
     966         2004 :         ctx: &RequestContext,
     967         2004 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     968              :         // fetch directory entry
     969         2004 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     970              : 
     971         2004 :         Ok(DbDirectory::des(&buf)?.dbdirs)
     972         2004 :     }
     973              : 
     974            0 :     pub(crate) async fn get_twophase_file(
     975            0 :         &self,
     976            0 :         xid: u64,
     977            0 :         lsn: Lsn,
     978            0 :         ctx: &RequestContext,
     979            0 :     ) -> Result<Bytes, PageReconstructError> {
     980            0 :         let key = twophase_file_key(xid);
     981            0 :         let buf = self.get(key, lsn, ctx).await?;
     982            0 :         Ok(buf)
     983            0 :     }
     984              : 
     985         2016 :     pub(crate) async fn list_twophase_files(
     986         2016 :         &self,
     987         2016 :         lsn: Lsn,
     988         2016 :         ctx: &RequestContext,
     989         2016 :     ) -> Result<HashSet<u64>, PageReconstructError> {
     990              :         // fetch directory entry
     991         2016 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     992              : 
     993         2016 :         if self.pg_version >= 17 {
     994         1860 :             Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
     995              :         } else {
     996          156 :             Ok(TwoPhaseDirectory::des(&buf)?
     997              :                 .xids
     998          156 :                 .iter()
     999          156 :                 .map(|x| u64::from(*x))
    1000          156 :                 .collect())
    1001              :         }
    1002         2016 :     }
    1003              : 
    1004            0 :     pub(crate) async fn get_control_file(
    1005            0 :         &self,
    1006            0 :         lsn: Lsn,
    1007            0 :         ctx: &RequestContext,
    1008            0 :     ) -> Result<Bytes, PageReconstructError> {
    1009            0 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
    1010            0 :     }
    1011              : 
    1012           72 :     pub(crate) async fn get_checkpoint(
    1013           72 :         &self,
    1014           72 :         lsn: Lsn,
    1015           72 :         ctx: &RequestContext,
    1016           72 :     ) -> Result<Bytes, PageReconstructError> {
    1017           72 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
    1018           72 :     }
    1019              : 
    1020           72 :     async fn list_aux_files_v2(
    1021           72 :         &self,
    1022           72 :         lsn: Lsn,
    1023           72 :         ctx: &RequestContext,
    1024           72 :         io_concurrency: IoConcurrency,
    1025           72 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
    1026           72 :         let kv = self
    1027           72 :             .scan(
    1028           72 :                 KeySpace::single(Key::metadata_aux_key_range()),
    1029           72 :                 lsn,
    1030           72 :                 ctx,
    1031           72 :                 io_concurrency,
    1032           72 :             )
    1033           72 :             .await?;
    1034           72 :         let mut result = HashMap::new();
    1035           72 :         let mut sz = 0;
    1036          180 :         for (_, v) in kv {
    1037          108 :             let v = v?;
    1038          108 :             let v = aux_file::decode_file_value_bytes(&v)
    1039          108 :                 .context("value decode")
    1040          108 :                 .map_err(PageReconstructError::Other)?;
    1041          204 :             for (fname, content) in v {
    1042           96 :                 sz += fname.len();
    1043           96 :                 sz += content.len();
    1044           96 :                 result.insert(fname, content);
    1045           96 :             }
    1046              :         }
    1047           72 :         self.aux_file_size_estimator.on_initial(sz);
    1048           72 :         Ok(result)
    1049           72 :     }
    1050              : 
    1051            0 :     pub(crate) async fn trigger_aux_file_size_computation(
    1052            0 :         &self,
    1053            0 :         lsn: Lsn,
    1054            0 :         ctx: &RequestContext,
    1055            0 :         io_concurrency: IoConcurrency,
    1056            0 :     ) -> Result<(), PageReconstructError> {
    1057            0 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
    1058            0 :         Ok(())
    1059            0 :     }
    1060              : 
    1061           72 :     pub(crate) async fn list_aux_files(
    1062           72 :         &self,
    1063           72 :         lsn: Lsn,
    1064           72 :         ctx: &RequestContext,
    1065           72 :         io_concurrency: IoConcurrency,
    1066           72 :     ) -> Result<HashMap<String, Bytes>, PageReconstructError> {
    1067           72 :         self.list_aux_files_v2(lsn, ctx, io_concurrency).await
    1068           72 :     }
    1069              : 
    1070           24 :     pub(crate) async fn get_replorigins(
    1071           24 :         &self,
    1072           24 :         lsn: Lsn,
    1073           24 :         ctx: &RequestContext,
    1074           24 :         io_concurrency: IoConcurrency,
    1075           24 :     ) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
    1076           24 :         let kv = self
    1077           24 :             .scan(
    1078           24 :                 KeySpace::single(repl_origin_key_range()),
    1079           24 :                 lsn,
    1080           24 :                 ctx,
    1081           24 :                 io_concurrency,
    1082           24 :             )
    1083           24 :             .await?;
    1084           24 :         let mut result = HashMap::new();
    1085           72 :         for (k, v) in kv {
    1086           60 :             let v = v?;
    1087           60 :             if v.is_empty() {
    1088              :                 // This is a tombstone -- we can skip it.
    1089              :                 // Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
    1090              :                 // the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
    1091              :                 // we also need to consider that. Such tombstones might be written on the detach ancestor code path to
    1092              :                 // avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
    1093           24 :                 continue;
    1094           36 :             }
    1095           36 :             let origin_id = k.field6 as RepOriginId;
    1096           36 :             let origin_lsn = Lsn::des(&v)
    1097           36 :                 .with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
    1098           24 :             if origin_lsn != Lsn::INVALID {
    1099           24 :                 result.insert(origin_id, origin_lsn);
    1100           24 :             }
    1101              :         }
    1102           12 :         Ok(result)
    1103           24 :     }
    1104              : 
    1105              :     /// Does the same as get_current_logical_size but counted on demand.
    1106              :     /// Used to initialize the logical size tracking on startup.
    1107              :     ///
    1108              :     /// Only relation blocks are counted currently. That excludes metadata,
    1109              :     /// SLRUs, twophase files etc.
    1110              :     ///
    1111              :     /// # Cancel-Safety
    1112              :     ///
    1113              :     /// This method is cancellation-safe.
    1114           84 :     pub(crate) async fn get_current_logical_size_non_incremental(
    1115           84 :         &self,
    1116           84 :         lsn: Lsn,
    1117           84 :         ctx: &RequestContext,
    1118           84 :     ) -> Result<u64, CalculateLogicalSizeError> {
    1119           84 :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
    1120           84 : 
    1121           84 :         fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) });
    1122              : 
    1123              :         // Fetch list of database dirs and iterate them
    1124           84 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
    1125           84 :         let dbdir = DbDirectory::des(&buf)?;
    1126              : 
    1127           84 :         let mut total_size: u64 = 0;
    1128           84 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
    1129            0 :             for rel in self
    1130            0 :                 .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
    1131            0 :                 .await?
    1132              :             {
    1133            0 :                 if self.cancel.is_cancelled() {
    1134            0 :                     return Err(CalculateLogicalSizeError::Cancelled);
    1135            0 :                 }
    1136            0 :                 let relsize_key = rel_size_to_key(rel);
    1137            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1138            0 :                 let relsize = buf.get_u32_le();
    1139            0 : 
    1140            0 :                 total_size += relsize as u64;
    1141              :             }
    1142              :         }
    1143           84 :         Ok(total_size * BLCKSZ as u64)
    1144           84 :     }
    1145              : 
    1146              :     /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used
    1147              :     /// for gc-compaction.
    1148              :     ///
    1149              :     /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it
    1150              :     /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to
    1151              :     /// be kept only for a specific range of LSN.
    1152              :     ///
    1153              :     /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at
    1154              :     /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range
    1155              :     /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to
    1156              :     /// determine which keys to retain/drop for gc-compaction.
    1157              :     ///
    1158              :     /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace
    1159              :     /// to be retained for each of the branch LSN.
    1160              :     ///
    1161              :     /// The return value is (dense keyspace, sparse keyspace).
    1162          324 :     pub(crate) async fn collect_gc_compaction_keyspace(
    1163          324 :         &self,
    1164          324 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1165          324 :         let metadata_key_begin = Key::metadata_key_range().start;
    1166          324 :         let aux_v1_key = AUX_FILES_KEY;
    1167          324 :         let dense_keyspace = KeySpace {
    1168          324 :             ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin],
    1169          324 :         };
    1170          324 :         Ok((
    1171          324 :             dense_keyspace,
    1172          324 :             SparseKeySpace(KeySpace::single(Key::metadata_key_range())),
    1173          324 :         ))
    1174          324 :     }
    1175              : 
    1176              :     ///
    1177              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
    1178              :     /// Anything that's not listed maybe removed from the underlying storage (from
    1179              :     /// that LSN forwards).
    1180              :     ///
    1181              :     /// The return value is (dense keyspace, sparse keyspace).
    1182         2004 :     pub(crate) async fn collect_keyspace(
    1183         2004 :         &self,
    1184         2004 :         lsn: Lsn,
    1185         2004 :         ctx: &RequestContext,
    1186         2004 :     ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
    1187         2004 :         // Iterate through key ranges, greedily packing them into partitions
    1188         2004 :         let mut result = KeySpaceAccum::new();
    1189         2004 : 
    1190         2004 :         // The dbdir metadata always exists
    1191         2004 :         result.add_key(DBDIR_KEY);
    1192              : 
    1193              :         // Fetch list of database dirs and iterate them
    1194         2004 :         let dbdir = self.list_dbdirs(lsn, ctx).await?;
    1195         2004 :         let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
    1196         2004 : 
    1197         2004 :         dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
    1198         2004 :         for ((spcnode, dbnode), has_relmap_file) in dbs {
    1199            0 :             if has_relmap_file {
    1200            0 :                 result.add_key(relmap_file_key(spcnode, dbnode));
    1201            0 :             }
    1202            0 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
    1203              : 
    1204            0 :             let mut rels: Vec<RelTag> = self
    1205            0 :                 .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
    1206            0 :                 .await?
    1207            0 :                 .into_iter()
    1208            0 :                 .collect();
    1209            0 :             rels.sort_unstable();
    1210            0 :             for rel in rels {
    1211            0 :                 let relsize_key = rel_size_to_key(rel);
    1212            0 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
    1213            0 :                 let relsize = buf.get_u32_le();
    1214            0 : 
    1215            0 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
    1216            0 :                 result.add_key(relsize_key);
    1217              :             }
    1218              :         }
    1219              : 
    1220              :         // Iterate SLRUs next
    1221         2004 :         if self.tenant_shard_id.is_shard_zero() {
    1222         5904 :             for kind in [
    1223         1968 :                 SlruKind::Clog,
    1224         1968 :                 SlruKind::MultiXactMembers,
    1225         1968 :                 SlruKind::MultiXactOffsets,
    1226              :             ] {
    1227         5904 :                 let slrudir_key = slru_dir_to_key(kind);
    1228         5904 :                 result.add_key(slrudir_key);
    1229         5904 :                 let buf = self.get(slrudir_key, lsn, ctx).await?;
    1230         5904 :                 let dir = SlruSegmentDirectory::des(&buf)?;
    1231         5904 :                 let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
    1232         5904 :                 segments.sort_unstable();
    1233         5904 :                 for segno in segments {
    1234            0 :                     let segsize_key = slru_segment_size_to_key(kind, segno);
    1235            0 :                     let mut buf = self.get(segsize_key, lsn, ctx).await?;
    1236            0 :                     let segsize = buf.get_u32_le();
    1237            0 : 
    1238            0 :                     result.add_range(
    1239            0 :                         slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
    1240            0 :                     );
    1241            0 :                     result.add_key(segsize_key);
    1242              :                 }
    1243              :             }
    1244           36 :         }
    1245              : 
    1246              :         // Then pg_twophase
    1247         2004 :         result.add_key(TWOPHASEDIR_KEY);
    1248              : 
    1249         2004 :         let mut xids: Vec<u64> = self
    1250         2004 :             .list_twophase_files(lsn, ctx)
    1251         2004 :             .await?
    1252         2004 :             .iter()
    1253         2004 :             .cloned()
    1254         2004 :             .collect();
    1255         2004 :         xids.sort_unstable();
    1256         2004 :         for xid in xids {
    1257            0 :             result.add_key(twophase_file_key(xid));
    1258            0 :         }
    1259              : 
    1260         2004 :         result.add_key(CONTROLFILE_KEY);
    1261         2004 :         result.add_key(CHECKPOINT_KEY);
    1262         2004 : 
    1263         2004 :         // Add extra keyspaces in the test cases. Some test cases write keys into the storage without
    1264         2004 :         // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
    1265         2004 :         // and the keys will not be garbage-colllected.
    1266         2004 :         #[cfg(test)]
    1267         2004 :         {
    1268         2004 :             let guard = self.extra_test_dense_keyspace.load();
    1269         2004 :             for kr in &guard.ranges {
    1270            0 :                 result.add_range(kr.clone());
    1271            0 :             }
    1272            0 :         }
    1273            0 : 
    1274         2004 :         let dense_keyspace = result.to_keyspace();
    1275         2004 :         let sparse_keyspace = SparseKeySpace(KeySpace {
    1276         2004 :             ranges: vec![
    1277         2004 :                 Key::metadata_aux_key_range(),
    1278         2004 :                 repl_origin_key_range(),
    1279         2004 :                 Key::rel_dir_sparse_key_range(),
    1280         2004 :             ],
    1281         2004 :         });
    1282         2004 : 
    1283         2004 :         if cfg!(debug_assertions) {
    1284              :             // Verify if the sparse keyspaces are ordered and non-overlapping.
    1285              : 
    1286              :             // We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
    1287              :             // category of sparse keys are split into their own image/delta files. If there
    1288              :             // are overlapping keyspaces, they will be automatically merged by keyspace accum,
    1289              :             // and we want the developer to keep the keyspaces separated.
    1290              : 
    1291         2004 :             let ranges = &sparse_keyspace.0.ranges;
    1292              : 
    1293              :             // TODO: use a single overlaps_with across the codebase
    1294         6012 :             fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
    1295         6012 :                 !(a.end <= b.start || b.end <= a.start)
    1296         6012 :             }
    1297         6012 :             for i in 0..ranges.len() {
    1298         6012 :                 for j in 0..i {
    1299         6012 :                     if overlaps_with(&ranges[i], &ranges[j]) {
    1300            0 :                         panic!(
    1301            0 :                             "overlapping sparse keyspace: {}..{} and {}..{}",
    1302            0 :                             ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
    1303            0 :                         );
    1304         6012 :                     }
    1305              :                 }
    1306              :             }
    1307         4008 :             for i in 1..ranges.len() {
    1308         4008 :                 assert!(
    1309         4008 :                     ranges[i - 1].end <= ranges[i].start,
    1310            0 :                     "unordered sparse keyspace: {}..{} and {}..{}",
    1311            0 :                     ranges[i - 1].start,
    1312            0 :                     ranges[i - 1].end,
    1313            0 :                     ranges[i].start,
    1314            0 :                     ranges[i].end
    1315              :                 );
    1316              :             }
    1317            0 :         }
    1318              : 
    1319         2004 :         Ok((dense_keyspace, sparse_keyspace))
    1320         2004 :     }
    1321              : 
    1322              :     /// Get cached size of relation if it not updated after specified LSN
    1323      2691240 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
    1324      2691240 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
    1325      2691240 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
    1326      2691108 :             if lsn >= *cached_lsn {
    1327      2660232 :                 RELSIZE_CACHE_HITS.inc();
    1328      2660232 :                 return Some(*nblocks);
    1329        30876 :             }
    1330        30876 :             RELSIZE_CACHE_MISSES_OLD.inc();
    1331          132 :         }
    1332        31008 :         RELSIZE_CACHE_MISSES.inc();
    1333        31008 :         None
    1334      2691240 :     }
    1335              : 
    1336              :     /// Update cached relation size if there is no more recent update
    1337        30816 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1338        30816 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1339        30816 : 
    1340        30816 :         if lsn < rel_size_cache.complete_as_of {
    1341              :             // Do not cache old values. It's safe to cache the size on read, as long as
    1342              :             // the read was at an LSN since we started the WAL ingestion. Reasoning: we
    1343              :             // never evict values from the cache, so if the relation size changed after
    1344              :             // 'lsn', the new value is already in the cache.
    1345            0 :             return;
    1346        30816 :         }
    1347        30816 : 
    1348        30816 :         match rel_size_cache.map.entry(tag) {
    1349        30816 :             hash_map::Entry::Occupied(mut entry) => {
    1350        30816 :                 let cached_lsn = entry.get_mut();
    1351        30816 :                 if lsn >= cached_lsn.0 {
    1352            0 :                     *cached_lsn = (lsn, nblocks);
    1353        30816 :                 }
    1354              :             }
    1355            0 :             hash_map::Entry::Vacant(entry) => {
    1356            0 :                 entry.insert((lsn, nblocks));
    1357            0 :                 RELSIZE_CACHE_ENTRIES.inc();
    1358            0 :             }
    1359              :         }
    1360        30816 :     }
    1361              : 
    1362              :     /// Store cached relation size
    1363      1696320 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
    1364      1696320 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1365      1696320 :         if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
    1366        11520 :             RELSIZE_CACHE_ENTRIES.inc();
    1367      1684800 :         }
    1368      1696320 :     }
    1369              : 
    1370              :     /// Remove cached relation size
    1371           12 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
    1372           12 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
    1373           12 :         if rel_size_cache.map.remove(tag).is_some() {
    1374           12 :             RELSIZE_CACHE_ENTRIES.dec();
    1375           12 :         }
    1376           12 :     }
    1377              : }
    1378              : 
    1379              : /// DatadirModification represents an operation to ingest an atomic set of
    1380              : /// updates to the repository.
    1381              : ///
    1382              : /// It is created by the 'begin_record' function. It is called for each WAL
    1383              : /// record, so that all the modifications by a one WAL record appear atomic.
    1384              : pub struct DatadirModification<'a> {
    1385              :     /// The timeline this modification applies to. You can access this to
    1386              :     /// read the state, but note that any pending updates are *not* reflected
    1387              :     /// in the state in 'tline' yet.
    1388              :     pub tline: &'a Timeline,
    1389              : 
    1390              :     /// Current LSN of the modification
    1391              :     lsn: Lsn,
    1392              : 
    1393              :     // The modifications are not applied directly to the underlying key-value store.
    1394              :     // The put-functions add the modifications here, and they are flushed to the
    1395              :     // underlying key-value store by the 'finish' function.
    1396              :     pending_lsns: Vec<Lsn>,
    1397              :     pending_deletions: Vec<(Range<Key>, Lsn)>,
    1398              :     pending_nblocks: i64,
    1399              : 
    1400              :     /// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
    1401              :     /// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
    1402              :     pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,
    1403              : 
    1404              :     /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
    1405              :     /// which keys are stored here.
    1406              :     pending_data_batch: Option<SerializedValueBatch>,
    1407              : 
    1408              :     /// For special "directory" keys that store key-value maps, track the size of the map
    1409              :     /// if it was updated in this modification.
    1410              :     pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
    1411              : 
    1412              :     /// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
    1413              :     pending_metadata_bytes: usize,
    1414              : }
    1415              : 
    1416              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    1417              : pub enum MetricsUpdate {
    1418              :     /// Set the metrics to this value
    1419              :     Set(u64),
    1420              :     /// Increment the metrics by this value
    1421              :     Add(u64),
    1422              :     /// Decrement the metrics by this value
    1423              :     Sub(u64),
    1424              : }
    1425              : 
    1426              : impl DatadirModification<'_> {
    1427              :     // When a DatadirModification is committed, we do a monolithic serialization of all its contents.  WAL records can
    1428              :     // contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
    1429              :     // additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
    1430              :     pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
    1431              : 
    1432              :     /// Get the current lsn
    1433      2508348 :     pub(crate) fn get_lsn(&self) -> Lsn {
    1434      2508348 :         self.lsn
    1435      2508348 :     }
    1436              : 
    1437            0 :     pub(crate) fn approx_pending_bytes(&self) -> usize {
    1438            0 :         self.pending_data_batch
    1439            0 :             .as_ref()
    1440            0 :             .map_or(0, |b| b.buffer_size())
    1441            0 :             + self.pending_metadata_bytes
    1442            0 :     }
    1443              : 
    1444            0 :     pub(crate) fn has_dirty_data(&self) -> bool {
    1445            0 :         self.pending_data_batch
    1446            0 :             .as_ref()
    1447            0 :             .is_some_and(|b| b.has_data())
    1448            0 :     }
    1449              : 
    1450              :     /// Returns statistics about the currently pending modifications.
    1451            0 :     pub(crate) fn stats(&self) -> DatadirModificationStats {
    1452            0 :         let mut stats = DatadirModificationStats::default();
    1453            0 :         for (_, _, value) in self.pending_metadata_pages.values().flatten() {
    1454            0 :             match value {
    1455            0 :                 Value::Image(_) => stats.metadata_images += 1,
    1456            0 :                 Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
    1457            0 :                 Value::WalRecord(_) => stats.metadata_deltas += 1,
    1458              :             }
    1459              :         }
    1460            0 :         for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
    1461            0 :             match valuemeta {
    1462            0 :                 ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
    1463            0 :                 ValueMeta::Serialized(_) => stats.data_deltas += 1,
    1464            0 :                 ValueMeta::Observed(_) => {}
    1465              :             }
    1466              :         }
    1467            0 :         stats
    1468            0 :     }
    1469              : 
    1470              :     /// Set the current lsn
    1471       875148 :     pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
    1472       875148 :         ensure_walingest!(
    1473       875148 :             lsn >= self.lsn,
    1474       875148 :             "setting an older lsn {} than {} is not allowed",
    1475       875148 :             lsn,
    1476       875148 :             self.lsn
    1477       875148 :         );
    1478              : 
    1479       875148 :         if lsn > self.lsn {
    1480       875148 :             self.pending_lsns.push(self.lsn);
    1481       875148 :             self.lsn = lsn;
    1482       875148 :         }
    1483       875148 :         Ok(())
    1484       875148 :     }
    1485              : 
    1486              :     /// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
    1487              :     /// keys that represent literal blocks that postgres can read.  So data includes relation blocks and
    1488              :     /// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
    1489              :     ///
    1490              :     /// The distinction is important because data keys are handled on a fast path where dirty writes are
    1491              :     /// not readable until this modification is committed, whereas metadata keys are visible for read
    1492              :     /// via [`Self::get`] as soon as their record has been ingested.
    1493      5104284 :     fn is_data_key(key: &Key) -> bool {
    1494      5104284 :         key.is_rel_block_key() || key.is_slru_block_key()
    1495      5104284 :     }
    1496              : 
    1497              :     /// Initialize a completely new repository.
    1498              :     ///
    1499              :     /// This inserts the directory metadata entries that are assumed to
    1500              :     /// always exist.
    1501         1320 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
    1502         1320 :         let buf = DbDirectory::ser(&DbDirectory {
    1503         1320 :             dbdirs: HashMap::new(),
    1504         1320 :         })?;
    1505         1320 :         self.pending_directory_entries
    1506         1320 :             .push((DirectoryKind::Db, MetricsUpdate::Set(0)));
    1507         1320 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
    1508              : 
    1509         1320 :         let buf = if self.tline.pg_version >= 17 {
    1510         1164 :             TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
    1511         1164 :                 xids: HashSet::new(),
    1512         1164 :             })
    1513              :         } else {
    1514          156 :             TwoPhaseDirectory::ser(&TwoPhaseDirectory {
    1515          156 :                 xids: HashSet::new(),
    1516          156 :             })
    1517            0 :         }?;
    1518         1320 :         self.pending_directory_entries
    1519         1320 :             .push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
    1520         1320 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
    1521              : 
    1522         1320 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
    1523         1320 :         let empty_dir = Value::Image(buf);
    1524         1320 : 
    1525         1320 :         // Initialize SLRUs on shard 0 only: creating these on other shards would be
    1526         1320 :         // harmless but they'd just be dropped on later compaction.
    1527         1320 :         if self.tline.tenant_shard_id.is_shard_zero() {
    1528         1284 :             self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
    1529         1284 :             self.pending_directory_entries.push((
    1530         1284 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1531         1284 :                 MetricsUpdate::Set(0),
    1532         1284 :             ));
    1533         1284 :             self.put(
    1534         1284 :                 slru_dir_to_key(SlruKind::MultiXactMembers),
    1535         1284 :                 empty_dir.clone(),
    1536         1284 :             );
    1537         1284 :             self.pending_directory_entries.push((
    1538         1284 :                 DirectoryKind::SlruSegment(SlruKind::Clog),
    1539         1284 :                 MetricsUpdate::Set(0),
    1540         1284 :             ));
    1541         1284 :             self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
    1542         1284 :             self.pending_directory_entries.push((
    1543         1284 :                 DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
    1544         1284 :                 MetricsUpdate::Set(0),
    1545         1284 :             ));
    1546         1284 :         }
    1547              : 
    1548         1320 :         Ok(())
    1549         1320 :     }
    1550              : 
    1551              :     #[cfg(test)]
    1552         1308 :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
    1553         1308 :         self.init_empty()?;
    1554         1308 :         self.put_control_file(bytes::Bytes::from_static(
    1555         1308 :             b"control_file contents do not matter",
    1556         1308 :         ))
    1557         1308 :         .context("put_control_file")?;
    1558         1308 :         self.put_checkpoint(bytes::Bytes::from_static(
    1559         1308 :             b"checkpoint_file contents do not matter",
    1560         1308 :         ))
    1561         1308 :         .context("put_checkpoint_file")?;
    1562         1308 :         Ok(())
    1563         1308 :     }
    1564              : 
    1565              :     /// Creates a relation if it is not already present.
    1566              :     /// Returns the current size of the relation
    1567      2508336 :     pub(crate) async fn create_relation_if_required(
    1568      2508336 :         &mut self,
    1569      2508336 :         rel: RelTag,
    1570      2508336 :         ctx: &RequestContext,
    1571      2508336 :     ) -> Result<u32, WalIngestError> {
    1572              :         // Get current size and put rel creation if rel doesn't exist
    1573              :         //
    1574              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1575              :         //       check the cache too. This is because eagerly checking the cache results in
    1576              :         //       less work overall and 10% better performance. It's more work on cache miss
    1577              :         //       but cache miss is rare.
    1578      2508336 :         if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
    1579      2508276 :             Ok(nblocks)
    1580           60 :         } else if !self
    1581           60 :             .tline
    1582           60 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    1583           60 :             .await?
    1584              :         {
    1585              :             // create it with 0 size initially, the logic below will extend it
    1586           60 :             self.put_rel_creation(rel, 0, ctx).await?;
    1587           60 :             Ok(0)
    1588              :         } else {
    1589            0 :             Ok(self
    1590            0 :                 .tline
    1591            0 :                 .get_rel_size(rel, Version::Modified(self), ctx)
    1592            0 :                 .await?)
    1593              :         }
    1594      2508336 :     }
    1595              : 
    1596              :     /// Given a block number for a relation (which represents a newly written block),
    1597              :     /// the previous block count of the relation, and the shard info, find the gaps
    1598              :     /// that were created by the newly written block if any.
    1599       874020 :     fn find_gaps(
    1600       874020 :         rel: RelTag,
    1601       874020 :         blkno: u32,
    1602       874020 :         previous_nblocks: u32,
    1603       874020 :         shard: &ShardIdentity,
    1604       874020 :     ) -> Option<KeySpace> {
    1605       874020 :         let mut key = rel_block_to_key(rel, blkno);
    1606       874020 :         let mut gap_accum = None;
    1607              : 
    1608       874020 :         for gap_blkno in previous_nblocks..blkno {
    1609          192 :             key.field6 = gap_blkno;
    1610          192 : 
    1611          192 :             if shard.get_shard_number(&key) != shard.number {
    1612           48 :                 continue;
    1613          144 :             }
    1614          144 : 
    1615          144 :             gap_accum
    1616          144 :                 .get_or_insert_with(KeySpaceAccum::new)
    1617          144 :                 .add_key(key);
    1618              :         }
    1619              : 
    1620       874020 :         gap_accum.map(|accum| accum.to_keyspace())
    1621       874020 :     }
    1622              : 
    1623       875112 :     pub async fn ingest_batch(
    1624       875112 :         &mut self,
    1625       875112 :         mut batch: SerializedValueBatch,
    1626       875112 :         // TODO(vlad): remove this argument and replace the shard check with is_key_local
    1627       875112 :         shard: &ShardIdentity,
    1628       875112 :         ctx: &RequestContext,
    1629       875112 :     ) -> Result<(), WalIngestError> {
    1630       875112 :         let mut gaps_at_lsns = Vec::default();
    1631              : 
    1632       875112 :         for meta in batch.metadata.iter() {
    1633       873852 :             let key = Key::from_compact(meta.key());
    1634       873852 :             let (rel, blkno) = key
    1635       873852 :                 .to_rel_block()
    1636       873852 :                 .map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
    1637       873852 :             let new_nblocks = blkno + 1;
    1638              : 
    1639       873852 :             let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
    1640       873852 :             if new_nblocks > old_nblocks {
    1641        14340 :                 self.put_rel_extend(rel, new_nblocks, ctx).await?;
    1642       859512 :             }
    1643              : 
    1644       873852 :             if let Some(gaps) = Self::find_gaps(rel, blkno, old_nblocks, shard) {
    1645            0 :                 gaps_at_lsns.push((gaps, meta.lsn()));
    1646       873852 :             }
    1647              :         }
    1648              : 
    1649       875112 :         if !gaps_at_lsns.is_empty() {
    1650            0 :             batch.zero_gaps(gaps_at_lsns);
    1651       875112 :         }
    1652              : 
    1653       875112 :         match self.pending_data_batch.as_mut() {
    1654          120 :             Some(pending_batch) => {
    1655          120 :                 pending_batch.extend(batch);
    1656          120 :             }
    1657       874992 :             None if batch.has_data() => {
    1658       873780 :                 self.pending_data_batch = Some(batch);
    1659       873780 :             }
    1660         1212 :             None => {
    1661         1212 :                 // Nothing to initialize the batch with
    1662         1212 :             }
    1663              :         }
    1664              : 
    1665       875112 :         Ok(())
    1666       875112 :     }
    1667              : 
    1668              :     /// Put a new page version that can be constructed from a WAL record
    1669              :     ///
    1670              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
    1671              :     /// current end-of-file. It's up to the caller to check that the relation size
    1672              :     /// matches the blocks inserted!
    1673           72 :     pub fn put_rel_wal_record(
    1674           72 :         &mut self,
    1675           72 :         rel: RelTag,
    1676           72 :         blknum: BlockNumber,
    1677           72 :         rec: NeonWalRecord,
    1678           72 :     ) -> Result<(), WalIngestError> {
    1679           72 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    1680           72 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
    1681           72 :         Ok(())
    1682           72 :     }
    1683              : 
    1684              :     // Same, but for an SLRU.
    1685           48 :     pub fn put_slru_wal_record(
    1686           48 :         &mut self,
    1687           48 :         kind: SlruKind,
    1688           48 :         segno: u32,
    1689           48 :         blknum: BlockNumber,
    1690           48 :         rec: NeonWalRecord,
    1691           48 :     ) -> Result<(), WalIngestError> {
    1692           48 :         if !self.tline.tenant_shard_id.is_shard_zero() {
    1693            0 :             return Ok(());
    1694           48 :         }
    1695           48 : 
    1696           48 :         self.put(
    1697           48 :             slru_block_to_key(kind, segno, blknum),
    1698           48 :             Value::WalRecord(rec),
    1699           48 :         );
    1700           48 :         Ok(())
    1701           48 :     }
    1702              : 
    1703              :     /// Like put_wal_record, but with ready-made image of the page.
    1704      1667052 :     pub fn put_rel_page_image(
    1705      1667052 :         &mut self,
    1706      1667052 :         rel: RelTag,
    1707      1667052 :         blknum: BlockNumber,
    1708      1667052 :         img: Bytes,
    1709      1667052 :     ) -> Result<(), WalIngestError> {
    1710      1667052 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    1711      1667052 :         let key = rel_block_to_key(rel, blknum);
    1712      1667052 :         if !key.is_valid_key_on_write_path() {
    1713            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1714      1667052 :         }
    1715      1667052 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
    1716      1667052 :         Ok(())
    1717      1667052 :     }
    1718              : 
    1719           36 :     pub fn put_slru_page_image(
    1720           36 :         &mut self,
    1721           36 :         kind: SlruKind,
    1722           36 :         segno: u32,
    1723           36 :         blknum: BlockNumber,
    1724           36 :         img: Bytes,
    1725           36 :     ) -> Result<(), WalIngestError> {
    1726           36 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1727              : 
    1728           36 :         let key = slru_block_to_key(kind, segno, blknum);
    1729           36 :         if !key.is_valid_key_on_write_path() {
    1730            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1731           36 :         }
    1732           36 :         self.put(key, Value::Image(img));
    1733           36 :         Ok(())
    1734           36 :     }
    1735              : 
    1736        17988 :     pub(crate) fn put_rel_page_image_zero(
    1737        17988 :         &mut self,
    1738        17988 :         rel: RelTag,
    1739        17988 :         blknum: BlockNumber,
    1740        17988 :     ) -> Result<(), WalIngestError> {
    1741        17988 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    1742        17988 :         let key = rel_block_to_key(rel, blknum);
    1743        17988 :         if !key.is_valid_key_on_write_path() {
    1744            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1745        17988 :         }
    1746              : 
    1747        17988 :         let batch = self
    1748        17988 :             .pending_data_batch
    1749        17988 :             .get_or_insert_with(SerializedValueBatch::default);
    1750        17988 : 
    1751        17988 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1752        17988 : 
    1753        17988 :         Ok(())
    1754        17988 :     }
    1755              : 
    1756            0 :     pub(crate) fn put_slru_page_image_zero(
    1757            0 :         &mut self,
    1758            0 :         kind: SlruKind,
    1759            0 :         segno: u32,
    1760            0 :         blknum: BlockNumber,
    1761            0 :     ) -> Result<(), WalIngestError> {
    1762            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    1763            0 :         let key = slru_block_to_key(kind, segno, blknum);
    1764            0 :         if !key.is_valid_key_on_write_path() {
    1765            0 :             Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    1766            0 :         }
    1767              : 
    1768            0 :         let batch = self
    1769            0 :             .pending_data_batch
    1770            0 :             .get_or_insert_with(SerializedValueBatch::default);
    1771            0 : 
    1772            0 :         batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn);
    1773            0 : 
    1774            0 :         Ok(())
    1775            0 :     }
    1776              : 
    1777              :     /// Returns `true` if the rel_size_v2 write path is enabled. If it is the first time that
    1778              :     /// we enable it, we also need to persist it in `index_part.json`.
    1779        11676 :     pub fn maybe_enable_rel_size_v2(&mut self) -> anyhow::Result<bool> {
    1780        11676 :         let status = self.tline.get_rel_size_v2_status();
    1781        11676 :         let config = self.tline.get_rel_size_v2_enabled();
    1782        11676 :         match (config, status) {
    1783              :             (false, RelSizeMigration::Legacy) => {
    1784              :                 // tenant config didn't enable it and we didn't write any reldir_v2 key yet
    1785        11676 :                 Ok(false)
    1786              :             }
    1787              :             (false, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
    1788              :                 // index_part already persisted that the timeline has enabled rel_size_v2
    1789            0 :                 Ok(true)
    1790              :             }
    1791              :             (true, RelSizeMigration::Legacy) => {
    1792              :                 // The first time we enable it, we need to persist it in `index_part.json`
    1793            0 :                 self.tline
    1794            0 :                     .update_rel_size_v2_status(RelSizeMigration::Migrating)?;
    1795            0 :                 tracing::info!("enabled rel_size_v2");
    1796            0 :                 Ok(true)
    1797              :             }
    1798              :             (true, RelSizeMigration::Migrating | RelSizeMigration::Migrated) => {
    1799              :                 // index_part already persisted that the timeline has enabled rel_size_v2
    1800              :                 // and we don't need to do anything
    1801            0 :                 Ok(true)
    1802              :             }
    1803              :         }
    1804        11676 :     }
    1805              : 
    1806              :     /// Store a relmapper file (pg_filenode.map) in the repository
    1807           96 :     pub async fn put_relmap_file(
    1808           96 :         &mut self,
    1809           96 :         spcnode: Oid,
    1810           96 :         dbnode: Oid,
    1811           96 :         img: Bytes,
    1812           96 :         ctx: &RequestContext,
    1813           96 :     ) -> Result<(), WalIngestError> {
    1814           96 :         let v2_enabled = self
    1815           96 :             .maybe_enable_rel_size_v2()
    1816           96 :             .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
    1817              : 
    1818              :         // Add it to the directory (if it doesn't exist already)
    1819           96 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1820           96 :         let mut dbdir = DbDirectory::des(&buf)?;
    1821              : 
    1822           96 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
    1823           96 :         if r.is_none() || r == Some(false) {
    1824              :             // The dbdir entry didn't exist, or it contained a
    1825              :             // 'false'. The 'insert' call already updated it with
    1826              :             // 'true', now write the updated 'dbdirs' map back.
    1827           96 :             let buf = DbDirectory::ser(&dbdir)?;
    1828           96 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1829            0 :         }
    1830           96 :         if r.is_none() {
    1831              :             // Create RelDirectory
    1832              :             // TODO: if we have fully migrated to v2, no need to create this directory
    1833           48 :             let buf = RelDirectory::ser(&RelDirectory {
    1834           48 :                 rels: HashSet::new(),
    1835           48 :             })?;
    1836           48 :             self.pending_directory_entries
    1837           48 :                 .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
    1838           48 :             if v2_enabled {
    1839            0 :                 self.pending_directory_entries
    1840            0 :                     .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    1841           48 :             }
    1842           48 :             self.put(
    1843           48 :                 rel_dir_to_key(spcnode, dbnode),
    1844           48 :                 Value::Image(Bytes::from(buf)),
    1845           48 :             );
    1846           48 :         }
    1847              : 
    1848           96 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
    1849           96 :         Ok(())
    1850           96 :     }
    1851              : 
    1852            0 :     pub async fn put_twophase_file(
    1853            0 :         &mut self,
    1854            0 :         xid: u64,
    1855            0 :         img: Bytes,
    1856            0 :         ctx: &RequestContext,
    1857            0 :     ) -> Result<(), WalIngestError> {
    1858              :         // Add it to the directory entry
    1859            0 :         let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1860            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    1861            0 :             let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
    1862            0 :             if !dir.xids.insert(xid) {
    1863            0 :                 Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
    1864            0 :             }
    1865            0 :             self.pending_directory_entries.push((
    1866            0 :                 DirectoryKind::TwoPhase,
    1867            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    1868            0 :             ));
    1869            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    1870              :         } else {
    1871            0 :             let xid = xid as u32;
    1872            0 :             let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
    1873            0 :             if !dir.xids.insert(xid) {
    1874            0 :                 Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
    1875            0 :             }
    1876            0 :             self.pending_directory_entries.push((
    1877            0 :                 DirectoryKind::TwoPhase,
    1878            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    1879            0 :             ));
    1880            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    1881              :         };
    1882            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    1883            0 : 
    1884            0 :         self.put(twophase_file_key(xid), Value::Image(img));
    1885            0 :         Ok(())
    1886            0 :     }
    1887              : 
    1888           12 :     pub async fn set_replorigin(
    1889           12 :         &mut self,
    1890           12 :         origin_id: RepOriginId,
    1891           12 :         origin_lsn: Lsn,
    1892           12 :     ) -> Result<(), WalIngestError> {
    1893           12 :         let key = repl_origin_key(origin_id);
    1894           12 :         self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
    1895           12 :         Ok(())
    1896           12 :     }
    1897              : 
    1898            0 :     pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
    1899            0 :         self.set_replorigin(origin_id, Lsn::INVALID).await
    1900            0 :     }
    1901              : 
    1902         1320 :     pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
    1903         1320 :         self.put(CONTROLFILE_KEY, Value::Image(img));
    1904         1320 :         Ok(())
    1905         1320 :     }
    1906              : 
    1907         1404 :     pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
    1908         1404 :         self.put(CHECKPOINT_KEY, Value::Image(img));
    1909         1404 :         Ok(())
    1910         1404 :     }
    1911              : 
    1912            0 :     pub async fn drop_dbdir(
    1913            0 :         &mut self,
    1914            0 :         spcnode: Oid,
    1915            0 :         dbnode: Oid,
    1916            0 :         ctx: &RequestContext,
    1917            0 :     ) -> Result<(), WalIngestError> {
    1918            0 :         let total_blocks = self
    1919            0 :             .tline
    1920            0 :             .get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
    1921            0 :             .await?;
    1922              : 
    1923              :         // Remove entry from dbdir
    1924            0 :         let buf = self.get(DBDIR_KEY, ctx).await?;
    1925            0 :         let mut dir = DbDirectory::des(&buf)?;
    1926            0 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
    1927            0 :             let buf = DbDirectory::ser(&dir)?;
    1928            0 :             self.pending_directory_entries.push((
    1929            0 :                 DirectoryKind::Db,
    1930            0 :                 MetricsUpdate::Set(dir.dbdirs.len() as u64),
    1931            0 :             ));
    1932            0 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
    1933              :         } else {
    1934            0 :             warn!(
    1935            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
    1936              :                 spcnode, dbnode
    1937              :             );
    1938              :         }
    1939              : 
    1940              :         // Update logical database size.
    1941            0 :         self.pending_nblocks -= total_blocks as i64;
    1942            0 : 
    1943            0 :         // Delete all relations and metadata files for the spcnode/dnode
    1944            0 :         self.delete(dbdir_key_range(spcnode, dbnode));
    1945            0 :         Ok(())
    1946            0 :     }
    1947              : 
    1948              :     /// Create a relation fork.
    1949              :     ///
    1950              :     /// 'nblocks' is the initial size.
    1951        11520 :     pub async fn put_rel_creation(
    1952        11520 :         &mut self,
    1953        11520 :         rel: RelTag,
    1954        11520 :         nblocks: BlockNumber,
    1955        11520 :         ctx: &RequestContext,
    1956        11520 :     ) -> Result<(), WalIngestError> {
    1957        11520 :         if rel.relnode == 0 {
    1958            0 :             Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
    1959            0 :                 "invalid relnode"
    1960            0 :             )))?;
    1961        11520 :         }
    1962              :         // It's possible that this is the first rel for this db in this
    1963              :         // tablespace.  Create the reldir entry for it if so.
    1964        11520 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
    1965              : 
    1966        11520 :         let dbdir_exists =
    1967        11520 :             if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
    1968              :                 // Didn't exist. Update dbdir
    1969           48 :                 e.insert(false);
    1970           48 :                 let buf = DbDirectory::ser(&dbdir)?;
    1971           48 :                 self.pending_directory_entries.push((
    1972           48 :                     DirectoryKind::Db,
    1973           48 :                     MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
    1974           48 :                 ));
    1975           48 :                 self.put(DBDIR_KEY, Value::Image(buf.into()));
    1976           48 :                 false
    1977              :             } else {
    1978        11472 :                 true
    1979              :             };
    1980              : 
    1981        11520 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1982        11520 :         let mut rel_dir = if !dbdir_exists {
    1983              :             // Create the RelDirectory
    1984           48 :             RelDirectory::default()
    1985              :         } else {
    1986              :             // reldir already exists, fetch it
    1987        11472 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
    1988              :         };
    1989              : 
    1990        11520 :         let v2_enabled = self
    1991        11520 :             .maybe_enable_rel_size_v2()
    1992        11520 :             .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
    1993              : 
    1994        11520 :         if v2_enabled {
    1995            0 :             if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
    1996            0 :                 Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    1997            0 :             }
    1998            0 :             let sparse_rel_dir_key =
    1999            0 :                 rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
    2000              :             // check if the rel_dir_key exists in v2
    2001            0 :             let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
    2002            0 :             let val = RelDirExists::decode_option(val)
    2003            0 :                 .map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
    2004            0 :             if val == RelDirExists::Exists {
    2005            0 :                 Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    2006            0 :             }
    2007            0 :             self.put(
    2008            0 :                 sparse_rel_dir_key,
    2009            0 :                 Value::Image(RelDirExists::Exists.encode()),
    2010            0 :             );
    2011            0 :             if !dbdir_exists {
    2012            0 :                 self.pending_directory_entries
    2013            0 :                     .push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
    2014            0 :                 self.pending_directory_entries
    2015            0 :                     .push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
    2016            0 :                 // We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
    2017            0 :                 // TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
    2018            0 :                 // will be key not found errors if we don't create an empty one for rel_size_v2.
    2019            0 :                 self.put(
    2020            0 :                     rel_dir_key,
    2021            0 :                     Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
    2022              :                 );
    2023            0 :             }
    2024            0 :             self.pending_directory_entries
    2025            0 :                 .push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
    2026              :         } else {
    2027              :             // Add the new relation to the rel directory entry, and write it back
    2028        11520 :             if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
    2029            0 :                 Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
    2030        11520 :             }
    2031        11520 :             if !dbdir_exists {
    2032           48 :                 self.pending_directory_entries
    2033           48 :                     .push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
    2034        11472 :             }
    2035        11520 :             self.pending_directory_entries
    2036        11520 :                 .push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
    2037        11520 :             self.put(
    2038        11520 :                 rel_dir_key,
    2039        11520 :                 Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
    2040              :             );
    2041              :         }
    2042              : 
    2043              :         // Put size
    2044        11520 :         let size_key = rel_size_to_key(rel);
    2045        11520 :         let buf = nblocks.to_le_bytes();
    2046        11520 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2047        11520 : 
    2048        11520 :         self.pending_nblocks += nblocks as i64;
    2049        11520 : 
    2050        11520 :         // Update relation size cache
    2051        11520 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2052        11520 : 
    2053        11520 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
    2054        11520 :         // caller.
    2055        11520 :         Ok(())
    2056        11520 :     }
    2057              : 
    2058              :     /// Truncate relation
    2059        36072 :     pub async fn put_rel_truncation(
    2060        36072 :         &mut self,
    2061        36072 :         rel: RelTag,
    2062        36072 :         nblocks: BlockNumber,
    2063        36072 :         ctx: &RequestContext,
    2064        36072 :     ) -> Result<(), WalIngestError> {
    2065        36072 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2066        36072 :         if self
    2067        36072 :             .tline
    2068        36072 :             .get_rel_exists(rel, Version::Modified(self), ctx)
    2069        36072 :             .await?
    2070              :         {
    2071        36072 :             let size_key = rel_size_to_key(rel);
    2072              :             // Fetch the old size first
    2073        36072 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2074        36072 : 
    2075        36072 :             // Update the entry with the new size.
    2076        36072 :             let buf = nblocks.to_le_bytes();
    2077        36072 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2078        36072 : 
    2079        36072 :             // Update relation size cache
    2080        36072 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2081        36072 : 
    2082        36072 :             // Update logical database size.
    2083        36072 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
    2084            0 :         }
    2085        36072 :         Ok(())
    2086        36072 :     }
    2087              : 
    2088              :     /// Extend relation
    2089              :     /// If new size is smaller, do nothing.
    2090      1660080 :     pub async fn put_rel_extend(
    2091      1660080 :         &mut self,
    2092      1660080 :         rel: RelTag,
    2093      1660080 :         nblocks: BlockNumber,
    2094      1660080 :         ctx: &RequestContext,
    2095      1660080 :     ) -> Result<(), WalIngestError> {
    2096      1660080 :         ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
    2097              : 
    2098              :         // Put size
    2099      1660080 :         let size_key = rel_size_to_key(rel);
    2100      1660080 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2101      1660080 : 
    2102      1660080 :         // only extend relation here. never decrease the size
    2103      1660080 :         if nblocks > old_size {
    2104      1648728 :             let buf = nblocks.to_le_bytes();
    2105      1648728 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2106      1648728 : 
    2107      1648728 :             // Update relation size cache
    2108      1648728 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
    2109      1648728 : 
    2110      1648728 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
    2111      1648728 :         }
    2112      1660080 :         Ok(())
    2113      1660080 :     }
    2114              : 
    2115              :     /// Drop some relations
    2116           60 :     pub(crate) async fn put_rel_drops(
    2117           60 :         &mut self,
    2118           60 :         drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
    2119           60 :         ctx: &RequestContext,
    2120           60 :     ) -> Result<(), WalIngestError> {
    2121           60 :         let v2_enabled = self
    2122           60 :             .maybe_enable_rel_size_v2()
    2123           60 :             .map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
    2124           72 :         for ((spc_node, db_node), rel_tags) in drop_relations {
    2125           12 :             let dir_key = rel_dir_to_key(spc_node, db_node);
    2126           12 :             let buf = self.get(dir_key, ctx).await?;
    2127           12 :             let mut dir = RelDirectory::des(&buf)?;
    2128              : 
    2129           12 :             let mut dirty = false;
    2130           24 :             for rel_tag in rel_tags {
    2131           12 :                 let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
    2132           12 :                     self.pending_directory_entries
    2133           12 :                         .push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
    2134           12 :                     dirty = true;
    2135           12 :                     true
    2136            0 :                 } else if v2_enabled {
    2137              :                     // The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
    2138              :                     // Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
    2139              :                     // logic).
    2140            0 :                     let key =
    2141            0 :                         rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
    2142            0 :                     let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
    2143            0 :                         .map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
    2144            0 :                     if val == RelDirExists::Exists {
    2145            0 :                         self.pending_directory_entries
    2146            0 :                             .push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
    2147            0 :                         // put tombstone
    2148            0 :                         self.put(key, Value::Image(RelDirExists::Removed.encode()));
    2149            0 :                         // no need to set dirty to true
    2150            0 :                         true
    2151              :                     } else {
    2152            0 :                         false
    2153              :                     }
    2154              :                 } else {
    2155            0 :                     false
    2156              :                 };
    2157              : 
    2158           12 :                 if found {
    2159              :                     // update logical size
    2160           12 :                     let size_key = rel_size_to_key(rel_tag);
    2161           12 :                     let old_size = self.get(size_key, ctx).await?.get_u32_le();
    2162           12 :                     self.pending_nblocks -= old_size as i64;
    2163           12 : 
    2164           12 :                     // Remove entry from relation size cache
    2165           12 :                     self.tline.remove_cached_rel_size(&rel_tag);
    2166           12 : 
    2167           12 :                     // Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
    2168           12 :                     self.delete(rel_key_range(rel_tag));
    2169            0 :                 }
    2170              :             }
    2171              : 
    2172           12 :             if dirty {
    2173           12 :                 self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    2174            0 :             }
    2175              :         }
    2176              : 
    2177           60 :         Ok(())
    2178           60 :     }
    2179              : 
    2180           36 :     pub async fn put_slru_segment_creation(
    2181           36 :         &mut self,
    2182           36 :         kind: SlruKind,
    2183           36 :         segno: u32,
    2184           36 :         nblocks: BlockNumber,
    2185           36 :         ctx: &RequestContext,
    2186           36 :     ) -> Result<(), WalIngestError> {
    2187           36 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2188              : 
    2189              :         // Add it to the directory entry
    2190           36 :         let dir_key = slru_dir_to_key(kind);
    2191           36 :         let buf = self.get(dir_key, ctx).await?;
    2192           36 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2193              : 
    2194           36 :         if !dir.segments.insert(segno) {
    2195            0 :             Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
    2196           36 :         }
    2197           36 :         self.pending_directory_entries.push((
    2198           36 :             DirectoryKind::SlruSegment(kind),
    2199           36 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2200           36 :         ));
    2201           36 :         self.put(
    2202           36 :             dir_key,
    2203           36 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2204              :         );
    2205              : 
    2206              :         // Put size
    2207           36 :         let size_key = slru_segment_size_to_key(kind, segno);
    2208           36 :         let buf = nblocks.to_le_bytes();
    2209           36 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2210           36 : 
    2211           36 :         // even if nblocks > 0, we don't insert any actual blocks here
    2212           36 : 
    2213           36 :         Ok(())
    2214           36 :     }
    2215              : 
    2216              :     /// Extend SLRU segment
    2217            0 :     pub fn put_slru_extend(
    2218            0 :         &mut self,
    2219            0 :         kind: SlruKind,
    2220            0 :         segno: u32,
    2221            0 :         nblocks: BlockNumber,
    2222            0 :     ) -> Result<(), WalIngestError> {
    2223            0 :         assert!(self.tline.tenant_shard_id.is_shard_zero());
    2224              : 
    2225              :         // Put size
    2226            0 :         let size_key = slru_segment_size_to_key(kind, segno);
    2227            0 :         let buf = nblocks.to_le_bytes();
    2228            0 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    2229            0 :         Ok(())
    2230            0 :     }
    2231              : 
    2232              :     /// This method is used for marking truncated SLRU files
    2233            0 :     pub async fn drop_slru_segment(
    2234            0 :         &mut self,
    2235            0 :         kind: SlruKind,
    2236            0 :         segno: u32,
    2237            0 :         ctx: &RequestContext,
    2238            0 :     ) -> Result<(), WalIngestError> {
    2239            0 :         // Remove it from the directory entry
    2240            0 :         let dir_key = slru_dir_to_key(kind);
    2241            0 :         let buf = self.get(dir_key, ctx).await?;
    2242            0 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    2243              : 
    2244            0 :         if !dir.segments.remove(&segno) {
    2245            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    2246            0 :         }
    2247            0 :         self.pending_directory_entries.push((
    2248            0 :             DirectoryKind::SlruSegment(kind),
    2249            0 :             MetricsUpdate::Set(dir.segments.len() as u64),
    2250            0 :         ));
    2251            0 :         self.put(
    2252            0 :             dir_key,
    2253            0 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    2254              :         );
    2255              : 
    2256              :         // Delete size entry, as well as all blocks
    2257            0 :         self.delete(slru_segment_key_range(kind, segno));
    2258            0 : 
    2259            0 :         Ok(())
    2260            0 :     }
    2261              : 
    2262              :     /// Drop a relmapper file (pg_filenode.map)
    2263            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
    2264            0 :         // TODO
    2265            0 :         Ok(())
    2266            0 :     }
    2267              : 
    2268              :     /// This method is used for marking truncated SLRU files
    2269            0 :     pub async fn drop_twophase_file(
    2270            0 :         &mut self,
    2271            0 :         xid: u64,
    2272            0 :         ctx: &RequestContext,
    2273            0 :     ) -> Result<(), WalIngestError> {
    2274              :         // Remove it from the directory entry
    2275            0 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    2276            0 :         let newdirbuf = if self.tline.pg_version >= 17 {
    2277            0 :             let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
    2278              : 
    2279            0 :             if !dir.xids.remove(&xid) {
    2280            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2281            0 :             }
    2282            0 :             self.pending_directory_entries.push((
    2283            0 :                 DirectoryKind::TwoPhase,
    2284            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2285            0 :             ));
    2286            0 :             Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
    2287              :         } else {
    2288            0 :             let xid: u32 = u32::try_from(xid)
    2289            0 :                 .map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
    2290            0 :             let mut dir = TwoPhaseDirectory::des(&buf)?;
    2291              : 
    2292            0 :             if !dir.xids.remove(&xid) {
    2293            0 :                 warn!("twophase file for xid {} does not exist", xid);
    2294            0 :             }
    2295            0 :             self.pending_directory_entries.push((
    2296            0 :                 DirectoryKind::TwoPhase,
    2297            0 :                 MetricsUpdate::Set(dir.xids.len() as u64),
    2298            0 :             ));
    2299            0 :             Bytes::from(TwoPhaseDirectory::ser(&dir)?)
    2300              :         };
    2301            0 :         self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
    2302            0 : 
    2303            0 :         // Delete it
    2304            0 :         self.delete(twophase_key_range(xid));
    2305            0 : 
    2306            0 :         Ok(())
    2307            0 :     }
    2308              : 
    2309           96 :     pub async fn put_file(
    2310           96 :         &mut self,
    2311           96 :         path: &str,
    2312           96 :         content: &[u8],
    2313           96 :         ctx: &RequestContext,
    2314           96 :     ) -> Result<(), WalIngestError> {
    2315           96 :         let key = aux_file::encode_aux_file_key(path);
    2316              :         // retrieve the key from the engine
    2317           96 :         let old_val = match self.get(key, ctx).await {
    2318           24 :             Ok(val) => Some(val),
    2319           72 :             Err(PageReconstructError::MissingKey(_)) => None,
    2320            0 :             Err(e) => return Err(e.into()),
    2321              :         };
    2322           96 :         let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
    2323           24 :             aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
    2324              :         } else {
    2325           72 :             Vec::new()
    2326              :         };
    2327           96 :         let mut other_files = Vec::with_capacity(files.len());
    2328           96 :         let mut modifying_file = None;
    2329          120 :         for file @ (p, content) in files {
    2330           24 :             if path == p {
    2331           24 :                 assert!(
    2332           24 :                     modifying_file.is_none(),
    2333            0 :                     "duplicated entries found for {}",
    2334              :                     path
    2335              :                 );
    2336           24 :                 modifying_file = Some(content);
    2337            0 :             } else {
    2338            0 :                 other_files.push(file);
    2339            0 :             }
    2340              :         }
    2341           96 :         let mut new_files = other_files;
    2342           96 :         match (modifying_file, content.is_empty()) {
    2343           12 :             (Some(old_content), false) => {
    2344           12 :                 self.tline
    2345           12 :                     .aux_file_size_estimator
    2346           12 :                     .on_update(old_content.len(), content.len());
    2347           12 :                 new_files.push((path, content));
    2348           12 :             }
    2349           12 :             (Some(old_content), true) => {
    2350           12 :                 self.tline
    2351           12 :                     .aux_file_size_estimator
    2352           12 :                     .on_remove(old_content.len());
    2353           12 :                 // not adding the file key to the final `new_files` vec.
    2354           12 :             }
    2355           72 :             (None, false) => {
    2356           72 :                 self.tline.aux_file_size_estimator.on_add(content.len());
    2357           72 :                 new_files.push((path, content));
    2358           72 :             }
    2359              :             // Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
    2360              :             // Compute doesn't know if previous version of this file exists or not, so
    2361              :             // attempt to delete non-existing file can cause this message.
    2362              :             // To avoid false alarms, log it as info rather than warning.
    2363            0 :             (None, true) if path.starts_with("pg_stat/") => {
    2364            0 :                 info!("removing non-existing pg_stat file: {}", path)
    2365              :             }
    2366            0 :             (None, true) => warn!("removing non-existing aux file: {}", path),
    2367              :         }
    2368           96 :         let new_val = aux_file::encode_file_value(&new_files)
    2369           96 :             .map_err(WalIngestErrorKind::EncodeAuxFileError)?;
    2370           96 :         self.put(key, Value::Image(new_val.into()));
    2371           96 : 
    2372           96 :         Ok(())
    2373           96 :     }
    2374              : 
    2375              :     ///
    2376              :     /// Flush changes accumulated so far to the underlying repository.
    2377              :     ///
    2378              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    2379              :     /// you to flush them to the underlying repository before the final `commit`.
    2380              :     /// That allows to free up the memory used to hold the pending changes.
    2381              :     ///
    2382              :     /// Currently only used during bulk import of a data directory. In that
    2383              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    2384              :     /// whole import fails and the timeline will be deleted anyway.
    2385              :     /// (Or to be precise, it will be left behind for debugging purposes and
    2386              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    2387              :     ///
    2388              :     /// Note: A consequence of flushing the pending operations is that they
    2389              :     /// won't be visible to subsequent operations until `commit`. The function
    2390              :     /// retains all the metadata, but data pages are flushed. That's again OK
    2391              :     /// for bulk import, where you are just loading data pages and won't try to
    2392              :     /// modify the same pages twice.
    2393        11580 :     pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2394        11580 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    2395        11580 :         // to scan through the pending_updates list.
    2396        11580 :         let pending_nblocks = self.pending_nblocks;
    2397        11580 :         if pending_nblocks < 10000 {
    2398        11580 :             return Ok(());
    2399            0 :         }
    2400              : 
    2401            0 :         let mut writer = self.tline.writer().await;
    2402              : 
    2403              :         // Flush relation and  SLRU data blocks, keep metadata.
    2404            0 :         if let Some(batch) = self.pending_data_batch.take() {
    2405            0 :             tracing::debug!(
    2406            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2407            0 :                 batch.max_lsn,
    2408            0 :                 self.tline.get_last_record_lsn()
    2409              :             );
    2410              : 
    2411              :             // This bails out on first error without modifying pending_updates.
    2412              :             // That's Ok, cf this function's doc comment.
    2413            0 :             writer.put_batch(batch, ctx).await?;
    2414            0 :         }
    2415              : 
    2416            0 :         if pending_nblocks != 0 {
    2417            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2418            0 :             self.pending_nblocks = 0;
    2419            0 :         }
    2420              : 
    2421            0 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2422            0 :             writer.update_directory_entries_count(kind, count);
    2423            0 :         }
    2424              : 
    2425            0 :         Ok(())
    2426        11580 :     }
    2427              : 
    2428              :     ///
    2429              :     /// Finish this atomic update, writing all the updated keys to the
    2430              :     /// underlying timeline.
    2431              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    2432              :     ///
    2433      4458660 :     pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
    2434      4458660 :         let mut writer = self.tline.writer().await;
    2435              : 
    2436      4458660 :         let pending_nblocks = self.pending_nblocks;
    2437      4458660 :         self.pending_nblocks = 0;
    2438              : 
    2439              :         // Ordering: the items in this batch do not need to be in any global order, but values for
    2440              :         // a particular Key must be in Lsn order relative to one another.  InMemoryLayer relies on
    2441              :         // this to do efficient updates to its index.  See [`wal_decoder::serialized_batch`] for
    2442              :         // more details.
    2443              : 
    2444      4458660 :         let metadata_batch = {
    2445      4458660 :             let pending_meta = self
    2446      4458660 :                 .pending_metadata_pages
    2447      4458660 :                 .drain()
    2448      4458660 :                 .flat_map(|(key, values)| {
    2449      1644648 :                     values
    2450      1644648 :                         .into_iter()
    2451      1644648 :                         .map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
    2452      4458660 :                 })
    2453      4458660 :                 .collect::<Vec<_>>();
    2454      4458660 : 
    2455      4458660 :             if pending_meta.is_empty() {
    2456      2833668 :                 None
    2457              :             } else {
    2458      1624992 :                 Some(SerializedValueBatch::from_values(pending_meta))
    2459              :             }
    2460              :         };
    2461              : 
    2462      4458660 :         let data_batch = self.pending_data_batch.take();
    2463              : 
    2464      4458660 :         let maybe_batch = match (data_batch, metadata_batch) {
    2465      1587336 :             (Some(mut data), Some(metadata)) => {
    2466      1587336 :                 data.extend(metadata);
    2467      1587336 :                 Some(data)
    2468              :             }
    2469       859572 :             (Some(data), None) => Some(data),
    2470        37656 :             (None, Some(metadata)) => Some(metadata),
    2471      1974096 :             (None, None) => None,
    2472              :         };
    2473              : 
    2474      4458660 :         if let Some(batch) = maybe_batch {
    2475      2484564 :             tracing::debug!(
    2476            0 :                 "Flushing batch with max_lsn={}. Last record LSN is {}",
    2477            0 :                 batch.max_lsn,
    2478            0 :                 self.tline.get_last_record_lsn()
    2479              :             );
    2480              : 
    2481              :             // This bails out on first error without modifying pending_updates.
    2482              :             // That's Ok, cf this function's doc comment.
    2483      2484564 :             writer.put_batch(batch, ctx).await?;
    2484      1974096 :         }
    2485              : 
    2486      4458660 :         if !self.pending_deletions.is_empty() {
    2487           12 :             writer.delete_batch(&self.pending_deletions, ctx).await?;
    2488           12 :             self.pending_deletions.clear();
    2489      4458648 :         }
    2490              : 
    2491      4458660 :         self.pending_lsns.push(self.lsn);
    2492      5333808 :         for pending_lsn in self.pending_lsns.drain(..) {
    2493      5333808 :             // TODO(vlad): pretty sure the comment below is not valid anymore
    2494      5333808 :             // and we can call finish write with the latest LSN
    2495      5333808 :             //
    2496      5333808 :             // Ideally, we should be able to call writer.finish_write() only once
    2497      5333808 :             // with the highest LSN. However, the last_record_lsn variable in the
    2498      5333808 :             // timeline keeps track of the latest LSN and the immediate previous LSN
    2499      5333808 :             // so we need to record every LSN to not leave a gap between them.
    2500      5333808 :             writer.finish_write(pending_lsn);
    2501      5333808 :         }
    2502              : 
    2503      4458660 :         if pending_nblocks != 0 {
    2504      1623420 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    2505      2835240 :         }
    2506              : 
    2507      4458660 :         for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
    2508        18204 :             writer.update_directory_entries_count(kind, count);
    2509        18204 :         }
    2510              : 
    2511      4458660 :         self.pending_metadata_bytes = 0;
    2512      4458660 : 
    2513      4458660 :         Ok(())
    2514      4458660 :     }
    2515              : 
    2516      1750224 :     pub(crate) fn len(&self) -> usize {
    2517      1750224 :         self.pending_metadata_pages.len()
    2518      1750224 :             + self.pending_data_batch.as_ref().map_or(0, |b| b.len())
    2519      1750224 :             + self.pending_deletions.len()
    2520      1750224 :     }
    2521              : 
    2522              :     /// Read a page from the Timeline we are writing to.  For metadata pages, this passes through
    2523              :     /// a cache in Self, which makes writes earlier in this modification visible to WAL records later
    2524              :     /// in the modification.
    2525              :     ///
    2526              :     /// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
    2527              :     /// page must ensure that the pages they read are already committed in Timeline, for example
    2528              :     /// DB create operations are always preceded by a call to commit().  This is special cased because
    2529              :     /// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
    2530              :     /// and not data pages.
    2531      1719516 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    2532      1719516 :         if !Self::is_data_key(&key) {
    2533              :             // Have we already updated the same key? Read the latest pending updated
    2534              :             // version in that case.
    2535              :             //
    2536              :             // Note: we don't check pending_deletions. It is an error to request a
    2537              :             // value that has been removed, deletion only avoids leaking storage.
    2538      1719516 :             if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
    2539        95568 :                 if let Some((_, _, value)) = values.last() {
    2540        95568 :                     return if let Value::Image(img) = value {
    2541        95568 :                         Ok(img.clone())
    2542              :                     } else {
    2543              :                         // Currently, we never need to read back a WAL record that we
    2544              :                         // inserted in the same "transaction". All the metadata updates
    2545              :                         // work directly with Images, and we never need to read actual
    2546              :                         // data pages. We could handle this if we had to, by calling
    2547              :                         // the walredo manager, but let's keep it simple for now.
    2548            0 :                         Err(PageReconstructError::Other(anyhow::anyhow!(
    2549            0 :                             "unexpected pending WAL record"
    2550            0 :                         )))
    2551              :                     };
    2552            0 :                 }
    2553      1623948 :             }
    2554              :         } else {
    2555              :             // This is an expensive check, so we only do it in debug mode. If reading a data key,
    2556              :             // this key should never be present in pending_data_pages. We ensure this by committing
    2557              :             // modifications before ingesting DB create operations, which are the only kind that reads
    2558              :             // data pages during ingest.
    2559            0 :             if cfg!(debug_assertions) {
    2560            0 :                 assert!(
    2561            0 :                     !self
    2562            0 :                         .pending_data_batch
    2563            0 :                         .as_ref()
    2564            0 :                         .is_some_and(|b| b.updates_key(&key))
    2565            0 :                 );
    2566            0 :             }
    2567              :         }
    2568              : 
    2569              :         // Metadata page cache miss, or we're reading a data page.
    2570      1623948 :         let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    2571      1623948 :         self.tline.get(key, lsn, ctx).await
    2572      1719516 :     }
    2573              : 
    2574              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    2575              :     /// and the empty value into None.
    2576            0 :     async fn sparse_get(
    2577            0 :         &self,
    2578            0 :         key: Key,
    2579            0 :         ctx: &RequestContext,
    2580            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    2581            0 :         let val = self.get(key, ctx).await;
    2582            0 :         match val {
    2583            0 :             Ok(val) if val.is_empty() => Ok(None),
    2584            0 :             Ok(val) => Ok(Some(val)),
    2585            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    2586            0 :             Err(e) => Err(e),
    2587              :         }
    2588            0 :     }
    2589              : 
    2590              :     #[cfg(test)]
    2591           24 :     pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
    2592           24 :         self.put(key, val);
    2593           24 :     }
    2594              : 
    2595      3384768 :     fn put(&mut self, key: Key, val: Value) {
    2596      3384768 :         if Self::is_data_key(&key) {
    2597      1667208 :             self.put_data(key.to_compact(), val)
    2598              :         } else {
    2599      1717560 :             self.put_metadata(key.to_compact(), val)
    2600              :         }
    2601      3384768 :     }
    2602              : 
    2603      1667208 :     fn put_data(&mut self, key: CompactKey, val: Value) {
    2604      1667208 :         let batch = self
    2605      1667208 :             .pending_data_batch
    2606      1667208 :             .get_or_insert_with(SerializedValueBatch::default);
    2607      1667208 :         batch.put(key, val, self.lsn);
    2608      1667208 :     }
    2609              : 
    2610      1717560 :     fn put_metadata(&mut self, key: CompactKey, val: Value) {
    2611      1717560 :         let values = self.pending_metadata_pages.entry(key).or_default();
    2612              :         // Replace the previous value if it exists at the same lsn
    2613      1717560 :         if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
    2614        72912 :             if *last_lsn == self.lsn {
    2615              :                 // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place
    2616        72912 :                 self.pending_metadata_bytes -= *last_value_ser_size;
    2617        72912 :                 *last_value_ser_size = val.serialized_size().unwrap() as usize;
    2618        72912 :                 self.pending_metadata_bytes += *last_value_ser_size;
    2619        72912 : 
    2620        72912 :                 // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
    2621        72912 :                 // have been generated by synthesized zero page writes prior to the first real write to a page.
    2622        72912 :                 *last_value = val;
    2623        72912 :                 return;
    2624            0 :             }
    2625      1644648 :         }
    2626              : 
    2627      1644648 :         let val_serialized_size = val.serialized_size().unwrap() as usize;
    2628      1644648 :         self.pending_metadata_bytes += val_serialized_size;
    2629      1644648 :         values.push((self.lsn, val_serialized_size, val));
    2630      1644648 : 
    2631      1644648 :         if key == CHECKPOINT_KEY.to_compact() {
    2632         1404 :             tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
    2633      1643244 :         }
    2634      1717560 :     }
    2635              : 
    2636           12 :     fn delete(&mut self, key_range: Range<Key>) {
    2637           12 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    2638           12 :         self.pending_deletions.push((key_range, self.lsn));
    2639           12 :     }
    2640              : }
    2641              : 
    2642              : /// Statistics for a DatadirModification.
    2643              : #[derive(Default)]
    2644              : pub struct DatadirModificationStats {
    2645              :     pub metadata_images: u64,
    2646              :     pub metadata_deltas: u64,
    2647              :     pub data_images: u64,
    2648              :     pub data_deltas: u64,
    2649              : }
    2650              : 
    2651              : /// This struct facilitates accessing either a committed key from the timeline at a
    2652              : /// specific LSN, or the latest uncommitted key from a pending modification.
    2653              : ///
    2654              : /// During WAL ingestion, the records from multiple LSNs may be batched in the same
    2655              : /// modification before being flushed to the timeline. Hence, the routines in WalIngest
    2656              : /// need to look up the keys in the modification first before looking them up in the
    2657              : /// timeline to not miss the latest updates.
    2658              : #[derive(Clone, Copy)]
    2659              : pub enum Version<'a> {
    2660              :     Lsn(Lsn),
    2661              :     Modified(&'a DatadirModification<'a>),
    2662              : }
    2663              : 
    2664              : impl Version<'_> {
    2665        31056 :     async fn get(
    2666        31056 :         &self,
    2667        31056 :         timeline: &Timeline,
    2668        31056 :         key: Key,
    2669        31056 :         ctx: &RequestContext,
    2670        31056 :     ) -> Result<Bytes, PageReconstructError> {
    2671        31056 :         match self {
    2672        30936 :             Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
    2673          120 :             Version::Modified(modification) => modification.get(key, ctx).await,
    2674              :         }
    2675        31056 :     }
    2676              : 
    2677              :     /// Get a key from the sparse keyspace. Automatically converts the missing key error
    2678              :     /// and the empty value into None.
    2679            0 :     async fn sparse_get(
    2680            0 :         &self,
    2681            0 :         timeline: &Timeline,
    2682            0 :         key: Key,
    2683            0 :         ctx: &RequestContext,
    2684            0 :     ) -> Result<Option<Bytes>, PageReconstructError> {
    2685            0 :         let val = self.get(timeline, key, ctx).await;
    2686            0 :         match val {
    2687            0 :             Ok(val) if val.is_empty() => Ok(None),
    2688            0 :             Ok(val) => Ok(Some(val)),
    2689            0 :             Err(PageReconstructError::MissingKey(_)) => Ok(None),
    2690            0 :             Err(e) => Err(e),
    2691              :         }
    2692            0 :     }
    2693              : 
    2694       213720 :     fn get_lsn(&self) -> Lsn {
    2695       213720 :         match self {
    2696       177444 :             Version::Lsn(lsn) => *lsn,
    2697        36276 :             Version::Modified(modification) => modification.lsn,
    2698              :         }
    2699       213720 :     }
    2700              : }
    2701              : 
    2702              : //--- Metadata structs stored in key-value pairs in the repository.
    2703              : 
    2704            0 : #[derive(Debug, Serialize, Deserialize)]
    2705              : pub(crate) struct DbDirectory {
    2706              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    2707              :     pub(crate) dbdirs: HashMap<(Oid, Oid), bool>,
    2708              : }
    2709              : 
    2710              : // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
    2711              : // pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs.  Previously, the files
    2712              : // were named like "pg_twophase/000002E5", now they're like
    2713              : // "pg_twophsae/0000000A000002E4".
    2714              : 
    2715            0 : #[derive(Debug, Serialize, Deserialize)]
    2716              : pub(crate) struct TwoPhaseDirectory {
    2717              :     pub(crate) xids: HashSet<TransactionId>,
    2718              : }
    2719              : 
    2720            0 : #[derive(Debug, Serialize, Deserialize)]
    2721              : struct TwoPhaseDirectoryV17 {
    2722              :     xids: HashSet<u64>,
    2723              : }
    2724              : 
    2725            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    2726              : pub(crate) struct RelDirectory {
    2727              :     // Set of relations that exist. (relfilenode, forknum)
    2728              :     //
    2729              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    2730              :     // key-value pairs, if you have a lot of relations
    2731              :     pub(crate) rels: HashSet<(Oid, u8)>,
    2732              : }
    2733              : 
    2734            0 : #[derive(Debug, Serialize, Deserialize)]
    2735              : struct RelSizeEntry {
    2736              :     nblocks: u32,
    2737              : }
    2738              : 
    2739            0 : #[derive(Debug, Serialize, Deserialize, Default)]
    2740              : pub(crate) struct SlruSegmentDirectory {
    2741              :     // Set of SLRU segments that exist.
    2742              :     pub(crate) segments: HashSet<u32>,
    2743              : }
    2744              : 
    2745              : #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
    2746              : #[repr(u8)]
    2747              : pub(crate) enum DirectoryKind {
    2748              :     Db,
    2749              :     TwoPhase,
    2750              :     Rel,
    2751              :     AuxFiles,
    2752              :     SlruSegment(SlruKind),
    2753              :     RelV2,
    2754              : }
    2755              : 
    2756              : impl DirectoryKind {
    2757              :     pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
    2758        54624 :     pub(crate) fn offset(&self) -> usize {
    2759        54624 :         self.into_usize()
    2760        54624 :     }
    2761              : }
    2762              : 
    2763              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    2764              : 
    2765              : #[allow(clippy::bool_assert_comparison)]
    2766              : #[cfg(test)]
    2767              : mod tests {
    2768              :     use hex_literal::hex;
    2769              :     use pageserver_api::models::ShardParameters;
    2770              :     use pageserver_api::shard::ShardStripeSize;
    2771              :     use utils::id::TimelineId;
    2772              :     use utils::shard::{ShardCount, ShardNumber};
    2773              : 
    2774              :     use super::*;
    2775              :     use crate::DEFAULT_PG_VERSION;
    2776              :     use crate::tenant::harness::TenantHarness;
    2777              : 
    2778              :     /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
    2779              :     #[tokio::test]
    2780           12 :     async fn aux_files_round_trip() -> anyhow::Result<()> {
    2781           12 :         let name = "aux_files_round_trip";
    2782           12 :         let harness = TenantHarness::create(name).await?;
    2783           12 : 
    2784           12 :         pub const TIMELINE_ID: TimelineId =
    2785           12 :             TimelineId::from_array(hex!("11223344556677881122334455667788"));
    2786           12 : 
    2787           12 :         let (tenant, ctx) = harness.load().await;
    2788           12 :         let (tline, ctx) = tenant
    2789           12 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    2790           12 :             .await?;
    2791           12 :         let tline = tline.raw_timeline().unwrap();
    2792           12 : 
    2793           12 :         // First modification: insert two keys
    2794           12 :         let mut modification = tline.begin_modification(Lsn(0x1000));
    2795           12 :         modification.put_file("foo/bar1", b"content1", &ctx).await?;
    2796           12 :         modification.set_lsn(Lsn(0x1008))?;
    2797           12 :         modification.put_file("foo/bar2", b"content2", &ctx).await?;
    2798           12 :         modification.commit(&ctx).await?;
    2799           12 :         let expect_1008 = HashMap::from([
    2800           12 :             ("foo/bar1".to_string(), Bytes::from_static(b"content1")),
    2801           12 :             ("foo/bar2".to_string(), Bytes::from_static(b"content2")),
    2802           12 :         ]);
    2803           12 : 
    2804           12 :         let io_concurrency = IoConcurrency::spawn_for_test();
    2805           12 : 
    2806           12 :         let readback = tline
    2807           12 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    2808           12 :             .await?;
    2809           12 :         assert_eq!(readback, expect_1008);
    2810           12 : 
    2811           12 :         // Second modification: update one key, remove the other
    2812           12 :         let mut modification = tline.begin_modification(Lsn(0x2000));
    2813           12 :         modification.put_file("foo/bar1", b"content3", &ctx).await?;
    2814           12 :         modification.set_lsn(Lsn(0x2008))?;
    2815           12 :         modification.put_file("foo/bar2", b"", &ctx).await?;
    2816           12 :         modification.commit(&ctx).await?;
    2817           12 :         let expect_2008 =
    2818           12 :             HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
    2819           12 : 
    2820           12 :         let readback = tline
    2821           12 :             .list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
    2822           12 :             .await?;
    2823           12 :         assert_eq!(readback, expect_2008);
    2824           12 : 
    2825           12 :         // Reading back in time works
    2826           12 :         let readback = tline
    2827           12 :             .list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
    2828           12 :             .await?;
    2829           12 :         assert_eq!(readback, expect_1008);
    2830           12 : 
    2831           12 :         Ok(())
    2832           12 :     }
    2833              : 
    2834              :     #[test]
    2835           12 :     fn gap_finding() {
    2836           12 :         let rel = RelTag {
    2837           12 :             spcnode: 1663,
    2838           12 :             dbnode: 208101,
    2839           12 :             relnode: 2620,
    2840           12 :             forknum: 0,
    2841           12 :         };
    2842           12 :         let base_blkno = 1;
    2843           12 : 
    2844           12 :         let base_key = rel_block_to_key(rel, base_blkno);
    2845           12 :         let before_base_key = rel_block_to_key(rel, base_blkno - 1);
    2846           12 : 
    2847           12 :         let shard = ShardIdentity::unsharded();
    2848           12 : 
    2849           12 :         let mut previous_nblocks = 0;
    2850          132 :         for i in 0..10 {
    2851          120 :             let crnt_blkno = base_blkno + i;
    2852          120 :             let gaps = DatadirModification::find_gaps(rel, crnt_blkno, previous_nblocks, &shard);
    2853          120 : 
    2854          120 :             previous_nblocks = crnt_blkno + 1;
    2855          120 : 
    2856          120 :             if i == 0 {
    2857              :                 // The first block we write is 1, so we should find the gap.
    2858           12 :                 assert_eq!(gaps.unwrap(), KeySpace::single(before_base_key..base_key));
    2859              :             } else {
    2860          108 :                 assert!(gaps.is_none());
    2861              :             }
    2862              :         }
    2863              : 
    2864              :         // This is an update to an already existing block. No gaps here.
    2865           12 :         let update_blkno = 5;
    2866           12 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    2867           12 :         assert!(gaps.is_none());
    2868              : 
    2869              :         // This is an update past the current end block.
    2870           12 :         let after_gap_blkno = 20;
    2871           12 :         let gaps = DatadirModification::find_gaps(rel, after_gap_blkno, previous_nblocks, &shard);
    2872           12 : 
    2873           12 :         let gap_start_key = rel_block_to_key(rel, previous_nblocks);
    2874           12 :         let after_gap_key = rel_block_to_key(rel, after_gap_blkno);
    2875           12 :         assert_eq!(
    2876           12 :             gaps.unwrap(),
    2877           12 :             KeySpace::single(gap_start_key..after_gap_key)
    2878           12 :         );
    2879           12 :     }
    2880              : 
    2881              :     #[test]
    2882           12 :     fn sharded_gap_finding() {
    2883           12 :         let rel = RelTag {
    2884           12 :             spcnode: 1663,
    2885           12 :             dbnode: 208101,
    2886           12 :             relnode: 2620,
    2887           12 :             forknum: 0,
    2888           12 :         };
    2889           12 : 
    2890           12 :         let first_blkno = 6;
    2891           12 : 
    2892           12 :         // This shard will get the even blocks
    2893           12 :         let shard = ShardIdentity::from_params(
    2894           12 :             ShardNumber(0),
    2895           12 :             &ShardParameters {
    2896           12 :                 count: ShardCount(2),
    2897           12 :                 stripe_size: ShardStripeSize(1),
    2898           12 :             },
    2899           12 :         );
    2900           12 : 
    2901           12 :         // Only keys belonging to this shard are considered as gaps.
    2902           12 :         let mut previous_nblocks = 0;
    2903           12 :         let gaps =
    2904           12 :             DatadirModification::find_gaps(rel, first_blkno, previous_nblocks, &shard).unwrap();
    2905           12 :         assert!(!gaps.ranges.is_empty());
    2906           36 :         for gap_range in gaps.ranges {
    2907           24 :             let mut k = gap_range.start;
    2908           48 :             while k != gap_range.end {
    2909           24 :                 assert_eq!(shard.get_shard_number(&k), shard.number);
    2910           24 :                 k = k.next();
    2911              :             }
    2912              :         }
    2913              : 
    2914           12 :         previous_nblocks = first_blkno;
    2915           12 : 
    2916           12 :         let update_blkno = 2;
    2917           12 :         let gaps = DatadirModification::find_gaps(rel, update_blkno, previous_nblocks, &shard);
    2918           12 :         assert!(gaps.is_none());
    2919           12 :     }
    2920              : 
    2921              :     /*
    2922              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    2923              :             let incremental = timeline.get_current_logical_size();
    2924              :             let non_incremental = timeline
    2925              :                 .get_current_logical_size_non_incremental(lsn)
    2926              :                 .unwrap();
    2927              :             assert_eq!(incremental, non_incremental);
    2928              :         }
    2929              :     */
    2930              : 
    2931              :     /*
    2932              :     ///
    2933              :     /// Test list_rels() function, with branches and dropped relations
    2934              :     ///
    2935              :     #[test]
    2936              :     fn test_list_rels_drop() -> Result<()> {
    2937              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    2938              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    2939              :         const TESTDB: u32 = 111;
    2940              : 
    2941              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    2942              :         // after branching fails below
    2943              :         let mut writer = tline.begin_record(Lsn(0x10));
    2944              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    2945              :         writer.finish()?;
    2946              : 
    2947              :         // Create a relation on the timeline
    2948              :         let mut writer = tline.begin_record(Lsn(0x20));
    2949              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    2950              :         writer.finish()?;
    2951              : 
    2952              :         let writer = tline.begin_record(Lsn(0x00));
    2953              :         writer.finish()?;
    2954              : 
    2955              :         // Check that list_rels() lists it after LSN 2, but no before it
    2956              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    2957              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    2958              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    2959              : 
    2960              :         // Create a branch, check that the relation is visible there
    2961              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    2962              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    2963              :             Some(timeline) => timeline,
    2964              :             None => panic!("Should have a local timeline"),
    2965              :         };
    2966              :         let newtline = DatadirTimelineImpl::new(newtline);
    2967              :         assert!(newtline
    2968              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2969              :             .contains(&TESTREL_A));
    2970              : 
    2971              :         // Drop it on the branch
    2972              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    2973              :         new_writer.drop_relation(TESTREL_A)?;
    2974              :         new_writer.finish()?;
    2975              : 
    2976              :         // Check that it's no longer listed on the branch after the point where it was dropped
    2977              :         assert!(newtline
    2978              :             .list_rels(0, TESTDB, Lsn(0x30))?
    2979              :             .contains(&TESTREL_A));
    2980              :         assert!(!newtline
    2981              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2982              :             .contains(&TESTREL_A));
    2983              : 
    2984              :         // Run checkpoint and garbage collection and check that it's still not visible
    2985              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    2986              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    2987              : 
    2988              :         assert!(!newtline
    2989              :             .list_rels(0, TESTDB, Lsn(0x40))?
    2990              :             .contains(&TESTREL_A));
    2991              : 
    2992              :         Ok(())
    2993              :     }
    2994              :      */
    2995              : 
    2996              :     /*
    2997              :     #[test]
    2998              :     fn test_read_beyond_eof() -> Result<()> {
    2999              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    3000              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    3001              : 
    3002              :         make_some_layers(&tline, Lsn(0x20))?;
    3003              :         let mut writer = tline.begin_record(Lsn(0x60));
    3004              :         walingest.put_rel_page_image(
    3005              :             &mut writer,
    3006              :             TESTREL_A,
    3007              :             0,
    3008              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    3009              :         )?;
    3010              :         writer.finish()?;
    3011              : 
    3012              :         // Test read before rel creation. Should error out.
    3013              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    3014              : 
    3015              :         // Read block beyond end of relation at different points in time.
    3016              :         // These reads should fall into different delta, image, and in-memory layers.
    3017              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    3018              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    3019              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    3020              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    3021              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    3022              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    3023              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    3024              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    3025              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    3026              : 
    3027              :         // Test on an in-memory layer with no preceding layer
    3028              :         let mut writer = tline.begin_record(Lsn(0x70));
    3029              :         walingest.put_rel_page_image(
    3030              :             &mut writer,
    3031              :             TESTREL_B,
    3032              :             0,
    3033              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    3034              :         )?;
    3035              :         writer.finish()?;
    3036              : 
    3037              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    3038              : 
    3039              :         Ok(())
    3040              :     }
    3041              :      */
    3042              : }
        

Generated by: LCOV version 2.1-beta