LCOV - code coverage report
Current view: top level - pageserver/src - pgdatadir_mapping.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 92.0 % 1034 951
Test Date: 2023-09-06 10:18:01 Functions: 64.1 % 181 116

            Line data    Source code
       1              : //!
       2              : //! This provides an abstraction to store PostgreSQL relations and other files
       3              : //! in the key-value store that implements the Repository interface.
       4              : //!
       5              : //! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
       6              : //! walingest.rs handles a few things like implicit relation creation and extension.
       7              : //! Clarify that)
       8              : //!
       9              : use super::tenant::{PageReconstructError, Timeline};
      10              : use crate::context::RequestContext;
      11              : use crate::keyspace::{KeySpace, KeySpaceAccum};
      12              : use crate::repository::*;
      13              : use crate::walrecord::NeonWalRecord;
      14              : use anyhow::Context;
      15              : use bytes::{Buf, Bytes};
      16              : use pageserver_api::reltag::{RelTag, SlruKind};
      17              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
      18              : use postgres_ffi::BLCKSZ;
      19              : use postgres_ffi::{Oid, TimestampTz, TransactionId};
      20              : use serde::{Deserialize, Serialize};
      21              : use std::collections::{hash_map, HashMap, HashSet};
      22              : use std::ops::Range;
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::{debug, trace, warn};
      25              : use utils::{bin_ser::BeSer, lsn::Lsn};
      26              : 
      27              : /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
      28              : pub type BlockNumber = u32;
      29              : 
      30            0 : #[derive(Debug)]
      31              : pub enum LsnForTimestamp {
      32              :     Present(Lsn),
      33              :     Future(Lsn),
      34              :     Past(Lsn),
      35              :     NoData(Lsn),
      36              : }
      37              : 
      38            3 : #[derive(Debug, thiserror::Error)]
      39              : pub enum CalculateLogicalSizeError {
      40              :     #[error("cancelled")]
      41              :     Cancelled,
      42              :     #[error(transparent)]
      43              :     Other(#[from] anyhow::Error),
      44              : }
      45              : 
      46            0 : #[derive(Debug, thiserror::Error)]
      47              : pub enum RelationError {
      48              :     #[error("Relation Already Exists")]
      49              :     AlreadyExists,
      50              :     #[error("invalid relnode")]
      51              :     InvalidRelnode,
      52              :     #[error(transparent)]
      53              :     Other(#[from] anyhow::Error),
      54              : }
      55              : 
      56              : ///
      57              : /// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
      58              : /// and other special kinds of files, in a versioned key-value store. The
      59              : /// Timeline struct provides the key-value store.
      60              : ///
      61              : /// This is a separate impl, so that we can easily include all these functions in a Timeline
      62              : /// implementation, and might be moved into a separate struct later.
      63              : impl Timeline {
      64              :     /// Start ingesting a WAL record, or other atomic modification of
      65              :     /// the timeline.
      66              :     ///
      67              :     /// This provides a transaction-like interface to perform a bunch
      68              :     /// of modifications atomically.
      69              :     ///
      70              :     /// To ingest a WAL record, call begin_modification(lsn) to get a
      71              :     /// DatadirModification object. Use the functions in the object to
      72              :     /// modify the repository state, updating all the pages and metadata
      73              :     /// that the WAL record affects. When you're done, call commit() to
      74              :     /// commit the changes.
      75              :     ///
      76              :     /// Lsn stored in modification is advanced by `ingest_record` and
      77              :     /// is used by `commit()` to update `last_record_lsn`.
      78              :     ///
      79              :     /// Calling commit() will flush all the changes and reset the state,
      80              :     /// so the `DatadirModification` struct can be reused to perform the next modification.
      81              :     ///
      82              :     /// Note that any pending modifications you make through the
      83              :     /// modification object won't be visible to calls to the 'get' and list
      84              :     /// functions of the timeline until you finish! And if you update the
      85              :     /// same page twice, the last update wins.
      86              :     ///
      87       867789 :     pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
      88       867789 :     where
      89       867789 :         Self: Sized,
      90       867789 :     {
      91       867789 :         DatadirModification {
      92       867789 :             tline: self,
      93       867789 :             pending_updates: HashMap::new(),
      94       867789 :             pending_deletions: Vec::new(),
      95       867789 :             pending_nblocks: 0,
      96       867789 :             lsn,
      97       867789 :         }
      98       867789 :     }
      99              : 
     100              :     //------------------------------------------------------------------------------
     101              :     // Public GET functions
     102              :     //------------------------------------------------------------------------------
     103              : 
     104              :     /// Look up given page version.
     105      4582770 :     pub async fn get_rel_page_at_lsn(
     106      4582770 :         &self,
     107      4582770 :         tag: RelTag,
     108      4582770 :         blknum: BlockNumber,
     109      4582770 :         lsn: Lsn,
     110      4582770 :         latest: bool,
     111      4582770 :         ctx: &RequestContext,
     112      4582770 :     ) -> Result<Bytes, PageReconstructError> {
     113      4582770 :         if tag.relnode == 0 {
     114            0 :             return Err(PageReconstructError::Other(
     115            0 :                 RelationError::InvalidRelnode.into(),
     116            0 :             ));
     117      4582770 :         }
     118              : 
     119      4582770 :         let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
     120      4582770 :         if blknum >= nblocks {
     121            0 :             debug!(
     122            0 :                 "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
     123            0 :                 tag, blknum, lsn, nblocks
     124            0 :             );
     125       159409 :             return Ok(ZERO_PAGE.clone());
     126      4423361 :         }
     127      4423361 : 
     128      4423361 :         let key = rel_block_to_key(tag, blknum);
     129      8429505 :         self.get(key, lsn, ctx).await
     130      4582739 :     }
     131              : 
     132              :     // Get size of a database in blocks
     133            7 :     pub async fn get_db_size(
     134            7 :         &self,
     135            7 :         spcnode: Oid,
     136            7 :         dbnode: Oid,
     137            7 :         lsn: Lsn,
     138            7 :         latest: bool,
     139            7 :         ctx: &RequestContext,
     140            7 :     ) -> Result<usize, PageReconstructError> {
     141            7 :         let mut total_blocks = 0;
     142              : 
     143            7 :         let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?;
     144              : 
     145         2058 :         for rel in rels {
     146         2051 :             let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?;
     147         2051 :             total_blocks += n_blocks as usize;
     148              :         }
     149            7 :         Ok(total_blocks)
     150            7 :     }
     151              : 
     152              :     /// Get size of a relation file
     153     80326639 :     pub async fn get_rel_size(
     154     80326639 :         &self,
     155     80326639 :         tag: RelTag,
     156     80326639 :         lsn: Lsn,
     157     80326639 :         latest: bool,
     158     80326639 :         ctx: &RequestContext,
     159     80326760 :     ) -> Result<BlockNumber, PageReconstructError> {
     160     80326760 :         if tag.relnode == 0 {
     161            0 :             return Err(PageReconstructError::Other(
     162            0 :                 RelationError::InvalidRelnode.into(),
     163            0 :             ));
     164     80326760 :         }
     165              : 
     166     80326760 :         if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
     167     80035393 :             return Ok(nblocks);
     168       291367 :         }
     169       291367 : 
     170       291367 :         if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
     171         7343 :             && !self.get_rel_exists(tag, lsn, latest, ctx).await?
     172              :         {
     173              :             // FIXME: Postgres sometimes calls smgrcreate() to create
     174              :             // FSM, and smgrnblocks() on it immediately afterwards,
     175              :             // without extending it.  Tolerate that by claiming that
     176              :             // any non-existent FSM fork has size 0.
     177            0 :             return Ok(0);
     178       291367 :         }
     179       291367 : 
     180       291367 :         let key = rel_size_to_key(tag);
     181       291367 :         let mut buf = self.get(key, lsn, ctx).await?;
     182       291365 :         let nblocks = buf.get_u32_le();
     183       291365 : 
     184       291365 :         if latest {
     185       168447 :             // Update relation size cache only if "latest" flag is set.
     186       168447 :             // This flag is set by compute when it is working with most recent version of relation.
     187       168447 :             // Typically master compute node always set latest=true.
     188       168447 :             // Please notice, that even if compute node "by mistake" specifies old LSN but set
     189       168447 :             // latest=true, then it can not cause cache corruption, because with latest=true
     190       168447 :             // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
     191       168447 :             // associated with most recent value of LSN.
     192       168447 :             self.update_cached_rel_size(tag, lsn, nblocks);
     193       168447 :         }
     194       291365 :         Ok(nblocks)
     195     80326760 :     }
     196              : 
     197              :     /// Does relation exist?
     198     75836851 :     pub async fn get_rel_exists(
     199     75836851 :         &self,
     200     75836851 :         tag: RelTag,
     201     75836851 :         lsn: Lsn,
     202     75836851 :         _latest: bool,
     203     75836851 :         ctx: &RequestContext,
     204     75836972 :     ) -> Result<bool, PageReconstructError> {
     205     75836972 :         if tag.relnode == 0 {
     206            0 :             return Err(PageReconstructError::Other(
     207            0 :                 RelationError::InvalidRelnode.into(),
     208            0 :             ));
     209     75836972 :         }
     210              : 
     211              :         // first try to lookup relation in cache
     212     75836972 :         if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
     213     75732261 :             return Ok(true);
     214       104711 :         }
     215       104711 :         // fetch directory listing
     216       104711 :         let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
     217       104711 :         let buf = self.get(key, lsn, ctx).await?;
     218              : 
     219       104711 :         match RelDirectory::des(&buf).context("deserialization failure") {
     220       104711 :             Ok(dir) => {
     221       104711 :                 let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
     222       104711 :                 Ok(exists)
     223              :             }
     224            0 :             Err(e) => Err(PageReconstructError::from(e)),
     225              :         }
     226     75836972 :     }
     227              : 
     228              :     /// Get a list of all existing relations in given tablespace and database.
     229         6669 :     pub async fn list_rels(
     230         6669 :         &self,
     231         6669 :         spcnode: Oid,
     232         6669 :         dbnode: Oid,
     233         6669 :         lsn: Lsn,
     234         6669 :         ctx: &RequestContext,
     235         6670 :     ) -> Result<HashSet<RelTag>, PageReconstructError> {
     236         6670 :         // fetch directory listing
     237         6670 :         let key = rel_dir_to_key(spcnode, dbnode);
     238         6670 :         let buf = self.get(key, lsn, ctx).await?;
     239              : 
     240         6670 :         match RelDirectory::des(&buf).context("deserialization failure") {
     241         6670 :             Ok(dir) => {
     242         6670 :                 let rels: HashSet<RelTag> =
     243      1567419 :                     HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
     244      1567419 :                         spcnode,
     245      1567419 :                         dbnode,
     246      1567419 :                         relnode: *relnode,
     247      1567419 :                         forknum: *forknum,
     248      1567419 :                     }));
     249         6670 : 
     250         6670 :                 Ok(rels)
     251              :             }
     252            0 :             Err(e) => Err(PageReconstructError::from(e)),
     253              :         }
     254         6670 :     }
     255              : 
     256              :     /// Look up given SLRU page version.
     257         6189 :     pub async fn get_slru_page_at_lsn(
     258         6189 :         &self,
     259         6189 :         kind: SlruKind,
     260         6189 :         segno: u32,
     261         6189 :         blknum: BlockNumber,
     262         6189 :         lsn: Lsn,
     263         6189 :         ctx: &RequestContext,
     264         6189 :     ) -> Result<Bytes, PageReconstructError> {
     265         6189 :         let key = slru_block_to_key(kind, segno, blknum);
     266       223121 :         self.get(key, lsn, ctx).await
     267         6189 :     }
     268              : 
     269              :     /// Get size of an SLRU segment
     270         6036 :     pub async fn get_slru_segment_size(
     271         6036 :         &self,
     272         6036 :         kind: SlruKind,
     273         6036 :         segno: u32,
     274         6036 :         lsn: Lsn,
     275         6036 :         ctx: &RequestContext,
     276         6036 :     ) -> Result<BlockNumber, PageReconstructError> {
     277         6036 :         let key = slru_segment_size_to_key(kind, segno);
     278         6036 :         let mut buf = self.get(key, lsn, ctx).await?;
     279         6036 :         Ok(buf.get_u32_le())
     280         6036 :     }
     281              : 
     282              :     /// Get size of an SLRU segment
     283          674 :     pub async fn get_slru_segment_exists(
     284          674 :         &self,
     285          674 :         kind: SlruKind,
     286          674 :         segno: u32,
     287          674 :         lsn: Lsn,
     288          674 :         ctx: &RequestContext,
     289          674 :     ) -> Result<bool, PageReconstructError> {
     290          674 :         // fetch directory listing
     291          674 :         let key = slru_dir_to_key(kind);
     292          674 :         let buf = self.get(key, lsn, ctx).await?;
     293              : 
     294          674 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     295          674 :             Ok(dir) => {
     296          674 :                 let exists = dir.segments.get(&segno).is_some();
     297          674 :                 Ok(exists)
     298              :             }
     299            0 :             Err(e) => Err(PageReconstructError::from(e)),
     300              :         }
     301          674 :     }
     302              : 
     303              :     /// Locate LSN, such that all transactions that committed before
     304              :     /// 'search_timestamp' are visible, but nothing newer is.
     305              :     ///
     306              :     /// This is not exact. Commit timestamps are not guaranteed to be ordered,
     307              :     /// so it's not well defined which LSN you get if there were multiple commits
     308              :     /// "in flight" at that point in time.
     309              :     ///
     310          203 :     pub async fn find_lsn_for_timestamp(
     311          203 :         &self,
     312          203 :         search_timestamp: TimestampTz,
     313          203 :         ctx: &RequestContext,
     314          203 :     ) -> Result<LsnForTimestamp, PageReconstructError> {
     315          203 :         let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
     316          203 :         let min_lsn = *gc_cutoff_lsn_guard;
     317          203 :         let max_lsn = self.get_last_record_lsn();
     318          203 : 
     319          203 :         // LSNs are always 8-byte aligned. low/mid/high represent the
     320          203 :         // LSN divided by 8.
     321          203 :         let mut low = min_lsn.0 / 8;
     322          203 :         let mut high = max_lsn.0 / 8 + 1;
     323          203 : 
     324          203 :         let mut found_smaller = false;
     325          203 :         let mut found_larger = false;
     326         3582 :         while low < high {
     327              :             // cannot overflow, high and low are both smaller than u64::MAX / 2
     328         3379 :             let mid = (high + low) / 2;
     329              : 
     330         3379 :             let cmp = self
     331         3379 :                 .is_latest_commit_timestamp_ge_than(
     332         3379 :                     search_timestamp,
     333         3379 :                     Lsn(mid * 8),
     334         3379 :                     &mut found_smaller,
     335         3379 :                     &mut found_larger,
     336         3379 :                     ctx,
     337         3379 :                 )
     338       213673 :                 .await?;
     339              : 
     340         3379 :             if cmp {
     341          904 :                 high = mid;
     342         2475 :             } else {
     343         2475 :                 low = mid + 1;
     344         2475 :             }
     345              :         }
     346          203 :         match (found_smaller, found_larger) {
     347              :             (false, false) => {
     348              :                 // This can happen if no commit records have been processed yet, e.g.
     349              :                 // just after importing a cluster.
     350           25 :                 Ok(LsnForTimestamp::NoData(max_lsn))
     351              :             }
     352              :             (true, false) => {
     353              :                 // Didn't find any commit timestamps larger than the request
     354           75 :                 Ok(LsnForTimestamp::Future(max_lsn))
     355              :             }
     356              :             (false, true) => {
     357              :                 // Didn't find any commit timestamps smaller than the request
     358            9 :                 Ok(LsnForTimestamp::Past(max_lsn))
     359              :             }
     360              :             (true, true) => {
     361              :                 // low is the LSN of the first commit record *after* the search_timestamp,
     362              :                 // Back off by one to get to the point just before the commit.
     363              :                 //
     364              :                 // FIXME: it would be better to get the LSN of the previous commit.
     365              :                 // Otherwise, if you restore to the returned LSN, the database will
     366              :                 // include physical changes from later commits that will be marked
     367              :                 // as aborted, and will need to be vacuumed away.
     368           94 :                 Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
     369              :             }
     370              :         }
     371          203 :     }
     372              : 
     373              :     ///
     374              :     /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
     375              :     /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
     376              :     ///
     377              :     /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
     378              :     /// with a smaller/larger timestamp.
     379              :     ///
     380         3379 :     pub async fn is_latest_commit_timestamp_ge_than(
     381         3379 :         &self,
     382         3379 :         search_timestamp: TimestampTz,
     383         3379 :         probe_lsn: Lsn,
     384         3379 :         found_smaller: &mut bool,
     385         3379 :         found_larger: &mut bool,
     386         3379 :         ctx: &RequestContext,
     387         3379 :     ) -> Result<bool, PageReconstructError> {
     388         3379 :         for segno in self
     389         3379 :             .list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
     390         1976 :             .await?
     391              :         {
     392         3379 :             let nblocks = self
     393         3379 :                 .get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx)
     394         1520 :                 .await?;
     395         3552 :             for blknum in (0..nblocks).rev() {
     396         3552 :                 let clog_page = self
     397         3552 :                     .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
     398       210177 :                     .await?;
     399              : 
     400         3552 :                 if clog_page.len() == BLCKSZ as usize + 8 {
     401         3323 :                     let mut timestamp_bytes = [0u8; 8];
     402         3323 :                     timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
     403         3323 :                     let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
     404         3323 : 
     405         3323 :                     if timestamp >= search_timestamp {
     406          904 :                         *found_larger = true;
     407          904 :                         return Ok(true);
     408         2419 :                     } else {
     409         2419 :                         *found_smaller = true;
     410         2419 :                     }
     411          229 :                 }
     412              :             }
     413              :         }
     414         2475 :         Ok(false)
     415         3379 :     }
     416              : 
     417              :     /// Get a list of SLRU segments
     418         5364 :     pub async fn list_slru_segments(
     419         5364 :         &self,
     420         5364 :         kind: SlruKind,
     421         5364 :         lsn: Lsn,
     422         5364 :         ctx: &RequestContext,
     423         5364 :     ) -> Result<HashSet<u32>, PageReconstructError> {
     424         5364 :         // fetch directory entry
     425         5364 :         let key = slru_dir_to_key(kind);
     426              : 
     427         5364 :         let buf = self.get(key, lsn, ctx).await?;
     428         5363 :         match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
     429         5363 :             Ok(dir) => Ok(dir.segments),
     430            0 :             Err(e) => Err(PageReconstructError::from(e)),
     431              :         }
     432         5364 :     }
     433              : 
     434         2666 :     pub async fn get_relmap_file(
     435         2666 :         &self,
     436         2666 :         spcnode: Oid,
     437         2666 :         dbnode: Oid,
     438         2666 :         lsn: Lsn,
     439         2666 :         ctx: &RequestContext,
     440         2666 :     ) -> Result<Bytes, PageReconstructError> {
     441         2666 :         let key = relmap_file_key(spcnode, dbnode);
     442              : 
     443         2666 :         let buf = self.get(key, lsn, ctx).await?;
     444         2666 :         Ok(buf)
     445         2666 :     }
     446              : 
     447          661 :     pub async fn list_dbdirs(
     448          661 :         &self,
     449          661 :         lsn: Lsn,
     450          661 :         ctx: &RequestContext,
     451          661 :     ) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
     452              :         // fetch directory entry
     453          661 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     454              : 
     455          661 :         match DbDirectory::des(&buf).context("deserialization failure") {
     456          661 :             Ok(dir) => Ok(dir.dbdirs),
     457            0 :             Err(e) => Err(PageReconstructError::from(e)),
     458              :         }
     459          661 :     }
     460              : 
     461            2 :     pub async fn get_twophase_file(
     462            2 :         &self,
     463            2 :         xid: TransactionId,
     464            2 :         lsn: Lsn,
     465            2 :         ctx: &RequestContext,
     466            2 :     ) -> Result<Bytes, PageReconstructError> {
     467            2 :         let key = twophase_file_key(xid);
     468            2 :         let buf = self.get(key, lsn, ctx).await?;
     469            2 :         Ok(buf)
     470            2 :     }
     471              : 
     472          661 :     pub async fn list_twophase_files(
     473          661 :         &self,
     474          661 :         lsn: Lsn,
     475          661 :         ctx: &RequestContext,
     476          661 :     ) -> Result<HashSet<TransactionId>, PageReconstructError> {
     477              :         // fetch directory entry
     478          661 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     479              : 
     480          661 :         match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
     481          661 :             Ok(dir) => Ok(dir.xids),
     482            0 :             Err(e) => Err(PageReconstructError::from(e)),
     483              :         }
     484          661 :     }
     485              : 
     486          660 :     pub async fn get_control_file(
     487          660 :         &self,
     488          660 :         lsn: Lsn,
     489          660 :         ctx: &RequestContext,
     490          660 :     ) -> Result<Bytes, PageReconstructError> {
     491          660 :         self.get(CONTROLFILE_KEY, lsn, ctx).await
     492          660 :     }
     493              : 
     494         2059 :     pub async fn get_checkpoint(
     495         2059 :         &self,
     496         2059 :         lsn: Lsn,
     497         2059 :         ctx: &RequestContext,
     498         2059 :     ) -> Result<Bytes, PageReconstructError> {
     499         2059 :         self.get(CHECKPOINT_KEY, lsn, ctx).await
     500         2059 :     }
     501              : 
     502              :     /// Does the same as get_current_logical_size but counted on demand.
     503              :     /// Used to initialize the logical size tracking on startup.
     504              :     ///
     505              :     /// Only relation blocks are counted currently. That excludes metadata,
     506              :     /// SLRUs, twophase files etc.
     507          421 :     pub async fn get_current_logical_size_non_incremental(
     508          421 :         &self,
     509          421 :         lsn: Lsn,
     510          421 :         cancel: CancellationToken,
     511          421 :         ctx: &RequestContext,
     512          421 :     ) -> Result<u64, CalculateLogicalSizeError> {
     513          421 :         crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
     514              : 
     515              :         // Fetch list of database dirs and iterate them
     516          421 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
     517          414 :         let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
     518              : 
     519          414 :         let mut total_size: u64 = 0;
     520         1660 :         for (spcnode, dbnode) in dbdir.dbdirs.keys() {
     521       387109 :             for rel in self
     522         1660 :                 .list_rels(*spcnode, *dbnode, lsn, ctx)
     523          891 :                 .await
     524         1659 :                 .context("list rels")?
     525              :             {
     526       387109 :                 if cancel.is_cancelled() {
     527            1 :                     return Err(CalculateLogicalSizeError::Cancelled);
     528       387108 :                 }
     529       387108 :                 let relsize_key = rel_size_to_key(rel);
     530       387108 :                 let mut buf = self
     531       387108 :                     .get(relsize_key, lsn, ctx)
     532       140355 :                     .await
     533       387108 :                     .with_context(|| format!("read relation size of {rel:?}"))?;
     534       387107 :                 let relsize = buf.get_u32_le();
     535       387107 : 
     536       387107 :                 total_size += relsize as u64;
     537              :             }
     538              :         }
     539          411 :         Ok(total_size * BLCKSZ as u64)
     540          417 :     }
     541              : 
     542              :     ///
     543              :     /// Get a KeySpace that covers all the Keys that are in use at the given LSN.
     544              :     /// Anything that's not listed maybe removed from the underlying storage (from
     545              :     /// that LSN forwards).
     546          666 :     pub async fn collect_keyspace(
     547          666 :         &self,
     548          666 :         lsn: Lsn,
     549          666 :         ctx: &RequestContext,
     550          666 :     ) -> anyhow::Result<KeySpace> {
     551          666 :         // Iterate through key ranges, greedily packing them into partitions
     552          666 :         let mut result = KeySpaceAccum::new();
     553          666 : 
     554          666 :         // The dbdir metadata always exists
     555          666 :         result.add_key(DBDIR_KEY);
     556              : 
     557              :         // Fetch list of database dirs and iterate them
     558          989 :         let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
     559          666 :         let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
     560              : 
     561          666 :         let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
     562          666 :         dbs.sort_unstable();
     563         2999 :         for (spcnode, dbnode) in dbs {
     564         2335 :             result.add_key(relmap_file_key(spcnode, dbnode));
     565         2335 :             result.add_key(rel_dir_to_key(spcnode, dbnode));
     566              : 
     567         2335 :             let mut rels: Vec<RelTag> = self
     568         2335 :                 .list_rels(spcnode, dbnode, lsn, ctx)
     569         2284 :                 .await?
     570         2335 :                 .into_iter()
     571         2335 :                 .collect();
     572         2335 :             rels.sort_unstable();
     573       554188 :             for rel in rels {
     574       551855 :                 let relsize_key = rel_size_to_key(rel);
     575       551855 :                 let mut buf = self.get(relsize_key, lsn, ctx).await?;
     576       551853 :                 let relsize = buf.get_u32_le();
     577       551853 : 
     578       551853 :                 result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
     579       551853 :                 result.add_key(relsize_key);
     580              :             }
     581              :         }
     582              : 
     583              :         // Iterate SLRUs next
     584         1992 :         for kind in [
     585          664 :             SlruKind::Clog,
     586          664 :             SlruKind::MultiXactMembers,
     587          664 :             SlruKind::MultiXactOffsets,
     588              :         ] {
     589         1992 :             let slrudir_key = slru_dir_to_key(kind);
     590         1992 :             result.add_key(slrudir_key);
     591         1992 :             let buf = self.get(slrudir_key, lsn, ctx).await?;
     592         1992 :             let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
     593         1992 :             let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
     594         1992 :             segments.sort_unstable();
     595         3740 :             for segno in segments {
     596         1748 :                 let segsize_key = slru_segment_size_to_key(kind, segno);
     597         1748 :                 let mut buf = self.get(segsize_key, lsn, ctx).await?;
     598         1748 :                 let segsize = buf.get_u32_le();
     599         1748 : 
     600         1748 :                 result.add_range(
     601         1748 :                     slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
     602         1748 :                 );
     603         1748 :                 result.add_key(segsize_key);
     604              :             }
     605              :         }
     606              : 
     607              :         // Then pg_twophase
     608          664 :         result.add_key(TWOPHASEDIR_KEY);
     609          664 :         let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
     610          664 :         let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
     611          664 :         let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
     612          664 :         xids.sort_unstable();
     613          664 :         for xid in xids {
     614            0 :             result.add_key(twophase_file_key(xid));
     615            0 :         }
     616              : 
     617          664 :         result.add_key(CONTROLFILE_KEY);
     618          664 :         result.add_key(CHECKPOINT_KEY);
     619          664 : 
     620          664 :         Ok(result.to_keyspace())
     621          664 :     }
     622              : 
     623              :     /// Get cached size of relation if it not updated after specified LSN
     624    156163490 :     pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
     625    156163490 :         let rel_size_cache = self.rel_size_cache.read().unwrap();
     626    156163490 :         if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
     627    155919845 :             if lsn >= *cached_lsn {
     628    155767412 :                 return Some(*nblocks);
     629       152433 :             }
     630       243645 :         }
     631       396078 :         None
     632    156163490 :     }
     633              : 
     634              :     /// Update cached relation size if there is no more recent update
     635       168447 :     pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     636       168447 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     637       168447 :         match rel_size_cache.entry(tag) {
     638       148122 :             hash_map::Entry::Occupied(mut entry) => {
     639       148122 :                 let cached_lsn = entry.get_mut();
     640       148122 :                 if lsn >= cached_lsn.0 {
     641            3 :                     *cached_lsn = (lsn, nblocks);
     642       148119 :                 }
     643              :             }
     644        20325 :             hash_map::Entry::Vacant(entry) => {
     645        20325 :                 entry.insert((lsn, nblocks));
     646        20325 :             }
     647              :         }
     648       168447 :     }
     649              : 
     650              :     /// Store cached relation size
     651      1728353 :     pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
     652      1728353 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     653      1728353 :         rel_size_cache.insert(tag, (lsn, nblocks));
     654      1728353 :     }
     655              : 
     656              :     /// Remove cached relation size
     657        17781 :     pub fn remove_cached_rel_size(&self, tag: &RelTag) {
     658        17781 :         let mut rel_size_cache = self.rel_size_cache.write().unwrap();
     659        17781 :         rel_size_cache.remove(tag);
     660        17781 :     }
     661              : }
     662              : 
     663              : /// DatadirModification represents an operation to ingest an atomic set of
     664              : /// updates to the repository. It is created by the 'begin_record'
     665              : /// function. It is called for each WAL record, so that all the modifications
     666              : /// by a one WAL record appear atomic.
     667              : pub struct DatadirModification<'a> {
     668              :     /// The timeline this modification applies to. You can access this to
     669              :     /// read the state, but note that any pending updates are *not* reflected
     670              :     /// in the state in 'tline' yet.
     671              :     pub tline: &'a Timeline,
     672              : 
     673              :     /// Lsn assigned by begin_modification
     674              :     pub lsn: Lsn,
     675              : 
     676              :     // The modifications are not applied directly to the underlying key-value store.
     677              :     // The put-functions add the modifications here, and they are flushed to the
     678              :     // underlying key-value store by the 'finish' function.
     679              :     pending_updates: HashMap<Key, Value>,
     680              :     pending_deletions: Vec<Range<Key>>,
     681              :     pending_nblocks: i64,
     682              : }
     683              : 
     684              : impl<'a> DatadirModification<'a> {
     685              :     /// Initialize a completely new repository.
     686              :     ///
     687              :     /// This inserts the directory metadata entries that are assumed to
     688              :     /// always exist.
     689          682 :     pub fn init_empty(&mut self) -> anyhow::Result<()> {
     690          682 :         let buf = DbDirectory::ser(&DbDirectory {
     691          682 :             dbdirs: HashMap::new(),
     692          682 :         })?;
     693          682 :         self.put(DBDIR_KEY, Value::Image(buf.into()));
     694              : 
     695          682 :         let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     696          682 :             xids: HashSet::new(),
     697          682 :         })?;
     698          682 :         self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
     699              : 
     700          682 :         let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
     701          682 :         let empty_dir = Value::Image(buf);
     702          682 :         self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
     703          682 :         self.put(
     704          682 :             slru_dir_to_key(SlruKind::MultiXactMembers),
     705          682 :             empty_dir.clone(),
     706          682 :         );
     707          682 :         self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
     708          682 : 
     709          682 :         Ok(())
     710          682 :     }
     711              : 
     712              :     #[cfg(test)]
     713              :     pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
     714           34 :         self.init_empty()?;
     715           34 :         self.put_control_file(bytes::Bytes::from_static(
     716           34 :             b"control_file contents do not matter",
     717           34 :         ))
     718           34 :         .context("put_control_file")?;
     719           34 :         self.put_checkpoint(bytes::Bytes::from_static(
     720           34 :             b"checkpoint_file contents do not matter",
     721           34 :         ))
     722           34 :         .context("put_checkpoint_file")?;
     723           34 :         Ok(())
     724           34 :     }
     725              : 
     726              :     /// Put a new page version that can be constructed from a WAL record
     727              :     ///
     728              :     /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
     729              :     /// current end-of-file. It's up to the caller to check that the relation size
     730              :     /// matches the blocks inserted!
     731     75311843 :     pub fn put_rel_wal_record(
     732     75311843 :         &mut self,
     733     75311843 :         rel: RelTag,
     734     75311843 :         blknum: BlockNumber,
     735     75311843 :         rec: NeonWalRecord,
     736     75311843 :     ) -> anyhow::Result<()> {
     737     75311843 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     738     75311843 :         self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
     739     75311843 :         Ok(())
     740     75311843 :     }
     741              : 
     742              :     // Same, but for an SLRU.
     743      2313430 :     pub fn put_slru_wal_record(
     744      2313430 :         &mut self,
     745      2313430 :         kind: SlruKind,
     746      2313430 :         segno: u32,
     747      2313430 :         blknum: BlockNumber,
     748      2313430 :         rec: NeonWalRecord,
     749      2313430 :     ) -> anyhow::Result<()> {
     750      2313430 :         self.put(
     751      2313430 :             slru_block_to_key(kind, segno, blknum),
     752      2313430 :             Value::WalRecord(rec),
     753      2313430 :         );
     754      2313430 :         Ok(())
     755      2313430 :     }
     756              : 
     757              :     /// Like put_wal_record, but with ready-made image of the page.
     758      2530223 :     pub fn put_rel_page_image(
     759      2530223 :         &mut self,
     760      2530223 :         rel: RelTag,
     761      2530223 :         blknum: BlockNumber,
     762      2530223 :         img: Bytes,
     763      2530223 :     ) -> anyhow::Result<()> {
     764      2530223 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     765      2530223 :         self.put(rel_block_to_key(rel, blknum), Value::Image(img));
     766      2530223 :         Ok(())
     767      2530223 :     }
     768              : 
     769         2615 :     pub fn put_slru_page_image(
     770         2615 :         &mut self,
     771         2615 :         kind: SlruKind,
     772         2615 :         segno: u32,
     773         2615 :         blknum: BlockNumber,
     774         2615 :         img: Bytes,
     775         2615 :     ) -> anyhow::Result<()> {
     776         2615 :         self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
     777         2615 :         Ok(())
     778         2615 :     }
     779              : 
     780              :     /// Store a relmapper file (pg_filenode.map) in the repository
     781         2649 :     pub async fn put_relmap_file(
     782         2649 :         &mut self,
     783         2649 :         spcnode: Oid,
     784         2649 :         dbnode: Oid,
     785         2649 :         img: Bytes,
     786         2649 :         ctx: &RequestContext,
     787         2649 :     ) -> anyhow::Result<()> {
     788              :         // Add it to the directory (if it doesn't exist already)
     789         2649 :         let buf = self.get(DBDIR_KEY, ctx).await?;
     790         2649 :         let mut dbdir = DbDirectory::des(&buf)?;
     791              : 
     792         2649 :         let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
     793         2649 :         if r.is_none() || r == Some(false) {
     794         2604 :             // The dbdir entry didn't exist, or it contained a
     795         2604 :             // 'false'. The 'insert' call already updated it with
     796         2604 :             // 'true', now write the updated 'dbdirs' map back.
     797         2604 :             let buf = DbDirectory::ser(&dbdir)?;
     798         2604 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
     799           45 :         }
     800         2649 :         if r.is_none() {
     801           20 :             // Create RelDirectory
     802           20 :             let buf = RelDirectory::ser(&RelDirectory {
     803           20 :                 rels: HashSet::new(),
     804           20 :             })?;
     805           20 :             self.put(
     806           20 :                 rel_dir_to_key(spcnode, dbnode),
     807           20 :                 Value::Image(Bytes::from(buf)),
     808           20 :             );
     809         2629 :         }
     810              : 
     811         2649 :         self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
     812         2649 :         Ok(())
     813         2649 :     }
     814              : 
     815            4 :     pub async fn put_twophase_file(
     816            4 :         &mut self,
     817            4 :         xid: TransactionId,
     818            4 :         img: Bytes,
     819            4 :         ctx: &RequestContext,
     820            4 :     ) -> anyhow::Result<()> {
     821              :         // Add it to the directory entry
     822            4 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
     823            4 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
     824            4 :         if !dir.xids.insert(xid) {
     825            0 :             anyhow::bail!("twophase file for xid {} already exists", xid);
     826            4 :         }
     827            4 :         self.put(
     828            4 :             TWOPHASEDIR_KEY,
     829            4 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
     830              :         );
     831              : 
     832            4 :         self.put(twophase_file_key(xid), Value::Image(img));
     833            4 :         Ok(())
     834            4 :     }
     835              : 
     836          680 :     pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
     837          680 :         self.put(CONTROLFILE_KEY, Value::Image(img));
     838          680 :         Ok(())
     839          680 :     }
     840              : 
     841        29078 :     pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
     842        29078 :         self.put(CHECKPOINT_KEY, Value::Image(img));
     843        29078 :         Ok(())
     844        29078 :     }
     845              : 
     846            2 :     pub async fn drop_dbdir(
     847            2 :         &mut self,
     848            2 :         spcnode: Oid,
     849            2 :         dbnode: Oid,
     850            2 :         ctx: &RequestContext,
     851            2 :     ) -> anyhow::Result<()> {
     852            2 :         let req_lsn = self.tline.get_last_record_lsn();
     853              : 
     854            2 :         let total_blocks = self
     855            2 :             .tline
     856            2 :             .get_db_size(spcnode, dbnode, req_lsn, true, ctx)
     857            0 :             .await?;
     858              : 
     859              :         // Remove entry from dbdir
     860            2 :         let buf = self.get(DBDIR_KEY, ctx).await?;
     861            2 :         let mut dir = DbDirectory::des(&buf)?;
     862            2 :         if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
     863            2 :             let buf = DbDirectory::ser(&dir)?;
     864            2 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
     865              :         } else {
     866            0 :             warn!(
     867            0 :                 "dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
     868            0 :                 spcnode, dbnode
     869            0 :             );
     870              :         }
     871              : 
     872              :         // Update logical database size.
     873            2 :         self.pending_nblocks -= total_blocks as i64;
     874            2 : 
     875            2 :         // Delete all relations and metadata files for the spcnode/dnode
     876            2 :         self.delete(dbdir_key_range(spcnode, dbnode));
     877            2 :         Ok(())
     878            2 :     }
     879              : 
     880              :     /// Create a relation fork.
     881              :     ///
     882              :     /// 'nblocks' is the initial size.
     883       630094 :     pub async fn put_rel_creation(
     884       630094 :         &mut self,
     885       630094 :         rel: RelTag,
     886       630094 :         nblocks: BlockNumber,
     887       630094 :         ctx: &RequestContext,
     888       630094 :     ) -> Result<(), RelationError> {
     889       630094 :         if rel.relnode == 0 {
     890            0 :             return Err(RelationError::InvalidRelnode);
     891       630094 :         }
     892              :         // It's possible that this is the first rel for this db in this
     893              :         // tablespace.  Create the reldir entry for it if so.
     894       630094 :         let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
     895       630094 :             .context("deserialize db")?;
     896       630094 :         let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
     897       630094 :         let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
     898              :             // Didn't exist. Update dbdir
     899         2585 :             dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
     900         2585 :             let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
     901         2585 :             self.put(DBDIR_KEY, Value::Image(buf.into()));
     902         2585 : 
     903         2585 :             // and create the RelDirectory
     904         2585 :             RelDirectory::default()
     905              :         } else {
     906              :             // reldir already exists, fetch it
     907       627509 :             RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
     908       627509 :                 .context("deserialize db")?
     909              :         };
     910              : 
     911              :         // Add the new relation to the rel directory entry, and write it back
     912       630094 :         if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
     913            0 :             return Err(RelationError::AlreadyExists);
     914       630094 :         }
     915       630094 :         self.put(
     916       630094 :             rel_dir_key,
     917       630094 :             Value::Image(Bytes::from(
     918       630094 :                 RelDirectory::ser(&rel_dir).context("serialize")?,
     919              :             )),
     920              :         );
     921              : 
     922              :         // Put size
     923       630094 :         let size_key = rel_size_to_key(rel);
     924       630094 :         let buf = nblocks.to_le_bytes();
     925       630094 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
     926       630094 : 
     927       630094 :         self.pending_nblocks += nblocks as i64;
     928       630094 : 
     929       630094 :         // Update relation size cache
     930       630094 :         self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
     931       630094 : 
     932       630094 :         // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
     933       630094 :         // caller.
     934       630094 :         Ok(())
     935       630094 :     }
     936              : 
     937              :     /// Truncate relation
     938         3095 :     pub async fn put_rel_truncation(
     939         3095 :         &mut self,
     940         3095 :         rel: RelTag,
     941         3095 :         nblocks: BlockNumber,
     942         3095 :         ctx: &RequestContext,
     943         3095 :     ) -> anyhow::Result<()> {
     944         3095 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     945         3095 :         let last_lsn = self.tline.get_last_record_lsn();
     946         3095 :         if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
     947         3095 :             let size_key = rel_size_to_key(rel);
     948              :             // Fetch the old size first
     949         3095 :             let old_size = self.get(size_key, ctx).await?.get_u32_le();
     950         3095 : 
     951         3095 :             // Update the entry with the new size.
     952         3095 :             let buf = nblocks.to_le_bytes();
     953         3095 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
     954         3095 : 
     955         3095 :             // Update relation size cache
     956         3095 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
     957         3095 : 
     958         3095 :             // Update relation size cache
     959         3095 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
     960         3095 : 
     961         3095 :             // Update logical database size.
     962         3095 :             self.pending_nblocks -= old_size as i64 - nblocks as i64;
     963            0 :         }
     964         3095 :         Ok(())
     965         3095 :     }
     966              : 
     967              :     /// Extend relation
     968              :     /// If new size is smaller, do nothing.
     969      1694438 :     pub async fn put_rel_extend(
     970      1694438 :         &mut self,
     971      1694438 :         rel: RelTag,
     972      1694438 :         nblocks: BlockNumber,
     973      1694438 :         ctx: &RequestContext,
     974      1694438 :     ) -> anyhow::Result<()> {
     975      1694438 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     976              : 
     977              :         // Put size
     978      1694438 :         let size_key = rel_size_to_key(rel);
     979      1694438 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
     980      1694438 : 
     981      1694438 :         // only extend relation here. never decrease the size
     982      1694438 :         if nblocks > old_size {
     983      1092069 :             let buf = nblocks.to_le_bytes();
     984      1092069 :             self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
     985      1092069 : 
     986      1092069 :             // Update relation size cache
     987      1092069 :             self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
     988      1092069 : 
     989      1092069 :             self.pending_nblocks += nblocks as i64 - old_size as i64;
     990      1092069 :         }
     991      1694438 :         Ok(())
     992      1694438 :     }
     993              : 
     994              :     /// Drop a relation.
     995        17781 :     pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
     996        17781 :         anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
     997              : 
     998              :         // Remove it from the directory entry
     999        17781 :         let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
    1000        17781 :         let buf = self.get(dir_key, ctx).await?;
    1001        17781 :         let mut dir = RelDirectory::des(&buf)?;
    1002              : 
    1003        17781 :         if dir.rels.remove(&(rel.relnode, rel.forknum)) {
    1004        17781 :             self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
    1005              :         } else {
    1006            0 :             warn!("dropped rel {} did not exist in rel directory", rel);
    1007              :         }
    1008              : 
    1009              :         // update logical size
    1010        17781 :         let size_key = rel_size_to_key(rel);
    1011        17781 :         let old_size = self.get(size_key, ctx).await?.get_u32_le();
    1012        17781 :         self.pending_nblocks -= old_size as i64;
    1013        17781 : 
    1014        17781 :         // Remove enty from relation size cache
    1015        17781 :         self.tline.remove_cached_rel_size(&rel);
    1016        17781 : 
    1017        17781 :         // Delete size entry, as well as all blocks
    1018        17781 :         self.delete(rel_key_range(rel));
    1019        17781 : 
    1020        17781 :         Ok(())
    1021        17781 :     }
    1022              : 
    1023         1959 :     pub async fn put_slru_segment_creation(
    1024         1959 :         &mut self,
    1025         1959 :         kind: SlruKind,
    1026         1959 :         segno: u32,
    1027         1959 :         nblocks: BlockNumber,
    1028         1959 :         ctx: &RequestContext,
    1029         1959 :     ) -> anyhow::Result<()> {
    1030         1959 :         // Add it to the directory entry
    1031         1959 :         let dir_key = slru_dir_to_key(kind);
    1032         1959 :         let buf = self.get(dir_key, ctx).await?;
    1033         1959 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1034              : 
    1035         1959 :         if !dir.segments.insert(segno) {
    1036            0 :             anyhow::bail!("slru segment {kind:?}/{segno} already exists");
    1037         1959 :         }
    1038         1959 :         self.put(
    1039         1959 :             dir_key,
    1040         1959 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1041              :         );
    1042              : 
    1043              :         // Put size
    1044         1959 :         let size_key = slru_segment_size_to_key(kind, segno);
    1045         1959 :         let buf = nblocks.to_le_bytes();
    1046         1959 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1047         1959 : 
    1048         1959 :         // even if nblocks > 0, we don't insert any actual blocks here
    1049         1959 : 
    1050         1959 :         Ok(())
    1051         1959 :     }
    1052              : 
    1053              :     /// Extend SLRU segment
    1054          668 :     pub fn put_slru_extend(
    1055          668 :         &mut self,
    1056          668 :         kind: SlruKind,
    1057          668 :         segno: u32,
    1058          668 :         nblocks: BlockNumber,
    1059          668 :     ) -> anyhow::Result<()> {
    1060          668 :         // Put size
    1061          668 :         let size_key = slru_segment_size_to_key(kind, segno);
    1062          668 :         let buf = nblocks.to_le_bytes();
    1063          668 :         self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
    1064          668 :         Ok(())
    1065          668 :     }
    1066              : 
    1067              :     /// This method is used for marking truncated SLRU files
    1068            9 :     pub async fn drop_slru_segment(
    1069            9 :         &mut self,
    1070            9 :         kind: SlruKind,
    1071            9 :         segno: u32,
    1072            9 :         ctx: &RequestContext,
    1073            9 :     ) -> anyhow::Result<()> {
    1074            9 :         // Remove it from the directory entry
    1075            9 :         let dir_key = slru_dir_to_key(kind);
    1076            9 :         let buf = self.get(dir_key, ctx).await?;
    1077            9 :         let mut dir = SlruSegmentDirectory::des(&buf)?;
    1078              : 
    1079            9 :         if !dir.segments.remove(&segno) {
    1080            0 :             warn!("slru segment {:?}/{} does not exist", kind, segno);
    1081            9 :         }
    1082              :         self.put(
    1083            9 :             dir_key,
    1084            9 :             Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
    1085              :         );
    1086              : 
    1087              :         // Delete size entry, as well as all blocks
    1088            9 :         self.delete(slru_segment_key_range(kind, segno));
    1089            9 : 
    1090            9 :         Ok(())
    1091            9 :     }
    1092              : 
    1093              :     /// Drop a relmapper file (pg_filenode.map)
    1094            0 :     pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
    1095            0 :         // TODO
    1096            0 :         Ok(())
    1097            0 :     }
    1098              : 
    1099              :     /// This method is used for marking truncated SLRU files
    1100            2 :     pub async fn drop_twophase_file(
    1101            2 :         &mut self,
    1102            2 :         xid: TransactionId,
    1103            2 :         ctx: &RequestContext,
    1104            2 :     ) -> anyhow::Result<()> {
    1105              :         // Remove it from the directory entry
    1106            2 :         let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
    1107            2 :         let mut dir = TwoPhaseDirectory::des(&buf)?;
    1108              : 
    1109            2 :         if !dir.xids.remove(&xid) {
    1110            0 :             warn!("twophase file for xid {} does not exist", xid);
    1111            2 :         }
    1112              :         self.put(
    1113              :             TWOPHASEDIR_KEY,
    1114            2 :             Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
    1115              :         );
    1116              : 
    1117              :         // Delete it
    1118            2 :         self.delete(twophase_key_range(xid));
    1119            2 : 
    1120            2 :         Ok(())
    1121            2 :     }
    1122              : 
    1123              :     ///
    1124              :     /// Flush changes accumulated so far to the underlying repository.
    1125              :     ///
    1126              :     /// Usually, changes made in DatadirModification are atomic, but this allows
    1127              :     /// you to flush them to the underlying repository before the final `commit`.
    1128              :     /// That allows to free up the memory used to hold the pending changes.
    1129              :     ///
    1130              :     /// Currently only used during bulk import of a data directory. In that
    1131              :     /// context, breaking the atomicity is OK. If the import is interrupted, the
    1132              :     /// whole import fails and the timeline will be deleted anyway.
    1133              :     /// (Or to be precise, it will be left behind for debugging purposes and
    1134              :     /// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
    1135              :     ///
    1136              :     /// Note: A consequence of flushing the pending operations is that they
    1137              :     /// won't be visible to subsequent operations until `commit`. The function
    1138              :     /// retains all the metadata, but data pages are flushed. That's again OK
    1139              :     /// for bulk import, where you are just loading data pages and won't try to
    1140              :     /// modify the same pages twice.
    1141       614019 :     pub async fn flush(&mut self) -> anyhow::Result<()> {
    1142       614019 :         // Unless we have accumulated a decent amount of changes, it's not worth it
    1143       614019 :         // to scan through the pending_updates list.
    1144       614019 :         let pending_nblocks = self.pending_nblocks;
    1145       614019 :         if pending_nblocks < 10000 {
    1146       614019 :             return Ok(());
    1147            0 :         }
    1148              : 
    1149            0 :         let writer = self.tline.writer().await;
    1150              : 
    1151              :         // Flush relation and  SLRU data blocks, keep metadata.
    1152            0 :         let mut retained_pending_updates = HashMap::new();
    1153            0 :         for (key, value) in self.pending_updates.drain() {
    1154            0 :             if is_rel_block_key(key) || is_slru_block_key(key) {
    1155              :                 // This bails out on first error without modifying pending_updates.
    1156              :                 // That's Ok, cf this function's doc comment.
    1157            0 :                 writer.put(key, self.lsn, &value).await?;
    1158            0 :             } else {
    1159            0 :                 retained_pending_updates.insert(key, value);
    1160            0 :             }
    1161              :         }
    1162            0 :         self.pending_updates.extend(retained_pending_updates);
    1163            0 : 
    1164            0 :         if pending_nblocks != 0 {
    1165            0 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1166            0 :             self.pending_nblocks = 0;
    1167            0 :         }
    1168              : 
    1169            0 :         Ok(())
    1170       614019 :     }
    1171              : 
    1172              :     ///
    1173              :     /// Finish this atomic update, writing all the updated keys to the
    1174              :     /// underlying timeline.
    1175              :     /// All the modifications in this atomic update are stamped by the specified LSN.
    1176              :     ///
    1177     73661491 :     pub async fn commit(&mut self) -> anyhow::Result<()> {
    1178     73661518 :         let writer = self.tline.writer().await;
    1179     73661518 :         let lsn = self.lsn;
    1180     73661518 :         let pending_nblocks = self.pending_nblocks;
    1181     73661518 :         self.pending_nblocks = 0;
    1182              : 
    1183     81939061 :         for (key, value) in self.pending_updates.drain() {
    1184     81939061 :             writer.put(key, lsn, &value).await?;
    1185              :         }
    1186     73661508 :         for key_range in self.pending_deletions.drain(..) {
    1187        17794 :             writer.delete(key_range, lsn).await?;
    1188              :         }
    1189              : 
    1190     73661508 :         writer.finish_write(lsn);
    1191     73661508 : 
    1192     73661508 :         if pending_nblocks != 0 {
    1193      1094354 :             writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
    1194     72567154 :         }
    1195              : 
    1196     73661508 :         Ok(())
    1197     73661508 :     }
    1198              : 
    1199              :     // Internal helper functions to batch the modifications
    1200              : 
    1201      2995323 :     async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
    1202              :         // Have we already updated the same key? Read the pending updated
    1203              :         // version in that case.
    1204              :         //
    1205              :         // Note: we don't check pending_deletions. It is an error to request a
    1206              :         // value that has been removed, deletion only avoids leaking storage.
    1207      2995323 :         if let Some(value) = self.pending_updates.get(&key) {
    1208      1836929 :             if let Value::Image(img) = value {
    1209      1836929 :                 Ok(img.clone())
    1210              :             } else {
    1211              :                 // Currently, we never need to read back a WAL record that we
    1212              :                 // inserted in the same "transaction". All the metadata updates
    1213              :                 // work directly with Images, and we never need to read actual
    1214              :                 // data pages. We could handle this if we had to, by calling
    1215              :                 // the walredo manager, but let's keep it simple for now.
    1216            0 :                 Err(PageReconstructError::from(anyhow::anyhow!(
    1217            0 :                     "unexpected pending WAL record"
    1218            0 :                 )))
    1219              :             }
    1220              :         } else {
    1221      1158394 :             let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
    1222      1158394 :             self.tline.get(key, lsn, ctx).await
    1223              :         }
    1224      2995323 :     }
    1225              : 
    1226     82576877 :     fn put(&mut self, key: Key, val: Value) {
    1227     82576877 :         self.pending_updates.insert(key, val);
    1228     82576877 :     }
    1229              : 
    1230        17794 :     fn delete(&mut self, key_range: Range<Key>) {
    1231        17794 :         trace!("DELETE {}-{}", key_range.start, key_range.end);
    1232        17794 :         self.pending_deletions.push(key_range);
    1233        17794 :     }
    1234              : }
    1235              : 
    1236              : //--- Metadata structs stored in key-value pairs in the repository.
    1237              : 
    1238       634486 : #[derive(Debug, Serialize, Deserialize)]
    1239              : struct DbDirectory {
    1240              :     // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
    1241              :     dbdirs: HashMap<(Oid, Oid), bool>,
    1242              : }
    1243              : 
    1244         1376 : #[derive(Debug, Serialize, Deserialize)]
    1245              : struct TwoPhaseDirectory {
    1246              :     xids: HashSet<TransactionId>,
    1247              : }
    1248              : 
    1249      1295790 : #[derive(Debug, Serialize, Deserialize, Default)]
    1250              : struct RelDirectory {
    1251              :     // Set of relations that exist. (relfilenode, forknum)
    1252              :     //
    1253              :     // TODO: Store it as a btree or radix tree or something else that spans multiple
    1254              :     // key-value pairs, if you have a lot of relations
    1255              :     rels: HashSet<(Oid, u8)>,
    1256              : }
    1257              : 
    1258            0 : #[derive(Debug, Serialize, Deserialize)]
    1259              : struct RelSizeEntry {
    1260              :     nblocks: u32,
    1261              : }
    1262              : 
    1263         9997 : #[derive(Debug, Serialize, Deserialize, Default)]
    1264              : struct SlruSegmentDirectory {
    1265              :     // Set of SLRU segments that exist.
    1266              :     segments: HashSet<u32>,
    1267              : }
    1268              : 
    1269              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
    1270              : 
    1271              : // Layout of the Key address space
    1272              : //
    1273              : // The Key struct, used to address the underlying key-value store, consists of
    1274              : // 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
    1275              : // all the data and metadata keys into those 18 bytes.
    1276              : //
    1277              : // Principles for the mapping:
    1278              : //
    1279              : // - Things that are often accessed or modified together, should be close to
    1280              : //   each other in the key space. For example, if a relation is extended by one
    1281              : //   block, we create a new key-value pair for the block data, and update the
    1282              : //   relation size entry. Because of that, the RelSize key comes after all the
    1283              : //   RelBlocks of a relation: the RelSize and the last RelBlock are always next
    1284              : //   to each other.
    1285              : //
    1286              : // The key space is divided into four major sections, identified by the first
    1287              : // byte, and the form a hierarchy:
    1288              : //
    1289              : // 00 Relation data and metadata
    1290              : //
    1291              : //   DbDir    () -> (dbnode, spcnode)
    1292              : //   Filenodemap
    1293              : //   RelDir   -> relnode forknum
    1294              : //       RelBlocks
    1295              : //       RelSize
    1296              : //
    1297              : // 01 SLRUs
    1298              : //
    1299              : //   SlruDir  kind
    1300              : //   SlruSegBlocks segno
    1301              : //   SlruSegSize
    1302              : //
    1303              : // 02 pg_twophase
    1304              : //
    1305              : // 03 misc
    1306              : //    controlfile
    1307              : //    checkpoint
    1308              : //    pg_version
    1309              : //
    1310              : // Below is a full list of the keyspace allocation:
    1311              : //
    1312              : // DbDir:
    1313              : // 00 00000000 00000000 00000000 00   00000000
    1314              : //
    1315              : // Filenodemap:
    1316              : // 00 SPCNODE  DBNODE   00000000 00   00000000
    1317              : //
    1318              : // RelDir:
    1319              : // 00 SPCNODE  DBNODE   00000000 00   00000001 (Postgres never uses relfilenode 0)
    1320              : //
    1321              : // RelBlock:
    1322              : // 00 SPCNODE  DBNODE   RELNODE  FORK BLKNUM
    1323              : //
    1324              : // RelSize:
    1325              : // 00 SPCNODE  DBNODE   RELNODE  FORK FFFFFFFF
    1326              : //
    1327              : // SlruDir:
    1328              : // 01 kind     00000000 00000000 00   00000000
    1329              : //
    1330              : // SlruSegBlock:
    1331              : // 01 kind     00000001 SEGNO    00   BLKNUM
    1332              : //
    1333              : // SlruSegSize:
    1334              : // 01 kind     00000001 SEGNO    00   FFFFFFFF
    1335              : //
    1336              : // TwoPhaseDir:
    1337              : // 02 00000000 00000000 00000000 00   00000000
    1338              : //
    1339              : // TwoPhaseFile:
    1340              : // 02 00000000 00000000 00000000 00   XID
    1341              : //
    1342              : // ControlFile:
    1343              : // 03 00000000 00000000 00000000 00   00000000
    1344              : //
    1345              : // Checkpoint:
    1346              : // 03 00000000 00000000 00000000 00   00000001
    1347              : //-- Section 01: relation data and metadata
    1348              : 
    1349              : const DBDIR_KEY: Key = Key {
    1350              :     field1: 0x00,
    1351              :     field2: 0,
    1352              :     field3: 0,
    1353              :     field4: 0,
    1354              :     field5: 0,
    1355              :     field6: 0,
    1356              : };
    1357              : 
    1358            2 : fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
    1359            2 :     Key {
    1360            2 :         field1: 0x00,
    1361            2 :         field2: spcnode,
    1362            2 :         field3: dbnode,
    1363            2 :         field4: 0,
    1364            2 :         field5: 0,
    1365            2 :         field6: 0,
    1366            2 :     }..Key {
    1367            2 :         field1: 0x00,
    1368            2 :         field2: spcnode,
    1369            2 :         field3: dbnode,
    1370            2 :         field4: 0xffffffff,
    1371            2 :         field5: 0xff,
    1372            2 :         field6: 0xffffffff,
    1373            2 :     }
    1374            2 : }
    1375              : 
    1376         7650 : fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
    1377         7650 :     Key {
    1378         7650 :         field1: 0x00,
    1379         7650 :         field2: spcnode,
    1380         7650 :         field3: dbnode,
    1381         7650 :         field4: 0,
    1382         7650 :         field5: 0,
    1383         7650 :         field6: 0,
    1384         7650 :     }
    1385         7650 : }
    1386              : 
    1387       761611 : fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
    1388       761611 :     Key {
    1389       761611 :         field1: 0x00,
    1390       761611 :         field2: spcnode,
    1391       761611 :         field3: dbnode,
    1392       761611 :         field4: 0,
    1393       761611 :         field5: 0,
    1394       761611 :         field6: 1,
    1395       761611 :     }
    1396       761611 : }
    1397              : 
    1398     83369133 : fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
    1399     83369133 :     Key {
    1400     83369133 :         field1: 0x00,
    1401     83369133 :         field2: rel.spcnode,
    1402     83369133 :         field3: rel.dbnode,
    1403     83369133 :         field4: rel.relnode,
    1404     83369133 :         field5: rel.forknum,
    1405     83369133 :         field6: blknum,
    1406     83369133 :     }
    1407     83369133 : }
    1408              : 
    1409      3575738 : fn rel_size_to_key(rel: RelTag) -> Key {
    1410      3575738 :     Key {
    1411      3575738 :         field1: 0x00,
    1412      3575738 :         field2: rel.spcnode,
    1413      3575738 :         field3: rel.dbnode,
    1414      3575738 :         field4: rel.relnode,
    1415      3575738 :         field5: rel.forknum,
    1416      3575738 :         field6: 0xffffffff,
    1417      3575738 :     }
    1418      3575738 : }
    1419              : 
    1420        17781 : fn rel_key_range(rel: RelTag) -> Range<Key> {
    1421        17781 :     Key {
    1422        17781 :         field1: 0x00,
    1423        17781 :         field2: rel.spcnode,
    1424        17781 :         field3: rel.dbnode,
    1425        17781 :         field4: rel.relnode,
    1426        17781 :         field5: rel.forknum,
    1427        17781 :         field6: 0,
    1428        17781 :     }..Key {
    1429        17781 :         field1: 0x00,
    1430        17781 :         field2: rel.spcnode,
    1431        17781 :         field3: rel.dbnode,
    1432        17781 :         field4: rel.relnode,
    1433        17781 :         field5: rel.forknum + 1,
    1434        17781 :         field6: 0,
    1435        17781 :     }
    1436        17781 : }
    1437              : 
    1438              : //-- Section 02: SLRUs
    1439              : 
    1440        12044 : fn slru_dir_to_key(kind: SlruKind) -> Key {
    1441        12044 :     Key {
    1442        12044 :         field1: 0x01,
    1443        12044 :         field2: match kind {
    1444         6422 :             SlruKind::Clog => 0x00,
    1445         2954 :             SlruKind::MultiXactMembers => 0x01,
    1446         2668 :             SlruKind::MultiXactOffsets => 0x02,
    1447              :         },
    1448              :         field3: 0,
    1449              :         field4: 0,
    1450              :         field5: 0,
    1451              :         field6: 0,
    1452              :     }
    1453        12044 : }
    1454              : 
    1455      2325730 : fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
    1456      2325730 :     Key {
    1457      2325730 :         field1: 0x01,
    1458      2325730 :         field2: match kind {
    1459      2271547 :             SlruKind::Clog => 0x00,
    1460        27652 :             SlruKind::MultiXactMembers => 0x01,
    1461        26531 :             SlruKind::MultiXactOffsets => 0x02,
    1462              :         },
    1463              :         field3: 1,
    1464      2325730 :         field4: segno,
    1465      2325730 :         field5: 0,
    1466      2325730 :         field6: blknum,
    1467      2325730 :     }
    1468      2325730 : }
    1469              : 
    1470        10411 : fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
    1471        10411 :     Key {
    1472        10411 :         field1: 0x01,
    1473        10411 :         field2: match kind {
    1474         6005 :             SlruKind::Clog => 0x00,
    1475         2493 :             SlruKind::MultiXactMembers => 0x01,
    1476         1913 :             SlruKind::MultiXactOffsets => 0x02,
    1477              :         },
    1478              :         field3: 1,
    1479        10411 :         field4: segno,
    1480        10411 :         field5: 0,
    1481        10411 :         field6: 0xffffffff,
    1482        10411 :     }
    1483        10411 : }
    1484              : 
    1485            9 : fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
    1486            9 :     let field2 = match kind {
    1487            9 :         SlruKind::Clog => 0x00,
    1488            0 :         SlruKind::MultiXactMembers => 0x01,
    1489            0 :         SlruKind::MultiXactOffsets => 0x02,
    1490              :     };
    1491              : 
    1492            9 :     Key {
    1493            9 :         field1: 0x01,
    1494            9 :         field2,
    1495            9 :         field3: 1,
    1496            9 :         field4: segno,
    1497            9 :         field5: 0,
    1498            9 :         field6: 0,
    1499            9 :     }..Key {
    1500            9 :         field1: 0x01,
    1501            9 :         field2,
    1502            9 :         field3: 1,
    1503            9 :         field4: segno,
    1504            9 :         field5: 1,
    1505            9 :         field6: 0,
    1506            9 :     }
    1507            9 : }
    1508              : 
    1509              : //-- Section 03: pg_twophase
    1510              : 
    1511              : const TWOPHASEDIR_KEY: Key = Key {
    1512              :     field1: 0x02,
    1513              :     field2: 0,
    1514              :     field3: 0,
    1515              :     field4: 0,
    1516              :     field5: 0,
    1517              :     field6: 0,
    1518              : };
    1519              : 
    1520            6 : fn twophase_file_key(xid: TransactionId) -> Key {
    1521            6 :     Key {
    1522            6 :         field1: 0x02,
    1523            6 :         field2: 0,
    1524            6 :         field3: 0,
    1525            6 :         field4: 0,
    1526            6 :         field5: 0,
    1527            6 :         field6: xid,
    1528            6 :     }
    1529            6 : }
    1530              : 
    1531            2 : fn twophase_key_range(xid: TransactionId) -> Range<Key> {
    1532            2 :     let (next_xid, overflowed) = xid.overflowing_add(1);
    1533            2 : 
    1534            2 :     Key {
    1535            2 :         field1: 0x02,
    1536            2 :         field2: 0,
    1537            2 :         field3: 0,
    1538            2 :         field4: 0,
    1539            2 :         field5: 0,
    1540            2 :         field6: xid,
    1541            2 :     }..Key {
    1542            2 :         field1: 0x02,
    1543            2 :         field2: 0,
    1544            2 :         field3: 0,
    1545            2 :         field4: 0,
    1546            2 :         field5: u8::from(overflowed),
    1547            2 :         field6: next_xid,
    1548            2 :     }
    1549            2 : }
    1550              : 
    1551              : //-- Section 03: Control file
    1552              : const CONTROLFILE_KEY: Key = Key {
    1553              :     field1: 0x03,
    1554              :     field2: 0,
    1555              :     field3: 0,
    1556              :     field4: 0,
    1557              :     field5: 0,
    1558              :     field6: 0,
    1559              : };
    1560              : 
    1561              : const CHECKPOINT_KEY: Key = Key {
    1562              :     field1: 0x03,
    1563              :     field2: 0,
    1564              :     field3: 0,
    1565              :     field4: 0,
    1566              :     field5: 0,
    1567              :     field6: 1,
    1568              : };
    1569              : 
    1570              : // Reverse mappings for a few Keys.
    1571              : // These are needed by WAL redo manager.
    1572              : 
    1573      2761257 : pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
    1574      2761257 :     Ok(match key.field1 {
    1575      2761257 :         0x00 => (
    1576      2761257 :             RelTag {
    1577      2761257 :                 spcnode: key.field2,
    1578      2761257 :                 dbnode: key.field3,
    1579      2761257 :                 relnode: key.field4,
    1580      2761257 :                 forknum: key.field5,
    1581      2761257 :             },
    1582      2761257 :             key.field6,
    1583      2761257 :         ),
    1584            0 :         _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
    1585              :     })
    1586      2761257 : }
    1587              : 
    1588            0 : fn is_rel_block_key(key: Key) -> bool {
    1589            0 :     key.field1 == 0x00 && key.field4 != 0
    1590            0 : }
    1591              : 
    1592            0 : pub fn is_rel_fsm_block_key(key: Key) -> bool {
    1593            0 :     key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
    1594            0 : }
    1595              : 
    1596            0 : pub fn is_rel_vm_block_key(key: Key) -> bool {
    1597            0 :     key.field1 == 0x00
    1598            0 :         && key.field4 != 0
    1599            0 :         && key.field5 == VISIBILITYMAP_FORKNUM
    1600            0 :         && key.field6 != 0xffffffff
    1601            0 : }
    1602              : 
    1603     28080239 : pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
    1604     28080239 :     Ok(match key.field1 {
    1605              :         0x01 => {
    1606     28080239 :             let kind = match key.field2 {
    1607     27984367 :                 0x00 => SlruKind::Clog,
    1608        48210 :                 0x01 => SlruKind::MultiXactMembers,
    1609        47662 :                 0x02 => SlruKind::MultiXactOffsets,
    1610            0 :                 _ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
    1611              :             };
    1612     28080239 :             let segno = key.field4;
    1613     28080239 :             let blknum = key.field6;
    1614     28080239 : 
    1615     28080239 :             (kind, segno, blknum)
    1616              :         }
    1617            0 :         _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
    1618              :     })
    1619     28080239 : }
    1620              : 
    1621            0 : fn is_slru_block_key(key: Key) -> bool {
    1622            0 :     key.field1 == 0x01                // SLRU-related
    1623            0 :         && key.field3 == 0x00000001   // but not SlruDir
    1624            0 :         && key.field6 != 0xffffffff // and not SlruSegSize
    1625            0 : }
    1626              : 
    1627              : #[allow(clippy::bool_assert_comparison)]
    1628              : #[cfg(test)]
    1629              : mod tests {
    1630              :     //use super::repo_harness::*;
    1631              :     //use super::*;
    1632              : 
    1633              :     /*
    1634              :         fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
    1635              :             let incremental = timeline.get_current_logical_size();
    1636              :             let non_incremental = timeline
    1637              :                 .get_current_logical_size_non_incremental(lsn)
    1638              :                 .unwrap();
    1639              :             assert_eq!(incremental, non_incremental);
    1640              :         }
    1641              :     */
    1642              : 
    1643              :     /*
    1644              :     ///
    1645              :     /// Test list_rels() function, with branches and dropped relations
    1646              :     ///
    1647              :     #[test]
    1648              :     fn test_list_rels_drop() -> Result<()> {
    1649              :         let repo = RepoHarness::create("test_list_rels_drop")?.load();
    1650              :         let tline = create_empty_timeline(repo, TIMELINE_ID)?;
    1651              :         const TESTDB: u32 = 111;
    1652              : 
    1653              :         // Import initial dummy checkpoint record, otherwise the get_timeline() call
    1654              :         // after branching fails below
    1655              :         let mut writer = tline.begin_record(Lsn(0x10));
    1656              :         writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1657              :         writer.finish()?;
    1658              : 
    1659              :         // Create a relation on the timeline
    1660              :         let mut writer = tline.begin_record(Lsn(0x20));
    1661              :         writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
    1662              :         writer.finish()?;
    1663              : 
    1664              :         let writer = tline.begin_record(Lsn(0x00));
    1665              :         writer.finish()?;
    1666              : 
    1667              :         // Check that list_rels() lists it after LSN 2, but no before it
    1668              :         assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
    1669              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
    1670              :         assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
    1671              : 
    1672              :         // Create a branch, check that the relation is visible there
    1673              :         repo.branch_timeline(&tline, NEW_TIMELINE_ID, Lsn(0x30))?;
    1674              :         let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
    1675              :             Some(timeline) => timeline,
    1676              :             None => panic!("Should have a local timeline"),
    1677              :         };
    1678              :         let newtline = DatadirTimelineImpl::new(newtline);
    1679              :         assert!(newtline
    1680              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1681              :             .contains(&TESTREL_A));
    1682              : 
    1683              :         // Drop it on the branch
    1684              :         let mut new_writer = newtline.begin_record(Lsn(0x40));
    1685              :         new_writer.drop_relation(TESTREL_A)?;
    1686              :         new_writer.finish()?;
    1687              : 
    1688              :         // Check that it's no longer listed on the branch after the point where it was dropped
    1689              :         assert!(newtline
    1690              :             .list_rels(0, TESTDB, Lsn(0x30))?
    1691              :             .contains(&TESTREL_A));
    1692              :         assert!(!newtline
    1693              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1694              :             .contains(&TESTREL_A));
    1695              : 
    1696              :         // Run checkpoint and garbage collection and check that it's still not visible
    1697              :         newtline.checkpoint(CheckpointConfig::Forced)?;
    1698              :         repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
    1699              : 
    1700              :         assert!(!newtline
    1701              :             .list_rels(0, TESTDB, Lsn(0x40))?
    1702              :             .contains(&TESTREL_A));
    1703              : 
    1704              :         Ok(())
    1705              :     }
    1706              :      */
    1707              : 
    1708              :     /*
    1709              :     #[test]
    1710              :     fn test_read_beyond_eof() -> Result<()> {
    1711              :         let repo = RepoHarness::create("test_read_beyond_eof")?.load();
    1712              :         let tline = create_test_timeline(repo, TIMELINE_ID)?;
    1713              : 
    1714              :         make_some_layers(&tline, Lsn(0x20))?;
    1715              :         let mut writer = tline.begin_record(Lsn(0x60));
    1716              :         walingest.put_rel_page_image(
    1717              :             &mut writer,
    1718              :             TESTREL_A,
    1719              :             0,
    1720              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
    1721              :         )?;
    1722              :         writer.finish()?;
    1723              : 
    1724              :         // Test read before rel creation. Should error out.
    1725              :         assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
    1726              : 
    1727              :         // Read block beyond end of relation at different points in time.
    1728              :         // These reads should fall into different delta, image, and in-memory layers.
    1729              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
    1730              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
    1731              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
    1732              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
    1733              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
    1734              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
    1735              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
    1736              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
    1737              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
    1738              : 
    1739              :         // Test on an in-memory layer with no preceding layer
    1740              :         let mut writer = tline.begin_record(Lsn(0x70));
    1741              :         walingest.put_rel_page_image(
    1742              :             &mut writer,
    1743              :             TESTREL_B,
    1744              :             0,
    1745              :             TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
    1746              :         )?;
    1747              :         writer.finish()?;
    1748              : 
    1749              :         assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
    1750              : 
    1751              :         Ok(())
    1752              :     }
    1753              :      */
    1754              : }
        

Generated by: LCOV version 2.1-beta