LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 59.8 % 1809 1081
Test Date: 2025-02-20 13:11:02 Functions: 58.2 % 79 46

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  -> [`wal_decoder`] ->  WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
       9              : //! Records get decoded and interpreted in the [`wal_decoder`] module
      10              : //! and then stored to the Repository by WalIngest.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
      14              : //! extracts page images out of some WAL records, but mostly it's WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use std::collections::HashMap;
      25              : use std::sync::Arc;
      26              : use std::sync::OnceLock;
      27              : use std::time::Duration;
      28              : use std::time::Instant;
      29              : use std::time::SystemTime;
      30              : 
      31              : use anyhow::{bail, Result};
      32              : use bytes::{Buf, Bytes};
      33              : use tracing::*;
      34              : 
      35              : use crate::context::RequestContext;
      36              : use crate::metrics::WAL_INGEST;
      37              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      38              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      39              : use crate::tenant::PageReconstructError;
      40              : use crate::tenant::Timeline;
      41              : use crate::ZERO_PAGE;
      42              : use pageserver_api::key::rel_block_to_key;
      43              : use pageserver_api::record::NeonWalRecord;
      44              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      45              : use pageserver_api::shard::ShardIdentity;
      46              : use postgres_ffi::fsm_logical_to_physical;
      47              : use postgres_ffi::pg_constants;
      48              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      49              : use postgres_ffi::walrecord::*;
      50              : use postgres_ffi::TransactionId;
      51              : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
      52              : use utils::bin_ser::SerializeError;
      53              : use utils::lsn::Lsn;
      54              : use utils::rate_limit::RateLimit;
      55              : use utils::{critical, failpoint_support};
      56              : use wal_decoder::models::*;
      57              : 
      58              : enum_pgversion! {CheckPoint, pgv::CheckPoint}
      59              : 
      60              : impl CheckPoint {
      61           12 :     fn encode(&self) -> Result<Bytes, SerializeError> {
      62           12 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
      63           12 :     }
      64              : 
      65       291668 :     fn update_next_xid(&mut self, xid: u32) -> bool {
      66       291668 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
      67       291668 :     }
      68              : 
      69            0 :     pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
      70            0 :         enum_pgversion_dispatch!(self, CheckPoint, cp, {
      71            0 :             cp.update_next_multixid(multi_xid, multi_offset)
      72              :         })
      73            0 :     }
      74              : }
      75              : 
      76              : /// Temporary limitation of WAL lag warnings after attach
      77              : ///
      78              : /// After tenant attach, we want to limit WAL lag warnings because
      79              : /// we don't look at the WAL until the attach is complete, which
      80              : /// might take a while.
      81              : pub struct WalLagCooldown {
      82              :     /// Until when should this limitation apply at all
      83              :     active_until: std::time::Instant,
      84              :     /// The maximum lag to suppress. Lags above this limit get reported anyways.
      85              :     max_lag: Duration,
      86              : }
      87              : 
      88              : impl WalLagCooldown {
      89            0 :     pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
      90            0 :         Self {
      91            0 :             active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
      92            0 :             max_lag: attach_duration * 2 + Duration::from_secs(60),
      93            0 :         }
      94            0 :     }
      95              : }
      96              : 
      97              : pub struct WalIngest {
      98              :     attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
      99              :     shard: ShardIdentity,
     100              :     checkpoint: CheckPoint,
     101              :     checkpoint_modified: bool,
     102              :     warn_ingest_lag: WarnIngestLag,
     103              : }
     104              : 
     105              : struct WarnIngestLag {
     106              :     lag_msg_ratelimit: RateLimit,
     107              :     future_lsn_msg_ratelimit: RateLimit,
     108              :     timestamp_invalid_msg_ratelimit: RateLimit,
     109              : }
     110              : 
     111              : impl WalIngest {
     112           24 :     pub async fn new(
     113           24 :         timeline: &Timeline,
     114           24 :         startpoint: Lsn,
     115           24 :         ctx: &RequestContext,
     116           24 :     ) -> anyhow::Result<WalIngest> {
     117              :         // Fetch the latest checkpoint into memory, so that we can compare with it
     118              :         // quickly in `ingest_record` and update it when it changes.
     119           24 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
     120           24 :         let pgversion = timeline.pg_version;
     121              : 
     122           24 :         let checkpoint = dispatch_pgversion!(pgversion, {
     123            0 :             let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
     124            0 :             trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
     125            0 :             <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
     126              :         });
     127              : 
     128           24 :         Ok(WalIngest {
     129           24 :             shard: *timeline.get_shard_identity(),
     130           24 :             checkpoint,
     131           24 :             checkpoint_modified: false,
     132           24 :             attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
     133           24 :             warn_ingest_lag: WarnIngestLag {
     134           24 :                 lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     135           24 :                 future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     136           24 :                 timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     137           24 :             },
     138           24 :         })
     139           24 :     }
     140              : 
     141              :     /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
     142              :     /// storage of a given timeline.
     143              :     ///
     144              :     /// This function updates `lsn` field of `DatadirModification`
     145              :     ///
     146              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
     147       291704 :     pub async fn ingest_record(
     148       291704 :         &mut self,
     149       291704 :         interpreted: InterpretedWalRecord,
     150       291704 :         modification: &mut DatadirModification<'_>,
     151       291704 :         ctx: &RequestContext,
     152       291704 :     ) -> anyhow::Result<bool> {
     153       291704 :         WAL_INGEST.records_received.inc();
     154       291704 :         let prev_len = modification.len();
     155       291704 : 
     156       291704 :         modification.set_lsn(interpreted.next_record_lsn)?;
     157              : 
     158       291704 :         if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
     159              :             // Records of this type should always be preceded by a commit(), as they
     160              :             // rely on reading data pages back from the Timeline.
     161            0 :             assert!(!modification.has_dirty_data());
     162       291704 :         }
     163              : 
     164       291704 :         assert!(!self.checkpoint_modified);
     165       291704 :         if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
     166       291668 :             && self.checkpoint.update_next_xid(interpreted.xid)
     167            4 :         {
     168            4 :             self.checkpoint_modified = true;
     169       291700 :         }
     170              : 
     171       291704 :         failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
     172              : 
     173          132 :         match interpreted.metadata_record {
     174           24 :             Some(MetadataRecord::Heapam(rec)) => match rec {
     175           24 :                 HeapamRecord::ClearVmBits(clear_vm_bits) => {
     176           24 :                     self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     177           24 :                         .await?;
     178              :                 }
     179              :             },
     180            0 :             Some(MetadataRecord::Neonrmgr(rec)) => match rec {
     181            0 :                 NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
     182            0 :                     self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     183            0 :                         .await?;
     184              :                 }
     185              :             },
     186           32 :             Some(MetadataRecord::Smgr(rec)) => match rec {
     187           32 :                 SmgrRecord::Create(create) => {
     188           32 :                     self.ingest_xlog_smgr_create(create, modification, ctx)
     189           32 :                         .await?;
     190              :                 }
     191            0 :                 SmgrRecord::Truncate(truncate) => {
     192            0 :                     self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
     193            0 :                         .await?;
     194              :                 }
     195              :             },
     196            0 :             Some(MetadataRecord::Dbase(rec)) => match rec {
     197            0 :                 DbaseRecord::Create(create) => {
     198            0 :                     self.ingest_xlog_dbase_create(create, modification, ctx)
     199            0 :                         .await?;
     200              :                 }
     201            0 :                 DbaseRecord::Drop(drop) => {
     202            0 :                     self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
     203              :                 }
     204              :             },
     205            0 :             Some(MetadataRecord::Clog(rec)) => match rec {
     206            0 :                 ClogRecord::ZeroPage(zero_page) => {
     207            0 :                     self.ingest_clog_zero_page(zero_page, modification, ctx)
     208            0 :                         .await?;
     209              :                 }
     210            0 :                 ClogRecord::Truncate(truncate) => {
     211            0 :                     self.ingest_clog_truncate(truncate, modification, ctx)
     212            0 :                         .await?;
     213              :                 }
     214              :             },
     215           16 :             Some(MetadataRecord::Xact(rec)) => {
     216           16 :                 self.ingest_xact_record(rec, modification, ctx).await?;
     217              :             }
     218            0 :             Some(MetadataRecord::MultiXact(rec)) => match rec {
     219            0 :                 MultiXactRecord::ZeroPage(zero_page) => {
     220            0 :                     self.ingest_multixact_zero_page(zero_page, modification, ctx)
     221            0 :                         .await?;
     222              :                 }
     223            0 :                 MultiXactRecord::Create(create) => {
     224            0 :                     self.ingest_multixact_create(modification, &create)?;
     225              :                 }
     226            0 :                 MultiXactRecord::Truncate(truncate) => {
     227            0 :                     self.ingest_multixact_truncate(modification, &truncate, ctx)
     228            0 :                         .await?;
     229              :                 }
     230              :             },
     231            0 :             Some(MetadataRecord::Relmap(rec)) => match rec {
     232            0 :                 RelmapRecord::Update(update) => {
     233            0 :                     self.ingest_relmap_update(update, modification, ctx).await?;
     234              :                 }
     235              :             },
     236           60 :             Some(MetadataRecord::Xlog(rec)) => match rec {
     237           60 :                 XlogRecord::Raw(raw) => {
     238           60 :                     self.ingest_raw_xlog_record(raw, modification, ctx).await?;
     239              :                 }
     240              :             },
     241            0 :             Some(MetadataRecord::LogicalMessage(rec)) => match rec {
     242            0 :                 LogicalMessageRecord::Put(put) => {
     243            0 :                     self.ingest_logical_message_put(put, modification, ctx)
     244            0 :                         .await?;
     245              :                 }
     246              :                 #[cfg(feature = "testing")]
     247              :                 LogicalMessageRecord::Failpoint => {
     248              :                     // This is a convenient way to make the WAL ingestion pause at
     249              :                     // particular point in the WAL. For more fine-grained control,
     250              :                     // we could peek into the message and only pause if it contains
     251              :                     // a particular string, for example, but this is enough for now.
     252            0 :                     failpoint_support::sleep_millis_async!(
     253            0 :                         "pageserver-wal-ingest-logical-message-sleep"
     254            0 :                     );
     255              :                 }
     256              :             },
     257            0 :             Some(MetadataRecord::Standby(rec)) => {
     258            0 :                 self.ingest_standby_record(rec).unwrap();
     259            0 :             }
     260            0 :             Some(MetadataRecord::Replorigin(rec)) => {
     261            0 :                 self.ingest_replorigin_record(rec, modification).await?;
     262              :             }
     263       291572 :             None => {
     264       291572 :                 // There are two cases through which we end up here:
     265       291572 :                 // 1. The resource manager for the original PG WAL record
     266       291572 :                 //    is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
     267       291572 :                 //    record type within Neon.
     268       291572 :                 // 2. The resource manager id was unknown to
     269       291572 :                 //    [`wal_decoder::decoder::MetadataRecord::from_decoded`].
     270       291572 :                 // TODO(vlad): Tighten this up more once we build confidence
     271       291572 :                 // that case (2) does not happen in the field.
     272       291572 :             }
     273              :         }
     274              : 
     275       291704 :         modification
     276       291704 :             .ingest_batch(interpreted.batch, &self.shard, ctx)
     277       291704 :             .await?;
     278              : 
     279              :         // If checkpoint data was updated, store the new version in the repository
     280       291704 :         if self.checkpoint_modified {
     281           12 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     282              : 
     283           12 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     284           12 :             self.checkpoint_modified = false;
     285       291692 :         }
     286              : 
     287              :         // Note that at this point this record is only cached in the modification
     288              :         // until commit() is called to flush the data into the repository and update
     289              :         // the latest LSN.
     290              : 
     291       291704 :         Ok(modification.len() > prev_len)
     292       291704 :     }
     293              : 
     294              :     /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
     295            0 :     fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
     296            0 :         let next_full_xid =
     297            0 :             enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
     298              : 
     299            0 :         let next_xid = (next_full_xid) as u32;
     300            0 :         let mut epoch = (next_full_xid >> 32) as u32;
     301            0 : 
     302            0 :         if xid > next_xid {
     303              :             // Wraparound occurred, must be from a prev epoch.
     304            0 :             if epoch == 0 {
     305            0 :                 bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
     306            0 :             }
     307            0 :             epoch -= 1;
     308            0 :         }
     309              : 
     310            0 :         Ok(((epoch as u64) << 32) | xid as u64)
     311            0 :     }
     312              : 
     313           24 :     async fn ingest_clear_vm_bits(
     314           24 :         &mut self,
     315           24 :         clear_vm_bits: ClearVmBits,
     316           24 :         modification: &mut DatadirModification<'_>,
     317           24 :         ctx: &RequestContext,
     318           24 :     ) -> anyhow::Result<()> {
     319           24 :         let ClearVmBits {
     320           24 :             new_heap_blkno,
     321           24 :             old_heap_blkno,
     322           24 :             flags,
     323           24 :             vm_rel,
     324           24 :         } = clear_vm_bits;
     325           24 :         // Clear the VM bits if required.
     326           24 :         let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     327           24 :         let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     328              : 
     329              :         // VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
     330              :         // its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
     331              :         // as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
     332              :         // https://github.com/neondatabase/neon/pull/10634.
     333           24 :         let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
     334            0 :             critical!("clear_vm_bits for unknown VM relation {vm_rel}");
     335            0 :             return Ok(());
     336              :         };
     337           24 :         if let Some(blknum) = new_vm_blk {
     338           24 :             if blknum >= vm_size {
     339            0 :                 critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
     340            0 :                 new_vm_blk = None;
     341           24 :             }
     342            0 :         }
     343           24 :         if let Some(blknum) = old_vm_blk {
     344            0 :             if blknum >= vm_size {
     345            0 :                 critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
     346            0 :                 old_vm_blk = None;
     347            0 :             }
     348           24 :         }
     349              : 
     350           24 :         if new_vm_blk.is_none() && old_vm_blk.is_none() {
     351            0 :             return Ok(());
     352           24 :         } else if new_vm_blk == old_vm_blk {
     353              :             // An UPDATE record that needs to clear the bits for both old and the new page, both of
     354              :             // which reside on the same VM page.
     355            0 :             self.put_rel_wal_record(
     356            0 :                 modification,
     357            0 :                 vm_rel,
     358            0 :                 new_vm_blk.unwrap(),
     359            0 :                 NeonWalRecord::ClearVisibilityMapFlags {
     360            0 :                     new_heap_blkno,
     361            0 :                     old_heap_blkno,
     362            0 :                     flags,
     363            0 :                 },
     364            0 :                 ctx,
     365            0 :             )
     366            0 :             .await?;
     367              :         } else {
     368              :             // Clear VM bits for one heap page, or for two pages that reside on different VM pages.
     369           24 :             if let Some(new_vm_blk) = new_vm_blk {
     370           24 :                 self.put_rel_wal_record(
     371           24 :                     modification,
     372           24 :                     vm_rel,
     373           24 :                     new_vm_blk,
     374           24 :                     NeonWalRecord::ClearVisibilityMapFlags {
     375           24 :                         new_heap_blkno,
     376           24 :                         old_heap_blkno: None,
     377           24 :                         flags,
     378           24 :                     },
     379           24 :                     ctx,
     380           24 :                 )
     381           24 :                 .await?;
     382            0 :             }
     383           24 :             if let Some(old_vm_blk) = old_vm_blk {
     384            0 :                 self.put_rel_wal_record(
     385            0 :                     modification,
     386            0 :                     vm_rel,
     387            0 :                     old_vm_blk,
     388            0 :                     NeonWalRecord::ClearVisibilityMapFlags {
     389            0 :                         new_heap_blkno: None,
     390            0 :                         old_heap_blkno,
     391            0 :                         flags,
     392            0 :                     },
     393            0 :                     ctx,
     394            0 :                 )
     395            0 :                 .await?;
     396           24 :             }
     397              :         }
     398           24 :         Ok(())
     399           24 :     }
     400              : 
     401              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
     402            0 :     async fn ingest_xlog_dbase_create(
     403            0 :         &mut self,
     404            0 :         create: DbaseCreate,
     405            0 :         modification: &mut DatadirModification<'_>,
     406            0 :         ctx: &RequestContext,
     407            0 :     ) -> anyhow::Result<()> {
     408            0 :         let DbaseCreate {
     409            0 :             db_id,
     410            0 :             tablespace_id,
     411            0 :             src_db_id,
     412            0 :             src_tablespace_id,
     413            0 :         } = create;
     414              : 
     415            0 :         let rels = modification
     416            0 :             .tline
     417            0 :             .list_rels(
     418            0 :                 src_tablespace_id,
     419            0 :                 src_db_id,
     420            0 :                 Version::Modified(modification),
     421            0 :                 ctx,
     422            0 :             )
     423            0 :             .await?;
     424              : 
     425            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
     426              : 
     427              :         // Copy relfilemap
     428            0 :         let filemap = modification
     429            0 :             .tline
     430            0 :             .get_relmap_file(
     431            0 :                 src_tablespace_id,
     432            0 :                 src_db_id,
     433            0 :                 Version::Modified(modification),
     434            0 :                 ctx,
     435            0 :             )
     436            0 :             .await?;
     437            0 :         modification
     438            0 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
     439            0 :             .await?;
     440              : 
     441            0 :         let mut num_rels_copied = 0;
     442            0 :         let mut num_blocks_copied = 0;
     443            0 :         for src_rel in rels {
     444            0 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
     445            0 :             assert_eq!(src_rel.dbnode, src_db_id);
     446              : 
     447            0 :             let nblocks = modification
     448            0 :                 .tline
     449            0 :                 .get_rel_size(src_rel, Version::Modified(modification), ctx)
     450            0 :                 .await?;
     451            0 :             let dst_rel = RelTag {
     452            0 :                 spcnode: tablespace_id,
     453            0 :                 dbnode: db_id,
     454            0 :                 relnode: src_rel.relnode,
     455            0 :                 forknum: src_rel.forknum,
     456            0 :             };
     457            0 : 
     458            0 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
     459              : 
     460              :             // Copy content
     461            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
     462            0 :             for blknum in 0..nblocks {
     463              :                 // Sharding:
     464              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
     465              :                 //    dbNode is not included in the hash inputs for sharding.
     466              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
     467              :                 //    that belong to it.
     468            0 :                 let src_key = rel_block_to_key(src_rel, blknum);
     469            0 :                 if !self.shard.is_key_local(&src_key) {
     470            0 :                     debug!(
     471            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
     472              :                         src_key
     473              :                     );
     474            0 :                     continue;
     475            0 :                 }
     476            0 :                 debug!(
     477            0 :                     "copying block {} from {} ({}) to {}",
     478              :                     blknum, src_rel, src_key, dst_rel
     479              :                 );
     480              : 
     481            0 :                 let content = modification
     482            0 :                     .tline
     483            0 :                     .get_rel_page_at_lsn(
     484            0 :                         src_rel,
     485            0 :                         blknum,
     486            0 :                         Version::Modified(modification),
     487            0 :                         ctx,
     488            0 :                         crate::tenant::storage_layer::IoConcurrency::sequential(),
     489            0 :                     )
     490            0 :                     .await?;
     491            0 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
     492            0 :                 num_blocks_copied += 1;
     493              :             }
     494              : 
     495            0 :             num_rels_copied += 1;
     496              :         }
     497              : 
     498            0 :         info!(
     499            0 :             "Created database {}/{}, copied {} blocks in {} rels",
     500              :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
     501              :         );
     502            0 :         Ok(())
     503            0 :     }
     504              : 
     505            0 :     async fn ingest_xlog_dbase_drop(
     506            0 :         &mut self,
     507            0 :         dbase_drop: DbaseDrop,
     508            0 :         modification: &mut DatadirModification<'_>,
     509            0 :         ctx: &RequestContext,
     510            0 :     ) -> anyhow::Result<()> {
     511            0 :         let DbaseDrop {
     512            0 :             db_id,
     513            0 :             tablespace_ids,
     514            0 :         } = dbase_drop;
     515            0 :         for tablespace_id in tablespace_ids {
     516            0 :             trace!("Drop db {}, {}", tablespace_id, db_id);
     517            0 :             modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
     518              :         }
     519              : 
     520            0 :         Ok(())
     521            0 :     }
     522              : 
     523           32 :     async fn ingest_xlog_smgr_create(
     524           32 :         &mut self,
     525           32 :         create: SmgrCreate,
     526           32 :         modification: &mut DatadirModification<'_>,
     527           32 :         ctx: &RequestContext,
     528           32 :     ) -> anyhow::Result<()> {
     529           32 :         let SmgrCreate { rel } = create;
     530           32 :         self.put_rel_creation(modification, rel, ctx).await?;
     531           32 :         Ok(())
     532           32 :     }
     533              : 
     534              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
     535              :     ///
     536              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
     537            0 :     async fn ingest_xlog_smgr_truncate(
     538            0 :         &mut self,
     539            0 :         truncate: XlSmgrTruncate,
     540            0 :         modification: &mut DatadirModification<'_>,
     541            0 :         ctx: &RequestContext,
     542            0 :     ) -> anyhow::Result<()> {
     543            0 :         let XlSmgrTruncate {
     544            0 :             blkno,
     545            0 :             rnode,
     546            0 :             flags,
     547            0 :         } = truncate;
     548            0 : 
     549            0 :         let spcnode = rnode.spcnode;
     550            0 :         let dbnode = rnode.dbnode;
     551            0 :         let relnode = rnode.relnode;
     552            0 : 
     553            0 :         if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
     554            0 :             let rel = RelTag {
     555            0 :                 spcnode,
     556            0 :                 dbnode,
     557            0 :                 relnode,
     558            0 :                 forknum: MAIN_FORKNUM,
     559            0 :             };
     560            0 : 
     561            0 :             self.put_rel_truncation(modification, rel, blkno, ctx)
     562            0 :                 .await?;
     563            0 :         }
     564            0 :         if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
     565            0 :             let rel = RelTag {
     566            0 :                 spcnode,
     567            0 :                 dbnode,
     568            0 :                 relnode,
     569            0 :                 forknum: FSM_FORKNUM,
     570            0 :             };
     571            0 : 
     572            0 :             // Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
     573            0 :             // and instead of digging in the FSM bitmap format we just clear the whole page.
     574            0 :             let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
     575            0 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
     576            0 :             if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
     577            0 :                 && self
     578            0 :                     .shard
     579            0 :                     .is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
     580              :             {
     581            0 :                 modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
     582            0 :                 fsm_physical_page_no += 1;
     583            0 :             }
     584              :             // Truncate this shard's view of the FSM relation size, if it even has one.
     585            0 :             let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
     586            0 :             if nblocks > fsm_physical_page_no {
     587            0 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
     588            0 :                     .await?;
     589            0 :             }
     590            0 :         }
     591            0 :         if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
     592            0 :             let rel = RelTag {
     593            0 :                 spcnode,
     594            0 :                 dbnode,
     595            0 :                 relnode,
     596            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     597            0 :             };
     598            0 : 
     599            0 :             // last remaining block, byte, and bit
     600            0 :             let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
     601            0 :             let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
     602            0 :                 / pg_constants::VM_HEAPBLOCKS_PER_BYTE;
     603            0 :             let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
     604            0 :                 * pg_constants::VM_BITS_PER_HEAPBLOCK;
     605            0 : 
     606            0 :             // Unless the new size is exactly at a visibility map page boundary, the
     607            0 :             // tail bits in the last remaining map page, representing truncated heap
     608            0 :             // blocks, need to be cleared. This is not only tidy, but also necessary
     609            0 :             // because we don't get a chance to clear the bits if the heap is extended
     610            0 :             // again. Only do this on the shard that owns the page.
     611            0 :             if (trunc_byte != 0 || trunc_offs != 0)
     612            0 :                 && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
     613              :             {
     614            0 :                 modification.put_rel_wal_record(
     615            0 :                     rel,
     616            0 :                     vm_page_no,
     617            0 :                     NeonWalRecord::TruncateVisibilityMap {
     618            0 :                         trunc_byte,
     619            0 :                         trunc_offs,
     620            0 :                     },
     621            0 :                 )?;
     622            0 :                 vm_page_no += 1;
     623            0 :             }
     624              :             // Truncate this shard's view of the VM relation size, if it even has one.
     625            0 :             let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
     626            0 :             if nblocks > vm_page_no {
     627            0 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
     628            0 :                     .await?;
     629            0 :             }
     630            0 :         }
     631            0 :         Ok(())
     632            0 :     }
     633              : 
     634           16 :     fn warn_on_ingest_lag(
     635           16 :         &mut self,
     636           16 :         conf: &crate::config::PageServerConf,
     637           16 :         wal_timestamp: TimestampTz,
     638           16 :     ) {
     639           16 :         debug_assert_current_span_has_tenant_and_timeline_id();
     640           16 :         let now = SystemTime::now();
     641           16 :         let rate_limits = &mut self.warn_ingest_lag;
     642              : 
     643           16 :         let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
     644            0 :             pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
     645              :         });
     646              : 
     647           16 :         match ts {
     648           16 :             Ok(ts) => {
     649           16 :                 match now.duration_since(ts) {
     650           16 :                     Ok(lag) => {
     651           16 :                         if lag > conf.wait_lsn_timeout {
     652           16 :                             rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
     653            4 :                                 if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
     654            0 :                                     if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
     655            0 :                                         return;
     656            0 :                                     }
     657            4 :                                 } else {
     658            4 :                                     // Still loading? We shouldn't be here
     659            4 :                                 }
     660            4 :                                 let lag = humantime::format_duration(lag);
     661            4 :                                 warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
     662           16 :                             })
     663            0 :                         }
     664              :                     }
     665            0 :                     Err(e) => {
     666            0 :                         let delta_t = e.duration();
     667              :                         // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
     668              :                         // => https://www.robustperception.io/time-metric-from-the-node-exporter/
     669              :                         const IGNORED_DRIFT: Duration = Duration::from_millis(100);
     670            0 :                         if delta_t > IGNORED_DRIFT {
     671            0 :                             let delta_t = humantime::format_duration(delta_t);
     672            0 :                             rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
     673            0 :                                 warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
     674            0 :                             })
     675            0 :                         }
     676              :                     }
     677              :                 };
     678              :             }
     679            0 :             Err(error) => {
     680            0 :                 rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
     681            0 :                     warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
     682            0 :                 })
     683              :             }
     684              :         }
     685           16 :     }
     686              : 
     687              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
     688              :     ///
     689           16 :     async fn ingest_xact_record(
     690           16 :         &mut self,
     691           16 :         record: XactRecord,
     692           16 :         modification: &mut DatadirModification<'_>,
     693           16 :         ctx: &RequestContext,
     694           16 :     ) -> anyhow::Result<()> {
     695           16 :         let (xact_common, is_commit, is_prepared) = match record {
     696            0 :             XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
     697            0 :                 let xid: u64 = if modification.tline.pg_version >= 17 {
     698            0 :                     self.adjust_to_full_transaction_id(xl_xid)?
     699              :                 } else {
     700            0 :                     xl_xid as u64
     701              :                 };
     702            0 :                 return modification.put_twophase_file(xid, data, ctx).await;
     703              :             }
     704           16 :             XactRecord::Commit(common) => (common, true, false),
     705            0 :             XactRecord::Abort(common) => (common, false, false),
     706            0 :             XactRecord::CommitPrepared(common) => (common, true, true),
     707            0 :             XactRecord::AbortPrepared(common) => (common, false, true),
     708              :         };
     709              : 
     710              :         let XactCommon {
     711           16 :             parsed,
     712           16 :             origin_id,
     713           16 :             xl_xid,
     714           16 :             lsn,
     715           16 :         } = xact_common;
     716           16 : 
     717           16 :         // Record update of CLOG pages
     718           16 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
     719           16 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     720           16 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     721           16 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
     722           16 : 
     723           16 :         self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
     724              : 
     725           16 :         for subxact in &parsed.subxacts {
     726            0 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
     727            0 :             if subxact_pageno != pageno {
     728              :                 // This subxact goes to different page. Write the record
     729              :                 // for all the XIDs on the previous page, and continue
     730              :                 // accumulating XIDs on this new page.
     731            0 :                 modification.put_slru_wal_record(
     732            0 :                     SlruKind::Clog,
     733            0 :                     segno,
     734            0 :                     rpageno,
     735            0 :                     if is_commit {
     736            0 :                         NeonWalRecord::ClogSetCommitted {
     737            0 :                             xids: page_xids,
     738            0 :                             timestamp: parsed.xact_time,
     739            0 :                         }
     740              :                     } else {
     741            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
     742              :                     },
     743            0 :                 )?;
     744            0 :                 page_xids = Vec::new();
     745            0 :             }
     746            0 :             pageno = subxact_pageno;
     747            0 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     748            0 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     749            0 :             page_xids.push(*subxact);
     750              :         }
     751           16 :         modification.put_slru_wal_record(
     752           16 :             SlruKind::Clog,
     753           16 :             segno,
     754           16 :             rpageno,
     755           16 :             if is_commit {
     756           16 :                 NeonWalRecord::ClogSetCommitted {
     757           16 :                     xids: page_xids,
     758           16 :                     timestamp: parsed.xact_time,
     759           16 :                 }
     760              :             } else {
     761            0 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
     762              :             },
     763            0 :         )?;
     764              : 
     765              :         // Group relations to drop by dbNode.  This map will contain all relations that _might_
     766              :         // exist, we will reduce it to which ones really exist later.  This map can be huge if
     767              :         // the transaction touches a huge number of relations (there is no bound on this in
     768              :         // postgres).
     769           16 :         let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
     770              : 
     771           16 :         for xnode in &parsed.xnodes {
     772            0 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
     773            0 :                 let rel = RelTag {
     774            0 :                     forknum,
     775            0 :                     spcnode: xnode.spcnode,
     776            0 :                     dbnode: xnode.dbnode,
     777            0 :                     relnode: xnode.relnode,
     778            0 :                 };
     779            0 :                 drop_relations
     780            0 :                     .entry((xnode.spcnode, xnode.dbnode))
     781            0 :                     .or_default()
     782            0 :                     .push(rel);
     783            0 :             }
     784              :         }
     785              : 
     786              :         // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
     787           16 :         modification.put_rel_drops(drop_relations, ctx).await?;
     788              : 
     789           16 :         if origin_id != 0 {
     790            0 :             modification
     791            0 :                 .set_replorigin(origin_id, parsed.origin_lsn)
     792            0 :                 .await?;
     793           16 :         }
     794              : 
     795           16 :         if is_prepared {
     796              :             // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     797            0 :             trace!(
     798            0 :                 "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     799              :                 xl_xid,
     800              :                 parsed.xid,
     801              :                 lsn,
     802              :             );
     803              : 
     804            0 :             let xid: u64 = if modification.tline.pg_version >= 17 {
     805            0 :                 self.adjust_to_full_transaction_id(parsed.xid)?
     806              :             } else {
     807            0 :                 parsed.xid as u64
     808              :             };
     809            0 :             modification.drop_twophase_file(xid, ctx).await?;
     810           16 :         }
     811              : 
     812           16 :         Ok(())
     813           16 :     }
     814              : 
     815            0 :     async fn ingest_clog_truncate(
     816            0 :         &mut self,
     817            0 :         truncate: ClogTruncate,
     818            0 :         modification: &mut DatadirModification<'_>,
     819            0 :         ctx: &RequestContext,
     820            0 :     ) -> anyhow::Result<()> {
     821            0 :         let ClogTruncate {
     822            0 :             pageno,
     823            0 :             oldest_xid,
     824            0 :             oldest_xid_db,
     825            0 :         } = truncate;
     826            0 : 
     827            0 :         info!(
     828            0 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
     829              :             pageno, oldest_xid, oldest_xid_db
     830              :         );
     831              : 
     832              :         // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
     833              :         // truncated, but a checkpoint record with the updated values isn't written until
     834              :         // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
     835              :         // so we keep the oldestXid and oldestXidDB up-to-date.
     836            0 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
     837            0 :             cp.oldestXid = oldest_xid;
     838            0 :             cp.oldestXidDB = oldest_xid_db;
     839            0 :         });
     840            0 :         self.checkpoint_modified = true;
     841              : 
     842              :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
     843              : 
     844            0 :         let latest_page_number =
     845            0 :             enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
     846              :                 / pg_constants::CLOG_XACTS_PER_PAGE;
     847              : 
     848              :         // Now delete all segments containing pages between xlrec.pageno
     849              :         // and latest_page_number.
     850              : 
     851              :         // First, make an important safety check:
     852              :         // the current endpoint page must not be eligible for removal.
     853              :         // See SimpleLruTruncate() in slru.c
     854            0 :         if dispatch_pgversion!(modification.tline.pg_version, {
     855            0 :             pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
     856              :         }) {
     857            0 :             info!("could not truncate directory pg_xact apparent wraparound");
     858            0 :             return Ok(());
     859            0 :         }
     860            0 : 
     861            0 :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
     862            0 :         //
     863            0 :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
     864            0 :         // will block waiting for the last valid LSN to advance up to
     865            0 :         // it. So we use the previous record's LSN in the get calls
     866            0 :         // instead.
     867            0 :         if modification.tline.get_shard_identity().is_shard_zero() {
     868            0 :             for segno in modification
     869            0 :                 .tline
     870            0 :                 .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
     871            0 :                 .await?
     872              :             {
     873            0 :                 let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
     874              : 
     875            0 :                 let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
     876            0 :                     pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
     877              :                 });
     878              : 
     879            0 :                 if may_delete {
     880            0 :                     modification
     881            0 :                         .drop_slru_segment(SlruKind::Clog, segno, ctx)
     882            0 :                         .await?;
     883            0 :                     trace!("Drop CLOG segment {:>04X}", segno);
     884            0 :                 }
     885              :             }
     886            0 :         }
     887              : 
     888            0 :         Ok(())
     889            0 :     }
     890              : 
     891            0 :     async fn ingest_clog_zero_page(
     892            0 :         &mut self,
     893            0 :         zero_page: ClogZeroPage,
     894            0 :         modification: &mut DatadirModification<'_>,
     895            0 :         ctx: &RequestContext,
     896            0 :     ) -> anyhow::Result<()> {
     897            0 :         let ClogZeroPage { segno, rpageno } = zero_page;
     898            0 : 
     899            0 :         self.put_slru_page_image(
     900            0 :             modification,
     901            0 :             SlruKind::Clog,
     902            0 :             segno,
     903            0 :             rpageno,
     904            0 :             ZERO_PAGE.clone(),
     905            0 :             ctx,
     906            0 :         )
     907            0 :         .await
     908            0 :     }
     909              : 
     910            0 :     fn ingest_multixact_create(
     911            0 :         &mut self,
     912            0 :         modification: &mut DatadirModification,
     913            0 :         xlrec: &XlMultiXactCreate,
     914            0 :     ) -> Result<()> {
     915            0 :         // Create WAL record for updating the multixact-offsets page
     916            0 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     917            0 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     918            0 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     919            0 : 
     920            0 :         modification.put_slru_wal_record(
     921            0 :             SlruKind::MultiXactOffsets,
     922            0 :             segno,
     923            0 :             rpageno,
     924            0 :             NeonWalRecord::MultixactOffsetCreate {
     925            0 :                 mid: xlrec.mid,
     926            0 :                 moff: xlrec.moff,
     927            0 :             },
     928            0 :         )?;
     929              : 
     930              :         // Create WAL records for the update of each affected multixact-members page
     931            0 :         let mut members = xlrec.members.iter();
     932            0 :         let mut offset = xlrec.moff;
     933              :         loop {
     934            0 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     935            0 : 
     936            0 :             // How many members fit on this page?
     937            0 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
     938            0 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     939            0 : 
     940            0 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
     941            0 :             for _ in 0..page_remain {
     942            0 :                 if let Some(m) = members.next() {
     943            0 :                     this_page_members.push(m.clone());
     944            0 :                 } else {
     945            0 :                     break;
     946              :                 }
     947              :             }
     948            0 :             if this_page_members.is_empty() {
     949              :                 // all done
     950            0 :                 break;
     951            0 :             }
     952            0 :             let n_this_page = this_page_members.len();
     953            0 : 
     954            0 :             modification.put_slru_wal_record(
     955            0 :                 SlruKind::MultiXactMembers,
     956            0 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
     957            0 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
     958            0 :                 NeonWalRecord::MultixactMembersCreate {
     959            0 :                     moff: offset,
     960            0 :                     members: this_page_members,
     961            0 :                 },
     962            0 :             )?;
     963              : 
     964              :             // Note: The multixact members can wrap around, even within one WAL record.
     965            0 :             offset = offset.wrapping_add(n_this_page as u32);
     966              :         }
     967            0 :         let next_offset = offset;
     968            0 :         assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
     969              : 
     970              :         // Update next-multi-xid and next-offset
     971              :         //
     972              :         // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
     973              :         // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
     974              :         // read it, like GetNewMultiXactId(). This is different from how nextXid is
     975              :         // incremented! nextXid skips over < FirstNormalTransactionId when the the value
     976              :         // is stored, so it's never 0 in a checkpoint.
     977              :         //
     978              :         // I don't know why it's done that way, it seems less error-prone to skip over 0
     979              :         // when the value is stored rather than when it's read. But let's do it the same
     980              :         // way here.
     981            0 :         let next_multi_xid = xlrec.mid.wrapping_add(1);
     982            0 : 
     983            0 :         if self
     984            0 :             .checkpoint
     985            0 :             .update_next_multixid(next_multi_xid, next_offset)
     986            0 :         {
     987            0 :             self.checkpoint_modified = true;
     988            0 :         }
     989              : 
     990              :         // Also update the next-xid with the highest member. According to the comments in
     991              :         // multixact_redo(), this shouldn't be necessary, but let's do the same here.
     992            0 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
     993            0 :             if let Some(max_xid) = acc {
     994            0 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
     995            0 :                     Some(mbr.xid)
     996              :                 } else {
     997            0 :                     acc
     998              :                 }
     999              :             } else {
    1000            0 :                 Some(mbr.xid)
    1001              :             }
    1002            0 :         });
    1003              : 
    1004            0 :         if let Some(max_xid) = max_mbr_xid {
    1005            0 :             if self.checkpoint.update_next_xid(max_xid) {
    1006            0 :                 self.checkpoint_modified = true;
    1007            0 :             }
    1008            0 :         }
    1009            0 :         Ok(())
    1010            0 :     }
    1011              : 
    1012            0 :     async fn ingest_multixact_truncate(
    1013            0 :         &mut self,
    1014            0 :         modification: &mut DatadirModification<'_>,
    1015            0 :         xlrec: &XlMultiXactTruncate,
    1016            0 :         ctx: &RequestContext,
    1017            0 :     ) -> Result<()> {
    1018            0 :         let (maxsegment, startsegment, endsegment) =
    1019            0 :             enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1020            0 :                 cp.oldestMulti = xlrec.end_trunc_off;
    1021            0 :                 cp.oldestMultiDB = xlrec.oldest_multi_db;
    1022            0 :                 let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
    1023            0 :                     pg_constants::MAX_MULTIXACT_OFFSET,
    1024            0 :                 );
    1025            0 :                 let startsegment: i32 =
    1026            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1027            0 :                 let endsegment: i32 =
    1028            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1029            0 :                 (maxsegment, startsegment, endsegment)
    1030              :             });
    1031              : 
    1032            0 :         self.checkpoint_modified = true;
    1033            0 : 
    1034            0 :         // PerformMembersTruncation
    1035            0 :         let mut segment: i32 = startsegment;
    1036            0 : 
    1037            0 :         // Delete all the segments except the last one. The last segment can still
    1038            0 :         // contain, possibly partially, valid data.
    1039            0 :         if modification.tline.get_shard_identity().is_shard_zero() {
    1040            0 :             while segment != endsegment {
    1041            0 :                 modification
    1042            0 :                     .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1043            0 :                     .await?;
    1044              : 
    1045              :                 /* move to next segment, handling wraparound correctly */
    1046            0 :                 if segment == maxsegment {
    1047            0 :                     segment = 0;
    1048            0 :                 } else {
    1049            0 :                     segment += 1;
    1050            0 :                 }
    1051              :             }
    1052            0 :         }
    1053              : 
    1054              :         // Truncate offsets
    1055              :         // FIXME: this did not handle wraparound correctly
    1056              : 
    1057            0 :         Ok(())
    1058            0 :     }
    1059              : 
    1060            0 :     async fn ingest_multixact_zero_page(
    1061            0 :         &mut self,
    1062            0 :         zero_page: MultiXactZeroPage,
    1063            0 :         modification: &mut DatadirModification<'_>,
    1064            0 :         ctx: &RequestContext,
    1065            0 :     ) -> Result<()> {
    1066            0 :         let MultiXactZeroPage {
    1067            0 :             slru_kind,
    1068            0 :             segno,
    1069            0 :             rpageno,
    1070            0 :         } = zero_page;
    1071            0 :         self.put_slru_page_image(
    1072            0 :             modification,
    1073            0 :             slru_kind,
    1074            0 :             segno,
    1075            0 :             rpageno,
    1076            0 :             ZERO_PAGE.clone(),
    1077            0 :             ctx,
    1078            0 :         )
    1079            0 :         .await
    1080            0 :     }
    1081              : 
    1082            0 :     async fn ingest_relmap_update(
    1083            0 :         &mut self,
    1084            0 :         update: RelmapUpdate,
    1085            0 :         modification: &mut DatadirModification<'_>,
    1086            0 :         ctx: &RequestContext,
    1087            0 :     ) -> Result<()> {
    1088            0 :         let RelmapUpdate { update, buf } = update;
    1089            0 : 
    1090            0 :         modification
    1091            0 :             .put_relmap_file(update.tsid, update.dbid, buf, ctx)
    1092            0 :             .await
    1093            0 :     }
    1094              : 
    1095           60 :     async fn ingest_raw_xlog_record(
    1096           60 :         &mut self,
    1097           60 :         raw_record: RawXlogRecord,
    1098           60 :         modification: &mut DatadirModification<'_>,
    1099           60 :         ctx: &RequestContext,
    1100           60 :     ) -> Result<()> {
    1101           60 :         let RawXlogRecord { info, lsn, mut buf } = raw_record;
    1102           60 :         let pg_version = modification.tline.pg_version;
    1103           60 : 
    1104           60 :         if info == pg_constants::XLOG_PARAMETER_CHANGE {
    1105            4 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    1106            0 :                 let rec = v17::XlParameterChange::decode(&mut buf);
    1107            0 :                 cp.wal_level = rec.wal_level;
    1108            0 :                 self.checkpoint_modified = true;
    1109            4 :             }
    1110           56 :         } else if info == pg_constants::XLOG_END_OF_RECOVERY {
    1111            0 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    1112            0 :                 let rec = v17::XlEndOfRecovery::decode(&mut buf);
    1113            0 :                 cp.wal_level = rec.wal_level;
    1114            0 :                 self.checkpoint_modified = true;
    1115            0 :             }
    1116           56 :         }
    1117              : 
    1118           60 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1119            0 :             if info == pg_constants::XLOG_NEXTOID {
    1120            0 :                 let next_oid = buf.get_u32_le();
    1121            0 :                 if cp.nextOid != next_oid {
    1122            0 :                     cp.nextOid = next_oid;
    1123            0 :                     self.checkpoint_modified = true;
    1124            0 :                 }
    1125            0 :             } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
    1126            0 :                 || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    1127              :             {
    1128            0 :                 let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
    1129            0 :                 buf.copy_to_slice(&mut checkpoint_bytes);
    1130            0 :                 let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
    1131            0 :                 trace!(
    1132            0 :                     "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
    1133              :                     xlog_checkpoint.oldestXid,
    1134              :                     cp.oldestXid
    1135              :                 );
    1136            0 :                 if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
    1137            0 :                     cp.oldestXid = xlog_checkpoint.oldestXid;
    1138            0 :                 }
    1139            0 :                 trace!(
    1140            0 :                     "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
    1141              :                     xlog_checkpoint.oldestActiveXid,
    1142              :                     cp.oldestActiveXid
    1143              :                 );
    1144              : 
    1145              :                 // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
    1146              :                 // because at shutdown, all in-progress transactions will implicitly
    1147              :                 // end. Postgres startup code knows that, and allows hot standby to start
    1148              :                 // immediately from a shutdown checkpoint.
    1149              :                 //
    1150              :                 // In Neon, Postgres hot standby startup always behaves as if starting from
    1151              :                 // an online checkpoint. It needs a valid `oldestActiveXid` value, so
    1152              :                 // instead of overwriting self.checkpoint.oldestActiveXid with
    1153              :                 // InvalidTransactionid from the checkpoint WAL record, update it to a
    1154              :                 // proper value, knowing that there are no in-progress transactions at this
    1155              :                 // point, except for prepared transactions.
    1156              :                 //
    1157              :                 // See also the neon code changes in the InitWalRecovery() function.
    1158            0 :                 if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
    1159            0 :                     && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    1160              :                 {
    1161            0 :                     let oldest_active_xid = if pg_version >= 17 {
    1162            0 :                         let mut oldest_active_full_xid = cp.nextXid.value;
    1163            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    1164            0 :                             if xid < oldest_active_full_xid {
    1165            0 :                                 oldest_active_full_xid = xid;
    1166            0 :                             }
    1167              :                         }
    1168            0 :                         oldest_active_full_xid as u32
    1169              :                     } else {
    1170            0 :                         let mut oldest_active_xid = cp.nextXid.value as u32;
    1171            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    1172            0 :                             let narrow_xid = xid as u32;
    1173            0 :                             if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
    1174            0 :                                 oldest_active_xid = narrow_xid;
    1175            0 :                             }
    1176              :                         }
    1177            0 :                         oldest_active_xid
    1178              :                     };
    1179            0 :                     cp.oldestActiveXid = oldest_active_xid;
    1180            0 :                 } else {
    1181            0 :                     cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
    1182            0 :                 }
    1183              :                 // NB: We abuse the Checkpoint.redo field:
    1184              :                 //
    1185              :                 // - In PostgreSQL, the Checkpoint struct doesn't store the information
    1186              :                 //   of whether this is an online checkpoint or a shutdown checkpoint. It's
    1187              :                 //   stored in the XLOG info field of the WAL record, shutdown checkpoints
    1188              :                 //   use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
    1189              :                 //   XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
    1190              :                 //   in the pageserver, however.
    1191              :                 //
    1192              :                 // - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
    1193              :                 //   checkpoint record, if it's a shutdown checkpoint. But when we are
    1194              :                 //   starting from a shutdown checkpoint, the basebackup LSN is the *end*
    1195              :                 //   of the shutdown checkpoint WAL record. That makes it difficult to
    1196              :                 //   correctly detect whether we're starting from a shutdown record or
    1197              :                 //   not.
    1198              :                 //
    1199              :                 // To address both of those issues, we store 0 in the redo field if it's
    1200              :                 // an online checkpoint record, and the record's *end* LSN if it's a
    1201              :                 // shutdown checkpoint. We don't need the original redo pointer in neon,
    1202              :                 // because we don't perform WAL replay at startup anyway, so we can get
    1203              :                 // away with abusing the redo field like this.
    1204              :                 //
    1205              :                 // XXX: Ideally, we would persist the extra information in a more
    1206              :                 // explicit format, rather than repurpose the fields of the Postgres
    1207              :                 // struct like this. However, we already have persisted data like this,
    1208              :                 // so we need to maintain backwards compatibility.
    1209              :                 //
    1210              :                 // NB: We didn't originally have this convention, so there are still old
    1211              :                 // persisted records that didn't do this. Before, we didn't update the
    1212              :                 // persisted redo field at all. That means that old records have a bogus
    1213              :                 // redo pointer that points to some old value, from the checkpoint record
    1214              :                 // that was originally imported from the data directory. If it was a
    1215              :                 // project created in Neon, that means it points to the first checkpoint
    1216              :                 // after initdb. That's OK for our purposes: all such old checkpoints are
    1217              :                 // treated as old online checkpoints when the basebackup is created.
    1218            0 :                 cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
    1219              :                     // Store the *end* LSN of the checkpoint record. Or to be precise,
    1220              :                     // the start LSN of the *next* record, i.e. if the record ends
    1221              :                     // exactly at page boundary, the redo LSN points to just after the
    1222              :                     // page header on the next page.
    1223            0 :                     lsn.into()
    1224              :                 } else {
    1225            0 :                     Lsn::INVALID.into()
    1226              :                 };
    1227              : 
    1228              :                 // Write a new checkpoint key-value pair on every checkpoint record, even
    1229              :                 // if nothing really changed. Not strictly required, but it seems nice to
    1230              :                 // have some trace of the checkpoint records in the layer files at the same
    1231              :                 // LSNs.
    1232            0 :                 self.checkpoint_modified = true;
    1233            0 :             }
    1234              :         });
    1235              : 
    1236           60 :         Ok(())
    1237           60 :     }
    1238              : 
    1239            0 :     async fn ingest_logical_message_put(
    1240            0 :         &mut self,
    1241            0 :         put: PutLogicalMessage,
    1242            0 :         modification: &mut DatadirModification<'_>,
    1243            0 :         ctx: &RequestContext,
    1244            0 :     ) -> Result<()> {
    1245            0 :         let PutLogicalMessage { path, buf } = put;
    1246            0 :         modification.put_file(path.as_str(), &buf, ctx).await
    1247            0 :     }
    1248              : 
    1249            0 :     fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
    1250            0 :         match record {
    1251            0 :             StandbyRecord::RunningXacts(running_xacts) => {
    1252            0 :                 enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1253            0 :                     cp.oldestActiveXid = running_xacts.oldest_running_xid;
    1254            0 :                 });
    1255              : 
    1256            0 :                 self.checkpoint_modified = true;
    1257            0 :             }
    1258            0 :         }
    1259            0 : 
    1260            0 :         Ok(())
    1261            0 :     }
    1262              : 
    1263            0 :     async fn ingest_replorigin_record(
    1264            0 :         &mut self,
    1265            0 :         record: ReploriginRecord,
    1266            0 :         modification: &mut DatadirModification<'_>,
    1267            0 :     ) -> Result<()> {
    1268            0 :         match record {
    1269            0 :             ReploriginRecord::Set(set) => {
    1270            0 :                 modification
    1271            0 :                     .set_replorigin(set.node_id, set.remote_lsn)
    1272            0 :                     .await?;
    1273              :             }
    1274            0 :             ReploriginRecord::Drop(drop) => {
    1275            0 :                 modification.drop_replorigin(drop.node_id).await?;
    1276              :             }
    1277              :         }
    1278              : 
    1279            0 :         Ok(())
    1280            0 :     }
    1281              : 
    1282           36 :     async fn put_rel_creation(
    1283           36 :         &mut self,
    1284           36 :         modification: &mut DatadirModification<'_>,
    1285           36 :         rel: RelTag,
    1286           36 :         ctx: &RequestContext,
    1287           36 :     ) -> Result<()> {
    1288           36 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1289           36 :         Ok(())
    1290           36 :     }
    1291              : 
    1292              :     #[cfg(test)]
    1293       544804 :     async fn put_rel_page_image(
    1294       544804 :         &mut self,
    1295       544804 :         modification: &mut DatadirModification<'_>,
    1296       544804 :         rel: RelTag,
    1297       544804 :         blknum: BlockNumber,
    1298       544804 :         img: Bytes,
    1299       544804 :         ctx: &RequestContext,
    1300       544804 :     ) -> Result<(), PageReconstructError> {
    1301       544804 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1302       544804 :             .await?;
    1303       544804 :         modification.put_rel_page_image(rel, blknum, img)?;
    1304       544804 :         Ok(())
    1305       544804 :     }
    1306              : 
    1307           24 :     async fn put_rel_wal_record(
    1308           24 :         &mut self,
    1309           24 :         modification: &mut DatadirModification<'_>,
    1310           24 :         rel: RelTag,
    1311           24 :         blknum: BlockNumber,
    1312           24 :         rec: NeonWalRecord,
    1313           24 :         ctx: &RequestContext,
    1314           24 :     ) -> Result<()> {
    1315           24 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1316           24 :             .await?;
    1317           24 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1318           24 :         Ok(())
    1319           24 :     }
    1320              : 
    1321        12024 :     async fn put_rel_truncation(
    1322        12024 :         &mut self,
    1323        12024 :         modification: &mut DatadirModification<'_>,
    1324        12024 :         rel: RelTag,
    1325        12024 :         nblocks: BlockNumber,
    1326        12024 :         ctx: &RequestContext,
    1327        12024 :     ) -> anyhow::Result<()> {
    1328        12024 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1329        12024 :         Ok(())
    1330        12024 :     }
    1331              : 
    1332       544828 :     async fn handle_rel_extend(
    1333       544828 :         &mut self,
    1334       544828 :         modification: &mut DatadirModification<'_>,
    1335       544828 :         rel: RelTag,
    1336       544828 :         blknum: BlockNumber,
    1337       544828 :         ctx: &RequestContext,
    1338       544828 :     ) -> Result<(), PageReconstructError> {
    1339       544828 :         let new_nblocks = blknum + 1;
    1340              :         // Check if the relation exists. We implicitly create relations on first
    1341              :         // record.
    1342       544828 :         let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
    1343              : 
    1344       544828 :         if new_nblocks > old_nblocks {
    1345              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1346       544796 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1347              : 
    1348       544796 :             let mut key = rel_block_to_key(rel, blknum);
    1349       544796 : 
    1350       544796 :             // fill the gap with zeros
    1351       544796 :             let mut gap_blocks_filled: u64 = 0;
    1352       544796 :             for gap_blknum in old_nblocks..blknum {
    1353         5996 :                 key.field6 = gap_blknum;
    1354         5996 : 
    1355         5996 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1356            0 :                     continue;
    1357         5996 :                 }
    1358         5996 : 
    1359         5996 :                 modification.put_rel_page_image_zero(rel, gap_blknum)?;
    1360         5996 :                 gap_blocks_filled += 1;
    1361              :             }
    1362              : 
    1363       544796 :             WAL_INGEST
    1364       544796 :                 .gap_blocks_zeroed_on_rel_extend
    1365       544796 :                 .inc_by(gap_blocks_filled);
    1366       544796 : 
    1367       544796 :             // Log something when relation extends cause use to fill gaps
    1368       544796 :             // with zero pages. Logging is rate limited per pg version to
    1369       544796 :             // avoid skewing.
    1370       544796 :             if gap_blocks_filled > 0 {
    1371              :                 use once_cell::sync::Lazy;
    1372              :                 use std::sync::Mutex;
    1373              :                 use utils::rate_limit::RateLimit;
    1374              : 
    1375              :                 struct RateLimitPerPgVersion {
    1376              :                     rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
    1377              :                 }
    1378              : 
    1379              :                 impl RateLimitPerPgVersion {
    1380            0 :                     const fn new() -> Self {
    1381            0 :                         Self {
    1382            0 :                             rate_limiters: [const {
    1383            4 :                                 Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
    1384            0 :                             }; 4],
    1385            0 :                         }
    1386            0 :                     }
    1387              : 
    1388            8 :                     const fn rate_limiter(
    1389            8 :                         &self,
    1390            8 :                         pg_version: u32,
    1391            8 :                     ) -> Option<&Lazy<Mutex<RateLimit>>> {
    1392              :                         const MIN_PG_VERSION: u32 = 14;
    1393              :                         const MAX_PG_VERSION: u32 = 17;
    1394              : 
    1395            8 :                         if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
    1396            0 :                             return None;
    1397            8 :                         }
    1398            8 : 
    1399            8 :                         Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
    1400            8 :                     }
    1401              :                 }
    1402              : 
    1403              :                 static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
    1404            8 :                 if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
    1405            8 :                     if let Ok(mut locked) = rate_limiter.try_lock() {
    1406            8 :                         locked.call(|| {
    1407            4 :                             info!(
    1408            0 :                                 lsn=%modification.get_lsn(),
    1409            0 :                                 pg_version=%modification.tline.pg_version,
    1410            0 :                                 rel=%rel,
    1411            0 :                                 "Filled {} gap blocks on rel extend to {} from {}",
    1412              :                                 gap_blocks_filled,
    1413              :                                 new_nblocks,
    1414              :                                 old_nblocks);
    1415            8 :                         });
    1416            8 :                     }
    1417            0 :                 }
    1418       544788 :             }
    1419           32 :         }
    1420       544828 :         Ok(())
    1421       544828 :     }
    1422              : 
    1423            0 :     async fn put_slru_page_image(
    1424            0 :         &mut self,
    1425            0 :         modification: &mut DatadirModification<'_>,
    1426            0 :         kind: SlruKind,
    1427            0 :         segno: u32,
    1428            0 :         blknum: BlockNumber,
    1429            0 :         img: Bytes,
    1430            0 :         ctx: &RequestContext,
    1431            0 :     ) -> Result<()> {
    1432            0 :         if !self.shard.is_shard_zero() {
    1433            0 :             return Ok(());
    1434            0 :         }
    1435            0 : 
    1436            0 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1437            0 :             .await?;
    1438            0 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1439            0 :         Ok(())
    1440            0 :     }
    1441              : 
    1442            0 :     async fn handle_slru_extend(
    1443            0 :         &mut self,
    1444            0 :         modification: &mut DatadirModification<'_>,
    1445            0 :         kind: SlruKind,
    1446            0 :         segno: u32,
    1447            0 :         blknum: BlockNumber,
    1448            0 :         ctx: &RequestContext,
    1449            0 :     ) -> anyhow::Result<()> {
    1450            0 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1451            0 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1452            0 :         // a lot less frequently.
    1453            0 : 
    1454            0 :         let new_nblocks = blknum + 1;
    1455              :         // Check if the relation exists. We implicitly create relations on first
    1456              :         // record.
    1457              :         // TODO: would be nice if to be more explicit about it
    1458            0 :         let old_nblocks = if !modification
    1459            0 :             .tline
    1460            0 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1461            0 :             .await?
    1462              :         {
    1463              :             // create it with 0 size initially, the logic below will extend it
    1464            0 :             modification
    1465            0 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1466            0 :                 .await?;
    1467            0 :             0
    1468              :         } else {
    1469            0 :             modification
    1470            0 :                 .tline
    1471            0 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1472            0 :                 .await?
    1473              :         };
    1474              : 
    1475            0 :         if new_nblocks > old_nblocks {
    1476            0 :             trace!(
    1477            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1478              :                 kind,
    1479              :                 segno,
    1480              :                 old_nblocks,
    1481              :                 new_nblocks
    1482              :             );
    1483            0 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1484              : 
    1485              :             // fill the gap with zeros
    1486            0 :             for gap_blknum in old_nblocks..blknum {
    1487            0 :                 modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
    1488              :             }
    1489            0 :         }
    1490            0 :         Ok(())
    1491            0 :     }
    1492              : }
    1493              : 
    1494              : /// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
    1495              : ///
    1496              : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
    1497              : /// page number stored in the shard, or None if the shard does not have any pages for it.
    1498           24 : async fn get_relsize(
    1499           24 :     modification: &DatadirModification<'_>,
    1500           24 :     rel: RelTag,
    1501           24 :     ctx: &RequestContext,
    1502           24 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
    1503           24 :     if !modification
    1504           24 :         .tline
    1505           24 :         .get_rel_exists(rel, Version::Modified(modification), ctx)
    1506           24 :         .await?
    1507              :     {
    1508            0 :         return Ok(None);
    1509           24 :     }
    1510           24 :     modification
    1511           24 :         .tline
    1512           24 :         .get_rel_size(rel, Version::Modified(modification), ctx)
    1513           24 :         .await
    1514           24 :         .map(Some)
    1515           24 : }
    1516              : 
    1517              : #[allow(clippy::bool_assert_comparison)]
    1518              : #[cfg(test)]
    1519              : mod tests {
    1520              :     use super::*;
    1521              :     use crate::tenant::harness::*;
    1522              :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    1523              :     use crate::tenant::storage_layer::IoConcurrency;
    1524              :     use postgres_ffi::RELSEG_SIZE;
    1525              : 
    1526              :     use crate::DEFAULT_PG_VERSION;
    1527              : 
    1528              :     /// Arbitrary relation tag, for testing.
    1529              :     const TESTREL_A: RelTag = RelTag {
    1530              :         spcnode: 0,
    1531              :         dbnode: 111,
    1532              :         relnode: 1000,
    1533              :         forknum: 0,
    1534              :     };
    1535              : 
    1536           24 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1537           24 :         // TODO
    1538           24 :     }
    1539              : 
    1540              :     #[tokio::test]
    1541            4 :     async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
    1542           16 :         for i in 14..=16 {
    1543           12 :             dispatch_pgversion!(i, {
    1544            4 :                 pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
    1545            4 :             });
    1546            4 :         }
    1547            4 : 
    1548            4 :         Ok(())
    1549            4 :     }
    1550              : 
    1551           16 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    1552           16 :         let mut m = tline.begin_modification(Lsn(0x10));
    1553           16 :         m.put_checkpoint(dispatch_pgversion!(
    1554           16 :             tline.pg_version,
    1555            0 :             pgv::ZERO_CHECKPOINT.clone()
    1556            0 :         ))?;
    1557           16 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1558           16 :         m.commit(ctx).await?;
    1559           16 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1560              : 
    1561           16 :         Ok(walingest)
    1562           16 :     }
    1563              : 
    1564              :     #[tokio::test]
    1565            4 :     async fn test_relsize() -> Result<()> {
    1566            4 :         let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
    1567            4 :         let io_concurrency = IoConcurrency::spawn_for_test();
    1568            4 :         let tline = tenant
    1569            4 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1570            4 :             .await?;
    1571            4 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1572            4 : 
    1573            4 :         let mut m = tline.begin_modification(Lsn(0x20));
    1574            4 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1575            4 :         walingest
    1576            4 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1577            4 :             .await?;
    1578            4 :         m.commit(&ctx).await?;
    1579            4 :         let mut m = tline.begin_modification(Lsn(0x30));
    1580            4 :         walingest
    1581            4 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
    1582            4 :             .await?;
    1583            4 :         m.commit(&ctx).await?;
    1584            4 :         let mut m = tline.begin_modification(Lsn(0x40));
    1585            4 :         walingest
    1586            4 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
    1587            4 :             .await?;
    1588            4 :         m.commit(&ctx).await?;
    1589            4 :         let mut m = tline.begin_modification(Lsn(0x50));
    1590            4 :         walingest
    1591            4 :             .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
    1592            4 :             .await?;
    1593            4 :         m.commit(&ctx).await?;
    1594            4 : 
    1595            4 :         assert_current_logical_size(&tline, Lsn(0x50));
    1596            4 : 
    1597            4 :         let test_span = tracing::info_span!(parent: None, "test",
    1598            4 :                                             tenant_id=%tline.tenant_shard_id.tenant_id,
    1599            0 :                                             shard_id=%tline.tenant_shard_id.shard_slug(),
    1600            0 :                                             timeline_id=%tline.timeline_id);
    1601            4 : 
    1602            4 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1603            4 :         assert_eq!(
    1604            4 :             tline
    1605            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1606            4 :                 .await?,
    1607            4 :             false
    1608            4 :         );
    1609            4 :         assert!(tline
    1610            4 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1611            4 :             .await
    1612            4 :             .is_err());
    1613            4 :         assert_eq!(
    1614            4 :             tline
    1615            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1616            4 :                 .await?,
    1617            4 :             true
    1618            4 :         );
    1619            4 :         assert_eq!(
    1620            4 :             tline
    1621            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1622            4 :                 .await?,
    1623            4 :             1
    1624            4 :         );
    1625            4 :         assert_eq!(
    1626            4 :             tline
    1627            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1628            4 :                 .await?,
    1629            4 :             3
    1630            4 :         );
    1631            4 : 
    1632            4 :         // Check page contents at each LSN
    1633            4 :         assert_eq!(
    1634            4 :             tline
    1635            4 :                 .get_rel_page_at_lsn(
    1636            4 :                     TESTREL_A,
    1637            4 :                     0,
    1638            4 :                     Version::Lsn(Lsn(0x20)),
    1639            4 :                     &ctx,
    1640            4 :                     io_concurrency.clone()
    1641            4 :                 )
    1642            4 :                 .instrument(test_span.clone())
    1643            4 :                 .await?,
    1644            4 :             test_img("foo blk 0 at 2")
    1645            4 :         );
    1646            4 : 
    1647            4 :         assert_eq!(
    1648            4 :             tline
    1649            4 :                 .get_rel_page_at_lsn(
    1650            4 :                     TESTREL_A,
    1651            4 :                     0,
    1652            4 :                     Version::Lsn(Lsn(0x30)),
    1653            4 :                     &ctx,
    1654            4 :                     io_concurrency.clone()
    1655            4 :                 )
    1656            4 :                 .instrument(test_span.clone())
    1657            4 :                 .await?,
    1658            4 :             test_img("foo blk 0 at 3")
    1659            4 :         );
    1660            4 : 
    1661            4 :         assert_eq!(
    1662            4 :             tline
    1663            4 :                 .get_rel_page_at_lsn(
    1664            4 :                     TESTREL_A,
    1665            4 :                     0,
    1666            4 :                     Version::Lsn(Lsn(0x40)),
    1667            4 :                     &ctx,
    1668            4 :                     io_concurrency.clone()
    1669            4 :                 )
    1670            4 :                 .instrument(test_span.clone())
    1671            4 :                 .await?,
    1672            4 :             test_img("foo blk 0 at 3")
    1673            4 :         );
    1674            4 :         assert_eq!(
    1675            4 :             tline
    1676            4 :                 .get_rel_page_at_lsn(
    1677            4 :                     TESTREL_A,
    1678            4 :                     1,
    1679            4 :                     Version::Lsn(Lsn(0x40)),
    1680            4 :                     &ctx,
    1681            4 :                     io_concurrency.clone()
    1682            4 :                 )
    1683            4 :                 .instrument(test_span.clone())
    1684            4 :                 .await?,
    1685            4 :             test_img("foo blk 1 at 4")
    1686            4 :         );
    1687            4 : 
    1688            4 :         assert_eq!(
    1689            4 :             tline
    1690            4 :                 .get_rel_page_at_lsn(
    1691            4 :                     TESTREL_A,
    1692            4 :                     0,
    1693            4 :                     Version::Lsn(Lsn(0x50)),
    1694            4 :                     &ctx,
    1695            4 :                     io_concurrency.clone()
    1696            4 :                 )
    1697            4 :                 .instrument(test_span.clone())
    1698            4 :                 .await?,
    1699            4 :             test_img("foo blk 0 at 3")
    1700            4 :         );
    1701            4 :         assert_eq!(
    1702            4 :             tline
    1703            4 :                 .get_rel_page_at_lsn(
    1704            4 :                     TESTREL_A,
    1705            4 :                     1,
    1706            4 :                     Version::Lsn(Lsn(0x50)),
    1707            4 :                     &ctx,
    1708            4 :                     io_concurrency.clone()
    1709            4 :                 )
    1710            4 :                 .instrument(test_span.clone())
    1711            4 :                 .await?,
    1712            4 :             test_img("foo blk 1 at 4")
    1713            4 :         );
    1714            4 :         assert_eq!(
    1715            4 :             tline
    1716            4 :                 .get_rel_page_at_lsn(
    1717            4 :                     TESTREL_A,
    1718            4 :                     2,
    1719            4 :                     Version::Lsn(Lsn(0x50)),
    1720            4 :                     &ctx,
    1721            4 :                     io_concurrency.clone()
    1722            4 :                 )
    1723            4 :                 .instrument(test_span.clone())
    1724            4 :                 .await?,
    1725            4 :             test_img("foo blk 2 at 5")
    1726            4 :         );
    1727            4 : 
    1728            4 :         // Truncate last block
    1729            4 :         let mut m = tline.begin_modification(Lsn(0x60));
    1730            4 :         walingest
    1731            4 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1732            4 :             .await?;
    1733            4 :         m.commit(&ctx).await?;
    1734            4 :         assert_current_logical_size(&tline, Lsn(0x60));
    1735            4 : 
    1736            4 :         // Check reported size and contents after truncation
    1737            4 :         assert_eq!(
    1738            4 :             tline
    1739            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    1740            4 :                 .await?,
    1741            4 :             2
    1742            4 :         );
    1743            4 :         assert_eq!(
    1744            4 :             tline
    1745            4 :                 .get_rel_page_at_lsn(
    1746            4 :                     TESTREL_A,
    1747            4 :                     0,
    1748            4 :                     Version::Lsn(Lsn(0x60)),
    1749            4 :                     &ctx,
    1750            4 :                     io_concurrency.clone()
    1751            4 :                 )
    1752            4 :                 .instrument(test_span.clone())
    1753            4 :                 .await?,
    1754            4 :             test_img("foo blk 0 at 3")
    1755            4 :         );
    1756            4 :         assert_eq!(
    1757            4 :             tline
    1758            4 :                 .get_rel_page_at_lsn(
    1759            4 :                     TESTREL_A,
    1760            4 :                     1,
    1761            4 :                     Version::Lsn(Lsn(0x60)),
    1762            4 :                     &ctx,
    1763            4 :                     io_concurrency.clone()
    1764            4 :                 )
    1765            4 :                 .instrument(test_span.clone())
    1766            4 :                 .await?,
    1767            4 :             test_img("foo blk 1 at 4")
    1768            4 :         );
    1769            4 : 
    1770            4 :         // should still see the truncated block with older LSN
    1771            4 :         assert_eq!(
    1772            4 :             tline
    1773            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1774            4 :                 .await?,
    1775            4 :             3
    1776            4 :         );
    1777            4 :         assert_eq!(
    1778            4 :             tline
    1779            4 :                 .get_rel_page_at_lsn(
    1780            4 :                     TESTREL_A,
    1781            4 :                     2,
    1782            4 :                     Version::Lsn(Lsn(0x50)),
    1783            4 :                     &ctx,
    1784            4 :                     io_concurrency.clone()
    1785            4 :                 )
    1786            4 :                 .instrument(test_span.clone())
    1787            4 :                 .await?,
    1788            4 :             test_img("foo blk 2 at 5")
    1789            4 :         );
    1790            4 : 
    1791            4 :         // Truncate to zero length
    1792            4 :         let mut m = tline.begin_modification(Lsn(0x68));
    1793            4 :         walingest
    1794            4 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1795            4 :             .await?;
    1796            4 :         m.commit(&ctx).await?;
    1797            4 :         assert_eq!(
    1798            4 :             tline
    1799            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
    1800            4 :                 .await?,
    1801            4 :             0
    1802            4 :         );
    1803            4 : 
    1804            4 :         // Extend from 0 to 2 blocks, leaving a gap
    1805            4 :         let mut m = tline.begin_modification(Lsn(0x70));
    1806            4 :         walingest
    1807            4 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
    1808            4 :             .await?;
    1809            4 :         m.commit(&ctx).await?;
    1810            4 :         assert_eq!(
    1811            4 :             tline
    1812            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
    1813            4 :                 .await?,
    1814            4 :             2
    1815            4 :         );
    1816            4 :         assert_eq!(
    1817            4 :             tline
    1818            4 :                 .get_rel_page_at_lsn(
    1819            4 :                     TESTREL_A,
    1820            4 :                     0,
    1821            4 :                     Version::Lsn(Lsn(0x70)),
    1822            4 :                     &ctx,
    1823            4 :                     io_concurrency.clone()
    1824            4 :                 )
    1825            4 :                 .instrument(test_span.clone())
    1826            4 :                 .await?,
    1827            4 :             ZERO_PAGE
    1828            4 :         );
    1829            4 :         assert_eq!(
    1830            4 :             tline
    1831            4 :                 .get_rel_page_at_lsn(
    1832            4 :                     TESTREL_A,
    1833            4 :                     1,
    1834            4 :                     Version::Lsn(Lsn(0x70)),
    1835            4 :                     &ctx,
    1836            4 :                     io_concurrency.clone()
    1837            4 :                 )
    1838            4 :                 .instrument(test_span.clone())
    1839            4 :                 .await?,
    1840            4 :             test_img("foo blk 1")
    1841            4 :         );
    1842            4 : 
    1843            4 :         // Extend a lot more, leaving a big gap that spans across segments
    1844            4 :         let mut m = tline.begin_modification(Lsn(0x80));
    1845            4 :         walingest
    1846            4 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
    1847            4 :             .await?;
    1848            4 :         m.commit(&ctx).await?;
    1849            4 :         assert_eq!(
    1850            4 :             tline
    1851            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    1852            4 :                 .await?,
    1853            4 :             1501
    1854            4 :         );
    1855         5996 :         for blk in 2..1500 {
    1856         5992 :             assert_eq!(
    1857         5992 :                 tline
    1858         5992 :                     .get_rel_page_at_lsn(
    1859         5992 :                         TESTREL_A,
    1860         5992 :                         blk,
    1861         5992 :                         Version::Lsn(Lsn(0x80)),
    1862         5992 :                         &ctx,
    1863         5992 :                         io_concurrency.clone()
    1864         5992 :                     )
    1865         5992 :                     .instrument(test_span.clone())
    1866         5992 :                     .await?,
    1867         5992 :                 ZERO_PAGE
    1868            4 :             );
    1869            4 :         }
    1870            4 :         assert_eq!(
    1871            4 :             tline
    1872            4 :                 .get_rel_page_at_lsn(
    1873            4 :                     TESTREL_A,
    1874            4 :                     1500,
    1875            4 :                     Version::Lsn(Lsn(0x80)),
    1876            4 :                     &ctx,
    1877            4 :                     io_concurrency.clone()
    1878            4 :                 )
    1879            4 :                 .instrument(test_span.clone())
    1880            4 :                 .await?,
    1881            4 :             test_img("foo blk 1500")
    1882            4 :         );
    1883            4 : 
    1884            4 :         Ok(())
    1885            4 :     }
    1886              : 
    1887              :     // Test what happens if we dropped a relation
    1888              :     // and then created it again within the same layer.
    1889              :     #[tokio::test]
    1890            4 :     async fn test_drop_extend() -> Result<()> {
    1891            4 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")
    1892            4 :             .await?
    1893            4 :             .load()
    1894            4 :             .await;
    1895            4 :         let tline = tenant
    1896            4 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1897            4 :             .await?;
    1898            4 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1899            4 : 
    1900            4 :         let mut m = tline.begin_modification(Lsn(0x20));
    1901            4 :         walingest
    1902            4 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1903            4 :             .await?;
    1904            4 :         m.commit(&ctx).await?;
    1905            4 : 
    1906            4 :         // Check that rel exists and size is correct
    1907            4 :         assert_eq!(
    1908            4 :             tline
    1909            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1910            4 :                 .await?,
    1911            4 :             true
    1912            4 :         );
    1913            4 :         assert_eq!(
    1914            4 :             tline
    1915            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1916            4 :                 .await?,
    1917            4 :             1
    1918            4 :         );
    1919            4 : 
    1920            4 :         // Drop rel
    1921            4 :         let mut m = tline.begin_modification(Lsn(0x30));
    1922            4 :         let mut rel_drops = HashMap::new();
    1923            4 :         rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
    1924            4 :         m.put_rel_drops(rel_drops, &ctx).await?;
    1925            4 :         m.commit(&ctx).await?;
    1926            4 : 
    1927            4 :         // Check that rel is not visible anymore
    1928            4 :         assert_eq!(
    1929            4 :             tline
    1930            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
    1931            4 :                 .await?,
    1932            4 :             false
    1933            4 :         );
    1934            4 : 
    1935            4 :         // FIXME: should fail
    1936            4 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    1937            4 : 
    1938            4 :         // Re-create it
    1939            4 :         let mut m = tline.begin_modification(Lsn(0x40));
    1940            4 :         walingest
    1941            4 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
    1942            4 :             .await?;
    1943            4 :         m.commit(&ctx).await?;
    1944            4 : 
    1945            4 :         // Check that rel exists and size is correct
    1946            4 :         assert_eq!(
    1947            4 :             tline
    1948            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    1949            4 :                 .await?,
    1950            4 :             true
    1951            4 :         );
    1952            4 :         assert_eq!(
    1953            4 :             tline
    1954            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    1955            4 :                 .await?,
    1956            4 :             1
    1957            4 :         );
    1958            4 : 
    1959            4 :         Ok(())
    1960            4 :     }
    1961              : 
    1962              :     // Test what happens if we truncated a relation
    1963              :     // so that one of its segments was dropped
    1964              :     // and then extended it again within the same layer.
    1965              :     #[tokio::test]
    1966            4 :     async fn test_truncate_extend() -> Result<()> {
    1967            4 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
    1968            4 :             .await?
    1969            4 :             .load()
    1970            4 :             .await;
    1971            4 :         let io_concurrency = IoConcurrency::spawn_for_test();
    1972            4 :         let tline = tenant
    1973            4 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1974            4 :             .await?;
    1975            4 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1976            4 : 
    1977            4 :         // Create a 20 MB relation (the size is arbitrary)
    1978            4 :         let relsize = 20 * 1024 * 1024 / 8192;
    1979            4 :         let mut m = tline.begin_modification(Lsn(0x20));
    1980        10240 :         for blkno in 0..relsize {
    1981        10240 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    1982        10240 :             walingest
    1983        10240 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    1984        10240 :                 .await?;
    1985            4 :         }
    1986            4 :         m.commit(&ctx).await?;
    1987            4 : 
    1988            4 :         let test_span = tracing::info_span!(parent: None, "test",
    1989            4 :                                             tenant_id=%tline.tenant_shard_id.tenant_id,
    1990            0 :                                             shard_id=%tline.tenant_shard_id.shard_slug(),
    1991            0 :                                             timeline_id=%tline.timeline_id);
    1992            4 : 
    1993            4 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    1994            4 :         assert_eq!(
    1995            4 :             tline
    1996            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1997            4 :                 .await?,
    1998            4 :             false
    1999            4 :         );
    2000            4 :         assert!(tline
    2001            4 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2002            4 :             .await
    2003            4 :             .is_err());
    2004            4 : 
    2005            4 :         assert_eq!(
    2006            4 :             tline
    2007            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2008            4 :                 .await?,
    2009            4 :             true
    2010            4 :         );
    2011            4 :         assert_eq!(
    2012            4 :             tline
    2013            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2014            4 :                 .await?,
    2015            4 :             relsize
    2016            4 :         );
    2017            4 : 
    2018            4 :         // Check relation content
    2019        10240 :         for blkno in 0..relsize {
    2020        10240 :             let lsn = Lsn(0x20);
    2021        10240 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2022        10240 :             assert_eq!(
    2023        10240 :                 tline
    2024        10240 :                     .get_rel_page_at_lsn(
    2025        10240 :                         TESTREL_A,
    2026        10240 :                         blkno,
    2027        10240 :                         Version::Lsn(lsn),
    2028        10240 :                         &ctx,
    2029        10240 :                         io_concurrency.clone()
    2030        10240 :                     )
    2031        10240 :                     .instrument(test_span.clone())
    2032        10240 :                     .await?,
    2033        10240 :                 test_img(&data)
    2034            4 :             );
    2035            4 :         }
    2036            4 : 
    2037            4 :         // Truncate relation so that second segment was dropped
    2038            4 :         // - only leave one page
    2039            4 :         let mut m = tline.begin_modification(Lsn(0x60));
    2040            4 :         walingest
    2041            4 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2042            4 :             .await?;
    2043            4 :         m.commit(&ctx).await?;
    2044            4 : 
    2045            4 :         // Check reported size and contents after truncation
    2046            4 :         assert_eq!(
    2047            4 :             tline
    2048            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    2049            4 :                 .await?,
    2050            4 :             1
    2051            4 :         );
    2052            4 : 
    2053            8 :         for blkno in 0..1 {
    2054            4 :             let lsn = Lsn(0x20);
    2055            4 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2056            4 :             assert_eq!(
    2057            4 :                 tline
    2058            4 :                     .get_rel_page_at_lsn(
    2059            4 :                         TESTREL_A,
    2060            4 :                         blkno,
    2061            4 :                         Version::Lsn(Lsn(0x60)),
    2062            4 :                         &ctx,
    2063            4 :                         io_concurrency.clone()
    2064            4 :                     )
    2065            4 :                     .instrument(test_span.clone())
    2066            4 :                     .await?,
    2067            4 :                 test_img(&data)
    2068            4 :             );
    2069            4 :         }
    2070            4 : 
    2071            4 :         // should still see all blocks with older LSN
    2072            4 :         assert_eq!(
    2073            4 :             tline
    2074            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2075            4 :                 .await?,
    2076            4 :             relsize
    2077            4 :         );
    2078        10240 :         for blkno in 0..relsize {
    2079        10240 :             let lsn = Lsn(0x20);
    2080        10240 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2081        10240 :             assert_eq!(
    2082        10240 :                 tline
    2083        10240 :                     .get_rel_page_at_lsn(
    2084        10240 :                         TESTREL_A,
    2085        10240 :                         blkno,
    2086        10240 :                         Version::Lsn(Lsn(0x50)),
    2087        10240 :                         &ctx,
    2088        10240 :                         io_concurrency.clone()
    2089        10240 :                     )
    2090        10240 :                     .instrument(test_span.clone())
    2091        10240 :                     .await?,
    2092        10240 :                 test_img(&data)
    2093            4 :             );
    2094            4 :         }
    2095            4 : 
    2096            4 :         // Extend relation again.
    2097            4 :         // Add enough blocks to create second segment
    2098            4 :         let lsn = Lsn(0x80);
    2099            4 :         let mut m = tline.begin_modification(lsn);
    2100        10240 :         for blkno in 0..relsize {
    2101        10240 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2102        10240 :             walingest
    2103        10240 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2104        10240 :                 .await?;
    2105            4 :         }
    2106            4 :         m.commit(&ctx).await?;
    2107            4 : 
    2108            4 :         assert_eq!(
    2109            4 :             tline
    2110            4 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2111            4 :                 .await?,
    2112            4 :             true
    2113            4 :         );
    2114            4 :         assert_eq!(
    2115            4 :             tline
    2116            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2117            4 :                 .await?,
    2118            4 :             relsize
    2119            4 :         );
    2120            4 :         // Check relation content
    2121        10240 :         for blkno in 0..relsize {
    2122        10240 :             let lsn = Lsn(0x80);
    2123        10240 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2124        10240 :             assert_eq!(
    2125        10240 :                 tline
    2126        10240 :                     .get_rel_page_at_lsn(
    2127        10240 :                         TESTREL_A,
    2128        10240 :                         blkno,
    2129        10240 :                         Version::Lsn(Lsn(0x80)),
    2130        10240 :                         &ctx,
    2131        10240 :                         io_concurrency.clone()
    2132        10240 :                     )
    2133        10240 :                     .instrument(test_span.clone())
    2134        10240 :                     .await?,
    2135        10240 :                 test_img(&data)
    2136            4 :             );
    2137            4 :         }
    2138            4 : 
    2139            4 :         Ok(())
    2140            4 :     }
    2141              : 
    2142              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2143              :     /// split into multiple 1 GB segments in Postgres.
    2144              :     #[tokio::test]
    2145            4 :     async fn test_large_rel() -> Result<()> {
    2146            4 :         let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
    2147            4 :         let tline = tenant
    2148            4 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2149            4 :             .await?;
    2150            4 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2151            4 : 
    2152            4 :         let mut lsn = 0x10;
    2153       524292 :         for blknum in 0..RELSEG_SIZE + 1 {
    2154       524292 :             lsn += 0x10;
    2155       524292 :             let mut m = tline.begin_modification(Lsn(lsn));
    2156       524292 :             let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2157       524292 :             walingest
    2158       524292 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2159       524292 :                 .await?;
    2160       524292 :             m.commit(&ctx).await?;
    2161            4 :         }
    2162            4 : 
    2163            4 :         assert_current_logical_size(&tline, Lsn(lsn));
    2164            4 : 
    2165            4 :         assert_eq!(
    2166            4 :             tline
    2167            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2168            4 :                 .await?,
    2169            4 :             RELSEG_SIZE + 1
    2170            4 :         );
    2171            4 : 
    2172            4 :         // Truncate one block
    2173            4 :         lsn += 0x10;
    2174            4 :         let mut m = tline.begin_modification(Lsn(lsn));
    2175            4 :         walingest
    2176            4 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2177            4 :             .await?;
    2178            4 :         m.commit(&ctx).await?;
    2179            4 :         assert_eq!(
    2180            4 :             tline
    2181            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2182            4 :                 .await?,
    2183            4 :             RELSEG_SIZE
    2184            4 :         );
    2185            4 :         assert_current_logical_size(&tline, Lsn(lsn));
    2186            4 : 
    2187            4 :         // Truncate another block
    2188            4 :         lsn += 0x10;
    2189            4 :         let mut m = tline.begin_modification(Lsn(lsn));
    2190            4 :         walingest
    2191            4 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2192            4 :             .await?;
    2193            4 :         m.commit(&ctx).await?;
    2194            4 :         assert_eq!(
    2195            4 :             tline
    2196            4 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2197            4 :                 .await?,
    2198            4 :             RELSEG_SIZE - 1
    2199            4 :         );
    2200            4 :         assert_current_logical_size(&tline, Lsn(lsn));
    2201            4 : 
    2202            4 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2203            4 :         // This tests the behavior at segment boundaries
    2204            4 :         let mut size: i32 = 3000;
    2205        12008 :         while size >= 0 {
    2206        12004 :             lsn += 0x10;
    2207        12004 :             let mut m = tline.begin_modification(Lsn(lsn));
    2208        12004 :             walingest
    2209        12004 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2210        12004 :                 .await?;
    2211        12004 :             m.commit(&ctx).await?;
    2212        12004 :             assert_eq!(
    2213        12004 :                 tline
    2214        12004 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2215        12004 :                     .await?,
    2216        12004 :                 size as BlockNumber
    2217            4 :             );
    2218            4 : 
    2219        12004 :             size -= 1;
    2220            4 :         }
    2221            4 :         assert_current_logical_size(&tline, Lsn(lsn));
    2222            4 : 
    2223            4 :         Ok(())
    2224            4 :     }
    2225              : 
    2226              :     /// Replay a wal segment file taken directly from safekeepers.
    2227              :     ///
    2228              :     /// This test is useful for benchmarking since it allows us to profile only
    2229              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2230              :     /// without waiting for unrelated steps.
    2231              :     #[tokio::test]
    2232            4 :     async fn test_ingest_real_wal() {
    2233            4 :         use crate::tenant::harness::*;
    2234            4 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2235            4 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2236            4 : 
    2237            4 :         // Define test data path and constants.
    2238            4 :         //
    2239            4 :         // Steps to reconstruct the data, if needed:
    2240            4 :         // 1. Run the pgbench python test
    2241            4 :         // 2. Take the first wal segment file from safekeeper
    2242            4 :         // 3. Compress it using `zstd --long input_file`
    2243            4 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2244            4 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2245            4 :         // 6. Run just the decoder from this test to get the endpoint.
    2246            4 :         //    It's the last LSN the decoder will output.
    2247            4 :         let pg_version = 15; // The test data was generated by pg15
    2248            4 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2249            4 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2250            4 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2251            4 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2252            4 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2253            4 : 
    2254            4 :         let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
    2255            4 :         let span = harness
    2256            4 :             .span()
    2257            4 :             .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
    2258            4 :         let (tenant, ctx) = harness.load().await;
    2259            4 : 
    2260            4 :         let remote_initdb_path =
    2261            4 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    2262            4 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2263            4 : 
    2264            4 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2265            4 :             .expect("creating test dir should work");
    2266            4 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2267            4 : 
    2268            4 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2269            4 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2270            4 :         // the garbage data.
    2271            4 :         let tline = tenant
    2272            4 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2273            4 :             .await
    2274            4 :             .unwrap();
    2275            4 : 
    2276            4 :         // We fully read and decompress this into memory before decoding
    2277            4 :         // to get a more accurate perf profile of the decoder.
    2278            4 :         let bytes = {
    2279            4 :             use async_compression::tokio::bufread::ZstdDecoder;
    2280            4 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2281            4 :             let reader = tokio::io::BufReader::new(file);
    2282            4 :             let decoder = ZstdDecoder::new(reader);
    2283            4 :             let mut reader = tokio::io::BufReader::new(decoder);
    2284            4 :             let mut buffer = Vec::new();
    2285            4 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2286            4 :             buffer
    2287            4 :         };
    2288            4 : 
    2289            4 :         // TODO start a profiler too
    2290            4 :         let started_at = std::time::Instant::now();
    2291            4 : 
    2292            4 :         // Initialize walingest
    2293            4 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2294            4 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2295            4 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2296            4 :             .await
    2297            4 :             .unwrap();
    2298            4 :         let mut modification = tline.begin_modification(startpoint);
    2299            4 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2300            4 : 
    2301            4 :         // Decode and ingest wal. We process the wal in chunks because
    2302            4 :         // that's what happens when we get bytes from safekeepers.
    2303       949372 :         for chunk in bytes[xlogoff..].chunks(50) {
    2304       949372 :             decoder.feed_bytes(chunk);
    2305      1241072 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2306       291700 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
    2307       291700 :                     recdata,
    2308       291700 :                     &[*modification.tline.get_shard_identity()],
    2309       291700 :                     lsn,
    2310       291700 :                     modification.tline.pg_version,
    2311       291700 :                 )
    2312       291700 :                 .unwrap()
    2313       291700 :                 .remove(modification.tline.get_shard_identity())
    2314       291700 :                 .unwrap();
    2315       291700 : 
    2316       291700 :                 walingest
    2317       291700 :                     .ingest_record(interpreted, &mut modification, &ctx)
    2318       291700 :                     .instrument(span.clone())
    2319       291700 :                     .await
    2320       291700 :                     .unwrap();
    2321            4 :             }
    2322       949372 :             modification.commit(&ctx).await.unwrap();
    2323            4 :         }
    2324            4 : 
    2325            4 :         let duration = started_at.elapsed();
    2326            4 :         println!("done in {:?}", duration);
    2327            4 :     }
    2328              : }
        

Generated by: LCOV version 2.1-beta