LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 58.2 % 1613 939
Test Date: 2024-11-13 18:23:39 Functions: 58.8 % 80 47

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  -> [`wal_decoder`] ->  WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
       9              : //! Records get decoded and interpreted in the [`wal_decoder`] module
      10              : //! and then stored to the Repository by WalIngest.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
      14              : //! extracts page images out of some WAL records, but mostly it's WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use std::collections::HashMap;
      25              : use std::sync::Arc;
      26              : use std::sync::OnceLock;
      27              : use std::time::Duration;
      28              : use std::time::Instant;
      29              : use std::time::SystemTime;
      30              : 
      31              : use pageserver_api::shard::ShardIdentity;
      32              : use postgres_ffi::fsm_logical_to_physical;
      33              : use postgres_ffi::walrecord::*;
      34              : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
      35              : use wal_decoder::models::*;
      36              : 
      37              : use anyhow::{bail, Result};
      38              : use bytes::{Buf, Bytes};
      39              : use tracing::*;
      40              : use utils::failpoint_support;
      41              : use utils::rate_limit::RateLimit;
      42              : 
      43              : use crate::context::RequestContext;
      44              : use crate::metrics::WAL_INGEST;
      45              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      46              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      47              : use crate::tenant::PageReconstructError;
      48              : use crate::tenant::Timeline;
      49              : use crate::ZERO_PAGE;
      50              : use pageserver_api::key::rel_block_to_key;
      51              : use pageserver_api::record::NeonWalRecord;
      52              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      53              : use postgres_ffi::pg_constants;
      54              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      55              : use postgres_ffi::TransactionId;
      56              : use utils::bin_ser::SerializeError;
      57              : use utils::lsn::Lsn;
      58              : 
      59              : enum_pgversion! {CheckPoint, pgv::CheckPoint}
      60              : 
      61              : impl CheckPoint {
      62            6 :     fn encode(&self) -> Result<Bytes, SerializeError> {
      63            6 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
      64            6 :     }
      65              : 
      66       145834 :     fn update_next_xid(&mut self, xid: u32) -> bool {
      67       145834 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
      68       145834 :     }
      69              : 
      70            0 :     pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
      71            0 :         enum_pgversion_dispatch!(self, CheckPoint, cp, {
      72            0 :             cp.update_next_multixid(multi_xid, multi_offset)
      73              :         })
      74            0 :     }
      75              : }
      76              : 
      77              : /// Temporary limitation of WAL lag warnings after attach
      78              : ///
      79              : /// After tenant attach, we want to limit WAL lag warnings because
      80              : /// we don't look at the WAL until the attach is complete, which
      81              : /// might take a while.
      82              : pub struct WalLagCooldown {
      83              :     /// Until when should this limitation apply at all
      84              :     active_until: std::time::Instant,
      85              :     /// The maximum lag to suppress. Lags above this limit get reported anyways.
      86              :     max_lag: Duration,
      87              : }
      88              : 
      89              : impl WalLagCooldown {
      90            0 :     pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
      91            0 :         Self {
      92            0 :             active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
      93            0 :             max_lag: attach_duration * 2 + Duration::from_secs(60),
      94            0 :         }
      95            0 :     }
      96              : }
      97              : 
      98              : pub struct WalIngest {
      99              :     attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
     100              :     shard: ShardIdentity,
     101              :     checkpoint: CheckPoint,
     102              :     checkpoint_modified: bool,
     103              :     warn_ingest_lag: WarnIngestLag,
     104              : }
     105              : 
     106              : struct WarnIngestLag {
     107              :     lag_msg_ratelimit: RateLimit,
     108              :     future_lsn_msg_ratelimit: RateLimit,
     109              :     timestamp_invalid_msg_ratelimit: RateLimit,
     110              : }
     111              : 
     112              : impl WalIngest {
     113           12 :     pub async fn new(
     114           12 :         timeline: &Timeline,
     115           12 :         startpoint: Lsn,
     116           12 :         ctx: &RequestContext,
     117           12 :     ) -> anyhow::Result<WalIngest> {
     118              :         // Fetch the latest checkpoint into memory, so that we can compare with it
     119              :         // quickly in `ingest_record` and update it when it changes.
     120           12 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
     121           12 :         let pgversion = timeline.pg_version;
     122              : 
     123           12 :         let checkpoint = dispatch_pgversion!(pgversion, {
     124            0 :             let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
     125            0 :             trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
     126            0 :             <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
     127              :         });
     128              : 
     129           12 :         Ok(WalIngest {
     130           12 :             shard: *timeline.get_shard_identity(),
     131           12 :             checkpoint,
     132           12 :             checkpoint_modified: false,
     133           12 :             attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
     134           12 :             warn_ingest_lag: WarnIngestLag {
     135           12 :                 lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     136           12 :                 future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     137           12 :                 timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     138           12 :             },
     139           12 :         })
     140           12 :     }
     141              : 
     142              :     /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
     143              :     /// storage of a given timeline.
     144              :     ///
     145              :     /// This function updates `lsn` field of `DatadirModification`
     146              :     ///
     147              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
     148       145852 :     pub async fn ingest_record(
     149       145852 :         &mut self,
     150       145852 :         interpreted: InterpretedWalRecord,
     151       145852 :         modification: &mut DatadirModification<'_>,
     152       145852 :         ctx: &RequestContext,
     153       145852 :     ) -> anyhow::Result<bool> {
     154       145852 :         WAL_INGEST.records_received.inc();
     155       145852 :         let prev_len = modification.len();
     156       145852 : 
     157       145852 :         modification.set_lsn(interpreted.end_lsn)?;
     158              : 
     159       145852 :         if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
     160              :             // Records of this type should always be preceded by a commit(), as they
     161              :             // rely on reading data pages back from the Timeline.
     162            0 :             assert!(!modification.has_dirty_data());
     163       145852 :         }
     164              : 
     165       145852 :         assert!(!self.checkpoint_modified);
     166       145852 :         if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
     167       145834 :             && self.checkpoint.update_next_xid(interpreted.xid)
     168            2 :         {
     169            2 :             self.checkpoint_modified = true;
     170       145850 :         }
     171              : 
     172       145852 :         failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
     173              : 
     174           66 :         match interpreted.metadata_record {
     175           12 :             Some(MetadataRecord::Heapam(rec)) => match rec {
     176           12 :                 HeapamRecord::ClearVmBits(clear_vm_bits) => {
     177           12 :                     self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     178            0 :                         .await?;
     179              :                 }
     180              :             },
     181            0 :             Some(MetadataRecord::Neonrmgr(rec)) => match rec {
     182            0 :                 NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
     183            0 :                     self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     184            0 :                         .await?;
     185              :                 }
     186              :             },
     187           16 :             Some(MetadataRecord::Smgr(rec)) => match rec {
     188           16 :                 SmgrRecord::Create(create) => {
     189           16 :                     self.ingest_xlog_smgr_create(create, modification, ctx)
     190           12 :                         .await?;
     191              :                 }
     192            0 :                 SmgrRecord::Truncate(truncate) => {
     193            0 :                     self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
     194            0 :                         .await?;
     195              :                 }
     196              :             },
     197            0 :             Some(MetadataRecord::Dbase(rec)) => match rec {
     198            0 :                 DbaseRecord::Create(create) => {
     199            0 :                     self.ingest_xlog_dbase_create(create, modification, ctx)
     200            0 :                         .await?;
     201              :                 }
     202            0 :                 DbaseRecord::Drop(drop) => {
     203            0 :                     self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
     204              :                 }
     205              :             },
     206            0 :             Some(MetadataRecord::Clog(rec)) => match rec {
     207            0 :                 ClogRecord::ZeroPage(zero_page) => {
     208            0 :                     self.ingest_clog_zero_page(zero_page, modification, ctx)
     209            0 :                         .await?;
     210              :                 }
     211            0 :                 ClogRecord::Truncate(truncate) => {
     212            0 :                     self.ingest_clog_truncate(truncate, modification, ctx)
     213            0 :                         .await?;
     214              :                 }
     215              :             },
     216            8 :             Some(MetadataRecord::Xact(rec)) => {
     217            8 :                 self.ingest_xact_record(rec, modification, ctx).await?;
     218              :             }
     219            0 :             Some(MetadataRecord::MultiXact(rec)) => match rec {
     220            0 :                 MultiXactRecord::ZeroPage(zero_page) => {
     221            0 :                     self.ingest_multixact_zero_page(zero_page, modification, ctx)
     222            0 :                         .await?;
     223              :                 }
     224            0 :                 MultiXactRecord::Create(create) => {
     225            0 :                     self.ingest_multixact_create(modification, &create)?;
     226              :                 }
     227            0 :                 MultiXactRecord::Truncate(truncate) => {
     228            0 :                     self.ingest_multixact_truncate(modification, &truncate, ctx)
     229            0 :                         .await?;
     230              :                 }
     231              :             },
     232            0 :             Some(MetadataRecord::Relmap(rec)) => match rec {
     233            0 :                 RelmapRecord::Update(update) => {
     234            0 :                     self.ingest_relmap_update(update, modification, ctx).await?;
     235              :                 }
     236              :             },
     237           30 :             Some(MetadataRecord::Xlog(rec)) => match rec {
     238           30 :                 XlogRecord::Raw(raw) => {
     239           30 :                     self.ingest_raw_xlog_record(raw, modification, ctx).await?;
     240              :                 }
     241              :             },
     242            0 :             Some(MetadataRecord::LogicalMessage(rec)) => match rec {
     243            0 :                 LogicalMessageRecord::Put(put) => {
     244            0 :                     self.ingest_logical_message_put(put, modification, ctx)
     245            0 :                         .await?;
     246              :                 }
     247              :                 #[cfg(feature = "testing")]
     248              :                 LogicalMessageRecord::Failpoint => {
     249              :                     // This is a convenient way to make the WAL ingestion pause at
     250              :                     // particular point in the WAL. For more fine-grained control,
     251              :                     // we could peek into the message and only pause if it contains
     252              :                     // a particular string, for example, but this is enough for now.
     253            0 :                     failpoint_support::sleep_millis_async!(
     254            0 :                         "pageserver-wal-ingest-logical-message-sleep"
     255            0 :                     );
     256              :                 }
     257              :             },
     258            0 :             Some(MetadataRecord::Standby(rec)) => {
     259            0 :                 self.ingest_standby_record(rec).unwrap();
     260            0 :             }
     261            0 :             Some(MetadataRecord::Replorigin(rec)) => {
     262            0 :                 self.ingest_replorigin_record(rec, modification).await?;
     263              :             }
     264       145786 :             None => {
     265       145786 :                 // There are two cases through which we end up here:
     266       145786 :                 // 1. The resource manager for the original PG WAL record
     267       145786 :                 //    is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
     268       145786 :                 //    record type within Neon.
     269       145786 :                 // 2. The resource manager id was unknown to
     270       145786 :                 //    [`wal_decoder::decoder::MetadataRecord::from_decoded`].
     271       145786 :                 // TODO(vlad): Tighten this up more once we build confidence
     272       145786 :                 // that case (2) does not happen in the field.
     273       145786 :             }
     274              :         }
     275              : 
     276       145852 :         modification
     277       145852 :             .ingest_batch(interpreted.batch, &self.shard, ctx)
     278          284 :             .await?;
     279              : 
     280              :         // If checkpoint data was updated, store the new version in the repository
     281       145852 :         if self.checkpoint_modified {
     282            6 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     283              : 
     284            6 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     285            6 :             self.checkpoint_modified = false;
     286       145846 :         }
     287              : 
     288              :         // Note that at this point this record is only cached in the modification
     289              :         // until commit() is called to flush the data into the repository and update
     290              :         // the latest LSN.
     291              : 
     292       145852 :         Ok(modification.len() > prev_len)
     293       145852 :     }
     294              : 
     295              :     /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
     296            0 :     fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
     297            0 :         let next_full_xid =
     298            0 :             enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
     299              : 
     300            0 :         let next_xid = (next_full_xid) as u32;
     301            0 :         let mut epoch = (next_full_xid >> 32) as u32;
     302            0 : 
     303            0 :         if xid > next_xid {
     304              :             // Wraparound occurred, must be from a prev epoch.
     305            0 :             if epoch == 0 {
     306            0 :                 bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
     307            0 :             }
     308            0 :             epoch -= 1;
     309            0 :         }
     310              : 
     311            0 :         Ok((epoch as u64) << 32 | xid as u64)
     312            0 :     }
     313              : 
     314           12 :     async fn ingest_clear_vm_bits(
     315           12 :         &mut self,
     316           12 :         clear_vm_bits: ClearVmBits,
     317           12 :         modification: &mut DatadirModification<'_>,
     318           12 :         ctx: &RequestContext,
     319           12 :     ) -> anyhow::Result<()> {
     320           12 :         let ClearVmBits {
     321           12 :             new_heap_blkno,
     322           12 :             old_heap_blkno,
     323           12 :             flags,
     324           12 :             vm_rel,
     325           12 :         } = clear_vm_bits;
     326           12 :         // Clear the VM bits if required.
     327           12 :         let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     328           12 :         let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     329              : 
     330              :         // Sometimes, Postgres seems to create heap WAL records with the
     331              :         // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     332              :         // not set. In fact, it's possible that the VM page does not exist at all.
     333              :         // In that case, we don't want to store a record to clear the VM bit;
     334              :         // replaying it would fail to find the previous image of the page, because
     335              :         // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     336              :         // record if it doesn't.
     337           12 :         let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     338           12 :         if let Some(blknum) = new_vm_blk {
     339           12 :             if blknum >= vm_size {
     340            0 :                 new_vm_blk = None;
     341           12 :             }
     342            0 :         }
     343           12 :         if let Some(blknum) = old_vm_blk {
     344            0 :             if blknum >= vm_size {
     345            0 :                 old_vm_blk = None;
     346            0 :             }
     347           12 :         }
     348              : 
     349           12 :         if new_vm_blk.is_some() || old_vm_blk.is_some() {
     350           12 :             if new_vm_blk == old_vm_blk {
     351              :                 // An UPDATE record that needs to clear the bits for both old and the
     352              :                 // new page, both of which reside on the same VM page.
     353            0 :                 self.put_rel_wal_record(
     354            0 :                     modification,
     355            0 :                     vm_rel,
     356            0 :                     new_vm_blk.unwrap(),
     357            0 :                     NeonWalRecord::ClearVisibilityMapFlags {
     358            0 :                         new_heap_blkno,
     359            0 :                         old_heap_blkno,
     360            0 :                         flags,
     361            0 :                     },
     362            0 :                     ctx,
     363            0 :                 )
     364            0 :                 .await?;
     365              :             } else {
     366              :                 // Clear VM bits for one heap page, or for two pages that reside on
     367              :                 // different VM pages.
     368           12 :                 if let Some(new_vm_blk) = new_vm_blk {
     369           12 :                     self.put_rel_wal_record(
     370           12 :                         modification,
     371           12 :                         vm_rel,
     372           12 :                         new_vm_blk,
     373           12 :                         NeonWalRecord::ClearVisibilityMapFlags {
     374           12 :                             new_heap_blkno,
     375           12 :                             old_heap_blkno: None,
     376           12 :                             flags,
     377           12 :                         },
     378           12 :                         ctx,
     379           12 :                     )
     380            0 :                     .await?;
     381            0 :                 }
     382           12 :                 if let Some(old_vm_blk) = old_vm_blk {
     383            0 :                     self.put_rel_wal_record(
     384            0 :                         modification,
     385            0 :                         vm_rel,
     386            0 :                         old_vm_blk,
     387            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     388            0 :                             new_heap_blkno: None,
     389            0 :                             old_heap_blkno,
     390            0 :                             flags,
     391            0 :                         },
     392            0 :                         ctx,
     393            0 :                     )
     394            0 :                     .await?;
     395           12 :                 }
     396              :             }
     397            0 :         }
     398              : 
     399           12 :         Ok(())
     400           12 :     }
     401              : 
     402              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
     403            0 :     async fn ingest_xlog_dbase_create(
     404            0 :         &mut self,
     405            0 :         create: DbaseCreate,
     406            0 :         modification: &mut DatadirModification<'_>,
     407            0 :         ctx: &RequestContext,
     408            0 :     ) -> anyhow::Result<()> {
     409            0 :         let DbaseCreate {
     410            0 :             db_id,
     411            0 :             tablespace_id,
     412            0 :             src_db_id,
     413            0 :             src_tablespace_id,
     414            0 :         } = create;
     415              : 
     416            0 :         let rels = modification
     417            0 :             .tline
     418            0 :             .list_rels(
     419            0 :                 src_tablespace_id,
     420            0 :                 src_db_id,
     421            0 :                 Version::Modified(modification),
     422            0 :                 ctx,
     423            0 :             )
     424            0 :             .await?;
     425              : 
     426            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
     427              : 
     428              :         // Copy relfilemap
     429            0 :         let filemap = modification
     430            0 :             .tline
     431            0 :             .get_relmap_file(
     432            0 :                 src_tablespace_id,
     433            0 :                 src_db_id,
     434            0 :                 Version::Modified(modification),
     435            0 :                 ctx,
     436            0 :             )
     437            0 :             .await?;
     438            0 :         modification
     439            0 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
     440            0 :             .await?;
     441              : 
     442            0 :         let mut num_rels_copied = 0;
     443            0 :         let mut num_blocks_copied = 0;
     444            0 :         for src_rel in rels {
     445            0 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
     446            0 :             assert_eq!(src_rel.dbnode, src_db_id);
     447              : 
     448            0 :             let nblocks = modification
     449            0 :                 .tline
     450            0 :                 .get_rel_size(src_rel, Version::Modified(modification), ctx)
     451            0 :                 .await?;
     452            0 :             let dst_rel = RelTag {
     453            0 :                 spcnode: tablespace_id,
     454            0 :                 dbnode: db_id,
     455            0 :                 relnode: src_rel.relnode,
     456            0 :                 forknum: src_rel.forknum,
     457            0 :             };
     458            0 : 
     459            0 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
     460              : 
     461              :             // Copy content
     462            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
     463            0 :             for blknum in 0..nblocks {
     464              :                 // Sharding:
     465              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
     466              :                 //    dbNode is not included in the hash inputs for sharding.
     467              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
     468              :                 //    that belong to it.
     469            0 :                 let src_key = rel_block_to_key(src_rel, blknum);
     470            0 :                 if !self.shard.is_key_local(&src_key) {
     471            0 :                     debug!(
     472            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
     473              :                         src_key
     474              :                     );
     475            0 :                     continue;
     476            0 :                 }
     477            0 :                 debug!(
     478            0 :                     "copying block {} from {} ({}) to {}",
     479              :                     blknum, src_rel, src_key, dst_rel
     480              :                 );
     481              : 
     482            0 :                 let content = modification
     483            0 :                     .tline
     484            0 :                     .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
     485            0 :                     .await?;
     486            0 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
     487            0 :                 num_blocks_copied += 1;
     488              :             }
     489              : 
     490            0 :             num_rels_copied += 1;
     491              :         }
     492              : 
     493            0 :         info!(
     494            0 :             "Created database {}/{}, copied {} blocks in {} rels",
     495              :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
     496              :         );
     497            0 :         Ok(())
     498            0 :     }
     499              : 
     500            0 :     async fn ingest_xlog_dbase_drop(
     501            0 :         &mut self,
     502            0 :         dbase_drop: DbaseDrop,
     503            0 :         modification: &mut DatadirModification<'_>,
     504            0 :         ctx: &RequestContext,
     505            0 :     ) -> anyhow::Result<()> {
     506            0 :         let DbaseDrop {
     507            0 :             db_id,
     508            0 :             tablespace_ids,
     509            0 :         } = dbase_drop;
     510            0 :         for tablespace_id in tablespace_ids {
     511            0 :             trace!("Drop db {}, {}", tablespace_id, db_id);
     512            0 :             modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
     513              :         }
     514              : 
     515            0 :         Ok(())
     516            0 :     }
     517              : 
     518           16 :     async fn ingest_xlog_smgr_create(
     519           16 :         &mut self,
     520           16 :         create: SmgrCreate,
     521           16 :         modification: &mut DatadirModification<'_>,
     522           16 :         ctx: &RequestContext,
     523           16 :     ) -> anyhow::Result<()> {
     524           16 :         let SmgrCreate { rel } = create;
     525           16 :         self.put_rel_creation(modification, rel, ctx).await?;
     526           16 :         Ok(())
     527           16 :     }
     528              : 
     529              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
     530              :     ///
     531              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
     532            0 :     async fn ingest_xlog_smgr_truncate(
     533            0 :         &mut self,
     534            0 :         truncate: XlSmgrTruncate,
     535            0 :         modification: &mut DatadirModification<'_>,
     536            0 :         ctx: &RequestContext,
     537            0 :     ) -> anyhow::Result<()> {
     538            0 :         let XlSmgrTruncate {
     539            0 :             blkno,
     540            0 :             rnode,
     541            0 :             flags,
     542            0 :         } = truncate;
     543            0 : 
     544            0 :         let spcnode = rnode.spcnode;
     545            0 :         let dbnode = rnode.dbnode;
     546            0 :         let relnode = rnode.relnode;
     547            0 : 
     548            0 :         if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
     549            0 :             let rel = RelTag {
     550            0 :                 spcnode,
     551            0 :                 dbnode,
     552            0 :                 relnode,
     553            0 :                 forknum: MAIN_FORKNUM,
     554            0 :             };
     555            0 : 
     556            0 :             self.put_rel_truncation(modification, rel, blkno, ctx)
     557            0 :                 .await?;
     558            0 :         }
     559            0 :         if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
     560            0 :             let rel = RelTag {
     561            0 :                 spcnode,
     562            0 :                 dbnode,
     563            0 :                 relnode,
     564            0 :                 forknum: FSM_FORKNUM,
     565            0 :             };
     566            0 : 
     567            0 :             let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
     568            0 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
     569            0 :             if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
     570              :                 // Tail of last remaining FSM page has to be zeroed.
     571              :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
     572            0 :                 modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
     573            0 :                 fsm_physical_page_no += 1;
     574            0 :             }
     575            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
     576            0 :             if nblocks > fsm_physical_page_no {
     577              :                 // check if something to do: FSM is larger than truncate position
     578            0 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
     579            0 :                     .await?;
     580            0 :             }
     581            0 :         }
     582            0 :         if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
     583            0 :             let rel = RelTag {
     584            0 :                 spcnode,
     585            0 :                 dbnode,
     586            0 :                 relnode,
     587            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     588            0 :             };
     589            0 : 
     590            0 :             let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
     591            0 :             if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
     592              :                 // Tail of last remaining vm page has to be zeroed.
     593              :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
     594            0 :                 modification.put_rel_page_image_zero(rel, vm_page_no)?;
     595            0 :                 vm_page_no += 1;
     596            0 :             }
     597            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
     598            0 :             if nblocks > vm_page_no {
     599              :                 // check if something to do: VM is larger than truncate position
     600            0 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
     601            0 :                     .await?;
     602            0 :             }
     603            0 :         }
     604            0 :         Ok(())
     605            0 :     }
     606              : 
     607            8 :     fn warn_on_ingest_lag(
     608            8 :         &mut self,
     609            8 :         conf: &crate::config::PageServerConf,
     610            8 :         wal_timestamp: TimestampTz,
     611            8 :     ) {
     612            8 :         debug_assert_current_span_has_tenant_and_timeline_id();
     613            8 :         let now = SystemTime::now();
     614            8 :         let rate_limits = &mut self.warn_ingest_lag;
     615              : 
     616            8 :         let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
     617            0 :             pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
     618              :         });
     619              : 
     620            8 :         match ts {
     621            8 :             Ok(ts) => {
     622            8 :                 match now.duration_since(ts) {
     623            8 :                     Ok(lag) => {
     624            8 :                         if lag > conf.wait_lsn_timeout {
     625            8 :                             rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
     626            2 :                                 if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
     627            0 :                                     if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
     628            0 :                                         return;
     629            0 :                                     }
     630            2 :                                 } else {
     631            2 :                                     // Still loading? We shouldn't be here
     632            2 :                                 }
     633            2 :                                 let lag = humantime::format_duration(lag);
     634            2 :                                 warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
     635            8 :                             })
     636            0 :                         }
     637              :                     }
     638            0 :                     Err(e) => {
     639            0 :                         let delta_t = e.duration();
     640              :                         // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
     641              :                         // => https://www.robustperception.io/time-metric-from-the-node-exporter/
     642              :                         const IGNORED_DRIFT: Duration = Duration::from_millis(100);
     643            0 :                         if delta_t > IGNORED_DRIFT {
     644            0 :                             let delta_t = humantime::format_duration(delta_t);
     645            0 :                             rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
     646            0 :                                 warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
     647            0 :                             })
     648            0 :                         }
     649              :                     }
     650              :                 };
     651              :             }
     652            0 :             Err(error) => {
     653            0 :                 rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
     654            0 :                     warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
     655            0 :                 })
     656              :             }
     657              :         }
     658            8 :     }
     659              : 
     660              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
     661              :     ///
     662            8 :     async fn ingest_xact_record(
     663            8 :         &mut self,
     664            8 :         record: XactRecord,
     665            8 :         modification: &mut DatadirModification<'_>,
     666            8 :         ctx: &RequestContext,
     667            8 :     ) -> anyhow::Result<()> {
     668            8 :         let (xact_common, is_commit, is_prepared) = match record {
     669            0 :             XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
     670            0 :                 let xid: u64 = if modification.tline.pg_version >= 17 {
     671            0 :                     self.adjust_to_full_transaction_id(xl_xid)?
     672              :                 } else {
     673            0 :                     xl_xid as u64
     674              :                 };
     675            0 :                 return modification.put_twophase_file(xid, data, ctx).await;
     676              :             }
     677            8 :             XactRecord::Commit(common) => (common, true, false),
     678            0 :             XactRecord::Abort(common) => (common, false, false),
     679            0 :             XactRecord::CommitPrepared(common) => (common, true, true),
     680            0 :             XactRecord::AbortPrepared(common) => (common, false, true),
     681              :         };
     682              : 
     683              :         let XactCommon {
     684            8 :             parsed,
     685            8 :             origin_id,
     686            8 :             xl_xid,
     687            8 :             lsn,
     688            8 :         } = xact_common;
     689            8 : 
     690            8 :         // Record update of CLOG pages
     691            8 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
     692            8 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     693            8 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     694            8 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
     695            8 : 
     696            8 :         self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
     697              : 
     698            8 :         for subxact in &parsed.subxacts {
     699            0 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
     700            0 :             if subxact_pageno != pageno {
     701              :                 // This subxact goes to different page. Write the record
     702              :                 // for all the XIDs on the previous page, and continue
     703              :                 // accumulating XIDs on this new page.
     704            0 :                 modification.put_slru_wal_record(
     705            0 :                     SlruKind::Clog,
     706            0 :                     segno,
     707            0 :                     rpageno,
     708            0 :                     if is_commit {
     709            0 :                         NeonWalRecord::ClogSetCommitted {
     710            0 :                             xids: page_xids,
     711            0 :                             timestamp: parsed.xact_time,
     712            0 :                         }
     713              :                     } else {
     714            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
     715              :                     },
     716            0 :                 )?;
     717            0 :                 page_xids = Vec::new();
     718            0 :             }
     719            0 :             pageno = subxact_pageno;
     720            0 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     721            0 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     722            0 :             page_xids.push(*subxact);
     723              :         }
     724            8 :         modification.put_slru_wal_record(
     725            8 :             SlruKind::Clog,
     726            8 :             segno,
     727            8 :             rpageno,
     728            8 :             if is_commit {
     729            8 :                 NeonWalRecord::ClogSetCommitted {
     730            8 :                     xids: page_xids,
     731            8 :                     timestamp: parsed.xact_time,
     732            8 :                 }
     733              :             } else {
     734            0 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
     735              :             },
     736            0 :         )?;
     737              : 
     738              :         // Group relations to drop by dbNode.  This map will contain all relations that _might_
     739              :         // exist, we will reduce it to which ones really exist later.  This map can be huge if
     740              :         // the transaction touches a huge number of relations (there is no bound on this in
     741              :         // postgres).
     742            8 :         let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
     743              : 
     744            8 :         for xnode in &parsed.xnodes {
     745            0 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
     746            0 :                 let rel = RelTag {
     747            0 :                     forknum,
     748            0 :                     spcnode: xnode.spcnode,
     749            0 :                     dbnode: xnode.dbnode,
     750            0 :                     relnode: xnode.relnode,
     751            0 :                 };
     752            0 :                 drop_relations
     753            0 :                     .entry((xnode.spcnode, xnode.dbnode))
     754            0 :                     .or_default()
     755            0 :                     .push(rel);
     756            0 :             }
     757              :         }
     758              : 
     759              :         // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
     760            8 :         modification.put_rel_drops(drop_relations, ctx).await?;
     761              : 
     762            8 :         if origin_id != 0 {
     763            0 :             modification
     764            0 :                 .set_replorigin(origin_id, parsed.origin_lsn)
     765            0 :                 .await?;
     766            8 :         }
     767              : 
     768            8 :         if is_prepared {
     769              :             // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     770            0 :             trace!(
     771            0 :                 "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     772              :                 xl_xid,
     773              :                 parsed.xid,
     774              :                 lsn,
     775              :             );
     776              : 
     777            0 :             let xid: u64 = if modification.tline.pg_version >= 17 {
     778            0 :                 self.adjust_to_full_transaction_id(parsed.xid)?
     779              :             } else {
     780            0 :                 parsed.xid as u64
     781              :             };
     782            0 :             modification.drop_twophase_file(xid, ctx).await?;
     783            8 :         }
     784              : 
     785            8 :         Ok(())
     786            8 :     }
     787              : 
     788            0 :     async fn ingest_clog_truncate(
     789            0 :         &mut self,
     790            0 :         truncate: ClogTruncate,
     791            0 :         modification: &mut DatadirModification<'_>,
     792            0 :         ctx: &RequestContext,
     793            0 :     ) -> anyhow::Result<()> {
     794            0 :         let ClogTruncate {
     795            0 :             pageno,
     796            0 :             oldest_xid,
     797            0 :             oldest_xid_db,
     798            0 :         } = truncate;
     799            0 : 
     800            0 :         info!(
     801            0 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
     802              :             pageno, oldest_xid, oldest_xid_db
     803              :         );
     804              : 
     805              :         // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
     806              :         // truncated, but a checkpoint record with the updated values isn't written until
     807              :         // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
     808              :         // so we keep the oldestXid and oldestXidDB up-to-date.
     809            0 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
     810            0 :             cp.oldestXid = oldest_xid;
     811            0 :             cp.oldestXidDB = oldest_xid_db;
     812            0 :         });
     813            0 :         self.checkpoint_modified = true;
     814              : 
     815              :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
     816              : 
     817            0 :         let latest_page_number =
     818            0 :             enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
     819              :                 / pg_constants::CLOG_XACTS_PER_PAGE;
     820              : 
     821              :         // Now delete all segments containing pages between xlrec.pageno
     822              :         // and latest_page_number.
     823              : 
     824              :         // First, make an important safety check:
     825              :         // the current endpoint page must not be eligible for removal.
     826              :         // See SimpleLruTruncate() in slru.c
     827            0 :         if dispatch_pgversion!(modification.tline.pg_version, {
     828            0 :             pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
     829              :         }) {
     830            0 :             info!("could not truncate directory pg_xact apparent wraparound");
     831            0 :             return Ok(());
     832            0 :         }
     833              : 
     834              :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
     835              :         //
     836              :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
     837              :         // will block waiting for the last valid LSN to advance up to
     838              :         // it. So we use the previous record's LSN in the get calls
     839              :         // instead.
     840            0 :         for segno in modification
     841            0 :             .tline
     842            0 :             .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
     843            0 :             .await?
     844              :         {
     845            0 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
     846              : 
     847            0 :             let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
     848            0 :                 pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
     849              :             });
     850              : 
     851            0 :             if may_delete {
     852            0 :                 modification
     853            0 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
     854            0 :                     .await?;
     855            0 :                 trace!("Drop CLOG segment {:>04X}", segno);
     856            0 :             }
     857              :         }
     858              : 
     859            0 :         Ok(())
     860            0 :     }
     861              : 
     862            0 :     async fn ingest_clog_zero_page(
     863            0 :         &mut self,
     864            0 :         zero_page: ClogZeroPage,
     865            0 :         modification: &mut DatadirModification<'_>,
     866            0 :         ctx: &RequestContext,
     867            0 :     ) -> anyhow::Result<()> {
     868            0 :         let ClogZeroPage { segno, rpageno } = zero_page;
     869            0 : 
     870            0 :         self.put_slru_page_image(
     871            0 :             modification,
     872            0 :             SlruKind::Clog,
     873            0 :             segno,
     874            0 :             rpageno,
     875            0 :             ZERO_PAGE.clone(),
     876            0 :             ctx,
     877            0 :         )
     878            0 :         .await
     879            0 :     }
     880              : 
     881            0 :     fn ingest_multixact_create(
     882            0 :         &mut self,
     883            0 :         modification: &mut DatadirModification,
     884            0 :         xlrec: &XlMultiXactCreate,
     885            0 :     ) -> Result<()> {
     886            0 :         // Create WAL record for updating the multixact-offsets page
     887            0 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     888            0 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     889            0 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     890            0 : 
     891            0 :         modification.put_slru_wal_record(
     892            0 :             SlruKind::MultiXactOffsets,
     893            0 :             segno,
     894            0 :             rpageno,
     895            0 :             NeonWalRecord::MultixactOffsetCreate {
     896            0 :                 mid: xlrec.mid,
     897            0 :                 moff: xlrec.moff,
     898            0 :             },
     899            0 :         )?;
     900              : 
     901              :         // Create WAL records for the update of each affected multixact-members page
     902            0 :         let mut members = xlrec.members.iter();
     903            0 :         let mut offset = xlrec.moff;
     904              :         loop {
     905            0 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     906            0 : 
     907            0 :             // How many members fit on this page?
     908            0 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
     909            0 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     910            0 : 
     911            0 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
     912            0 :             for _ in 0..page_remain {
     913            0 :                 if let Some(m) = members.next() {
     914            0 :                     this_page_members.push(m.clone());
     915            0 :                 } else {
     916            0 :                     break;
     917              :                 }
     918              :             }
     919            0 :             if this_page_members.is_empty() {
     920              :                 // all done
     921            0 :                 break;
     922            0 :             }
     923            0 :             let n_this_page = this_page_members.len();
     924            0 : 
     925            0 :             modification.put_slru_wal_record(
     926            0 :                 SlruKind::MultiXactMembers,
     927            0 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
     928            0 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
     929            0 :                 NeonWalRecord::MultixactMembersCreate {
     930            0 :                     moff: offset,
     931            0 :                     members: this_page_members,
     932            0 :                 },
     933            0 :             )?;
     934              : 
     935              :             // Note: The multixact members can wrap around, even within one WAL record.
     936            0 :             offset = offset.wrapping_add(n_this_page as u32);
     937              :         }
     938            0 :         let next_offset = offset;
     939            0 :         assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
     940              : 
     941              :         // Update next-multi-xid and next-offset
     942              :         //
     943              :         // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
     944              :         // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
     945              :         // read it, like GetNewMultiXactId(). This is different from how nextXid is
     946              :         // incremented! nextXid skips over < FirstNormalTransactionId when the the value
     947              :         // is stored, so it's never 0 in a checkpoint.
     948              :         //
     949              :         // I don't know why it's done that way, it seems less error-prone to skip over 0
     950              :         // when the value is stored rather than when it's read. But let's do it the same
     951              :         // way here.
     952            0 :         let next_multi_xid = xlrec.mid.wrapping_add(1);
     953            0 : 
     954            0 :         if self
     955            0 :             .checkpoint
     956            0 :             .update_next_multixid(next_multi_xid, next_offset)
     957            0 :         {
     958            0 :             self.checkpoint_modified = true;
     959            0 :         }
     960              : 
     961              :         // Also update the next-xid with the highest member. According to the comments in
     962              :         // multixact_redo(), this shouldn't be necessary, but let's do the same here.
     963            0 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
     964            0 :             if let Some(max_xid) = acc {
     965            0 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
     966            0 :                     Some(mbr.xid)
     967              :                 } else {
     968            0 :                     acc
     969              :                 }
     970              :             } else {
     971            0 :                 Some(mbr.xid)
     972              :             }
     973            0 :         });
     974              : 
     975            0 :         if let Some(max_xid) = max_mbr_xid {
     976            0 :             if self.checkpoint.update_next_xid(max_xid) {
     977            0 :                 self.checkpoint_modified = true;
     978            0 :             }
     979            0 :         }
     980            0 :         Ok(())
     981            0 :     }
     982              : 
     983            0 :     async fn ingest_multixact_truncate(
     984            0 :         &mut self,
     985            0 :         modification: &mut DatadirModification<'_>,
     986            0 :         xlrec: &XlMultiXactTruncate,
     987            0 :         ctx: &RequestContext,
     988            0 :     ) -> Result<()> {
     989            0 :         let (maxsegment, startsegment, endsegment) =
     990            0 :             enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
     991            0 :                 cp.oldestMulti = xlrec.end_trunc_off;
     992            0 :                 cp.oldestMultiDB = xlrec.oldest_multi_db;
     993            0 :                 let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
     994            0 :                     pg_constants::MAX_MULTIXACT_OFFSET,
     995            0 :                 );
     996            0 :                 let startsegment: i32 =
     997            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
     998            0 :                 let endsegment: i32 =
     999            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1000            0 :                 (maxsegment, startsegment, endsegment)
    1001              :             });
    1002              : 
    1003            0 :         self.checkpoint_modified = true;
    1004            0 : 
    1005            0 :         // PerformMembersTruncation
    1006            0 :         let mut segment: i32 = startsegment;
    1007              : 
    1008              :         // Delete all the segments except the last one. The last segment can still
    1009              :         // contain, possibly partially, valid data.
    1010            0 :         while segment != endsegment {
    1011            0 :             modification
    1012            0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1013            0 :                 .await?;
    1014              : 
    1015              :             /* move to next segment, handling wraparound correctly */
    1016            0 :             if segment == maxsegment {
    1017            0 :                 segment = 0;
    1018            0 :             } else {
    1019            0 :                 segment += 1;
    1020            0 :             }
    1021              :         }
    1022              : 
    1023              :         // Truncate offsets
    1024              :         // FIXME: this did not handle wraparound correctly
    1025              : 
    1026            0 :         Ok(())
    1027            0 :     }
    1028              : 
    1029            0 :     async fn ingest_multixact_zero_page(
    1030            0 :         &mut self,
    1031            0 :         zero_page: MultiXactZeroPage,
    1032            0 :         modification: &mut DatadirModification<'_>,
    1033            0 :         ctx: &RequestContext,
    1034            0 :     ) -> Result<()> {
    1035            0 :         let MultiXactZeroPage {
    1036            0 :             slru_kind,
    1037            0 :             segno,
    1038            0 :             rpageno,
    1039            0 :         } = zero_page;
    1040            0 :         self.put_slru_page_image(
    1041            0 :             modification,
    1042            0 :             slru_kind,
    1043            0 :             segno,
    1044            0 :             rpageno,
    1045            0 :             ZERO_PAGE.clone(),
    1046            0 :             ctx,
    1047            0 :         )
    1048            0 :         .await
    1049            0 :     }
    1050              : 
    1051            0 :     async fn ingest_relmap_update(
    1052            0 :         &mut self,
    1053            0 :         update: RelmapUpdate,
    1054            0 :         modification: &mut DatadirModification<'_>,
    1055            0 :         ctx: &RequestContext,
    1056            0 :     ) -> Result<()> {
    1057            0 :         let RelmapUpdate { update, buf } = update;
    1058            0 : 
    1059            0 :         modification
    1060            0 :             .put_relmap_file(update.tsid, update.dbid, buf, ctx)
    1061            0 :             .await
    1062            0 :     }
    1063              : 
    1064           30 :     async fn ingest_raw_xlog_record(
    1065           30 :         &mut self,
    1066           30 :         raw_record: RawXlogRecord,
    1067           30 :         modification: &mut DatadirModification<'_>,
    1068           30 :         ctx: &RequestContext,
    1069           30 :     ) -> Result<()> {
    1070           30 :         let RawXlogRecord { info, lsn, mut buf } = raw_record;
    1071           30 :         let pg_version = modification.tline.pg_version;
    1072           30 : 
    1073           30 :         if info == pg_constants::XLOG_PARAMETER_CHANGE {
    1074            2 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    1075            0 :                 let rec = v17::XlParameterChange::decode(&mut buf);
    1076            0 :                 cp.wal_level = rec.wal_level;
    1077            0 :                 self.checkpoint_modified = true;
    1078            2 :             }
    1079           28 :         } else if info == pg_constants::XLOG_END_OF_RECOVERY {
    1080            0 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    1081            0 :                 let rec = v17::XlEndOfRecovery::decode(&mut buf);
    1082            0 :                 cp.wal_level = rec.wal_level;
    1083            0 :                 self.checkpoint_modified = true;
    1084            0 :             }
    1085           28 :         }
    1086              : 
    1087           30 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1088            0 :             if info == pg_constants::XLOG_NEXTOID {
    1089            0 :                 let next_oid = buf.get_u32_le();
    1090            0 :                 if cp.nextOid != next_oid {
    1091            0 :                     cp.nextOid = next_oid;
    1092            0 :                     self.checkpoint_modified = true;
    1093            0 :                 }
    1094            0 :             } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
    1095            0 :                 || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    1096              :             {
    1097            0 :                 let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
    1098            0 :                 buf.copy_to_slice(&mut checkpoint_bytes);
    1099            0 :                 let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
    1100            0 :                 trace!(
    1101            0 :                     "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
    1102              :                     xlog_checkpoint.oldestXid,
    1103              :                     cp.oldestXid
    1104              :                 );
    1105            0 :                 if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
    1106            0 :                     cp.oldestXid = xlog_checkpoint.oldestXid;
    1107            0 :                 }
    1108            0 :                 trace!(
    1109            0 :                     "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
    1110              :                     xlog_checkpoint.oldestActiveXid,
    1111              :                     cp.oldestActiveXid
    1112              :                 );
    1113              : 
    1114              :                 // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
    1115              :                 // because at shutdown, all in-progress transactions will implicitly
    1116              :                 // end. Postgres startup code knows that, and allows hot standby to start
    1117              :                 // immediately from a shutdown checkpoint.
    1118              :                 //
    1119              :                 // In Neon, Postgres hot standby startup always behaves as if starting from
    1120              :                 // an online checkpoint. It needs a valid `oldestActiveXid` value, so
    1121              :                 // instead of overwriting self.checkpoint.oldestActiveXid with
    1122              :                 // InvalidTransactionid from the checkpoint WAL record, update it to a
    1123              :                 // proper value, knowing that there are no in-progress transactions at this
    1124              :                 // point, except for prepared transactions.
    1125              :                 //
    1126              :                 // See also the neon code changes in the InitWalRecovery() function.
    1127            0 :                 if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
    1128            0 :                     && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    1129              :                 {
    1130            0 :                     let oldest_active_xid = if pg_version >= 17 {
    1131            0 :                         let mut oldest_active_full_xid = cp.nextXid.value;
    1132            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    1133            0 :                             if xid < oldest_active_full_xid {
    1134            0 :                                 oldest_active_full_xid = xid;
    1135            0 :                             }
    1136              :                         }
    1137            0 :                         oldest_active_full_xid as u32
    1138              :                     } else {
    1139            0 :                         let mut oldest_active_xid = cp.nextXid.value as u32;
    1140            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    1141            0 :                             let narrow_xid = xid as u32;
    1142            0 :                             if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
    1143            0 :                                 oldest_active_xid = narrow_xid;
    1144            0 :                             }
    1145              :                         }
    1146            0 :                         oldest_active_xid
    1147              :                     };
    1148            0 :                     cp.oldestActiveXid = oldest_active_xid;
    1149            0 :                 } else {
    1150            0 :                     cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
    1151            0 :                 }
    1152              : 
    1153              :                 // Write a new checkpoint key-value pair on every checkpoint record, even
    1154              :                 // if nothing really changed. Not strictly required, but it seems nice to
    1155              :                 // have some trace of the checkpoint records in the layer files at the same
    1156              :                 // LSNs.
    1157            0 :                 self.checkpoint_modified = true;
    1158            0 :             }
    1159              :         });
    1160              : 
    1161           30 :         Ok(())
    1162           30 :     }
    1163              : 
    1164            0 :     async fn ingest_logical_message_put(
    1165            0 :         &mut self,
    1166            0 :         put: PutLogicalMessage,
    1167            0 :         modification: &mut DatadirModification<'_>,
    1168            0 :         ctx: &RequestContext,
    1169            0 :     ) -> Result<()> {
    1170            0 :         let PutLogicalMessage { path, buf } = put;
    1171            0 :         modification.put_file(path.as_str(), &buf, ctx).await
    1172            0 :     }
    1173              : 
    1174            0 :     fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
    1175            0 :         match record {
    1176            0 :             StandbyRecord::RunningXacts(running_xacts) => {
    1177            0 :                 enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1178            0 :                     cp.oldestActiveXid = running_xacts.oldest_running_xid;
    1179            0 :                 });
    1180              : 
    1181            0 :                 self.checkpoint_modified = true;
    1182            0 :             }
    1183            0 :         }
    1184            0 : 
    1185            0 :         Ok(())
    1186            0 :     }
    1187              : 
    1188            0 :     async fn ingest_replorigin_record(
    1189            0 :         &mut self,
    1190            0 :         record: ReploriginRecord,
    1191            0 :         modification: &mut DatadirModification<'_>,
    1192            0 :     ) -> Result<()> {
    1193            0 :         match record {
    1194            0 :             ReploriginRecord::Set(set) => {
    1195            0 :                 modification
    1196            0 :                     .set_replorigin(set.node_id, set.remote_lsn)
    1197            0 :                     .await?;
    1198              :             }
    1199            0 :             ReploriginRecord::Drop(drop) => {
    1200            0 :                 modification.drop_replorigin(drop.node_id).await?;
    1201              :             }
    1202              :         }
    1203              : 
    1204            0 :         Ok(())
    1205            0 :     }
    1206              : 
    1207           18 :     async fn put_rel_creation(
    1208           18 :         &mut self,
    1209           18 :         modification: &mut DatadirModification<'_>,
    1210           18 :         rel: RelTag,
    1211           18 :         ctx: &RequestContext,
    1212           18 :     ) -> Result<()> {
    1213           18 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1214           18 :         Ok(())
    1215           18 :     }
    1216              : 
    1217              :     #[cfg(test)]
    1218       272402 :     async fn put_rel_page_image(
    1219       272402 :         &mut self,
    1220       272402 :         modification: &mut DatadirModification<'_>,
    1221       272402 :         rel: RelTag,
    1222       272402 :         blknum: BlockNumber,
    1223       272402 :         img: Bytes,
    1224       272402 :         ctx: &RequestContext,
    1225       272402 :     ) -> Result<(), PageReconstructError> {
    1226       272402 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1227         5420 :             .await?;
    1228       272402 :         modification.put_rel_page_image(rel, blknum, img)?;
    1229       272402 :         Ok(())
    1230       272402 :     }
    1231              : 
    1232           12 :     async fn put_rel_wal_record(
    1233           12 :         &mut self,
    1234           12 :         modification: &mut DatadirModification<'_>,
    1235           12 :         rel: RelTag,
    1236           12 :         blknum: BlockNumber,
    1237           12 :         rec: NeonWalRecord,
    1238           12 :         ctx: &RequestContext,
    1239           12 :     ) -> Result<()> {
    1240           12 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1241            0 :             .await?;
    1242           12 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1243           12 :         Ok(())
    1244           12 :     }
    1245              : 
    1246         6012 :     async fn put_rel_truncation(
    1247         6012 :         &mut self,
    1248         6012 :         modification: &mut DatadirModification<'_>,
    1249         6012 :         rel: RelTag,
    1250         6012 :         nblocks: BlockNumber,
    1251         6012 :         ctx: &RequestContext,
    1252         6012 :     ) -> anyhow::Result<()> {
    1253         6012 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1254         6012 :         Ok(())
    1255         6012 :     }
    1256              : 
    1257       272414 :     async fn handle_rel_extend(
    1258       272414 :         &mut self,
    1259       272414 :         modification: &mut DatadirModification<'_>,
    1260       272414 :         rel: RelTag,
    1261       272414 :         blknum: BlockNumber,
    1262       272414 :         ctx: &RequestContext,
    1263       272414 :     ) -> Result<(), PageReconstructError> {
    1264       272414 :         let new_nblocks = blknum + 1;
    1265              :         // Check if the relation exists. We implicitly create relations on first
    1266              :         // record.
    1267       272414 :         let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
    1268              : 
    1269       272414 :         if new_nblocks > old_nblocks {
    1270              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1271       272398 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1272              : 
    1273       272398 :             let mut key = rel_block_to_key(rel, blknum);
    1274       272398 : 
    1275       272398 :             // fill the gap with zeros
    1276       272398 :             let mut gap_blocks_filled: u64 = 0;
    1277       272398 :             for gap_blknum in old_nblocks..blknum {
    1278         2998 :                 key.field6 = gap_blknum;
    1279         2998 : 
    1280         2998 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1281            0 :                     continue;
    1282         2998 :                 }
    1283         2998 : 
    1284         2998 :                 modification.put_rel_page_image_zero(rel, gap_blknum)?;
    1285         2998 :                 gap_blocks_filled += 1;
    1286              :             }
    1287              : 
    1288       272398 :             WAL_INGEST
    1289       272398 :                 .gap_blocks_zeroed_on_rel_extend
    1290       272398 :                 .inc_by(gap_blocks_filled);
    1291       272398 : 
    1292       272398 :             // Log something when relation extends cause use to fill gaps
    1293       272398 :             // with zero pages. Logging is rate limited per pg version to
    1294       272398 :             // avoid skewing.
    1295       272398 :             if gap_blocks_filled > 0 {
    1296              :                 use once_cell::sync::Lazy;
    1297              :                 use std::sync::Mutex;
    1298              :                 use utils::rate_limit::RateLimit;
    1299              : 
    1300              :                 struct RateLimitPerPgVersion {
    1301              :                     rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
    1302              :                 }
    1303              : 
    1304              :                 impl RateLimitPerPgVersion {
    1305            0 :                     const fn new() -> Self {
    1306            0 :                         Self {
    1307            0 :                             rate_limiters: [const {
    1308            2 :                                 Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
    1309            0 :                             }; 4],
    1310            0 :                         }
    1311            0 :                     }
    1312              : 
    1313            4 :                     const fn rate_limiter(
    1314            4 :                         &self,
    1315            4 :                         pg_version: u32,
    1316            4 :                     ) -> Option<&Lazy<Mutex<RateLimit>>> {
    1317              :                         const MIN_PG_VERSION: u32 = 14;
    1318              :                         const MAX_PG_VERSION: u32 = 17;
    1319              : 
    1320            4 :                         if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
    1321            0 :                             return None;
    1322            4 :                         }
    1323            4 : 
    1324            4 :                         Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
    1325            4 :                     }
    1326              :                 }
    1327              : 
    1328              :                 static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
    1329            4 :                 if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
    1330            4 :                     if let Ok(mut locked) = rate_limiter.try_lock() {
    1331            4 :                         locked.call(|| {
    1332            2 :                             info!(
    1333            0 :                                 lsn=%modification.get_lsn(),
    1334            0 :                                 pg_version=%modification.tline.pg_version,
    1335            0 :                                 rel=%rel,
    1336            0 :                                 "Filled {} gap blocks on rel extend to {} from {}",
    1337              :                                 gap_blocks_filled,
    1338              :                                 new_nblocks,
    1339              :                                 old_nblocks);
    1340            4 :                         });
    1341            4 :                     }
    1342            0 :                 }
    1343       272394 :             }
    1344           16 :         }
    1345       272414 :         Ok(())
    1346       272414 :     }
    1347              : 
    1348            0 :     async fn put_slru_page_image(
    1349            0 :         &mut self,
    1350            0 :         modification: &mut DatadirModification<'_>,
    1351            0 :         kind: SlruKind,
    1352            0 :         segno: u32,
    1353            0 :         blknum: BlockNumber,
    1354            0 :         img: Bytes,
    1355            0 :         ctx: &RequestContext,
    1356            0 :     ) -> Result<()> {
    1357            0 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1358            0 :             .await?;
    1359            0 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1360            0 :         Ok(())
    1361            0 :     }
    1362              : 
    1363            0 :     async fn handle_slru_extend(
    1364            0 :         &mut self,
    1365            0 :         modification: &mut DatadirModification<'_>,
    1366            0 :         kind: SlruKind,
    1367            0 :         segno: u32,
    1368            0 :         blknum: BlockNumber,
    1369            0 :         ctx: &RequestContext,
    1370            0 :     ) -> anyhow::Result<()> {
    1371            0 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1372            0 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1373            0 :         // a lot less frequently.
    1374            0 : 
    1375            0 :         let new_nblocks = blknum + 1;
    1376              :         // Check if the relation exists. We implicitly create relations on first
    1377              :         // record.
    1378              :         // TODO: would be nice if to be more explicit about it
    1379            0 :         let old_nblocks = if !modification
    1380            0 :             .tline
    1381            0 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1382            0 :             .await?
    1383              :         {
    1384              :             // create it with 0 size initially, the logic below will extend it
    1385            0 :             modification
    1386            0 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1387            0 :                 .await?;
    1388            0 :             0
    1389              :         } else {
    1390            0 :             modification
    1391            0 :                 .tline
    1392            0 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1393            0 :                 .await?
    1394              :         };
    1395              : 
    1396            0 :         if new_nblocks > old_nblocks {
    1397            0 :             trace!(
    1398            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1399              :                 kind,
    1400              :                 segno,
    1401              :                 old_nblocks,
    1402              :                 new_nblocks
    1403              :             );
    1404            0 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1405              : 
    1406              :             // fill the gap with zeros
    1407            0 :             for gap_blknum in old_nblocks..blknum {
    1408            0 :                 modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
    1409              :             }
    1410            0 :         }
    1411            0 :         Ok(())
    1412            0 :     }
    1413              : }
    1414              : 
    1415           12 : async fn get_relsize(
    1416           12 :     modification: &DatadirModification<'_>,
    1417           12 :     rel: RelTag,
    1418           12 :     ctx: &RequestContext,
    1419           12 : ) -> Result<BlockNumber, PageReconstructError> {
    1420           12 :     let nblocks = if !modification
    1421           12 :         .tline
    1422           12 :         .get_rel_exists(rel, Version::Modified(modification), ctx)
    1423            0 :         .await?
    1424              :     {
    1425            0 :         0
    1426              :     } else {
    1427           12 :         modification
    1428           12 :             .tline
    1429           12 :             .get_rel_size(rel, Version::Modified(modification), ctx)
    1430            0 :             .await?
    1431              :     };
    1432           12 :     Ok(nblocks)
    1433           12 : }
    1434              : 
    1435              : #[allow(clippy::bool_assert_comparison)]
    1436              : #[cfg(test)]
    1437              : mod tests {
    1438              :     use super::*;
    1439              :     use crate::tenant::harness::*;
    1440              :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    1441              :     use postgres_ffi::RELSEG_SIZE;
    1442              : 
    1443              :     use crate::DEFAULT_PG_VERSION;
    1444              : 
    1445              :     /// Arbitrary relation tag, for testing.
    1446              :     const TESTREL_A: RelTag = RelTag {
    1447              :         spcnode: 0,
    1448              :         dbnode: 111,
    1449              :         relnode: 1000,
    1450              :         forknum: 0,
    1451              :     };
    1452              : 
    1453           12 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1454           12 :         // TODO
    1455           12 :     }
    1456              : 
    1457              :     #[tokio::test]
    1458            2 :     async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
    1459            8 :         for i in 14..=16 {
    1460            6 :             dispatch_pgversion!(i, {
    1461            2 :                 pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
    1462            2 :             });
    1463            2 :         }
    1464            2 : 
    1465            2 :         Ok(())
    1466            2 :     }
    1467              : 
    1468            8 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    1469            8 :         let mut m = tline.begin_modification(Lsn(0x10));
    1470            8 :         m.put_checkpoint(dispatch_pgversion!(
    1471            8 :             tline.pg_version,
    1472            0 :             pgv::ZERO_CHECKPOINT.clone()
    1473            0 :         ))?;
    1474           16 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1475            8 :         m.commit(ctx).await?;
    1476            8 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1477              : 
    1478            8 :         Ok(walingest)
    1479            8 :     }
    1480              : 
    1481              :     #[tokio::test]
    1482            2 :     async fn test_relsize() -> Result<()> {
    1483           20 :         let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
    1484            2 :         let tline = tenant
    1485            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1486            6 :             .await?;
    1487            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1488            2 : 
    1489            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1490            2 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1491            2 :         walingest
    1492            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1493            2 :             .await?;
    1494            2 :         m.commit(&ctx).await?;
    1495            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    1496            2 :         walingest
    1497            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
    1498            2 :             .await?;
    1499            2 :         m.commit(&ctx).await?;
    1500            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    1501            2 :         walingest
    1502            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
    1503            2 :             .await?;
    1504            2 :         m.commit(&ctx).await?;
    1505            2 :         let mut m = tline.begin_modification(Lsn(0x50));
    1506            2 :         walingest
    1507            2 :             .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
    1508            2 :             .await?;
    1509            2 :         m.commit(&ctx).await?;
    1510            2 : 
    1511            2 :         assert_current_logical_size(&tline, Lsn(0x50));
    1512            2 : 
    1513            2 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1514            2 :         assert_eq!(
    1515            2 :             tline
    1516            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1517            2 :                 .await?,
    1518            2 :             false
    1519            2 :         );
    1520            2 :         assert!(tline
    1521            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1522            2 :             .await
    1523            2 :             .is_err());
    1524            2 :         assert_eq!(
    1525            2 :             tline
    1526            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1527            2 :                 .await?,
    1528            2 :             true
    1529            2 :         );
    1530            2 :         assert_eq!(
    1531            2 :             tline
    1532            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1533            2 :                 .await?,
    1534            2 :             1
    1535            2 :         );
    1536            2 :         assert_eq!(
    1537            2 :             tline
    1538            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1539            2 :                 .await?,
    1540            2 :             3
    1541            2 :         );
    1542            2 : 
    1543            2 :         // Check page contents at each LSN
    1544            2 :         assert_eq!(
    1545            2 :             tline
    1546            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
    1547            2 :                 .await?,
    1548            2 :             test_img("foo blk 0 at 2")
    1549            2 :         );
    1550            2 : 
    1551            2 :         assert_eq!(
    1552            2 :             tline
    1553            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
    1554            2 :                 .await?,
    1555            2 :             test_img("foo blk 0 at 3")
    1556            2 :         );
    1557            2 : 
    1558            2 :         assert_eq!(
    1559            2 :             tline
    1560            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
    1561            2 :                 .await?,
    1562            2 :             test_img("foo blk 0 at 3")
    1563            2 :         );
    1564            2 :         assert_eq!(
    1565            2 :             tline
    1566            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
    1567            2 :                 .await?,
    1568            2 :             test_img("foo blk 1 at 4")
    1569            2 :         );
    1570            2 : 
    1571            2 :         assert_eq!(
    1572            2 :             tline
    1573            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
    1574            2 :                 .await?,
    1575            2 :             test_img("foo blk 0 at 3")
    1576            2 :         );
    1577            2 :         assert_eq!(
    1578            2 :             tline
    1579            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
    1580            2 :                 .await?,
    1581            2 :             test_img("foo blk 1 at 4")
    1582            2 :         );
    1583            2 :         assert_eq!(
    1584            2 :             tline
    1585            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    1586            2 :                 .await?,
    1587            2 :             test_img("foo blk 2 at 5")
    1588            2 :         );
    1589            2 : 
    1590            2 :         // Truncate last block
    1591            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    1592            2 :         walingest
    1593            2 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1594            2 :             .await?;
    1595            2 :         m.commit(&ctx).await?;
    1596            2 :         assert_current_logical_size(&tline, Lsn(0x60));
    1597            2 : 
    1598            2 :         // Check reported size and contents after truncation
    1599            2 :         assert_eq!(
    1600            2 :             tline
    1601            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    1602            2 :                 .await?,
    1603            2 :             2
    1604            2 :         );
    1605            2 :         assert_eq!(
    1606            2 :             tline
    1607            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
    1608            2 :                 .await?,
    1609            2 :             test_img("foo blk 0 at 3")
    1610            2 :         );
    1611            2 :         assert_eq!(
    1612            2 :             tline
    1613            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
    1614            2 :                 .await?,
    1615            2 :             test_img("foo blk 1 at 4")
    1616            2 :         );
    1617            2 : 
    1618            2 :         // should still see the truncated block with older LSN
    1619            2 :         assert_eq!(
    1620            2 :             tline
    1621            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1622            2 :                 .await?,
    1623            2 :             3
    1624            2 :         );
    1625            2 :         assert_eq!(
    1626            2 :             tline
    1627            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    1628            2 :                 .await?,
    1629            2 :             test_img("foo blk 2 at 5")
    1630            2 :         );
    1631            2 : 
    1632            2 :         // Truncate to zero length
    1633            2 :         let mut m = tline.begin_modification(Lsn(0x68));
    1634            2 :         walingest
    1635            2 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1636            2 :             .await?;
    1637            2 :         m.commit(&ctx).await?;
    1638            2 :         assert_eq!(
    1639            2 :             tline
    1640            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
    1641            2 :                 .await?,
    1642            2 :             0
    1643            2 :         );
    1644            2 : 
    1645            2 :         // Extend from 0 to 2 blocks, leaving a gap
    1646            2 :         let mut m = tline.begin_modification(Lsn(0x70));
    1647            2 :         walingest
    1648            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
    1649            2 :             .await?;
    1650            2 :         m.commit(&ctx).await?;
    1651            2 :         assert_eq!(
    1652            2 :             tline
    1653            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
    1654            2 :                 .await?,
    1655            2 :             2
    1656            2 :         );
    1657            2 :         assert_eq!(
    1658            2 :             tline
    1659            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
    1660            2 :                 .await?,
    1661            2 :             ZERO_PAGE
    1662            2 :         );
    1663            2 :         assert_eq!(
    1664            2 :             tline
    1665            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
    1666            2 :                 .await?,
    1667            2 :             test_img("foo blk 1")
    1668            2 :         );
    1669            2 : 
    1670            2 :         // Extend a lot more, leaving a big gap that spans across segments
    1671            2 :         let mut m = tline.begin_modification(Lsn(0x80));
    1672            2 :         walingest
    1673            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
    1674            2 :             .await?;
    1675          190 :         m.commit(&ctx).await?;
    1676            2 :         assert_eq!(
    1677            2 :             tline
    1678            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    1679            2 :                 .await?,
    1680            2 :             1501
    1681            2 :         );
    1682         2998 :         for blk in 2..1500 {
    1683         2996 :             assert_eq!(
    1684         2996 :                 tline
    1685         2996 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
    1686         1540 :                     .await?,
    1687         2996 :                 ZERO_PAGE
    1688            2 :             );
    1689            2 :         }
    1690            2 :         assert_eq!(
    1691            2 :             tline
    1692            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
    1693            2 :                 .await?,
    1694            2 :             test_img("foo blk 1500")
    1695            2 :         );
    1696            2 : 
    1697            2 :         Ok(())
    1698            2 :     }
    1699              : 
    1700              :     // Test what happens if we dropped a relation
    1701              :     // and then created it again within the same layer.
    1702              :     #[tokio::test]
    1703            2 :     async fn test_drop_extend() -> Result<()> {
    1704            2 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")
    1705            2 :             .await?
    1706            2 :             .load()
    1707           20 :             .await;
    1708            2 :         let tline = tenant
    1709            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1710            6 :             .await?;
    1711            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1712            2 : 
    1713            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1714            2 :         walingest
    1715            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1716            2 :             .await?;
    1717            2 :         m.commit(&ctx).await?;
    1718            2 : 
    1719            2 :         // Check that rel exists and size is correct
    1720            2 :         assert_eq!(
    1721            2 :             tline
    1722            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1723            2 :                 .await?,
    1724            2 :             true
    1725            2 :         );
    1726            2 :         assert_eq!(
    1727            2 :             tline
    1728            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1729            2 :                 .await?,
    1730            2 :             1
    1731            2 :         );
    1732            2 : 
    1733            2 :         // Drop rel
    1734            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    1735            2 :         let mut rel_drops = HashMap::new();
    1736            2 :         rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
    1737            2 :         m.put_rel_drops(rel_drops, &ctx).await?;
    1738            2 :         m.commit(&ctx).await?;
    1739            2 : 
    1740            2 :         // Check that rel is not visible anymore
    1741            2 :         assert_eq!(
    1742            2 :             tline
    1743            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
    1744            2 :                 .await?,
    1745            2 :             false
    1746            2 :         );
    1747            2 : 
    1748            2 :         // FIXME: should fail
    1749            2 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    1750            2 : 
    1751            2 :         // Re-create it
    1752            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    1753            2 :         walingest
    1754            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
    1755            2 :             .await?;
    1756            2 :         m.commit(&ctx).await?;
    1757            2 : 
    1758            2 :         // Check that rel exists and size is correct
    1759            2 :         assert_eq!(
    1760            2 :             tline
    1761            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    1762            2 :                 .await?,
    1763            2 :             true
    1764            2 :         );
    1765            2 :         assert_eq!(
    1766            2 :             tline
    1767            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    1768            2 :                 .await?,
    1769            2 :             1
    1770            2 :         );
    1771            2 : 
    1772            2 :         Ok(())
    1773            2 :     }
    1774              : 
    1775              :     // Test what happens if we truncated a relation
    1776              :     // so that one of its segments was dropped
    1777              :     // and then extended it again within the same layer.
    1778              :     #[tokio::test]
    1779            2 :     async fn test_truncate_extend() -> Result<()> {
    1780            2 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
    1781            2 :             .await?
    1782            2 :             .load()
    1783           20 :             .await;
    1784            2 :         let tline = tenant
    1785            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1786            6 :             .await?;
    1787            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1788            2 : 
    1789            2 :         // Create a 20 MB relation (the size is arbitrary)
    1790            2 :         let relsize = 20 * 1024 * 1024 / 8192;
    1791            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1792         5120 :         for blkno in 0..relsize {
    1793         5120 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    1794         5120 :             walingest
    1795         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    1796            2 :                 .await?;
    1797            2 :         }
    1798            2 :         m.commit(&ctx).await?;
    1799            2 : 
    1800            2 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    1801            2 :         assert_eq!(
    1802            2 :             tline
    1803            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1804            2 :                 .await?,
    1805            2 :             false
    1806            2 :         );
    1807            2 :         assert!(tline
    1808            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1809            2 :             .await
    1810            2 :             .is_err());
    1811            2 : 
    1812            2 :         assert_eq!(
    1813            2 :             tline
    1814            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1815            2 :                 .await?,
    1816            2 :             true
    1817            2 :         );
    1818            2 :         assert_eq!(
    1819            2 :             tline
    1820            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1821            2 :                 .await?,
    1822            2 :             relsize
    1823            2 :         );
    1824            2 : 
    1825            2 :         // Check relation content
    1826         5120 :         for blkno in 0..relsize {
    1827         5120 :             let lsn = Lsn(0x20);
    1828         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1829         5120 :             assert_eq!(
    1830         5120 :                 tline
    1831         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
    1832         1803 :                     .await?,
    1833         5120 :                 test_img(&data)
    1834            2 :             );
    1835            2 :         }
    1836            2 : 
    1837            2 :         // Truncate relation so that second segment was dropped
    1838            2 :         // - only leave one page
    1839            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    1840            2 :         walingest
    1841            2 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    1842            2 :             .await?;
    1843            2 :         m.commit(&ctx).await?;
    1844            2 : 
    1845            2 :         // Check reported size and contents after truncation
    1846            2 :         assert_eq!(
    1847            2 :             tline
    1848            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    1849            2 :                 .await?,
    1850            2 :             1
    1851            2 :         );
    1852            2 : 
    1853            4 :         for blkno in 0..1 {
    1854            2 :             let lsn = Lsn(0x20);
    1855            2 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1856            2 :             assert_eq!(
    1857            2 :                 tline
    1858            2 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
    1859            2 :                     .await?,
    1860            2 :                 test_img(&data)
    1861            2 :             );
    1862            2 :         }
    1863            2 : 
    1864            2 :         // should still see all blocks with older LSN
    1865            2 :         assert_eq!(
    1866            2 :             tline
    1867            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1868            2 :                 .await?,
    1869            2 :             relsize
    1870            2 :         );
    1871         5120 :         for blkno in 0..relsize {
    1872         5120 :             let lsn = Lsn(0x20);
    1873         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1874         5120 :             assert_eq!(
    1875         5120 :                 tline
    1876         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
    1877         1856 :                     .await?,
    1878         5120 :                 test_img(&data)
    1879            2 :             );
    1880            2 :         }
    1881            2 : 
    1882            2 :         // Extend relation again.
    1883            2 :         // Add enough blocks to create second segment
    1884            2 :         let lsn = Lsn(0x80);
    1885            2 :         let mut m = tline.begin_modification(lsn);
    1886         5120 :         for blkno in 0..relsize {
    1887         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1888         5120 :             walingest
    1889         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    1890            2 :                 .await?;
    1891            2 :         }
    1892            3 :         m.commit(&ctx).await?;
    1893            2 : 
    1894            2 :         assert_eq!(
    1895            2 :             tline
    1896            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    1897            2 :                 .await?,
    1898            2 :             true
    1899            2 :         );
    1900            2 :         assert_eq!(
    1901            2 :             tline
    1902            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    1903            2 :                 .await?,
    1904            2 :             relsize
    1905            2 :         );
    1906            2 :         // Check relation content
    1907         5120 :         for blkno in 0..relsize {
    1908         5120 :             let lsn = Lsn(0x80);
    1909         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1910         5120 :             assert_eq!(
    1911         5120 :                 tline
    1912         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
    1913         1828 :                     .await?,
    1914         5120 :                 test_img(&data)
    1915            2 :             );
    1916            2 :         }
    1917            2 : 
    1918            2 :         Ok(())
    1919            2 :     }
    1920              : 
    1921              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    1922              :     /// split into multiple 1 GB segments in Postgres.
    1923              :     #[tokio::test]
    1924            2 :     async fn test_large_rel() -> Result<()> {
    1925           20 :         let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
    1926            2 :         let tline = tenant
    1927            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1928            6 :             .await?;
    1929            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1930            2 : 
    1931            2 :         let mut lsn = 0x10;
    1932       262146 :         for blknum in 0..RELSEG_SIZE + 1 {
    1933       262146 :             lsn += 0x10;
    1934       262146 :             let mut m = tline.begin_modification(Lsn(lsn));
    1935       262146 :             let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    1936       262146 :             walingest
    1937       262146 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    1938         5420 :                 .await?;
    1939       262146 :             m.commit(&ctx).await?;
    1940            2 :         }
    1941            2 : 
    1942            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    1943            2 : 
    1944            2 :         assert_eq!(
    1945            2 :             tline
    1946            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    1947            2 :                 .await?,
    1948            2 :             RELSEG_SIZE + 1
    1949            2 :         );
    1950            2 : 
    1951            2 :         // Truncate one block
    1952            2 :         lsn += 0x10;
    1953            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    1954            2 :         walingest
    1955            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    1956            2 :             .await?;
    1957            2 :         m.commit(&ctx).await?;
    1958            2 :         assert_eq!(
    1959            2 :             tline
    1960            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    1961            2 :                 .await?,
    1962            2 :             RELSEG_SIZE
    1963            2 :         );
    1964            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    1965            2 : 
    1966            2 :         // Truncate another block
    1967            2 :         lsn += 0x10;
    1968            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    1969            2 :         walingest
    1970            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    1971            2 :             .await?;
    1972            2 :         m.commit(&ctx).await?;
    1973            2 :         assert_eq!(
    1974            2 :             tline
    1975            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    1976            2 :                 .await?,
    1977            2 :             RELSEG_SIZE - 1
    1978            2 :         );
    1979            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    1980            2 : 
    1981            2 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    1982            2 :         // This tests the behavior at segment boundaries
    1983            2 :         let mut size: i32 = 3000;
    1984         6004 :         while size >= 0 {
    1985         6002 :             lsn += 0x10;
    1986         6002 :             let mut m = tline.begin_modification(Lsn(lsn));
    1987         6002 :             walingest
    1988         6002 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    1989            2 :                 .await?;
    1990         6002 :             m.commit(&ctx).await?;
    1991         6002 :             assert_eq!(
    1992         6002 :                 tline
    1993         6002 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    1994            2 :                     .await?,
    1995         6002 :                 size as BlockNumber
    1996            2 :             );
    1997            2 : 
    1998         6002 :             size -= 1;
    1999            2 :         }
    2000            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2001            2 : 
    2002            2 :         Ok(())
    2003            2 :     }
    2004              : 
    2005              :     /// Replay a wal segment file taken directly from safekeepers.
    2006              :     ///
    2007              :     /// This test is useful for benchmarking since it allows us to profile only
    2008              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2009              :     /// without waiting for unrelated steps.
    2010              :     #[tokio::test]
    2011            2 :     async fn test_ingest_real_wal() {
    2012            2 :         use crate::tenant::harness::*;
    2013            2 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2014            2 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2015            2 : 
    2016            2 :         // Define test data path and constants.
    2017            2 :         //
    2018            2 :         // Steps to reconstruct the data, if needed:
    2019            2 :         // 1. Run the pgbench python test
    2020            2 :         // 2. Take the first wal segment file from safekeeper
    2021            2 :         // 3. Compress it using `zstd --long input_file`
    2022            2 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2023            2 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2024            2 :         // 6. Run just the decoder from this test to get the endpoint.
    2025            2 :         //    It's the last LSN the decoder will output.
    2026            2 :         let pg_version = 15; // The test data was generated by pg15
    2027            2 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2028            2 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2029            2 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2030            2 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2031            2 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2032            2 : 
    2033            2 :         let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
    2034            2 :         let span = harness
    2035            2 :             .span()
    2036            2 :             .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
    2037           20 :         let (tenant, ctx) = harness.load().await;
    2038            2 : 
    2039            2 :         let remote_initdb_path =
    2040            2 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    2041            2 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2042            2 : 
    2043            2 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2044            2 :             .expect("creating test dir should work");
    2045            2 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2046            2 : 
    2047            2 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2048            2 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2049            2 :         // the garbage data.
    2050            2 :         let tline = tenant
    2051            2 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2052        21296 :             .await
    2053            2 :             .unwrap();
    2054            2 : 
    2055            2 :         // We fully read and decompress this into memory before decoding
    2056            2 :         // to get a more accurate perf profile of the decoder.
    2057            2 :         let bytes = {
    2058            2 :             use async_compression::tokio::bufread::ZstdDecoder;
    2059            2 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2060            2 :             let reader = tokio::io::BufReader::new(file);
    2061            2 :             let decoder = ZstdDecoder::new(reader);
    2062            2 :             let mut reader = tokio::io::BufReader::new(decoder);
    2063            2 :             let mut buffer = Vec::new();
    2064          224 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2065            2 :             buffer
    2066            2 :         };
    2067            2 : 
    2068            2 :         // TODO start a profiler too
    2069            2 :         let started_at = std::time::Instant::now();
    2070            2 : 
    2071            2 :         // Initialize walingest
    2072            2 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2073            2 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2074            2 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2075            5 :             .await
    2076            2 :             .unwrap();
    2077            2 :         let mut modification = tline.begin_modification(startpoint);
    2078            2 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2079            2 : 
    2080            2 :         // Decode and ingest wal. We process the wal in chunks because
    2081            2 :         // that's what happens when we get bytes from safekeepers.
    2082       474686 :         for chunk in bytes[xlogoff..].chunks(50) {
    2083       474686 :             decoder.feed_bytes(chunk);
    2084       620536 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2085       145850 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
    2086       145850 :                     recdata,
    2087       145850 :                     modification.tline.get_shard_identity(),
    2088       145850 :                     lsn,
    2089       145850 :                     modification.tline.pg_version,
    2090       145850 :                 )
    2091       145850 :                 .unwrap();
    2092       145850 : 
    2093       145850 :                 walingest
    2094       145850 :                     .ingest_record(interpreted, &mut modification, &ctx)
    2095       145850 :                     .instrument(span.clone())
    2096          296 :                     .await
    2097       145850 :                     .unwrap();
    2098            2 :             }
    2099       474686 :             modification.commit(&ctx).await.unwrap();
    2100            2 :         }
    2101            2 : 
    2102            2 :         let duration = started_at.elapsed();
    2103            2 :         println!("done in {:?}", duration);
    2104            2 :     }
    2105              : }
        

Generated by: LCOV version 2.1-beta