LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 55.7 % 1601 892
Test Date: 2025-07-31 15:59:03 Functions: 56.5 % 92 52

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  -> [`wal_decoder`] ->  WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers.
       9              : //! Records get decoded and interpreted in the [`wal_decoder`] module
      10              : //! and then stored to the Repository by WalIngest.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
      14              : //! extracts page images out of some WAL records, but mostly it's WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_rel_wal_record or put_rel_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use std::backtrace::Backtrace;
      25              : use std::collections::HashMap;
      26              : use std::sync::atomic::AtomicBool;
      27              : use std::sync::{Arc, OnceLock};
      28              : use std::time::{Duration, Instant, SystemTime};
      29              : 
      30              : use bytes::{Buf, Bytes};
      31              : use pageserver_api::key::{Key, rel_block_to_key};
      32              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      33              : use pageserver_api::shard::ShardIdentity;
      34              : use postgres_ffi::walrecord::*;
      35              : use postgres_ffi::{
      36              :     PgMajorVersion, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
      37              :     fsm_logical_to_physical, pg_constants,
      38              : };
      39              : use postgres_ffi_types::TimestampTz;
      40              : use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      41              : use tracing::*;
      42              : use utils::bin_ser::{DeserializeError, SerializeError};
      43              : use utils::lsn::Lsn;
      44              : use utils::rate_limit::RateLimit;
      45              : use utils::{critical_timeline, failpoint_support};
      46              : use wal_decoder::models::record::NeonWalRecord;
      47              : use wal_decoder::models::*;
      48              : 
      49              : use crate::ZERO_PAGE;
      50              : use crate::context::RequestContext;
      51              : use crate::metrics::WAL_INGEST;
      52              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      53              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      54              : use crate::tenant::{PageReconstructError, Timeline};
      55              : 
      56              : enum_pgversion! {CheckPoint, pgv::CheckPoint}
      57              : 
      58              : impl CheckPoint {
      59            3 :     fn encode(&self) -> Result<Bytes, SerializeError> {
      60            3 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
      61            3 :     }
      62              : 
      63        72917 :     fn update_next_xid(&mut self, xid: u32) -> bool {
      64        72917 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
      65        72917 :     }
      66              : 
      67            0 :     pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
      68            0 :         enum_pgversion_dispatch!(self, CheckPoint, cp, {
      69            0 :             cp.update_next_multixid(multi_xid, multi_offset)
      70              :         })
      71            0 :     }
      72              : }
      73              : 
      74              : /// Temporary limitation of WAL lag warnings after attach
      75              : ///
      76              : /// After tenant attach, we want to limit WAL lag warnings because
      77              : /// we don't look at the WAL until the attach is complete, which
      78              : /// might take a while.
      79              : pub struct WalLagCooldown {
      80              :     /// Until when should this limitation apply at all
      81              :     active_until: std::time::Instant,
      82              :     /// The maximum lag to suppress. Lags above this limit get reported anyways.
      83              :     max_lag: Duration,
      84              : }
      85              : 
      86              : impl WalLagCooldown {
      87            0 :     pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
      88            0 :         Self {
      89            0 :             active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
      90            0 :             max_lag: attach_duration * 2 + Duration::from_secs(60),
      91            0 :         }
      92            0 :     }
      93              : }
      94              : 
      95              : pub struct WalIngest {
      96              :     attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
      97              :     shard: ShardIdentity,
      98              :     checkpoint: CheckPoint,
      99              :     checkpoint_modified: bool,
     100              :     warn_ingest_lag: WarnIngestLag,
     101              : }
     102              : 
     103              : struct WarnIngestLag {
     104              :     lag_msg_ratelimit: RateLimit,
     105              :     future_lsn_msg_ratelimit: RateLimit,
     106              :     timestamp_invalid_msg_ratelimit: RateLimit,
     107              : }
     108              : 
     109              : pub struct WalIngestError {
     110              :     pub backtrace: std::backtrace::Backtrace,
     111              :     pub kind: WalIngestErrorKind,
     112              : }
     113              : 
     114              : #[derive(thiserror::Error, Debug)]
     115              : pub enum WalIngestErrorKind {
     116              :     #[error(transparent)]
     117              :     #[allow(private_interfaces)]
     118              :     PageReconstructError(#[from] PageReconstructError),
     119              :     #[error(transparent)]
     120              :     DeserializationFailure(#[from] DeserializeError),
     121              :     #[error(transparent)]
     122              :     SerializationFailure(#[from] SerializeError),
     123              :     #[error("the request contains data not supported by pageserver: {0} @ {1}")]
     124              :     InvalidKey(Key, Lsn),
     125              :     #[error("twophase file for xid {0} already exists")]
     126              :     FileAlreadyExists(u64),
     127              :     #[error("slru segment {0:?}/{1} already exists")]
     128              :     SlruAlreadyExists(SlruKind, u32),
     129              :     #[error("relation already exists")]
     130              :     RelationAlreadyExists(RelTag),
     131              :     #[error("invalid reldir key {0}")]
     132              :     InvalidRelDirKey(Key),
     133              : 
     134              :     #[error(transparent)]
     135              :     LogicalError(anyhow::Error),
     136              :     #[error(transparent)]
     137              :     EncodeAuxFileError(anyhow::Error),
     138              :     #[error(transparent)]
     139              :     MaybeRelSizeV2Error(anyhow::Error),
     140              : 
     141              :     #[error("timeline shutting down")]
     142              :     Cancelled,
     143              : }
     144              : 
     145              : impl<T> From<T> for WalIngestError
     146              : where
     147              :     WalIngestErrorKind: From<T>,
     148              : {
     149            0 :     fn from(value: T) -> Self {
     150            0 :         WalIngestError {
     151            0 :             backtrace: Backtrace::capture(),
     152            0 :             kind: WalIngestErrorKind::from(value),
     153            0 :         }
     154            0 :     }
     155              : }
     156              : 
     157              : impl std::error::Error for WalIngestError {
     158            0 :     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
     159            0 :         self.kind.source()
     160            0 :     }
     161              : }
     162              : 
     163              : impl core::fmt::Display for WalIngestError {
     164            0 :     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
     165            0 :         self.kind.fmt(f)
     166            0 :     }
     167              : }
     168              : 
     169              : impl core::fmt::Debug for WalIngestError {
     170            0 :     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
     171            0 :         if f.alternate() {
     172            0 :             f.debug_map()
     173            0 :                 .key(&"backtrace")
     174            0 :                 .value(&self.backtrace)
     175            0 :                 .key(&"kind")
     176            0 :                 .value(&self.kind)
     177            0 :                 .finish()
     178              :         } else {
     179            0 :             writeln!(f, "Error: {:?}", self.kind)?;
     180            0 :             if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
     181            0 :                 writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
     182            0 :             }
     183            0 :             Ok(())
     184              :         }
     185            0 :     }
     186              : }
     187              : 
     188              : #[macro_export]
     189              : macro_rules! ensure_walingest {
     190              :     ($($t:tt)*) => {
     191       354701 :         _ = || -> Result<(), anyhow::Error> {
     192       354701 :             anyhow::ensure!($($t)*);
     193       354701 :             Ok(())
     194       354701 :         }().map_err(WalIngestErrorKind::LogicalError)?;
     195              :     };
     196              : }
     197              : 
     198              : impl WalIngest {
     199            6 :     pub async fn new(
     200            6 :         timeline: &Timeline,
     201            6 :         startpoint: Lsn,
     202            6 :         ctx: &RequestContext,
     203            6 :     ) -> Result<WalIngest, WalIngestError> {
     204              :         // Fetch the latest checkpoint into memory, so that we can compare with it
     205              :         // quickly in `ingest_record` and update it when it changes.
     206            6 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
     207            6 :         let pgversion = timeline.pg_version;
     208              : 
     209            6 :         let checkpoint = dispatch_pgversion!(pgversion, {
     210            0 :             let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
     211            4 :             trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
     212            0 :             <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
     213              :         });
     214              : 
     215            6 :         Ok(WalIngest {
     216            6 :             shard: *timeline.get_shard_identity(),
     217            6 :             checkpoint,
     218            6 :             checkpoint_modified: false,
     219            6 :             attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
     220            6 :             warn_ingest_lag: WarnIngestLag {
     221            6 :                 lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     222            6 :                 future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     223            6 :                 timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     224            6 :             },
     225            6 :         })
     226            6 :     }
     227              : 
     228              :     /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
     229              :     /// storage of a given timeline.
     230              :     ///
     231              :     /// This function updates `lsn` field of `DatadirModification`
     232              :     ///
     233              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
     234        72926 :     pub async fn ingest_record(
     235        72926 :         &mut self,
     236        72926 :         interpreted: InterpretedWalRecord,
     237        72926 :         modification: &mut DatadirModification<'_>,
     238        72926 :         ctx: &RequestContext,
     239        72926 :     ) -> Result<bool, WalIngestError> {
     240        72926 :         WAL_INGEST.records_received.inc();
     241        72926 :         let prev_len = modification.len();
     242              : 
     243        72926 :         modification.set_lsn(interpreted.next_record_lsn)?;
     244              : 
     245        72926 :         if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
     246              :             // Records of this type should always be preceded by a commit(), as they
     247              :             // rely on reading data pages back from the Timeline.
     248            0 :             assert!(!modification.has_dirty_data());
     249        72926 :         }
     250              : 
     251        72926 :         assert!(!self.checkpoint_modified);
     252        72926 :         if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
     253        72917 :             && self.checkpoint.update_next_xid(interpreted.xid)
     254            1 :         {
     255            1 :             self.checkpoint_modified = true;
     256        72925 :         }
     257              : 
     258        72926 :         failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
     259              : 
     260           33 :         match interpreted.metadata_record {
     261            6 :             Some(MetadataRecord::Heapam(rec)) => match rec {
     262            6 :                 HeapamRecord::ClearVmBits(clear_vm_bits) => {
     263            6 :                     self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     264            6 :                         .await?;
     265              :                 }
     266              :             },
     267            0 :             Some(MetadataRecord::Neonrmgr(rec)) => match rec {
     268            0 :                 NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
     269            0 :                     self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     270            0 :                         .await?;
     271              :                 }
     272              :             },
     273            8 :             Some(MetadataRecord::Smgr(rec)) => match rec {
     274            8 :                 SmgrRecord::Create(create) => {
     275            8 :                     self.ingest_xlog_smgr_create(create, modification, ctx)
     276            8 :                         .await?;
     277              :                 }
     278            0 :                 SmgrRecord::Truncate(truncate) => {
     279            0 :                     self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
     280            0 :                         .await?;
     281              :                 }
     282              :             },
     283            0 :             Some(MetadataRecord::Dbase(rec)) => match rec {
     284            0 :                 DbaseRecord::Create(create) => {
     285            0 :                     self.ingest_xlog_dbase_create(create, modification, ctx)
     286            0 :                         .await?;
     287              :                 }
     288            0 :                 DbaseRecord::Drop(drop) => {
     289            0 :                     self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
     290              :                 }
     291              :             },
     292            0 :             Some(MetadataRecord::Clog(rec)) => match rec {
     293            0 :                 ClogRecord::ZeroPage(zero_page) => {
     294            0 :                     self.ingest_clog_zero_page(zero_page, modification, ctx)
     295            0 :                         .await?;
     296              :                 }
     297            0 :                 ClogRecord::Truncate(truncate) => {
     298            0 :                     self.ingest_clog_truncate(truncate, modification, ctx)
     299            0 :                         .await?;
     300              :                 }
     301              :             },
     302            4 :             Some(MetadataRecord::Xact(rec)) => {
     303            4 :                 self.ingest_xact_record(rec, modification, ctx).await?;
     304              :             }
     305            0 :             Some(MetadataRecord::MultiXact(rec)) => match rec {
     306            0 :                 MultiXactRecord::ZeroPage(zero_page) => {
     307            0 :                     self.ingest_multixact_zero_page(zero_page, modification, ctx)
     308            0 :                         .await?;
     309              :                 }
     310            0 :                 MultiXactRecord::Create(create) => {
     311            0 :                     self.ingest_multixact_create(modification, &create)?;
     312              :                 }
     313            0 :                 MultiXactRecord::Truncate(truncate) => {
     314            0 :                     self.ingest_multixact_truncate(modification, &truncate, ctx)
     315            0 :                         .await?;
     316              :                 }
     317              :             },
     318            0 :             Some(MetadataRecord::Relmap(rec)) => match rec {
     319            0 :                 RelmapRecord::Update(update) => {
     320            0 :                     self.ingest_relmap_update(update, modification, ctx).await?;
     321              :                 }
     322              :             },
     323           15 :             Some(MetadataRecord::Xlog(rec)) => match rec {
     324           15 :                 XlogRecord::Raw(raw) => {
     325           15 :                     self.ingest_raw_xlog_record(raw, modification, ctx).await?;
     326              :                 }
     327              :             },
     328            0 :             Some(MetadataRecord::LogicalMessage(rec)) => match rec {
     329            0 :                 LogicalMessageRecord::Put(put) => {
     330            0 :                     self.ingest_logical_message_put(put, modification, ctx)
     331            0 :                         .await?;
     332              :                 }
     333              :                 #[cfg(feature = "testing")]
     334              :                 LogicalMessageRecord::Failpoint => {
     335              :                     // This is a convenient way to make the WAL ingestion pause at
     336              :                     // particular point in the WAL. For more fine-grained control,
     337              :                     // we could peek into the message and only pause if it contains
     338              :                     // a particular string, for example, but this is enough for now.
     339            0 :                     failpoint_support::sleep_millis_async!(
     340              :                         "pageserver-wal-ingest-logical-message-sleep"
     341              :                     );
     342              :                 }
     343              :             },
     344            0 :             Some(MetadataRecord::Standby(rec)) => {
     345            0 :                 self.ingest_standby_record(rec).unwrap();
     346            0 :             }
     347            0 :             Some(MetadataRecord::Replorigin(rec)) => {
     348            0 :                 self.ingest_replorigin_record(rec, modification).await?;
     349              :             }
     350        72893 :             None => {
     351        72893 :                 // There are two cases through which we end up here:
     352        72893 :                 // 1. The resource manager for the original PG WAL record
     353        72893 :                 //    is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
     354        72893 :                 //    record type within Neon.
     355        72893 :                 // 2. The resource manager id was unknown to
     356        72893 :                 //    [`wal_decoder::decoder::MetadataRecord::from_decoded`].
     357        72893 :                 // TODO(vlad): Tighten this up more once we build confidence
     358        72893 :                 // that case (2) does not happen in the field.
     359        72893 :             }
     360              :         }
     361              : 
     362        72926 :         modification
     363        72926 :             .ingest_batch(interpreted.batch, &self.shard, ctx)
     364        72926 :             .await?;
     365              : 
     366              :         // If checkpoint data was updated, store the new version in the repository
     367        72926 :         if self.checkpoint_modified {
     368            3 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     369              : 
     370            3 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     371            3 :             self.checkpoint_modified = false;
     372        72923 :         }
     373              : 
     374              :         // Note that at this point this record is only cached in the modification
     375              :         // until commit() is called to flush the data into the repository and update
     376              :         // the latest LSN.
     377              : 
     378        72926 :         Ok(modification.len() > prev_len)
     379        72926 :     }
     380              : 
     381              :     /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
     382            0 :     fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
     383            0 :         let next_full_xid =
     384            0 :             enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
     385              : 
     386            0 :         let next_xid = (next_full_xid) as u32;
     387            0 :         let mut epoch = (next_full_xid >> 32) as u32;
     388              : 
     389            0 :         if xid > next_xid {
     390              :             // Wraparound occurred, must be from a prev epoch.
     391            0 :             if epoch == 0 {
     392            0 :                 Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
     393            0 :                     "apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
     394            0 :                 )))?;
     395            0 :             }
     396            0 :             epoch -= 1;
     397            0 :         }
     398              : 
     399            0 :         Ok(((epoch as u64) << 32) | xid as u64)
     400            0 :     }
     401              : 
     402            6 :     async fn ingest_clear_vm_bits(
     403            6 :         &mut self,
     404            6 :         clear_vm_bits: ClearVmBits,
     405            6 :         modification: &mut DatadirModification<'_>,
     406            6 :         ctx: &RequestContext,
     407            6 :     ) -> Result<(), WalIngestError> {
     408              :         let ClearVmBits {
     409            6 :             new_heap_blkno,
     410            6 :             old_heap_blkno,
     411            6 :             flags,
     412            6 :             vm_rel,
     413            6 :         } = clear_vm_bits;
     414              :         // Clear the VM bits if required.
     415            6 :         let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     416            6 :         let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     417              : 
     418              :         // VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
     419              :         // its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
     420              :         // as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
     421              :         // https://github.com/neondatabase/neon/pull/10634.
     422            6 :         let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
     423            0 :             critical_timeline!(
     424            0 :                 modification.tline.tenant_shard_id,
     425            0 :                 modification.tline.timeline_id,
     426              :                 // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
     427            0 :                 None::<&AtomicBool>,
     428            0 :                 "clear_vm_bits for unknown VM relation {vm_rel}"
     429              :             );
     430            0 :             return Ok(());
     431              :         };
     432            6 :         if let Some(blknum) = new_vm_blk {
     433            6 :             if blknum >= vm_size {
     434            0 :                 critical_timeline!(
     435            0 :                     modification.tline.tenant_shard_id,
     436            0 :                     modification.tline.timeline_id,
     437              :                     // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
     438            0 :                     None::<&AtomicBool>,
     439            0 :                     "new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
     440              :                 );
     441            0 :                 new_vm_blk = None;
     442            6 :             }
     443            0 :         }
     444            6 :         if let Some(blknum) = old_vm_blk {
     445            0 :             if blknum >= vm_size {
     446            0 :                 critical_timeline!(
     447            0 :                     modification.tline.tenant_shard_id,
     448            0 :                     modification.tline.timeline_id,
     449              :                     // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
     450            0 :                     None::<&AtomicBool>,
     451            0 :                     "old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
     452              :                 );
     453            0 :                 old_vm_blk = None;
     454            0 :             }
     455            6 :         }
     456              : 
     457            6 :         if new_vm_blk.is_none() && old_vm_blk.is_none() {
     458            0 :             return Ok(());
     459            6 :         } else if new_vm_blk == old_vm_blk {
     460              :             // An UPDATE record that needs to clear the bits for both old and the new page, both of
     461              :             // which reside on the same VM page.
     462            0 :             self.put_rel_wal_record(
     463            0 :                 modification,
     464            0 :                 vm_rel,
     465            0 :                 new_vm_blk.unwrap(),
     466            0 :                 NeonWalRecord::ClearVisibilityMapFlags {
     467            0 :                     new_heap_blkno,
     468            0 :                     old_heap_blkno,
     469            0 :                     flags,
     470            0 :                 },
     471            0 :                 ctx,
     472            0 :             )
     473            0 :             .await?;
     474              :         } else {
     475              :             // Clear VM bits for one heap page, or for two pages that reside on different VM pages.
     476            6 :             if let Some(new_vm_blk) = new_vm_blk {
     477            6 :                 self.put_rel_wal_record(
     478            6 :                     modification,
     479            6 :                     vm_rel,
     480            6 :                     new_vm_blk,
     481            6 :                     NeonWalRecord::ClearVisibilityMapFlags {
     482            6 :                         new_heap_blkno,
     483            6 :                         old_heap_blkno: None,
     484            6 :                         flags,
     485            6 :                     },
     486            6 :                     ctx,
     487            6 :                 )
     488            6 :                 .await?;
     489            0 :             }
     490            6 :             if let Some(old_vm_blk) = old_vm_blk {
     491            0 :                 self.put_rel_wal_record(
     492            0 :                     modification,
     493            0 :                     vm_rel,
     494            0 :                     old_vm_blk,
     495            0 :                     NeonWalRecord::ClearVisibilityMapFlags {
     496            0 :                         new_heap_blkno: None,
     497            0 :                         old_heap_blkno,
     498            0 :                         flags,
     499            0 :                     },
     500            0 :                     ctx,
     501            0 :                 )
     502            0 :                 .await?;
     503            6 :             }
     504              :         }
     505            6 :         Ok(())
     506            6 :     }
     507              : 
     508              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
     509            0 :     async fn ingest_xlog_dbase_create(
     510            0 :         &mut self,
     511            0 :         create: DbaseCreate,
     512            0 :         modification: &mut DatadirModification<'_>,
     513            0 :         ctx: &RequestContext,
     514            0 :     ) -> Result<(), WalIngestError> {
     515              :         let DbaseCreate {
     516            0 :             db_id,
     517            0 :             tablespace_id,
     518            0 :             src_db_id,
     519            0 :             src_tablespace_id,
     520            0 :         } = create;
     521              : 
     522            0 :         let rels = modification
     523            0 :             .tline
     524            0 :             .list_rels(
     525            0 :                 src_tablespace_id,
     526            0 :                 src_db_id,
     527            0 :                 Version::Modified(modification),
     528            0 :                 ctx,
     529            0 :             )
     530            0 :             .await?;
     531              : 
     532            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
     533              : 
     534              :         // Copy relfilemap
     535            0 :         let filemap = modification
     536            0 :             .tline
     537            0 :             .get_relmap_file(
     538            0 :                 src_tablespace_id,
     539            0 :                 src_db_id,
     540            0 :                 Version::Modified(modification),
     541            0 :                 ctx,
     542            0 :             )
     543            0 :             .await?;
     544            0 :         modification
     545            0 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
     546            0 :             .await?;
     547              : 
     548            0 :         let mut num_rels_copied = 0;
     549            0 :         let mut num_blocks_copied = 0;
     550            0 :         for src_rel in rels {
     551            0 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
     552            0 :             assert_eq!(src_rel.dbnode, src_db_id);
     553              : 
     554            0 :             let nblocks = modification
     555            0 :                 .tline
     556            0 :                 .get_rel_size(src_rel, Version::Modified(modification), ctx)
     557            0 :                 .await?;
     558            0 :             let dst_rel = RelTag {
     559            0 :                 spcnode: tablespace_id,
     560            0 :                 dbnode: db_id,
     561            0 :                 relnode: src_rel.relnode,
     562            0 :                 forknum: src_rel.forknum,
     563            0 :             };
     564              : 
     565            0 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
     566              : 
     567              :             // Copy content
     568            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
     569            0 :             for blknum in 0..nblocks {
     570              :                 // Sharding:
     571              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
     572              :                 //    dbNode is not included in the hash inputs for sharding.
     573              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
     574              :                 //    that belong to it.
     575            0 :                 let src_key = rel_block_to_key(src_rel, blknum);
     576            0 :                 if !self.shard.is_key_local(&src_key) {
     577            0 :                     debug!(
     578            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
     579              :                         src_key
     580              :                     );
     581            0 :                     continue;
     582            0 :                 }
     583            0 :                 debug!(
     584            0 :                     "copying block {} from {} ({}) to {}",
     585              :                     blknum, src_rel, src_key, dst_rel
     586              :                 );
     587              : 
     588            0 :                 let content = modification
     589            0 :                     .tline
     590            0 :                     .get_rel_page_at_lsn(
     591            0 :                         src_rel,
     592            0 :                         blknum,
     593            0 :                         Version::Modified(modification),
     594            0 :                         ctx,
     595            0 :                         crate::tenant::storage_layer::IoConcurrency::sequential(),
     596            0 :                     )
     597            0 :                     .await?;
     598            0 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
     599            0 :                 num_blocks_copied += 1;
     600              :             }
     601              : 
     602            0 :             num_rels_copied += 1;
     603              :         }
     604              : 
     605            0 :         info!(
     606            0 :             "Created database {}/{}, copied {} blocks in {} rels",
     607              :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
     608              :         );
     609            0 :         Ok(())
     610            0 :     }
     611              : 
     612            0 :     async fn ingest_xlog_dbase_drop(
     613            0 :         &mut self,
     614            0 :         dbase_drop: DbaseDrop,
     615            0 :         modification: &mut DatadirModification<'_>,
     616            0 :         ctx: &RequestContext,
     617            0 :     ) -> Result<(), WalIngestError> {
     618              :         let DbaseDrop {
     619            0 :             db_id,
     620            0 :             tablespace_ids,
     621            0 :         } = dbase_drop;
     622            0 :         for tablespace_id in tablespace_ids {
     623            0 :             trace!("Drop db {}, {}", tablespace_id, db_id);
     624            0 :             modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
     625              :         }
     626              : 
     627            0 :         Ok(())
     628            0 :     }
     629              : 
     630            8 :     async fn ingest_xlog_smgr_create(
     631            8 :         &mut self,
     632            8 :         create: SmgrCreate,
     633            8 :         modification: &mut DatadirModification<'_>,
     634            8 :         ctx: &RequestContext,
     635            8 :     ) -> Result<(), WalIngestError> {
     636            8 :         let SmgrCreate { rel } = create;
     637            8 :         self.put_rel_creation(modification, rel, ctx).await?;
     638            8 :         Ok(())
     639            8 :     }
     640              : 
     641              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
     642              :     ///
     643              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
     644            0 :     async fn ingest_xlog_smgr_truncate(
     645            0 :         &mut self,
     646            0 :         truncate: XlSmgrTruncate,
     647            0 :         modification: &mut DatadirModification<'_>,
     648            0 :         ctx: &RequestContext,
     649            0 :     ) -> Result<(), WalIngestError> {
     650              :         let XlSmgrTruncate {
     651            0 :             blkno,
     652            0 :             rnode,
     653            0 :             flags,
     654            0 :         } = truncate;
     655              : 
     656            0 :         let spcnode = rnode.spcnode;
     657            0 :         let dbnode = rnode.dbnode;
     658            0 :         let relnode = rnode.relnode;
     659              : 
     660            0 :         if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
     661            0 :             let rel = RelTag {
     662            0 :                 spcnode,
     663            0 :                 dbnode,
     664            0 :                 relnode,
     665            0 :                 forknum: MAIN_FORKNUM,
     666            0 :             };
     667              : 
     668            0 :             self.put_rel_truncation(modification, rel, blkno, ctx)
     669            0 :                 .await?;
     670            0 :         }
     671            0 :         if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
     672            0 :             let rel = RelTag {
     673            0 :                 spcnode,
     674            0 :                 dbnode,
     675            0 :                 relnode,
     676            0 :                 forknum: FSM_FORKNUM,
     677            0 :             };
     678              : 
     679              :             // Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
     680              :             // and instead of digging in the FSM bitmap format we just clear the whole page.
     681            0 :             let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
     682            0 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
     683            0 :             if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
     684            0 :                 && self
     685            0 :                     .shard
     686            0 :                     .is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
     687              :             {
     688            0 :                 modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
     689            0 :                 fsm_physical_page_no += 1;
     690            0 :             }
     691              :             // Truncate this shard's view of the FSM relation size, if it even has one.
     692            0 :             let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
     693            0 :             if nblocks > fsm_physical_page_no {
     694            0 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
     695            0 :                     .await?;
     696            0 :             }
     697            0 :         }
     698            0 :         if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
     699            0 :             let rel = RelTag {
     700            0 :                 spcnode,
     701            0 :                 dbnode,
     702            0 :                 relnode,
     703            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     704            0 :             };
     705              : 
     706              :             // last remaining block, byte, and bit
     707            0 :             let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
     708            0 :             let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
     709            0 :                 / pg_constants::VM_HEAPBLOCKS_PER_BYTE;
     710            0 :             let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
     711            0 :                 * pg_constants::VM_BITS_PER_HEAPBLOCK;
     712              : 
     713              :             // Unless the new size is exactly at a visibility map page boundary, the
     714              :             // tail bits in the last remaining map page, representing truncated heap
     715              :             // blocks, need to be cleared. This is not only tidy, but also necessary
     716              :             // because we don't get a chance to clear the bits if the heap is extended
     717              :             // again. Only do this on the shard that owns the page.
     718            0 :             if (trunc_byte != 0 || trunc_offs != 0)
     719            0 :                 && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
     720              :             {
     721            0 :                 modification.put_rel_wal_record(
     722            0 :                     rel,
     723            0 :                     vm_page_no,
     724            0 :                     NeonWalRecord::TruncateVisibilityMap {
     725            0 :                         trunc_byte,
     726            0 :                         trunc_offs,
     727            0 :                     },
     728            0 :                 )?;
     729            0 :                 vm_page_no += 1;
     730            0 :             }
     731              :             // Truncate this shard's view of the VM relation size, if it even has one.
     732            0 :             let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
     733            0 :             if nblocks > vm_page_no {
     734            0 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
     735            0 :                     .await?;
     736            0 :             }
     737            0 :         }
     738            0 :         Ok(())
     739            0 :     }
     740              : 
     741            4 :     fn warn_on_ingest_lag(
     742            4 :         &mut self,
     743            4 :         conf: &crate::config::PageServerConf,
     744            4 :         wal_timestamp: TimestampTz,
     745            4 :     ) {
     746            4 :         debug_assert_current_span_has_tenant_and_timeline_id();
     747            4 :         let now = SystemTime::now();
     748            4 :         let rate_limits = &mut self.warn_ingest_lag;
     749              : 
     750            4 :         let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
     751            0 :             pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
     752              :         });
     753              : 
     754            4 :         match ts {
     755            4 :             Ok(ts) => {
     756            4 :                 match now.duration_since(ts) {
     757            4 :                     Ok(lag) => {
     758            4 :                         if lag > conf.wait_lsn_timeout {
     759            4 :                             rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
     760            1 :                                 if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
     761            0 :                                     if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
     762            0 :                                         return;
     763            0 :                                     }
     764            1 :                                 } else {
     765            1 :                                     // Still loading? We shouldn't be here
     766            1 :                                 }
     767            1 :                                 let lag = humantime::format_duration(lag);
     768            1 :                                 warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
     769            1 :                             })
     770            0 :                         }
     771              :                     }
     772            0 :                     Err(e) => {
     773            0 :                         let delta_t = e.duration();
     774              :                         // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
     775              :                         // => https://www.robustperception.io/time-metric-from-the-node-exporter/
     776              :                         const IGNORED_DRIFT: Duration = Duration::from_millis(100);
     777            0 :                         if delta_t > IGNORED_DRIFT {
     778            0 :                             let delta_t = humantime::format_duration(delta_t);
     779            0 :                             rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
     780            0 :                                 warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
     781            0 :                             })
     782            0 :                         }
     783              :                     }
     784              :                 };
     785              :             }
     786            0 :             Err(error) => {
     787            0 :                 rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
     788            0 :                     warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
     789            0 :                 })
     790              :             }
     791              :         }
     792            4 :     }
     793              : 
     794              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
     795              :     ///
     796            4 :     async fn ingest_xact_record(
     797            4 :         &mut self,
     798            4 :         record: XactRecord,
     799            4 :         modification: &mut DatadirModification<'_>,
     800            4 :         ctx: &RequestContext,
     801            4 :     ) -> Result<(), WalIngestError> {
     802            4 :         let (xact_common, is_commit, is_prepared) = match record {
     803            0 :             XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
     804            0 :                 let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
     805            0 :                     self.adjust_to_full_transaction_id(xl_xid)?
     806              :                 } else {
     807            0 :                     xl_xid as u64
     808              :                 };
     809            0 :                 return modification.put_twophase_file(xid, data, ctx).await;
     810              :             }
     811            4 :             XactRecord::Commit(common) => (common, true, false),
     812            0 :             XactRecord::Abort(common) => (common, false, false),
     813            0 :             XactRecord::CommitPrepared(common) => (common, true, true),
     814            0 :             XactRecord::AbortPrepared(common) => (common, false, true),
     815              :         };
     816              : 
     817              :         let XactCommon {
     818            4 :             parsed,
     819            4 :             origin_id,
     820            4 :             xl_xid,
     821            4 :             lsn,
     822            4 :         } = xact_common;
     823              : 
     824              :         // Record update of CLOG pages
     825            4 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
     826            4 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     827            4 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     828            4 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
     829              : 
     830            4 :         self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
     831              : 
     832            4 :         for subxact in &parsed.subxacts {
     833            0 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
     834            0 :             if subxact_pageno != pageno {
     835              :                 // This subxact goes to different page. Write the record
     836              :                 // for all the XIDs on the previous page, and continue
     837              :                 // accumulating XIDs on this new page.
     838            0 :                 modification.put_slru_wal_record(
     839            0 :                     SlruKind::Clog,
     840            0 :                     segno,
     841            0 :                     rpageno,
     842            0 :                     if is_commit {
     843            0 :                         NeonWalRecord::ClogSetCommitted {
     844            0 :                             xids: page_xids,
     845            0 :                             timestamp: parsed.xact_time,
     846            0 :                         }
     847              :                     } else {
     848            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
     849              :                     },
     850            0 :                 )?;
     851            0 :                 page_xids = Vec::new();
     852            0 :             }
     853            0 :             pageno = subxact_pageno;
     854            0 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     855            0 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     856            0 :             page_xids.push(*subxact);
     857              :         }
     858            4 :         modification.put_slru_wal_record(
     859            4 :             SlruKind::Clog,
     860            4 :             segno,
     861            4 :             rpageno,
     862            4 :             if is_commit {
     863            4 :                 NeonWalRecord::ClogSetCommitted {
     864            4 :                     xids: page_xids,
     865            4 :                     timestamp: parsed.xact_time,
     866            4 :                 }
     867              :             } else {
     868            0 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
     869              :             },
     870            0 :         )?;
     871              : 
     872              :         // Group relations to drop by dbNode.  This map will contain all relations that _might_
     873              :         // exist, we will reduce it to which ones really exist later.  This map can be huge if
     874              :         // the transaction touches a huge number of relations (there is no bound on this in
     875              :         // postgres).
     876            4 :         let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
     877              : 
     878            4 :         for xnode in &parsed.xnodes {
     879            0 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
     880            0 :                 let rel = RelTag {
     881            0 :                     forknum,
     882            0 :                     spcnode: xnode.spcnode,
     883            0 :                     dbnode: xnode.dbnode,
     884            0 :                     relnode: xnode.relnode,
     885            0 :                 };
     886            0 :                 drop_relations
     887            0 :                     .entry((xnode.spcnode, xnode.dbnode))
     888            0 :                     .or_default()
     889            0 :                     .push(rel);
     890            0 :             }
     891              :         }
     892              : 
     893              :         // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
     894            4 :         modification.put_rel_drops(drop_relations, ctx).await?;
     895              : 
     896            4 :         if origin_id != 0 {
     897            0 :             modification
     898            0 :                 .set_replorigin(origin_id, parsed.origin_lsn)
     899            0 :                 .await?;
     900            4 :         }
     901              : 
     902            4 :         if is_prepared {
     903              :             // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     904            0 :             trace!(
     905            0 :                 "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     906              :                 xl_xid, parsed.xid, lsn,
     907              :             );
     908              : 
     909            0 :             let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
     910            0 :                 self.adjust_to_full_transaction_id(parsed.xid)?
     911              :             } else {
     912            0 :                 parsed.xid as u64
     913              :             };
     914            0 :             modification.drop_twophase_file(xid, ctx).await?;
     915            4 :         }
     916              : 
     917            4 :         Ok(())
     918            4 :     }
     919              : 
     920            0 :     async fn ingest_clog_truncate(
     921            0 :         &mut self,
     922            0 :         truncate: ClogTruncate,
     923            0 :         modification: &mut DatadirModification<'_>,
     924            0 :         ctx: &RequestContext,
     925            0 :     ) -> Result<(), WalIngestError> {
     926              :         let ClogTruncate {
     927            0 :             pageno,
     928            0 :             oldest_xid,
     929            0 :             oldest_xid_db,
     930            0 :         } = truncate;
     931              : 
     932            0 :         info!(
     933            0 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
     934              :             pageno, oldest_xid, oldest_xid_db
     935              :         );
     936              : 
     937              :         // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
     938              :         // truncated, but a checkpoint record with the updated values isn't written until
     939              :         // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
     940              :         // so we keep the oldestXid and oldestXidDB up-to-date.
     941            0 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
     942            0 :             cp.oldestXid = oldest_xid;
     943            0 :             cp.oldestXidDB = oldest_xid_db;
     944            0 :         });
     945            0 :         self.checkpoint_modified = true;
     946              : 
     947              :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
     948              : 
     949            0 :         let latest_page_number =
     950            0 :             enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
     951              :                 / pg_constants::CLOG_XACTS_PER_PAGE;
     952              : 
     953              :         // Now delete all segments containing pages between xlrec.pageno
     954              :         // and latest_page_number.
     955              : 
     956              :         // First, make an important safety check:
     957              :         // the current endpoint page must not be eligible for removal.
     958              :         // See SimpleLruTruncate() in slru.c
     959            0 :         if dispatch_pgversion!(modification.tline.pg_version, {
     960            0 :             pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
     961              :         }) {
     962            0 :             info!("could not truncate directory pg_xact apparent wraparound");
     963            0 :             return Ok(());
     964            0 :         }
     965              : 
     966              :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
     967              :         //
     968              :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
     969              :         // will block waiting for the last valid LSN to advance up to
     970              :         // it. So we use the previous record's LSN in the get calls
     971              :         // instead.
     972            0 :         if modification.tline.get_shard_identity().is_shard_zero() {
     973            0 :             for segno in modification
     974            0 :                 .tline
     975            0 :                 .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
     976            0 :                 .await?
     977              :             {
     978            0 :                 let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
     979              : 
     980            0 :                 let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
     981            0 :                     pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
     982              :                 });
     983              : 
     984            0 :                 if may_delete {
     985            0 :                     modification
     986            0 :                         .drop_slru_segment(SlruKind::Clog, segno, ctx)
     987            0 :                         .await?;
     988            0 :                     trace!("Drop CLOG segment {:>04X}", segno);
     989            0 :                 }
     990              :             }
     991            0 :         }
     992              : 
     993            0 :         Ok(())
     994            0 :     }
     995              : 
     996            0 :     async fn ingest_clog_zero_page(
     997            0 :         &mut self,
     998            0 :         zero_page: ClogZeroPage,
     999            0 :         modification: &mut DatadirModification<'_>,
    1000            0 :         ctx: &RequestContext,
    1001            0 :     ) -> Result<(), WalIngestError> {
    1002            0 :         let ClogZeroPage { segno, rpageno } = zero_page;
    1003              : 
    1004            0 :         self.put_slru_page_image(
    1005            0 :             modification,
    1006            0 :             SlruKind::Clog,
    1007            0 :             segno,
    1008            0 :             rpageno,
    1009            0 :             ZERO_PAGE.clone(),
    1010            0 :             ctx,
    1011            0 :         )
    1012            0 :         .await
    1013            0 :     }
    1014              : 
    1015            0 :     fn ingest_multixact_create(
    1016            0 :         &mut self,
    1017            0 :         modification: &mut DatadirModification,
    1018            0 :         xlrec: &XlMultiXactCreate,
    1019            0 :     ) -> Result<(), WalIngestError> {
    1020              :         // Create WAL record for updating the multixact-offsets page
    1021            0 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1022            0 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1023            0 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1024              : 
    1025            0 :         modification.put_slru_wal_record(
    1026            0 :             SlruKind::MultiXactOffsets,
    1027            0 :             segno,
    1028            0 :             rpageno,
    1029            0 :             NeonWalRecord::MultixactOffsetCreate {
    1030            0 :                 mid: xlrec.mid,
    1031            0 :                 moff: xlrec.moff,
    1032            0 :             },
    1033            0 :         )?;
    1034              : 
    1035              :         // Create WAL records for the update of each affected multixact-members page
    1036            0 :         let mut members = xlrec.members.iter();
    1037            0 :         let mut offset = xlrec.moff;
    1038              :         loop {
    1039            0 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1040              : 
    1041              :             // How many members fit on this page?
    1042            0 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1043            0 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1044              : 
    1045            0 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1046            0 :             for _ in 0..page_remain {
    1047            0 :                 if let Some(m) = members.next() {
    1048            0 :                     this_page_members.push(m.clone());
    1049            0 :                 } else {
    1050            0 :                     break;
    1051              :                 }
    1052              :             }
    1053            0 :             if this_page_members.is_empty() {
    1054              :                 // all done
    1055            0 :                 break;
    1056            0 :             }
    1057            0 :             let n_this_page = this_page_members.len();
    1058              : 
    1059            0 :             modification.put_slru_wal_record(
    1060            0 :                 SlruKind::MultiXactMembers,
    1061            0 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1062            0 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1063            0 :                 NeonWalRecord::MultixactMembersCreate {
    1064            0 :                     moff: offset,
    1065            0 :                     members: this_page_members,
    1066            0 :                 },
    1067            0 :             )?;
    1068              : 
    1069              :             // Note: The multixact members can wrap around, even within one WAL record.
    1070            0 :             offset = offset.wrapping_add(n_this_page as u32);
    1071              :         }
    1072            0 :         let next_offset = offset;
    1073            0 :         assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
    1074              : 
    1075              :         // Update next-multi-xid and next-offset
    1076              :         //
    1077              :         // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
    1078              :         // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
    1079              :         // read it, like GetNewMultiXactId(). This is different from how nextXid is
    1080              :         // incremented! nextXid skips over < FirstNormalTransactionId when the value
    1081              :         // is stored, so it's never 0 in a checkpoint.
    1082              :         //
    1083              :         // I don't know why it's done that way, it seems less error-prone to skip over 0
    1084              :         // when the value is stored rather than when it's read. But let's do it the same
    1085              :         // way here.
    1086            0 :         let next_multi_xid = xlrec.mid.wrapping_add(1);
    1087              : 
    1088            0 :         if self
    1089            0 :             .checkpoint
    1090            0 :             .update_next_multixid(next_multi_xid, next_offset)
    1091            0 :         {
    1092            0 :             self.checkpoint_modified = true;
    1093            0 :         }
    1094              : 
    1095              :         // Also update the next-xid with the highest member. According to the comments in
    1096              :         // multixact_redo(), this shouldn't be necessary, but let's do the same here.
    1097            0 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
    1098            0 :             if let Some(max_xid) = acc {
    1099            0 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
    1100            0 :                     Some(mbr.xid)
    1101              :                 } else {
    1102            0 :                     acc
    1103              :                 }
    1104              :             } else {
    1105            0 :                 Some(mbr.xid)
    1106              :             }
    1107            0 :         });
    1108              : 
    1109            0 :         if let Some(max_xid) = max_mbr_xid {
    1110            0 :             if self.checkpoint.update_next_xid(max_xid) {
    1111            0 :                 self.checkpoint_modified = true;
    1112            0 :             }
    1113            0 :         }
    1114            0 :         Ok(())
    1115            0 :     }
    1116              : 
    1117            0 :     async fn ingest_multixact_truncate(
    1118            0 :         &mut self,
    1119            0 :         modification: &mut DatadirModification<'_>,
    1120            0 :         xlrec: &XlMultiXactTruncate,
    1121            0 :         ctx: &RequestContext,
    1122            0 :     ) -> Result<(), WalIngestError> {
    1123            0 :         let (maxsegment, startsegment, endsegment) =
    1124            0 :             enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1125            0 :                 cp.oldestMulti = xlrec.end_trunc_off;
    1126            0 :                 cp.oldestMultiDB = xlrec.oldest_multi_db;
    1127            0 :                 let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
    1128              :                     pg_constants::MAX_MULTIXACT_OFFSET,
    1129              :                 );
    1130            0 :                 let startsegment: i32 =
    1131            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1132            0 :                 let endsegment: i32 =
    1133            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1134            0 :                 (maxsegment, startsegment, endsegment)
    1135              :             });
    1136              : 
    1137            0 :         self.checkpoint_modified = true;
    1138              : 
    1139              :         // PerformMembersTruncation
    1140            0 :         let mut segment: i32 = startsegment;
    1141              : 
    1142              :         // Delete all the segments except the last one. The last segment can still
    1143              :         // contain, possibly partially, valid data.
    1144            0 :         if modification.tline.get_shard_identity().is_shard_zero() {
    1145            0 :             while segment != endsegment {
    1146            0 :                 modification
    1147            0 :                     .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1148            0 :                     .await?;
    1149              : 
    1150              :                 /* move to next segment, handling wraparound correctly */
    1151            0 :                 if segment == maxsegment {
    1152            0 :                     segment = 0;
    1153            0 :                 } else {
    1154            0 :                     segment += 1;
    1155            0 :                 }
    1156              :             }
    1157            0 :         }
    1158              : 
    1159              :         // Truncate offsets
    1160              :         // FIXME: this did not handle wraparound correctly
    1161              : 
    1162            0 :         Ok(())
    1163            0 :     }
    1164              : 
    1165            0 :     async fn ingest_multixact_zero_page(
    1166            0 :         &mut self,
    1167            0 :         zero_page: MultiXactZeroPage,
    1168            0 :         modification: &mut DatadirModification<'_>,
    1169            0 :         ctx: &RequestContext,
    1170            0 :     ) -> Result<(), WalIngestError> {
    1171              :         let MultiXactZeroPage {
    1172            0 :             slru_kind,
    1173            0 :             segno,
    1174            0 :             rpageno,
    1175            0 :         } = zero_page;
    1176            0 :         self.put_slru_page_image(
    1177            0 :             modification,
    1178            0 :             slru_kind,
    1179            0 :             segno,
    1180            0 :             rpageno,
    1181            0 :             ZERO_PAGE.clone(),
    1182            0 :             ctx,
    1183            0 :         )
    1184            0 :         .await
    1185            0 :     }
    1186              : 
    1187            0 :     async fn ingest_relmap_update(
    1188            0 :         &mut self,
    1189            0 :         update: RelmapUpdate,
    1190            0 :         modification: &mut DatadirModification<'_>,
    1191            0 :         ctx: &RequestContext,
    1192            0 :     ) -> Result<(), WalIngestError> {
    1193            0 :         let RelmapUpdate { update, buf } = update;
    1194              : 
    1195            0 :         modification
    1196            0 :             .put_relmap_file(update.tsid, update.dbid, buf, ctx)
    1197            0 :             .await
    1198            0 :     }
    1199              : 
    1200           15 :     async fn ingest_raw_xlog_record(
    1201           15 :         &mut self,
    1202           15 :         raw_record: RawXlogRecord,
    1203           15 :         modification: &mut DatadirModification<'_>,
    1204           15 :         ctx: &RequestContext,
    1205           15 :     ) -> Result<(), WalIngestError> {
    1206           15 :         let RawXlogRecord { info, lsn, mut buf } = raw_record;
    1207           15 :         let pg_version = modification.tline.pg_version;
    1208              : 
    1209           15 :         if info == pg_constants::XLOG_PARAMETER_CHANGE {
    1210            1 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    1211            0 :                 let rec = v17::XlParameterChange::decode(&mut buf);
    1212            0 :                 cp.wal_level = rec.wal_level;
    1213            0 :                 self.checkpoint_modified = true;
    1214            1 :             }
    1215           14 :         } else if info == pg_constants::XLOG_END_OF_RECOVERY {
    1216            0 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    1217            0 :                 let rec = v17::XlEndOfRecovery::decode(&mut buf);
    1218            0 :                 cp.wal_level = rec.wal_level;
    1219            0 :                 self.checkpoint_modified = true;
    1220            0 :             }
    1221           14 :         }
    1222              : 
    1223           15 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1224            0 :             if info == pg_constants::XLOG_NEXTOID {
    1225            0 :                 let next_oid = buf.get_u32_le();
    1226            0 :                 if cp.nextOid != next_oid {
    1227            0 :                     cp.nextOid = next_oid;
    1228            0 :                     self.checkpoint_modified = true;
    1229            0 :                 }
    1230            0 :             } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
    1231            0 :                 || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    1232              :             {
    1233            0 :                 let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
    1234            0 :                 buf.copy_to_slice(&mut checkpoint_bytes);
    1235            0 :                 let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
    1236            0 :                 trace!(
    1237            0 :                     "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
    1238              :                     xlog_checkpoint.oldestXid, cp.oldestXid
    1239              :                 );
    1240            0 :                 if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
    1241            0 :                     cp.oldestXid = xlog_checkpoint.oldestXid;
    1242            0 :                 }
    1243            0 :                 trace!(
    1244            0 :                     "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
    1245              :                     xlog_checkpoint.oldestActiveXid, cp.oldestActiveXid
    1246              :                 );
    1247              : 
    1248              :                 // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
    1249              :                 // because at shutdown, all in-progress transactions will implicitly
    1250              :                 // end. Postgres startup code knows that, and allows hot standby to start
    1251              :                 // immediately from a shutdown checkpoint.
    1252              :                 //
    1253              :                 // In Neon, Postgres hot standby startup always behaves as if starting from
    1254              :                 // an online checkpoint. It needs a valid `oldestActiveXid` value, so
    1255              :                 // instead of overwriting self.checkpoint.oldestActiveXid with
    1256              :                 // InvalidTransactionid from the checkpoint WAL record, update it to a
    1257              :                 // proper value, knowing that there are no in-progress transactions at this
    1258              :                 // point, except for prepared transactions.
    1259              :                 //
    1260              :                 // See also the neon code changes in the InitWalRecovery() function.
    1261            0 :                 if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
    1262            0 :                     && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    1263              :                 {
    1264            0 :                     let oldest_active_xid = if pg_version >= PgMajorVersion::PG17 {
    1265            0 :                         let mut oldest_active_full_xid = cp.nextXid.value;
    1266            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    1267            0 :                             if xid < oldest_active_full_xid {
    1268            0 :                                 oldest_active_full_xid = xid;
    1269            0 :                             }
    1270              :                         }
    1271            0 :                         oldest_active_full_xid as u32
    1272              :                     } else {
    1273            0 :                         let mut oldest_active_xid = cp.nextXid.value as u32;
    1274            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    1275            0 :                             let narrow_xid = xid as u32;
    1276            0 :                             if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
    1277            0 :                                 oldest_active_xid = narrow_xid;
    1278            0 :                             }
    1279              :                         }
    1280            0 :                         oldest_active_xid
    1281              :                     };
    1282            0 :                     cp.oldestActiveXid = oldest_active_xid;
    1283            0 :                 } else {
    1284            0 :                     cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
    1285            0 :                 }
    1286              :                 // NB: We abuse the Checkpoint.redo field:
    1287              :                 //
    1288              :                 // - In PostgreSQL, the Checkpoint struct doesn't store the information
    1289              :                 //   of whether this is an online checkpoint or a shutdown checkpoint. It's
    1290              :                 //   stored in the XLOG info field of the WAL record, shutdown checkpoints
    1291              :                 //   use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
    1292              :                 //   XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
    1293              :                 //   in the pageserver, however.
    1294              :                 //
    1295              :                 // - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
    1296              :                 //   checkpoint record, if it's a shutdown checkpoint. But when we are
    1297              :                 //   starting from a shutdown checkpoint, the basebackup LSN is the *end*
    1298              :                 //   of the shutdown checkpoint WAL record. That makes it difficult to
    1299              :                 //   correctly detect whether we're starting from a shutdown record or
    1300              :                 //   not.
    1301              :                 //
    1302              :                 // To address both of those issues, we store 0 in the redo field if it's
    1303              :                 // an online checkpoint record, and the record's *end* LSN if it's a
    1304              :                 // shutdown checkpoint. We don't need the original redo pointer in neon,
    1305              :                 // because we don't perform WAL replay at startup anyway, so we can get
    1306              :                 // away with abusing the redo field like this.
    1307              :                 //
    1308              :                 // XXX: Ideally, we would persist the extra information in a more
    1309              :                 // explicit format, rather than repurpose the fields of the Postgres
    1310              :                 // struct like this. However, we already have persisted data like this,
    1311              :                 // so we need to maintain backwards compatibility.
    1312              :                 //
    1313              :                 // NB: We didn't originally have this convention, so there are still old
    1314              :                 // persisted records that didn't do this. Before, we didn't update the
    1315              :                 // persisted redo field at all. That means that old records have a bogus
    1316              :                 // redo pointer that points to some old value, from the checkpoint record
    1317              :                 // that was originally imported from the data directory. If it was a
    1318              :                 // project created in Neon, that means it points to the first checkpoint
    1319              :                 // after initdb. That's OK for our purposes: all such old checkpoints are
    1320              :                 // treated as old online checkpoints when the basebackup is created.
    1321            0 :                 cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
    1322              :                     // Store the *end* LSN of the checkpoint record. Or to be precise,
    1323              :                     // the start LSN of the *next* record, i.e. if the record ends
    1324              :                     // exactly at page boundary, the redo LSN points to just after the
    1325              :                     // page header on the next page.
    1326            0 :                     lsn.into()
    1327              :                 } else {
    1328            0 :                     Lsn::INVALID.into()
    1329              :                 };
    1330              : 
    1331              :                 // Write a new checkpoint key-value pair on every checkpoint record, even
    1332              :                 // if nothing really changed. Not strictly required, but it seems nice to
    1333              :                 // have some trace of the checkpoint records in the layer files at the same
    1334              :                 // LSNs.
    1335            0 :                 self.checkpoint_modified = true;
    1336            0 :             }
    1337              :         });
    1338              : 
    1339           15 :         if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
    1340            1 :             modification.tline.prepare_basebackup(lsn);
    1341           14 :         }
    1342              : 
    1343           15 :         Ok(())
    1344           15 :     }
    1345              : 
    1346            0 :     async fn ingest_logical_message_put(
    1347            0 :         &mut self,
    1348            0 :         put: PutLogicalMessage,
    1349            0 :         modification: &mut DatadirModification<'_>,
    1350            0 :         ctx: &RequestContext,
    1351            0 :     ) -> Result<(), WalIngestError> {
    1352            0 :         let PutLogicalMessage { path, buf } = put;
    1353            0 :         modification.put_file(path.as_str(), &buf, ctx).await
    1354            0 :     }
    1355              : 
    1356            0 :     fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
    1357            0 :         match record {
    1358            0 :             StandbyRecord::RunningXacts(running_xacts) => {
    1359            0 :                 enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1360            0 :                     cp.oldestActiveXid = running_xacts.oldest_running_xid;
    1361            0 :                 });
    1362              : 
    1363            0 :                 self.checkpoint_modified = true;
    1364              :             }
    1365              :         }
    1366              : 
    1367            0 :         Ok(())
    1368            0 :     }
    1369              : 
    1370            0 :     async fn ingest_replorigin_record(
    1371            0 :         &mut self,
    1372            0 :         record: ReploriginRecord,
    1373            0 :         modification: &mut DatadirModification<'_>,
    1374            0 :     ) -> Result<(), WalIngestError> {
    1375            0 :         match record {
    1376            0 :             ReploriginRecord::Set(set) => {
    1377            0 :                 modification
    1378            0 :                     .set_replorigin(set.node_id, set.remote_lsn)
    1379            0 :                     .await?;
    1380              :             }
    1381            0 :             ReploriginRecord::Drop(drop) => {
    1382            0 :                 modification.drop_replorigin(drop.node_id).await?;
    1383              :             }
    1384              :         }
    1385              : 
    1386            0 :         Ok(())
    1387            0 :     }
    1388              : 
    1389            9 :     async fn put_rel_creation(
    1390            9 :         &mut self,
    1391            9 :         modification: &mut DatadirModification<'_>,
    1392            9 :         rel: RelTag,
    1393            9 :         ctx: &RequestContext,
    1394            9 :     ) -> Result<(), WalIngestError> {
    1395            9 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1396            9 :         Ok(())
    1397            9 :     }
    1398              : 
    1399              :     #[cfg(test)]
    1400       136201 :     async fn put_rel_page_image(
    1401       136201 :         &mut self,
    1402       136201 :         modification: &mut DatadirModification<'_>,
    1403       136201 :         rel: RelTag,
    1404       136201 :         blknum: BlockNumber,
    1405       136201 :         img: Bytes,
    1406       136201 :         ctx: &RequestContext,
    1407       136201 :     ) -> Result<(), WalIngestError> {
    1408       136201 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1409       136201 :             .await?;
    1410       136201 :         modification.put_rel_page_image(rel, blknum, img)?;
    1411       136201 :         Ok(())
    1412       136201 :     }
    1413              : 
    1414            6 :     async fn put_rel_wal_record(
    1415            6 :         &mut self,
    1416            6 :         modification: &mut DatadirModification<'_>,
    1417            6 :         rel: RelTag,
    1418            6 :         blknum: BlockNumber,
    1419            6 :         rec: NeonWalRecord,
    1420            6 :         ctx: &RequestContext,
    1421            6 :     ) -> Result<(), WalIngestError> {
    1422            6 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1423            6 :             .await?;
    1424            6 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1425            6 :         Ok(())
    1426            6 :     }
    1427              : 
    1428         3006 :     async fn put_rel_truncation(
    1429         3006 :         &mut self,
    1430         3006 :         modification: &mut DatadirModification<'_>,
    1431         3006 :         rel: RelTag,
    1432         3006 :         nblocks: BlockNumber,
    1433         3006 :         ctx: &RequestContext,
    1434         3006 :     ) -> Result<(), WalIngestError> {
    1435         3006 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1436         3006 :         Ok(())
    1437         3006 :     }
    1438              : 
    1439       136207 :     async fn handle_rel_extend(
    1440       136207 :         &mut self,
    1441       136207 :         modification: &mut DatadirModification<'_>,
    1442       136207 :         rel: RelTag,
    1443       136207 :         blknum: BlockNumber,
    1444       136207 :         ctx: &RequestContext,
    1445       136207 :     ) -> Result<(), WalIngestError> {
    1446       136207 :         let new_nblocks = blknum + 1;
    1447              :         // Check if the relation exists. We implicitly create relations on first
    1448              :         // record.
    1449       136207 :         let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
    1450              : 
    1451       136207 :         if new_nblocks > old_nblocks {
    1452              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1453       136199 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1454              : 
    1455       136199 :             let mut key = rel_block_to_key(rel, blknum);
    1456              : 
    1457              :             // fill the gap with zeros
    1458       136199 :             let mut gap_blocks_filled: u64 = 0;
    1459       136199 :             for gap_blknum in old_nblocks..blknum {
    1460         1499 :                 key.field6 = gap_blknum;
    1461              : 
    1462         1499 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1463            0 :                     continue;
    1464         1499 :                 }
    1465              : 
    1466         1499 :                 modification.put_rel_page_image_zero(rel, gap_blknum)?;
    1467         1499 :                 gap_blocks_filled += 1;
    1468              :             }
    1469              : 
    1470       136199 :             WAL_INGEST
    1471       136199 :                 .gap_blocks_zeroed_on_rel_extend
    1472       136199 :                 .inc_by(gap_blocks_filled);
    1473              : 
    1474              :             // Log something when relation extends cause use to fill gaps
    1475              :             // with zero pages. Logging is rate limited per pg version to
    1476              :             // avoid skewing.
    1477       136199 :             if gap_blocks_filled > 0 {
    1478              :                 use std::sync::Mutex;
    1479              : 
    1480              :                 use once_cell::sync::Lazy;
    1481              :                 use utils::rate_limit::RateLimit;
    1482              : 
    1483              :                 struct RateLimitPerPgVersion {
    1484              :                     rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
    1485              :                 }
    1486              : 
    1487              :                 impl RateLimitPerPgVersion {
    1488            0 :                     const fn new() -> Self {
    1489              :                         Self {
    1490              :                             rate_limiters: [const {
    1491            1 :                                 Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
    1492              :                             }; 4],
    1493              :                         }
    1494            0 :                     }
    1495              : 
    1496            2 :                     const fn rate_limiter(
    1497            2 :                         &self,
    1498            2 :                         pg_version: PgMajorVersion,
    1499            2 :                     ) -> Option<&Lazy<Mutex<RateLimit>>> {
    1500              :                         const MIN_PG_VERSION: u32 = PgMajorVersion::PG14.major_version_num();
    1501              :                         const MAX_PG_VERSION: u32 = PgMajorVersion::PG17.major_version_num();
    1502            2 :                         let pg_version = pg_version.major_version_num();
    1503              : 
    1504            2 :                         if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
    1505            0 :                             return None;
    1506            2 :                         }
    1507              : 
    1508            2 :                         Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
    1509            2 :                     }
    1510              :                 }
    1511              : 
    1512              :                 static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
    1513            2 :                 if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
    1514            2 :                     if let Ok(mut locked) = rate_limiter.try_lock() {
    1515            2 :                         locked.call(|| {
    1516            1 :                             info!(
    1517            0 :                                 lsn=%modification.get_lsn(),
    1518              :                                 pg_version=%modification.tline.pg_version,
    1519              :                                 rel=%rel,
    1520            0 :                                 "Filled {} gap blocks on rel extend to {} from {}",
    1521              :                                 gap_blocks_filled,
    1522              :                                 new_nblocks,
    1523              :                                 old_nblocks);
    1524            1 :                         });
    1525            0 :                     }
    1526            0 :                 }
    1527       136197 :             }
    1528            8 :         }
    1529       136207 :         Ok(())
    1530       136207 :     }
    1531              : 
    1532            0 :     async fn put_slru_page_image(
    1533            0 :         &mut self,
    1534            0 :         modification: &mut DatadirModification<'_>,
    1535            0 :         kind: SlruKind,
    1536            0 :         segno: u32,
    1537            0 :         blknum: BlockNumber,
    1538            0 :         img: Bytes,
    1539            0 :         ctx: &RequestContext,
    1540            0 :     ) -> Result<(), WalIngestError> {
    1541            0 :         if !self.shard.is_shard_zero() {
    1542            0 :             return Ok(());
    1543            0 :         }
    1544              : 
    1545            0 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1546            0 :             .await?;
    1547            0 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1548            0 :         Ok(())
    1549            0 :     }
    1550              : 
    1551            0 :     async fn handle_slru_extend(
    1552            0 :         &mut self,
    1553            0 :         modification: &mut DatadirModification<'_>,
    1554            0 :         kind: SlruKind,
    1555            0 :         segno: u32,
    1556            0 :         blknum: BlockNumber,
    1557            0 :         ctx: &RequestContext,
    1558            0 :     ) -> Result<(), WalIngestError> {
    1559              :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1560              :         // extended with ZEROPAGE records, not with commit records, so it happens
    1561              :         // a lot less frequently.
    1562              : 
    1563            0 :         let new_nblocks = blknum + 1;
    1564              :         // Check if the relation exists. We implicitly create relations on first
    1565              :         // record.
    1566              :         // TODO: would be nice if to be more explicit about it
    1567            0 :         let old_nblocks = if !modification
    1568            0 :             .tline
    1569            0 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1570            0 :             .await?
    1571              :         {
    1572              :             // create it with 0 size initially, the logic below will extend it
    1573            0 :             modification
    1574            0 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1575            0 :                 .await?;
    1576            0 :             0
    1577              :         } else {
    1578            0 :             modification
    1579            0 :                 .tline
    1580            0 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1581            0 :                 .await?
    1582              :         };
    1583              : 
    1584            0 :         if new_nblocks > old_nblocks {
    1585            0 :             trace!(
    1586            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1587              :                 kind, segno, old_nblocks, new_nblocks
    1588              :             );
    1589            0 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1590              : 
    1591              :             // fill the gap with zeros
    1592            0 :             for gap_blknum in old_nblocks..blknum {
    1593            0 :                 modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
    1594              :             }
    1595            0 :         }
    1596            0 :         Ok(())
    1597            0 :     }
    1598              : }
    1599              : 
    1600              : /// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
    1601              : ///
    1602              : /// This is only accurate on shard 0. On other shards, it will return the size up to the highest
    1603              : /// page number stored in the shard, or None if the shard does not have any pages for it.
    1604            6 : async fn get_relsize(
    1605            6 :     modification: &DatadirModification<'_>,
    1606            6 :     rel: RelTag,
    1607            6 :     ctx: &RequestContext,
    1608            6 : ) -> Result<Option<BlockNumber>, PageReconstructError> {
    1609            6 :     if !modification
    1610            6 :         .tline
    1611            6 :         .get_rel_exists(rel, Version::Modified(modification), ctx)
    1612            6 :         .await?
    1613              :     {
    1614            0 :         return Ok(None);
    1615            6 :     }
    1616            6 :     modification
    1617            6 :         .tline
    1618            6 :         .get_rel_size(rel, Version::Modified(modification), ctx)
    1619            6 :         .await
    1620            6 :         .map(Some)
    1621            6 : }
    1622              : 
    1623              : #[allow(clippy::bool_assert_comparison)]
    1624              : #[cfg(test)]
    1625              : mod tests {
    1626              :     use anyhow::Result;
    1627              :     use postgres_ffi::PgMajorVersion;
    1628              :     use postgres_ffi::RELSEG_SIZE;
    1629              : 
    1630              :     use super::*;
    1631              :     use crate::DEFAULT_PG_VERSION;
    1632              :     use crate::tenant::harness::*;
    1633              :     use crate::tenant::remote_timeline_client::{INITDB_PATH, remote_initdb_archive_path};
    1634              :     use crate::tenant::storage_layer::IoConcurrency;
    1635              : 
    1636              :     /// Arbitrary relation tag, for testing.
    1637              :     const TESTREL_A: RelTag = RelTag {
    1638              :         spcnode: 0,
    1639              :         dbnode: 111,
    1640              :         relnode: 1000,
    1641              :         forknum: 0,
    1642              :     };
    1643              : 
    1644            6 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1645              :         // TODO
    1646            6 :     }
    1647              : 
    1648              :     #[tokio::test]
    1649            1 :     async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
    1650            5 :         for i in PgMajorVersion::ALL {
    1651            4 :             dispatch_pgversion!(i, {
    1652            1 :                 pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
    1653            1 :             });
    1654            1 :         }
    1655            1 : 
    1656            1 :         Ok(())
    1657            1 :     }
    1658              : 
    1659            4 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    1660            4 :         let mut m = tline.begin_modification(Lsn(0x10));
    1661            4 :         m.put_checkpoint(dispatch_pgversion!(
    1662            4 :             tline.pg_version,
    1663            0 :             pgv::ZERO_CHECKPOINT.clone()
    1664            0 :         ))?;
    1665            4 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1666            4 :         m.commit(ctx).await?;
    1667            4 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1668              : 
    1669            4 :         Ok(walingest)
    1670            4 :     }
    1671              : 
    1672              :     #[tokio::test]
    1673            1 :     async fn test_relsize() -> Result<()> {
    1674            1 :         let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
    1675            1 :         let io_concurrency = IoConcurrency::spawn_for_test();
    1676            1 :         let tline = tenant
    1677            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1678            1 :             .await?;
    1679            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1680              : 
    1681            1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1682            1 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1683            1 :         walingest
    1684            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1685            1 :             .await?;
    1686            1 :         m.commit(&ctx).await?;
    1687            1 :         let mut m = tline.begin_modification(Lsn(0x30));
    1688            1 :         walingest
    1689            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
    1690            1 :             .await?;
    1691            1 :         m.commit(&ctx).await?;
    1692            1 :         let mut m = tline.begin_modification(Lsn(0x40));
    1693            1 :         walingest
    1694            1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
    1695            1 :             .await?;
    1696            1 :         m.commit(&ctx).await?;
    1697            1 :         let mut m = tline.begin_modification(Lsn(0x50));
    1698            1 :         walingest
    1699            1 :             .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
    1700            1 :             .await?;
    1701            1 :         m.commit(&ctx).await?;
    1702              : 
    1703            1 :         assert_current_logical_size(&tline, Lsn(0x50));
    1704              : 
    1705            1 :         let test_span = tracing::info_span!(parent: None, "test",
    1706            0 :                                             tenant_id=%tline.tenant_shard_id.tenant_id,
    1707            0 :                                             shard_id=%tline.tenant_shard_id.shard_slug(),
    1708            0 :                                             timeline_id=%tline.timeline_id);
    1709              : 
    1710              :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1711            1 :         assert_eq!(
    1712            1 :             tline
    1713            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
    1714            1 :                 .await?,
    1715              :             false
    1716              :         );
    1717            1 :         assert!(
    1718            1 :             tline
    1719            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
    1720            1 :                 .await
    1721            1 :                 .is_err()
    1722              :         );
    1723            1 :         assert_eq!(
    1724            1 :             tline
    1725            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
    1726            1 :                 .await?,
    1727              :             true
    1728              :         );
    1729            1 :         assert_eq!(
    1730            1 :             tline
    1731            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
    1732            1 :                 .await?,
    1733              :             1
    1734              :         );
    1735            1 :         assert_eq!(
    1736            1 :             tline
    1737            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
    1738            1 :                 .await?,
    1739              :             3
    1740              :         );
    1741              : 
    1742              :         // Check page contents at each LSN
    1743            1 :         assert_eq!(
    1744            1 :             tline
    1745            1 :                 .get_rel_page_at_lsn(
    1746            1 :                     TESTREL_A,
    1747            1 :                     0,
    1748            1 :                     Version::at(Lsn(0x20)),
    1749            1 :                     &ctx,
    1750            1 :                     io_concurrency.clone()
    1751            1 :                 )
    1752            1 :                 .instrument(test_span.clone())
    1753            1 :                 .await?,
    1754            1 :             test_img("foo blk 0 at 2")
    1755              :         );
    1756              : 
    1757            1 :         assert_eq!(
    1758            1 :             tline
    1759            1 :                 .get_rel_page_at_lsn(
    1760            1 :                     TESTREL_A,
    1761            1 :                     0,
    1762            1 :                     Version::at(Lsn(0x30)),
    1763            1 :                     &ctx,
    1764            1 :                     io_concurrency.clone()
    1765            1 :                 )
    1766            1 :                 .instrument(test_span.clone())
    1767            1 :                 .await?,
    1768            1 :             test_img("foo blk 0 at 3")
    1769              :         );
    1770              : 
    1771            1 :         assert_eq!(
    1772            1 :             tline
    1773            1 :                 .get_rel_page_at_lsn(
    1774            1 :                     TESTREL_A,
    1775            1 :                     0,
    1776            1 :                     Version::at(Lsn(0x40)),
    1777            1 :                     &ctx,
    1778            1 :                     io_concurrency.clone()
    1779            1 :                 )
    1780            1 :                 .instrument(test_span.clone())
    1781            1 :                 .await?,
    1782            1 :             test_img("foo blk 0 at 3")
    1783              :         );
    1784            1 :         assert_eq!(
    1785            1 :             tline
    1786            1 :                 .get_rel_page_at_lsn(
    1787            1 :                     TESTREL_A,
    1788            1 :                     1,
    1789            1 :                     Version::at(Lsn(0x40)),
    1790            1 :                     &ctx,
    1791            1 :                     io_concurrency.clone()
    1792            1 :                 )
    1793            1 :                 .instrument(test_span.clone())
    1794            1 :                 .await?,
    1795            1 :             test_img("foo blk 1 at 4")
    1796              :         );
    1797              : 
    1798            1 :         assert_eq!(
    1799            1 :             tline
    1800            1 :                 .get_rel_page_at_lsn(
    1801            1 :                     TESTREL_A,
    1802            1 :                     0,
    1803            1 :                     Version::at(Lsn(0x50)),
    1804            1 :                     &ctx,
    1805            1 :                     io_concurrency.clone()
    1806            1 :                 )
    1807            1 :                 .instrument(test_span.clone())
    1808            1 :                 .await?,
    1809            1 :             test_img("foo blk 0 at 3")
    1810              :         );
    1811            1 :         assert_eq!(
    1812            1 :             tline
    1813            1 :                 .get_rel_page_at_lsn(
    1814            1 :                     TESTREL_A,
    1815            1 :                     1,
    1816            1 :                     Version::at(Lsn(0x50)),
    1817            1 :                     &ctx,
    1818            1 :                     io_concurrency.clone()
    1819            1 :                 )
    1820            1 :                 .instrument(test_span.clone())
    1821            1 :                 .await?,
    1822            1 :             test_img("foo blk 1 at 4")
    1823              :         );
    1824            1 :         assert_eq!(
    1825            1 :             tline
    1826            1 :                 .get_rel_page_at_lsn(
    1827            1 :                     TESTREL_A,
    1828            1 :                     2,
    1829            1 :                     Version::at(Lsn(0x50)),
    1830            1 :                     &ctx,
    1831            1 :                     io_concurrency.clone()
    1832            1 :                 )
    1833            1 :                 .instrument(test_span.clone())
    1834            1 :                 .await?,
    1835            1 :             test_img("foo blk 2 at 5")
    1836              :         );
    1837              : 
    1838              :         // Truncate last block
    1839            1 :         let mut m = tline.begin_modification(Lsn(0x60));
    1840            1 :         walingest
    1841            1 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1842            1 :             .await?;
    1843            1 :         m.commit(&ctx).await?;
    1844            1 :         assert_current_logical_size(&tline, Lsn(0x60));
    1845              : 
    1846              :         // Check reported size and contents after truncation
    1847            1 :         assert_eq!(
    1848            1 :             tline
    1849            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
    1850            1 :                 .await?,
    1851              :             2
    1852              :         );
    1853            1 :         assert_eq!(
    1854            1 :             tline
    1855            1 :                 .get_rel_page_at_lsn(
    1856            1 :                     TESTREL_A,
    1857            1 :                     0,
    1858            1 :                     Version::at(Lsn(0x60)),
    1859            1 :                     &ctx,
    1860            1 :                     io_concurrency.clone()
    1861            1 :                 )
    1862            1 :                 .instrument(test_span.clone())
    1863            1 :                 .await?,
    1864            1 :             test_img("foo blk 0 at 3")
    1865              :         );
    1866            1 :         assert_eq!(
    1867            1 :             tline
    1868            1 :                 .get_rel_page_at_lsn(
    1869            1 :                     TESTREL_A,
    1870            1 :                     1,
    1871            1 :                     Version::at(Lsn(0x60)),
    1872            1 :                     &ctx,
    1873            1 :                     io_concurrency.clone()
    1874            1 :                 )
    1875            1 :                 .instrument(test_span.clone())
    1876            1 :                 .await?,
    1877            1 :             test_img("foo blk 1 at 4")
    1878              :         );
    1879              : 
    1880              :         // should still see the truncated block with older LSN
    1881            1 :         assert_eq!(
    1882            1 :             tline
    1883            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
    1884            1 :                 .await?,
    1885              :             3
    1886              :         );
    1887            1 :         assert_eq!(
    1888            1 :             tline
    1889            1 :                 .get_rel_page_at_lsn(
    1890            1 :                     TESTREL_A,
    1891            1 :                     2,
    1892            1 :                     Version::at(Lsn(0x50)),
    1893            1 :                     &ctx,
    1894            1 :                     io_concurrency.clone()
    1895            1 :                 )
    1896            1 :                 .instrument(test_span.clone())
    1897            1 :                 .await?,
    1898            1 :             test_img("foo blk 2 at 5")
    1899              :         );
    1900              : 
    1901              :         // Truncate to zero length
    1902            1 :         let mut m = tline.begin_modification(Lsn(0x68));
    1903            1 :         walingest
    1904            1 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1905            1 :             .await?;
    1906            1 :         m.commit(&ctx).await?;
    1907            1 :         assert_eq!(
    1908            1 :             tline
    1909            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
    1910            1 :                 .await?,
    1911              :             0
    1912              :         );
    1913              : 
    1914              :         // Extend from 0 to 2 blocks, leaving a gap
    1915            1 :         let mut m = tline.begin_modification(Lsn(0x70));
    1916            1 :         walingest
    1917            1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
    1918            1 :             .await?;
    1919            1 :         m.commit(&ctx).await?;
    1920            1 :         assert_eq!(
    1921            1 :             tline
    1922            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
    1923            1 :                 .await?,
    1924              :             2
    1925              :         );
    1926            1 :         assert_eq!(
    1927            1 :             tline
    1928            1 :                 .get_rel_page_at_lsn(
    1929            1 :                     TESTREL_A,
    1930            1 :                     0,
    1931            1 :                     Version::at(Lsn(0x70)),
    1932            1 :                     &ctx,
    1933            1 :                     io_concurrency.clone()
    1934            1 :                 )
    1935            1 :                 .instrument(test_span.clone())
    1936            1 :                 .await?,
    1937            1 :             ZERO_PAGE
    1938              :         );
    1939            1 :         assert_eq!(
    1940            1 :             tline
    1941            1 :                 .get_rel_page_at_lsn(
    1942            1 :                     TESTREL_A,
    1943            1 :                     1,
    1944            1 :                     Version::at(Lsn(0x70)),
    1945            1 :                     &ctx,
    1946            1 :                     io_concurrency.clone()
    1947            1 :                 )
    1948            1 :                 .instrument(test_span.clone())
    1949            1 :                 .await?,
    1950            1 :             test_img("foo blk 1")
    1951              :         );
    1952              : 
    1953              :         // Extend a lot more, leaving a big gap that spans across segments
    1954            1 :         let mut m = tline.begin_modification(Lsn(0x80));
    1955            1 :         walingest
    1956            1 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
    1957            1 :             .await?;
    1958            1 :         m.commit(&ctx).await?;
    1959            1 :         assert_eq!(
    1960            1 :             tline
    1961            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
    1962            1 :                 .await?,
    1963              :             1501
    1964              :         );
    1965         1499 :         for blk in 2..1500 {
    1966         1498 :             assert_eq!(
    1967         1498 :                 tline
    1968         1498 :                     .get_rel_page_at_lsn(
    1969         1498 :                         TESTREL_A,
    1970         1498 :                         blk,
    1971         1498 :                         Version::at(Lsn(0x80)),
    1972         1498 :                         &ctx,
    1973         1498 :                         io_concurrency.clone()
    1974         1498 :                     )
    1975         1498 :                     .instrument(test_span.clone())
    1976         1498 :                     .await?,
    1977         1498 :                 ZERO_PAGE
    1978              :             );
    1979              :         }
    1980            1 :         assert_eq!(
    1981            1 :             tline
    1982            1 :                 .get_rel_page_at_lsn(
    1983            1 :                     TESTREL_A,
    1984            1 :                     1500,
    1985            1 :                     Version::at(Lsn(0x80)),
    1986            1 :                     &ctx,
    1987            1 :                     io_concurrency.clone()
    1988            1 :                 )
    1989            1 :                 .instrument(test_span.clone())
    1990            1 :                 .await?,
    1991            1 :             test_img("foo blk 1500")
    1992              :         );
    1993              : 
    1994            2 :         Ok(())
    1995            1 :     }
    1996              : 
    1997              :     // Test what happens if we dropped a relation
    1998              :     // and then created it again within the same layer.
    1999              :     #[tokio::test]
    2000            1 :     async fn test_drop_extend() -> Result<()> {
    2001            1 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")
    2002            1 :             .await?
    2003            1 :             .load()
    2004            1 :             .await;
    2005            1 :         let tline = tenant
    2006            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2007            1 :             .await?;
    2008            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2009              : 
    2010            1 :         let mut m = tline.begin_modification(Lsn(0x20));
    2011            1 :         walingest
    2012            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    2013            1 :             .await?;
    2014            1 :         m.commit(&ctx).await?;
    2015              : 
    2016              :         // Check that rel exists and size is correct
    2017            1 :         assert_eq!(
    2018            1 :             tline
    2019            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
    2020            1 :                 .await?,
    2021              :             true
    2022              :         );
    2023            1 :         assert_eq!(
    2024            1 :             tline
    2025            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
    2026            1 :                 .await?,
    2027              :             1
    2028              :         );
    2029              : 
    2030              :         // Drop rel
    2031            1 :         let mut m = tline.begin_modification(Lsn(0x30));
    2032            1 :         let mut rel_drops = HashMap::new();
    2033            1 :         rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
    2034            1 :         m.put_rel_drops(rel_drops, &ctx).await?;
    2035            1 :         m.commit(&ctx).await?;
    2036              : 
    2037              :         // Check that rel is not visible anymore
    2038            1 :         assert_eq!(
    2039            1 :             tline
    2040            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
    2041            1 :                 .await?,
    2042              :             false
    2043              :         );
    2044              : 
    2045              :         // FIXME: should fail
    2046              :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    2047              : 
    2048              :         // Re-create it
    2049            1 :         let mut m = tline.begin_modification(Lsn(0x40));
    2050            1 :         walingest
    2051            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
    2052            1 :             .await?;
    2053            1 :         m.commit(&ctx).await?;
    2054              : 
    2055              :         // Check that rel exists and size is correct
    2056            1 :         assert_eq!(
    2057            1 :             tline
    2058            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
    2059            1 :                 .await?,
    2060              :             true
    2061              :         );
    2062            1 :         assert_eq!(
    2063            1 :             tline
    2064            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
    2065            1 :                 .await?,
    2066              :             1
    2067              :         );
    2068              : 
    2069            2 :         Ok(())
    2070            1 :     }
    2071              : 
    2072              :     // Test what happens if we truncated a relation
    2073              :     // so that one of its segments was dropped
    2074              :     // and then extended it again within the same layer.
    2075              :     #[tokio::test]
    2076            1 :     async fn test_truncate_extend() -> Result<()> {
    2077            1 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
    2078            1 :             .await?
    2079            1 :             .load()
    2080            1 :             .await;
    2081            1 :         let io_concurrency = IoConcurrency::spawn_for_test();
    2082            1 :         let tline = tenant
    2083            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2084            1 :             .await?;
    2085            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2086              : 
    2087              :         // Create a 20 MB relation (the size is arbitrary)
    2088            1 :         let relsize = 20 * 1024 * 1024 / 8192;
    2089            1 :         let mut m = tline.begin_modification(Lsn(0x20));
    2090         2560 :         for blkno in 0..relsize {
    2091         2560 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    2092         2560 :             walingest
    2093         2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2094         2560 :                 .await?;
    2095              :         }
    2096            1 :         m.commit(&ctx).await?;
    2097              : 
    2098            1 :         let test_span = tracing::info_span!(parent: None, "test",
    2099            0 :                                             tenant_id=%tline.tenant_shard_id.tenant_id,
    2100            0 :                                             shard_id=%tline.tenant_shard_id.shard_slug(),
    2101            0 :                                             timeline_id=%tline.timeline_id);
    2102              : 
    2103              :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    2104            1 :         assert_eq!(
    2105            1 :             tline
    2106            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
    2107            1 :                 .await?,
    2108              :             false
    2109              :         );
    2110            1 :         assert!(
    2111            1 :             tline
    2112            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
    2113            1 :                 .await
    2114            1 :                 .is_err()
    2115              :         );
    2116              : 
    2117            1 :         assert_eq!(
    2118            1 :             tline
    2119            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
    2120            1 :                 .await?,
    2121              :             true
    2122              :         );
    2123            1 :         assert_eq!(
    2124            1 :             tline
    2125            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
    2126            1 :                 .await?,
    2127              :             relsize
    2128              :         );
    2129              : 
    2130              :         // Check relation content
    2131         2560 :         for blkno in 0..relsize {
    2132         2560 :             let lsn = Lsn(0x20);
    2133         2560 :             let data = format!("foo blk {blkno} at {lsn}");
    2134         2560 :             assert_eq!(
    2135         2560 :                 tline
    2136         2560 :                     .get_rel_page_at_lsn(
    2137         2560 :                         TESTREL_A,
    2138         2560 :                         blkno,
    2139         2560 :                         Version::at(lsn),
    2140         2560 :                         &ctx,
    2141         2560 :                         io_concurrency.clone()
    2142         2560 :                     )
    2143         2560 :                     .instrument(test_span.clone())
    2144         2560 :                     .await?,
    2145         2560 :                 test_img(&data)
    2146              :             );
    2147              :         }
    2148              : 
    2149              :         // Truncate relation so that second segment was dropped
    2150              :         // - only leave one page
    2151            1 :         let mut m = tline.begin_modification(Lsn(0x60));
    2152            1 :         walingest
    2153            1 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2154            1 :             .await?;
    2155            1 :         m.commit(&ctx).await?;
    2156              : 
    2157              :         // Check reported size and contents after truncation
    2158            1 :         assert_eq!(
    2159            1 :             tline
    2160            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
    2161            1 :                 .await?,
    2162              :             1
    2163              :         );
    2164              : 
    2165            2 :         for blkno in 0..1 {
    2166            1 :             let lsn = Lsn(0x20);
    2167            1 :             let data = format!("foo blk {blkno} at {lsn}");
    2168            1 :             assert_eq!(
    2169            1 :                 tline
    2170            1 :                     .get_rel_page_at_lsn(
    2171            1 :                         TESTREL_A,
    2172            1 :                         blkno,
    2173            1 :                         Version::at(Lsn(0x60)),
    2174            1 :                         &ctx,
    2175            1 :                         io_concurrency.clone()
    2176            1 :                     )
    2177            1 :                     .instrument(test_span.clone())
    2178            1 :                     .await?,
    2179            1 :                 test_img(&data)
    2180              :             );
    2181              :         }
    2182              : 
    2183              :         // should still see all blocks with older LSN
    2184            1 :         assert_eq!(
    2185            1 :             tline
    2186            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
    2187            1 :                 .await?,
    2188              :             relsize
    2189              :         );
    2190         2560 :         for blkno in 0..relsize {
    2191         2560 :             let lsn = Lsn(0x20);
    2192         2560 :             let data = format!("foo blk {blkno} at {lsn}");
    2193         2560 :             assert_eq!(
    2194         2560 :                 tline
    2195         2560 :                     .get_rel_page_at_lsn(
    2196         2560 :                         TESTREL_A,
    2197         2560 :                         blkno,
    2198         2560 :                         Version::at(Lsn(0x50)),
    2199         2560 :                         &ctx,
    2200         2560 :                         io_concurrency.clone()
    2201         2560 :                     )
    2202         2560 :                     .instrument(test_span.clone())
    2203         2560 :                     .await?,
    2204         2560 :                 test_img(&data)
    2205              :             );
    2206              :         }
    2207              : 
    2208              :         // Extend relation again.
    2209              :         // Add enough blocks to create second segment
    2210            1 :         let lsn = Lsn(0x80);
    2211            1 :         let mut m = tline.begin_modification(lsn);
    2212         2560 :         for blkno in 0..relsize {
    2213         2560 :             let data = format!("foo blk {blkno} at {lsn}");
    2214         2560 :             walingest
    2215         2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2216         2560 :                 .await?;
    2217              :         }
    2218            1 :         m.commit(&ctx).await?;
    2219              : 
    2220            1 :         assert_eq!(
    2221            1 :             tline
    2222            1 :                 .get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
    2223            1 :                 .await?,
    2224              :             true
    2225              :         );
    2226            1 :         assert_eq!(
    2227            1 :             tline
    2228            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
    2229            1 :                 .await?,
    2230              :             relsize
    2231              :         );
    2232              :         // Check relation content
    2233         2560 :         for blkno in 0..relsize {
    2234         2560 :             let lsn = Lsn(0x80);
    2235         2560 :             let data = format!("foo blk {blkno} at {lsn}");
    2236         2560 :             assert_eq!(
    2237         2560 :                 tline
    2238         2560 :                     .get_rel_page_at_lsn(
    2239         2560 :                         TESTREL_A,
    2240         2560 :                         blkno,
    2241         2560 :                         Version::at(Lsn(0x80)),
    2242         2560 :                         &ctx,
    2243         2560 :                         io_concurrency.clone()
    2244         2560 :                     )
    2245         2560 :                     .instrument(test_span.clone())
    2246         2560 :                     .await?,
    2247         2560 :                 test_img(&data)
    2248            1 :             );
    2249            1 :         }
    2250            1 : 
    2251            1 :         Ok(())
    2252            1 :     }
    2253              : 
    2254              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2255              :     /// split into multiple 1 GB segments in Postgres.
    2256              :     #[tokio::test]
    2257            1 :     async fn test_large_rel() -> Result<()> {
    2258            1 :         let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
    2259            1 :         let tline = tenant
    2260            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2261            1 :             .await?;
    2262            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2263              : 
    2264            1 :         let mut lsn = 0x10;
    2265       131073 :         for blknum in 0..RELSEG_SIZE + 1 {
    2266       131073 :             lsn += 0x10;
    2267       131073 :             let mut m = tline.begin_modification(Lsn(lsn));
    2268       131073 :             let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2269       131073 :             walingest
    2270       131073 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2271       131073 :                 .await?;
    2272       131073 :             m.commit(&ctx).await?;
    2273              :         }
    2274              : 
    2275            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2276              : 
    2277            1 :         assert_eq!(
    2278            1 :             tline
    2279            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
    2280            1 :                 .await?,
    2281            1 :             RELSEG_SIZE + 1
    2282              :         );
    2283              : 
    2284              :         // Truncate one block
    2285            1 :         lsn += 0x10;
    2286            1 :         let mut m = tline.begin_modification(Lsn(lsn));
    2287            1 :         walingest
    2288            1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2289            1 :             .await?;
    2290            1 :         m.commit(&ctx).await?;
    2291            1 :         assert_eq!(
    2292            1 :             tline
    2293            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
    2294            1 :                 .await?,
    2295              :             RELSEG_SIZE
    2296              :         );
    2297            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2298              : 
    2299              :         // Truncate another block
    2300            1 :         lsn += 0x10;
    2301            1 :         let mut m = tline.begin_modification(Lsn(lsn));
    2302            1 :         walingest
    2303            1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2304            1 :             .await?;
    2305            1 :         m.commit(&ctx).await?;
    2306            1 :         assert_eq!(
    2307            1 :             tline
    2308            1 :                 .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
    2309            1 :                 .await?,
    2310            1 :             RELSEG_SIZE - 1
    2311              :         );
    2312            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2313              : 
    2314              :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2315              :         // This tests the behavior at segment boundaries
    2316            1 :         let mut size: i32 = 3000;
    2317         3002 :         while size >= 0 {
    2318         3001 :             lsn += 0x10;
    2319         3001 :             let mut m = tline.begin_modification(Lsn(lsn));
    2320         3001 :             walingest
    2321         3001 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2322         3001 :                 .await?;
    2323         3001 :             m.commit(&ctx).await?;
    2324         3001 :             assert_eq!(
    2325         3001 :                 tline
    2326         3001 :                     .get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
    2327         3001 :                     .await?,
    2328         3001 :                 size as BlockNumber
    2329              :             );
    2330              : 
    2331         3001 :             size -= 1;
    2332              :         }
    2333            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2334              : 
    2335            2 :         Ok(())
    2336            1 :     }
    2337              : 
    2338              :     /// Replay a wal segment file taken directly from safekeepers.
    2339              :     ///
    2340              :     /// This test is useful for benchmarking since it allows us to profile only
    2341              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2342              :     /// without waiting for unrelated steps.
    2343              :     #[tokio::test]
    2344            1 :     async fn test_ingest_real_wal() {
    2345              :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2346              :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2347              : 
    2348              :         use crate::tenant::harness::*;
    2349              : 
    2350              :         // Define test data path and constants.
    2351              :         //
    2352              :         // Steps to reconstruct the data, if needed:
    2353              :         // 1. Run the pgbench python test
    2354              :         // 2. Take the first wal segment file from safekeeper
    2355              :         // 3. Compress it using `zstd --long input_file`
    2356              :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2357              :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2358              :         // 6. Run just the decoder from this test to get the endpoint.
    2359              :         //    It's the last LSN the decoder will output.
    2360            1 :         let pg_version = PgMajorVersion::PG15; // The test data was generated by pg15
    2361            1 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2362            1 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2363            1 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2364            1 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2365            1 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2366              : 
    2367            1 :         let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
    2368            1 :         let span = harness
    2369            1 :             .span()
    2370            1 :             .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
    2371            1 :         let (tenant, ctx) = harness.load().await;
    2372              : 
    2373            1 :         let remote_initdb_path =
    2374            1 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    2375            1 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2376              : 
    2377            1 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2378            1 :             .expect("creating test dir should work");
    2379            1 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2380              : 
    2381              :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2382              :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2383              :         // the garbage data.
    2384            1 :         let tline = tenant
    2385            1 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2386            1 :             .await
    2387            1 :             .unwrap();
    2388              : 
    2389              :         // We fully read and decompress this into memory before decoding
    2390              :         // to get a more accurate perf profile of the decoder.
    2391            1 :         let bytes = {
    2392              :             use async_compression::tokio::bufread::ZstdDecoder;
    2393            1 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2394            1 :             let reader = tokio::io::BufReader::new(file);
    2395            1 :             let decoder = ZstdDecoder::new(reader);
    2396            1 :             let mut reader = tokio::io::BufReader::new(decoder);
    2397            1 :             let mut buffer = Vec::new();
    2398            1 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2399            1 :             buffer
    2400              :         };
    2401              : 
    2402              :         // TODO start a profiler too
    2403            1 :         let started_at = std::time::Instant::now();
    2404              : 
    2405              :         // Initialize walingest
    2406            1 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2407            1 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2408            1 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2409            1 :             .await
    2410            1 :             .unwrap();
    2411            1 :         let mut modification = tline.begin_modification(startpoint);
    2412            1 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2413              : 
    2414              :         // Decode and ingest wal. We process the wal in chunks because
    2415              :         // that's what happens when we get bytes from safekeepers.
    2416       237343 :         for chunk in bytes[xlogoff..].chunks(50) {
    2417       237343 :             decoder.feed_bytes(chunk);
    2418       310268 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2419        72925 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
    2420        72925 :                     recdata,
    2421        72925 :                     &[*modification.tline.get_shard_identity()],
    2422        72925 :                     lsn,
    2423        72925 :                     modification.tline.pg_version,
    2424        72925 :                 )
    2425        72925 :                 .unwrap()
    2426        72925 :                 .remove(modification.tline.get_shard_identity())
    2427        72925 :                 .unwrap();
    2428              : 
    2429        72925 :                 walingest
    2430        72925 :                     .ingest_record(interpreted, &mut modification, &ctx)
    2431        72925 :                     .instrument(span.clone())
    2432        72925 :                     .await
    2433        72925 :                     .unwrap();
    2434              :             }
    2435       237343 :             modification.commit(&ctx).await.unwrap();
    2436              :         }
    2437              : 
    2438            1 :         let duration = started_at.elapsed();
    2439            1 :         println!("done in {duration:?}");
    2440            1 :     }
    2441              : }
        

Generated by: LCOV version 2.1-beta