LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: 903780b8ddc62f532be8f220102da7b91c63a235.info Lines: 48.1 % 2389 1149
Test Date: 2024-10-25 10:10:57 Functions: 56.4 % 94 53

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  ->   WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9              : //! and decodes it to individual WAL records. It feeds the WAL records
      10              : //! to WalIngest, which parses them and stores them in the Repository.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14              : //! page images out of some WAL records, but most it stores as WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_wal_record or put_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use std::sync::Arc;
      25              : use std::sync::OnceLock;
      26              : use std::time::Duration;
      27              : use std::time::Instant;
      28              : use std::time::SystemTime;
      29              : 
      30              : use pageserver_api::shard::ShardIdentity;
      31              : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
      32              : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      33              : 
      34              : use anyhow::{bail, Context, Result};
      35              : use bytes::{Buf, Bytes, BytesMut};
      36              : use tracing::*;
      37              : use utils::failpoint_support;
      38              : use utils::rate_limit::RateLimit;
      39              : 
      40              : use crate::context::RequestContext;
      41              : use crate::metrics::WAL_INGEST;
      42              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      43              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      44              : use crate::tenant::PageReconstructError;
      45              : use crate::tenant::Timeline;
      46              : use crate::walrecord::*;
      47              : use crate::ZERO_PAGE;
      48              : use pageserver_api::key::rel_block_to_key;
      49              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      50              : use postgres_ffi::pg_constants;
      51              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      52              : use postgres_ffi::TransactionId;
      53              : use postgres_ffi::BLCKSZ;
      54              : use utils::bin_ser::SerializeError;
      55              : use utils::lsn::Lsn;
      56              : 
      57              : enum_pgversion! {CheckPoint, pgv::CheckPoint}
      58              : 
      59              : impl CheckPoint {
      60            6 :     fn encode(&self) -> Result<Bytes, SerializeError> {
      61            6 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
      62            6 :     }
      63              : 
      64       145834 :     fn update_next_xid(&mut self, xid: u32) -> bool {
      65       145834 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
      66       145834 :     }
      67              : 
      68            0 :     pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
      69            0 :         enum_pgversion_dispatch!(self, CheckPoint, cp, {
      70            0 :             cp.update_next_multixid(multi_xid, multi_offset)
      71              :         })
      72            0 :     }
      73              : }
      74              : 
      75              : /// Temporary limitation of WAL lag warnings after attach
      76              : ///
      77              : /// After tenant attach, we want to limit WAL lag warnings because
      78              : /// we don't look at the WAL until the attach is complete, which
      79              : /// might take a while.
      80              : pub struct WalLagCooldown {
      81              :     /// Until when should this limitation apply at all
      82              :     active_until: std::time::Instant,
      83              :     /// The maximum lag to suppress. Lags above this limit get reported anyways.
      84              :     max_lag: Duration,
      85              : }
      86              : 
      87              : impl WalLagCooldown {
      88            0 :     pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
      89            0 :         Self {
      90            0 :             active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
      91            0 :             max_lag: attach_duration * 2 + Duration::from_secs(60),
      92            0 :         }
      93            0 :     }
      94              : }
      95              : 
      96              : pub struct WalIngest {
      97              :     attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
      98              :     shard: ShardIdentity,
      99              :     checkpoint: CheckPoint,
     100              :     checkpoint_modified: bool,
     101              :     warn_ingest_lag: WarnIngestLag,
     102              : }
     103              : 
     104              : struct WarnIngestLag {
     105              :     lag_msg_ratelimit: RateLimit,
     106              :     future_lsn_msg_ratelimit: RateLimit,
     107              :     timestamp_invalid_msg_ratelimit: RateLimit,
     108              : }
     109              : 
     110              : // These structs are an intermediary representation of the PostgreSQL WAL records.
     111              : // The ones prefixed with `Xl` are lower level, while the ones that are not have
     112              : // all the required context to be acted upon by the pageserver.
     113              : 
     114              : enum HeapamRecord {
     115              :     ClearVmBits(ClearVmBits),
     116              : }
     117              : 
     118              : struct ClearVmBits {
     119              :     new_heap_blkno: Option<u32>,
     120              :     old_heap_blkno: Option<u32>,
     121              :     vm_rel: RelTag,
     122              :     flags: u8,
     123              : }
     124              : 
     125              : enum NeonrmgrRecord {
     126              :     ClearVmBits(ClearVmBits),
     127              : }
     128              : 
     129              : enum SmgrRecord {
     130              :     Create(SmgrCreate),
     131              :     Truncate(XlSmgrTruncate),
     132              : }
     133              : 
     134              : struct SmgrCreate {
     135              :     rel: RelTag,
     136              : }
     137              : 
     138              : enum DbaseRecord {
     139              :     Create(DbaseCreate),
     140              :     Drop(DbaseDrop),
     141              : }
     142              : 
     143              : struct DbaseCreate {
     144              :     db_id: u32,
     145              :     tablespace_id: u32,
     146              :     src_db_id: u32,
     147              :     src_tablespace_id: u32,
     148              : }
     149              : 
     150              : struct DbaseDrop {
     151              :     db_id: u32,
     152              :     tablespace_ids: Vec<u32>,
     153              : }
     154              : 
     155              : enum ClogRecord {
     156              :     ZeroPage(ClogZeroPage),
     157              :     Truncate(ClogTruncate),
     158              : }
     159              : 
     160              : struct ClogZeroPage {
     161              :     segno: u32,
     162              :     rpageno: u32,
     163              : }
     164              : 
     165              : struct ClogTruncate {
     166              :     pageno: u32,
     167              :     oldest_xid: u32,
     168              :     oldest_xid_db: u32,
     169              : }
     170              : 
     171              : enum XactRecord {
     172              :     Commit(XactCommon),
     173              :     Abort(XactCommon),
     174              :     CommitPrepared(XactCommon),
     175              :     AbortPrepared(XactCommon),
     176              :     Prepare(XactPrepare),
     177              : }
     178              : 
     179              : struct XactCommon {
     180              :     parsed: XlXactParsedRecord,
     181              :     origin_id: u16,
     182              :     // Fields below are only used for logging
     183              :     xl_xid: u32,
     184              :     lsn: Lsn,
     185              : }
     186              : 
     187              : struct XactPrepare {
     188              :     xl_xid: u32,
     189              :     data: Bytes,
     190              : }
     191              : 
     192              : enum MultiXactRecord {
     193              :     ZeroPage(MultiXactZeroPage),
     194              :     Create(XlMultiXactCreate),
     195              :     Truncate(XlMultiXactTruncate),
     196              : }
     197              : 
     198              : struct MultiXactZeroPage {
     199              :     slru_kind: SlruKind,
     200              :     segno: u32,
     201              :     rpageno: u32,
     202              : }
     203              : 
     204              : enum RelmapRecord {
     205              :     Update(RelmapUpdate),
     206              : }
     207              : 
     208              : struct RelmapUpdate {
     209              :     update: XlRelmapUpdate,
     210              :     buf: Bytes,
     211              : }
     212              : 
     213              : enum XlogRecord {
     214              :     Raw(RawXlogRecord),
     215              : }
     216              : 
     217              : struct RawXlogRecord {
     218              :     info: u8,
     219              :     lsn: Lsn,
     220              :     buf: Bytes,
     221              : }
     222              : 
     223              : enum LogicalMessageRecord {
     224              :     Put(PutLogicalMessage),
     225              :     #[cfg(feature = "testing")]
     226              :     Failpoint,
     227              : }
     228              : 
     229              : struct PutLogicalMessage {
     230              :     path: String,
     231              :     buf: Bytes,
     232              : }
     233              : 
     234              : enum StandbyRecord {
     235              :     RunningXacts(StandbyRunningXacts),
     236              : }
     237              : 
     238              : struct StandbyRunningXacts {
     239              :     oldest_running_xid: u32,
     240              : }
     241              : 
     242              : enum ReploriginRecord {
     243              :     Set(XlReploriginSet),
     244              :     Drop(XlReploriginDrop),
     245              : }
     246              : 
     247              : impl WalIngest {
     248           12 :     pub async fn new(
     249           12 :         timeline: &Timeline,
     250           12 :         startpoint: Lsn,
     251           12 :         ctx: &RequestContext,
     252           12 :     ) -> anyhow::Result<WalIngest> {
     253              :         // Fetch the latest checkpoint into memory, so that we can compare with it
     254              :         // quickly in `ingest_record` and update it when it changes.
     255           12 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
     256           12 :         let pgversion = timeline.pg_version;
     257              : 
     258           12 :         let checkpoint = dispatch_pgversion!(pgversion, {
     259            0 :             let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
     260            0 :             trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
     261            0 :             <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
     262              :         });
     263              : 
     264           12 :         Ok(WalIngest {
     265           12 :             shard: *timeline.get_shard_identity(),
     266           12 :             checkpoint,
     267           12 :             checkpoint_modified: false,
     268           12 :             attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
     269           12 :             warn_ingest_lag: WarnIngestLag {
     270           12 :                 lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     271           12 :                 future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     272           12 :                 timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     273           12 :             },
     274           12 :         })
     275           12 :     }
     276              : 
     277              :     ///
     278              :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
     279              :     ///
     280              :     /// This function updates `lsn` field of `DatadirModification`
     281              :     ///
     282              :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
     283              :     /// relations/pages that the record affects.
     284              :     ///
     285              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
     286              :     ///
     287       145852 :     pub async fn ingest_record(
     288       145852 :         &mut self,
     289       145852 :         decoded: DecodedWALRecord,
     290       145852 :         lsn: Lsn,
     291       145852 :         modification: &mut DatadirModification<'_>,
     292       145852 :         ctx: &RequestContext,
     293       145852 :     ) -> anyhow::Result<bool> {
     294       145852 :         WAL_INGEST.records_received.inc();
     295       145852 :         let pg_version = modification.tline.pg_version;
     296       145852 :         let prev_len = modification.len();
     297       145852 : 
     298       145852 :         modification.set_lsn(lsn)?;
     299              : 
     300       145852 :         if decoded.is_dbase_create_copy(pg_version) {
     301              :             // Records of this type should always be preceded by a commit(), as they
     302              :             // rely on reading data pages back from the Timeline.
     303            0 :             assert!(!modification.has_dirty_data_pages());
     304       145852 :         }
     305              : 
     306       145852 :         let mut buf = decoded.record.clone();
     307       145852 :         buf.advance(decoded.main_data_offset);
     308       145852 : 
     309       145852 :         assert!(!self.checkpoint_modified);
     310       145852 :         if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
     311       145834 :             && self.checkpoint.update_next_xid(decoded.xl_xid)
     312            2 :         {
     313            2 :             self.checkpoint_modified = true;
     314       145850 :         }
     315              : 
     316       145852 :         failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
     317              : 
     318       145852 :         match decoded.xl_rmid {
     319              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
     320              :                 // Heap AM records need some special handling, because they modify VM pages
     321              :                 // without registering them with the standard mechanism.
     322       145474 :                 let maybe_heapam_record =
     323       145474 :                     Self::decode_heapam_record(&mut buf, &decoded, pg_version)?;
     324       145474 :                 if let Some(heapam_record) = maybe_heapam_record {
     325           12 :                     match heapam_record {
     326           12 :                         HeapamRecord::ClearVmBits(clear_vm_bits) => {
     327           12 :                             self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     328            0 :                                 .await?;
     329              :                         }
     330              :                     }
     331       145462 :                 }
     332              :             }
     333              :             pg_constants::RM_NEON_ID => {
     334            0 :                 let maybe_nenonrmgr_record =
     335            0 :                     Self::decode_neonmgr_record(&mut buf, &decoded, pg_version)?;
     336            0 :                 if let Some(neonrmgr_record) = maybe_nenonrmgr_record {
     337            0 :                     match neonrmgr_record {
     338            0 :                         NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
     339            0 :                             self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
     340            0 :                                 .await?;
     341              :                         }
     342              :                     }
     343            0 :                 }
     344              :             }
     345              :             // Handle other special record types
     346              :             pg_constants::RM_SMGR_ID => {
     347           16 :                 let maybe_smgr_record =
     348           16 :                     Self::decode_smgr_record(&mut buf, &decoded, pg_version).unwrap();
     349           16 :                 if let Some(smgr_record) = maybe_smgr_record {
     350           16 :                     match smgr_record {
     351           16 :                         SmgrRecord::Create(create) => {
     352           16 :                             self.ingest_xlog_smgr_create(create, modification, ctx)
     353           12 :                                 .await?;
     354              :                         }
     355            0 :                         SmgrRecord::Truncate(truncate) => {
     356            0 :                             self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
     357            0 :                                 .await?;
     358              :                         }
     359              :                     }
     360            0 :                 }
     361              :             }
     362              :             pg_constants::RM_DBASE_ID => {
     363            0 :                 let maybe_dbase_record =
     364            0 :                     Self::decode_dbase_record(&mut buf, &decoded, pg_version).unwrap();
     365              : 
     366            0 :                 if let Some(dbase_record) = maybe_dbase_record {
     367            0 :                     match dbase_record {
     368            0 :                         DbaseRecord::Create(create) => {
     369            0 :                             self.ingest_xlog_dbase_create(create, modification, ctx)
     370            0 :                                 .await?;
     371              :                         }
     372            0 :                         DbaseRecord::Drop(drop) => {
     373            0 :                             self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
     374              :                         }
     375              :                     }
     376            0 :                 }
     377              :             }
     378              :             pg_constants::RM_TBLSPC_ID => {
     379            0 :                 trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     380              :             }
     381              :             pg_constants::RM_CLOG_ID => {
     382              :                 // [`Self::decode_clog_record`] may never fail and always returns.
     383              :                 // It has this interface to match all the other decoding methods.
     384            0 :                 let clog_record = Self::decode_clog_record(&mut buf, &decoded, pg_version)
     385            0 :                     .unwrap()
     386            0 :                     .unwrap();
     387            0 : 
     388            0 :                 match clog_record {
     389            0 :                     ClogRecord::ZeroPage(zero_page) => {
     390            0 :                         self.ingest_clog_zero_page(zero_page, modification, ctx)
     391            0 :                             .await?;
     392              :                     }
     393            0 :                     ClogRecord::Truncate(truncate) => {
     394            0 :                         self.ingest_clog_truncate(truncate, modification, ctx)
     395            0 :                             .await?;
     396              :                     }
     397              :                 }
     398              :             }
     399              :             pg_constants::RM_XACT_ID => {
     400           24 :                 let maybe_xact_record =
     401           24 :                     Self::decode_xact_record(&mut buf, &decoded, lsn, pg_version).unwrap();
     402           24 :                 if let Some(xact_record) = maybe_xact_record {
     403            8 :                     self.ingest_xact_record(xact_record, modification, ctx)
     404            0 :                         .await?;
     405           16 :                 }
     406              :             }
     407              :             pg_constants::RM_MULTIXACT_ID => {
     408            0 :                 let maybe_multixact_record =
     409            0 :                     Self::decode_multixact_record(&mut buf, &decoded, pg_version).unwrap();
     410            0 :                 if let Some(multixact_record) = maybe_multixact_record {
     411            0 :                     match multixact_record {
     412            0 :                         MultiXactRecord::ZeroPage(zero_page) => {
     413            0 :                             self.ingest_multixact_zero_page(zero_page, modification, ctx)
     414            0 :                                 .await?;
     415              :                         }
     416            0 :                         MultiXactRecord::Create(create) => {
     417            0 :                             self.ingest_multixact_create(modification, &create)?;
     418              :                         }
     419            0 :                         MultiXactRecord::Truncate(truncate) => {
     420            0 :                             self.ingest_multixact_truncate(modification, &truncate, ctx)
     421            0 :                                 .await?;
     422              :                         }
     423              :                     }
     424            0 :                 }
     425              :             }
     426              :             pg_constants::RM_RELMAP_ID => {
     427            0 :                 let relmap_record = Self::decode_relmap_record(&mut buf, &decoded, pg_version)
     428            0 :                     .unwrap()
     429            0 :                     .unwrap();
     430            0 :                 match relmap_record {
     431            0 :                     RelmapRecord::Update(update) => {
     432            0 :                         self.ingest_relmap_update(update, modification, ctx).await?;
     433              :                     }
     434              :                 }
     435              :             }
     436              :             // This is an odd duck. It needs to go to all shards.
     437              :             // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
     438              :             // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
     439              :             // the pageserver and decode it there.
     440              :             //
     441              :             // Alternatively, one can make the checkpoint part of the subscription protocol
     442              :             // to the pageserver. This should work fine, but can be done at a later point.
     443              :             pg_constants::RM_XLOG_ID => {
     444           30 :                 let xlog_record = Self::decode_xlog_record(&mut buf, &decoded, lsn, pg_version)
     445           30 :                     .unwrap()
     446           30 :                     .unwrap();
     447           30 : 
     448           30 :                 match xlog_record {
     449           30 :                     XlogRecord::Raw(raw) => {
     450           30 :                         self.ingest_raw_xlog_record(raw, modification, ctx).await?;
     451              :                     }
     452              :                 }
     453              :             }
     454              :             pg_constants::RM_LOGICALMSG_ID => {
     455            0 :                 let maybe_logical_message_record =
     456            0 :                     Self::decode_logical_message_record(&mut buf, &decoded, pg_version).unwrap();
     457            0 :                 if let Some(logical_message_record) = maybe_logical_message_record {
     458            0 :                     match logical_message_record {
     459            0 :                         LogicalMessageRecord::Put(put) => {
     460            0 :                             self.ingest_logical_message_put(put, modification, ctx)
     461            0 :                                 .await?;
     462              :                         }
     463              :                         #[cfg(feature = "testing")]
     464              :                         LogicalMessageRecord::Failpoint => {
     465              :                             // This is a convenient way to make the WAL ingestion pause at
     466              :                             // particular point in the WAL. For more fine-grained control,
     467              :                             // we could peek into the message and only pause if it contains
     468              :                             // a particular string, for example, but this is enough for now.
     469            0 :                             failpoint_support::sleep_millis_async!(
     470            0 :                                 "pageserver-wal-ingest-logical-message-sleep"
     471            0 :                             );
     472              :                         }
     473              :                     }
     474            0 :                 }
     475              :             }
     476              :             pg_constants::RM_STANDBY_ID => {
     477           16 :                 let maybe_standby_record =
     478           16 :                     Self::decode_standby_record(&mut buf, &decoded, pg_version).unwrap();
     479           16 :                 if let Some(standby_record) = maybe_standby_record {
     480            0 :                     self.ingest_standby_record(standby_record).unwrap();
     481           16 :                 }
     482              :             }
     483              :             pg_constants::RM_REPLORIGIN_ID => {
     484            0 :                 let maybe_replorigin_record =
     485            0 :                     Self::decode_replorigin_record(&mut buf, &decoded, pg_version).unwrap();
     486            0 :                 if let Some(replorigin_record) = maybe_replorigin_record {
     487            0 :                     self.ingest_replorigin_record(replorigin_record, modification)
     488            0 :                         .await?;
     489            0 :                 }
     490              :             }
     491          292 :             _x => {
     492          292 :                 // TODO: should probably log & fail here instead of blindly
     493          292 :                 // doing something without understanding the protocol
     494          292 :             }
     495              :         }
     496              : 
     497              :         // Iterate through all the blocks that the record modifies, and
     498              :         // "put" a separate copy of the record for each block.
     499       145852 :         for blk in decoded.blocks.iter() {
     500       145642 :             let rel = RelTag {
     501       145642 :                 spcnode: blk.rnode_spcnode,
     502       145642 :                 dbnode: blk.rnode_dbnode,
     503       145642 :                 relnode: blk.rnode_relnode,
     504       145642 :                 forknum: blk.forknum,
     505       145642 :             };
     506       145642 : 
     507       145642 :             let key = rel_block_to_key(rel, blk.blkno);
     508       145642 :             let key_is_local = self.shard.is_key_local(&key);
     509       145642 : 
     510       145642 :             tracing::debug!(
     511              :                 lsn=%lsn,
     512              :                 key=%key,
     513            0 :                 "ingest: shard decision {} (checkpoint={})",
     514            0 :                 if !key_is_local { "drop" } else { "keep" },
     515              :                 self.checkpoint_modified
     516              :             );
     517              : 
     518       145642 :             if !key_is_local {
     519            0 :                 if self.shard.is_shard_zero() {
     520              :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     521              :                     // its blkno in case it implicitly extends a relation.
     522            0 :                     self.observe_decoded_block(modification, blk, ctx).await?;
     523            0 :                 }
     524              : 
     525            0 :                 continue;
     526       145642 :             }
     527       145642 :             self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
     528          284 :                 .await?;
     529              :         }
     530              : 
     531              :         // If checkpoint data was updated, store the new version in the repository
     532       145852 :         if self.checkpoint_modified {
     533            6 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     534              : 
     535            6 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     536            6 :             self.checkpoint_modified = false;
     537       145846 :         }
     538              : 
     539              :         // Note that at this point this record is only cached in the modification
     540              :         // until commit() is called to flush the data into the repository and update
     541              :         // the latest LSN.
     542              : 
     543       145852 :         modification.on_record_end();
     544       145852 : 
     545       145852 :         Ok(modification.len() > prev_len)
     546       145852 :     }
     547              : 
     548              :     /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
     549            0 :     fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
     550            0 :         let next_full_xid =
     551            0 :             enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
     552              : 
     553            0 :         let next_xid = (next_full_xid) as u32;
     554            0 :         let mut epoch = (next_full_xid >> 32) as u32;
     555            0 : 
     556            0 :         if xid > next_xid {
     557              :             // Wraparound occurred, must be from a prev epoch.
     558            0 :             if epoch == 0 {
     559            0 :                 bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
     560            0 :             }
     561            0 :             epoch -= 1;
     562            0 :         }
     563              : 
     564            0 :         Ok((epoch as u64) << 32 | xid as u64)
     565            0 :     }
     566              : 
     567              :     /// Do not store this block, but observe it for the purposes of updating our relation size state.
     568            0 :     async fn observe_decoded_block(
     569            0 :         &mut self,
     570            0 :         modification: &mut DatadirModification<'_>,
     571            0 :         blk: &DecodedBkpBlock,
     572            0 :         ctx: &RequestContext,
     573            0 :     ) -> Result<(), PageReconstructError> {
     574            0 :         let rel = RelTag {
     575            0 :             spcnode: blk.rnode_spcnode,
     576            0 :             dbnode: blk.rnode_dbnode,
     577            0 :             relnode: blk.rnode_relnode,
     578            0 :             forknum: blk.forknum,
     579            0 :         };
     580            0 :         self.handle_rel_extend(modification, rel, blk.blkno, ctx)
     581            0 :             .await
     582            0 :     }
     583              : 
     584       145642 :     async fn ingest_decoded_block(
     585       145642 :         &mut self,
     586       145642 :         modification: &mut DatadirModification<'_>,
     587       145642 :         lsn: Lsn,
     588       145642 :         decoded: &DecodedWALRecord,
     589       145642 :         blk: &DecodedBkpBlock,
     590       145642 :         ctx: &RequestContext,
     591       145642 :     ) -> Result<(), PageReconstructError> {
     592       145642 :         let rel = RelTag {
     593       145642 :             spcnode: blk.rnode_spcnode,
     594       145642 :             dbnode: blk.rnode_dbnode,
     595       145642 :             relnode: blk.rnode_relnode,
     596       145642 :             forknum: blk.forknum,
     597       145642 :         };
     598       145642 : 
     599       145642 :         //
     600       145642 :         // Instead of storing full-page-image WAL record,
     601       145642 :         // it is better to store extracted image: we can skip wal-redo
     602       145642 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     603       145642 :         // so them have to be copied multiple times.
     604       145642 :         //
     605       145642 :         if blk.apply_image
     606           60 :             && blk.has_image
     607           60 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     608           24 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     609            0 :             || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     610              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     611           24 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
     612              :             // do not materialize null pages because them most likely be soon replaced with real data
     613           24 :             && blk.bimg_len != 0
     614              :         {
     615              :             // Extract page image from FPI record
     616           24 :             let img_len = blk.bimg_len as usize;
     617           24 :             let img_offs = blk.bimg_offset as usize;
     618           24 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     619           24 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     620           24 : 
     621           24 :             if blk.hole_length != 0 {
     622            0 :                 let tail = image.split_off(blk.hole_offset as usize);
     623            0 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     624            0 :                 image.unsplit(tail);
     625           24 :             }
     626              :             //
     627              :             // Match the logic of XLogReadBufferForRedoExtended:
     628              :             // The page may be uninitialized. If so, we can't set the LSN because
     629              :             // that would corrupt the page.
     630              :             //
     631           24 :             if !page_is_new(&image) {
     632           18 :                 page_set_lsn(&mut image, lsn)
     633            6 :             }
     634           24 :             assert_eq!(image.len(), BLCKSZ as usize);
     635              : 
     636           24 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     637            5 :                 .await?;
     638              :         } else {
     639       145618 :             let rec = NeonWalRecord::Postgres {
     640       145618 :                 will_init: blk.will_init || blk.apply_image,
     641       145618 :                 rec: decoded.record.clone(),
     642       145618 :             };
     643       145618 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     644          279 :                 .await?;
     645              :         }
     646       145642 :         Ok(())
     647       145642 :     }
     648              : 
     649           12 :     async fn ingest_clear_vm_bits(
     650           12 :         &mut self,
     651           12 :         clear_vm_bits: ClearVmBits,
     652           12 :         modification: &mut DatadirModification<'_>,
     653           12 :         ctx: &RequestContext,
     654           12 :     ) -> anyhow::Result<()> {
     655           12 :         let ClearVmBits {
     656           12 :             new_heap_blkno,
     657           12 :             old_heap_blkno,
     658           12 :             flags,
     659           12 :             vm_rel,
     660           12 :         } = clear_vm_bits;
     661           12 :         // Clear the VM bits if required.
     662           12 :         let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     663           12 :         let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     664              : 
     665              :         // Sometimes, Postgres seems to create heap WAL records with the
     666              :         // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     667              :         // not set. In fact, it's possible that the VM page does not exist at all.
     668              :         // In that case, we don't want to store a record to clear the VM bit;
     669              :         // replaying it would fail to find the previous image of the page, because
     670              :         // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     671              :         // record if it doesn't.
     672           12 :         let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     673           12 :         if let Some(blknum) = new_vm_blk {
     674           12 :             if blknum >= vm_size {
     675            0 :                 new_vm_blk = None;
     676           12 :             }
     677            0 :         }
     678           12 :         if let Some(blknum) = old_vm_blk {
     679            0 :             if blknum >= vm_size {
     680            0 :                 old_vm_blk = None;
     681            0 :             }
     682           12 :         }
     683              : 
     684           12 :         if new_vm_blk.is_some() || old_vm_blk.is_some() {
     685           12 :             if new_vm_blk == old_vm_blk {
     686              :                 // An UPDATE record that needs to clear the bits for both old and the
     687              :                 // new page, both of which reside on the same VM page.
     688            0 :                 self.put_rel_wal_record(
     689            0 :                     modification,
     690            0 :                     vm_rel,
     691            0 :                     new_vm_blk.unwrap(),
     692            0 :                     NeonWalRecord::ClearVisibilityMapFlags {
     693            0 :                         new_heap_blkno,
     694            0 :                         old_heap_blkno,
     695            0 :                         flags,
     696            0 :                     },
     697            0 :                     ctx,
     698            0 :                 )
     699            0 :                 .await?;
     700              :             } else {
     701              :                 // Clear VM bits for one heap page, or for two pages that reside on
     702              :                 // different VM pages.
     703           12 :                 if let Some(new_vm_blk) = new_vm_blk {
     704           12 :                     self.put_rel_wal_record(
     705           12 :                         modification,
     706           12 :                         vm_rel,
     707           12 :                         new_vm_blk,
     708           12 :                         NeonWalRecord::ClearVisibilityMapFlags {
     709           12 :                             new_heap_blkno,
     710           12 :                             old_heap_blkno: None,
     711           12 :                             flags,
     712           12 :                         },
     713           12 :                         ctx,
     714           12 :                     )
     715            0 :                     .await?;
     716            0 :                 }
     717           12 :                 if let Some(old_vm_blk) = old_vm_blk {
     718            0 :                     self.put_rel_wal_record(
     719            0 :                         modification,
     720            0 :                         vm_rel,
     721            0 :                         old_vm_blk,
     722            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     723            0 :                             new_heap_blkno: None,
     724            0 :                             old_heap_blkno,
     725            0 :                             flags,
     726            0 :                         },
     727            0 :                         ctx,
     728            0 :                     )
     729            0 :                     .await?;
     730           12 :                 }
     731              :             }
     732            0 :         }
     733              : 
     734           12 :         Ok(())
     735           12 :     }
     736              : 
     737       145474 :     fn decode_heapam_record(
     738       145474 :         buf: &mut Bytes,
     739       145474 :         decoded: &DecodedWALRecord,
     740       145474 :         pg_version: u32,
     741       145474 :     ) -> anyhow::Result<Option<HeapamRecord>> {
     742       145474 :         // Handle VM bit updates that are implicitly part of heap records.
     743       145474 : 
     744       145474 :         // First, look at the record to determine which VM bits need
     745       145474 :         // to be cleared. If either of these variables is set, we
     746       145474 :         // need to clear the corresponding bits in the visibility map.
     747       145474 :         let mut new_heap_blkno: Option<u32> = None;
     748       145474 :         let mut old_heap_blkno: Option<u32> = None;
     749       145474 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     750       145474 : 
     751       145474 :         match pg_version {
     752              :             14 => {
     753            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     754            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     755            0 : 
     756            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     757            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     758            0 :                         assert_eq!(0, buf.remaining());
     759            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     760            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     761            0 :                         }
     762            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     763            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     764            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     765            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     766            0 :                         }
     767            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     768            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     769              :                     {
     770            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     771            0 :                         // the size of tuple data is inferred from the size of the record.
     772            0 :                         // we can't validate the remaining number of bytes without parsing
     773            0 :                         // the tuple data.
     774            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     775            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     776            0 :                         }
     777            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     778            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     779            0 :                             // non-HOT update where the new tuple goes to different page than
     780            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     781            0 :                             // set.
     782            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     783            0 :                         }
     784            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     785            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     786            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     787            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     788            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     789            0 :                         }
     790            0 :                     }
     791            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     792            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     793            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     794            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     795              : 
     796            0 :                         let offset_array_len =
     797            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     798              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     799            0 :                                 0
     800              :                             } else {
     801            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     802              :                             };
     803            0 :                         assert_eq!(offset_array_len, buf.remaining());
     804              : 
     805            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     806            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     807            0 :                         }
     808            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     809            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     810            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     811            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     812            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     813            0 :                         }
     814            0 :                     }
     815              :                 } else {
     816            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     817              :                 }
     818              :             }
     819              :             15 => {
     820       145474 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     821       145286 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     822       145286 : 
     823       145286 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     824       145276 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     825       145276 :                         assert_eq!(0, buf.remaining());
     826       145276 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     827            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     828       145272 :                         }
     829           10 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     830            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     831            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     832            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     833            0 :                         }
     834           10 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     835            2 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     836              :                     {
     837            8 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     838            8 :                         // the size of tuple data is inferred from the size of the record.
     839            8 :                         // we can't validate the remaining number of bytes without parsing
     840            8 :                         // the tuple data.
     841            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     842            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     843            8 :                         }
     844            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     845            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     846            0 :                             // non-HOT update where the new tuple goes to different page than
     847            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     848            0 :                             // set.
     849            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     850            8 :                         }
     851            2 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     852            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     853            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     854            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     855            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     856            0 :                         }
     857            2 :                     }
     858          188 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     859          188 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     860          188 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     861           42 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     862              : 
     863           42 :                         let offset_array_len =
     864           42 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     865              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     866            2 :                                 0
     867              :                             } else {
     868           40 :                                 size_of::<u16>() * xlrec.ntuples as usize
     869              :                             };
     870           42 :                         assert_eq!(offset_array_len, buf.remaining());
     871              : 
     872           42 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     873            8 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     874           34 :                         }
     875          146 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     876            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     877            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     878            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     879            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     880            0 :                         }
     881          146 :                     }
     882              :                 } else {
     883            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     884              :                 }
     885              :             }
     886              :             16 => {
     887            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     888            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     889            0 : 
     890            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     891            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     892            0 :                         assert_eq!(0, buf.remaining());
     893            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     894            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     895            0 :                         }
     896            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     897            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     898            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     899            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     900            0 :                         }
     901            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     902            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     903              :                     {
     904            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     905            0 :                         // the size of tuple data is inferred from the size of the record.
     906            0 :                         // we can't validate the remaining number of bytes without parsing
     907            0 :                         // the tuple data.
     908            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     909            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     910            0 :                         }
     911            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     912            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     913            0 :                             // non-HOT update where the new tuple goes to different page than
     914            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     915            0 :                             // set.
     916            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     917            0 :                         }
     918            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     919            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     920            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     921            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     922            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     923            0 :                         }
     924            0 :                     }
     925            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     926            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     927            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     928            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     929              : 
     930            0 :                         let offset_array_len =
     931            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     932              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     933            0 :                                 0
     934              :                             } else {
     935            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     936              :                             };
     937            0 :                         assert_eq!(offset_array_len, buf.remaining());
     938              : 
     939            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     940            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     941            0 :                         }
     942            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     943            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     944            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     945            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     946            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     947            0 :                         }
     948            0 :                     }
     949              :                 } else {
     950            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     951              :                 }
     952              :             }
     953              :             17 => {
     954            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     955            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     956            0 : 
     957            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     958            0 :                         let xlrec = v17::XlHeapInsert::decode(buf);
     959            0 :                         assert_eq!(0, buf.remaining());
     960            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     961            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     962            0 :                         }
     963            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     964            0 :                         let xlrec = v17::XlHeapDelete::decode(buf);
     965            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     966            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     967            0 :                         }
     968            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     969            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     970              :                     {
     971            0 :                         let xlrec = v17::XlHeapUpdate::decode(buf);
     972            0 :                         // the size of tuple data is inferred from the size of the record.
     973            0 :                         // we can't validate the remaining number of bytes without parsing
     974            0 :                         // the tuple data.
     975            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     976            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     977            0 :                         }
     978            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     979            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     980            0 :                             // non-HOT update where the new tuple goes to different page than
     981            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     982            0 :                             // set.
     983            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     984            0 :                         }
     985            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     986            0 :                         let xlrec = v17::XlHeapLock::decode(buf);
     987            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     988            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     989            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     990            0 :                         }
     991            0 :                     }
     992            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     993            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     994            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     995            0 :                         let xlrec = v17::XlHeapMultiInsert::decode(buf);
     996              : 
     997            0 :                         let offset_array_len =
     998            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     999              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
    1000            0 :                                 0
    1001              :                             } else {
    1002            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
    1003              :                             };
    1004            0 :                         assert_eq!(offset_array_len, buf.remaining());
    1005              : 
    1006            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
    1007            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1008            0 :                         }
    1009            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
    1010            0 :                         let xlrec = v17::XlHeapLockUpdated::decode(buf);
    1011            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
    1012            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
    1013            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
    1014            0 :                         }
    1015            0 :                     }
    1016              :                 } else {
    1017            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
    1018              :                 }
    1019              :             }
    1020            0 :             _ => {}
    1021              :         }
    1022              : 
    1023       145474 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
    1024           12 :             let vm_rel = RelTag {
    1025           12 :                 forknum: VISIBILITYMAP_FORKNUM,
    1026           12 :                 spcnode: decoded.blocks[0].rnode_spcnode,
    1027           12 :                 dbnode: decoded.blocks[0].rnode_dbnode,
    1028           12 :                 relnode: decoded.blocks[0].rnode_relnode,
    1029           12 :             };
    1030           12 : 
    1031           12 :             Ok(Some(HeapamRecord::ClearVmBits(ClearVmBits {
    1032           12 :                 new_heap_blkno,
    1033           12 :                 old_heap_blkno,
    1034           12 :                 vm_rel,
    1035           12 :                 flags,
    1036           12 :             })))
    1037              :         } else {
    1038       145462 :             Ok(None)
    1039              :         }
    1040       145474 :     }
    1041              : 
    1042            0 :     fn decode_neonmgr_record(
    1043            0 :         buf: &mut Bytes,
    1044            0 :         decoded: &DecodedWALRecord,
    1045            0 :         pg_version: u32,
    1046            0 :     ) -> anyhow::Result<Option<NeonrmgrRecord>> {
    1047            0 :         // Handle VM bit updates that are implicitly part of heap records.
    1048            0 : 
    1049            0 :         // First, look at the record to determine which VM bits need
    1050            0 :         // to be cleared. If either of these variables is set, we
    1051            0 :         // need to clear the corresponding bits in the visibility map.
    1052            0 :         let mut new_heap_blkno: Option<u32> = None;
    1053            0 :         let mut old_heap_blkno: Option<u32> = None;
    1054            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
    1055            0 : 
    1056            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
    1057              : 
    1058            0 :         match pg_version {
    1059              :             16 | 17 => {
    1060            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1061            0 : 
    1062            0 :                 match info {
    1063              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
    1064            0 :                         let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
    1065            0 :                         assert_eq!(0, buf.remaining());
    1066            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
    1067            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1068            0 :                         }
    1069              :                     }
    1070              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
    1071            0 :                         let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
    1072            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
    1073            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1074            0 :                         }
    1075              :                     }
    1076              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
    1077              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
    1078            0 :                         let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
    1079            0 :                         // the size of tuple data is inferred from the size of the record.
    1080            0 :                         // we can't validate the remaining number of bytes without parsing
    1081            0 :                         // the tuple data.
    1082            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
    1083            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
    1084            0 :                         }
    1085            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
    1086            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
    1087            0 :                             // non-HOT update where the new tuple goes to different page than
    1088            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
    1089            0 :                             // set.
    1090            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1091            0 :                         }
    1092              :                     }
    1093              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
    1094            0 :                         let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
    1095              : 
    1096            0 :                         let offset_array_len =
    1097            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
    1098              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
    1099            0 :                                 0
    1100              :                             } else {
    1101            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
    1102              :                             };
    1103            0 :                         assert_eq!(offset_array_len, buf.remaining());
    1104              : 
    1105            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
    1106            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1107            0 :                         }
    1108              :                     }
    1109              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
    1110            0 :                         let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
    1111            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
    1112            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
    1113            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
    1114            0 :                         }
    1115              :                     }
    1116            0 :                     info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
    1117              :                 }
    1118              :             }
    1119            0 :             _ => bail!(
    1120            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
    1121            0 :                 pg_version
    1122            0 :             ),
    1123              :         }
    1124              : 
    1125            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
    1126            0 :             let vm_rel = RelTag {
    1127            0 :                 forknum: VISIBILITYMAP_FORKNUM,
    1128            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
    1129            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
    1130            0 :                 relnode: decoded.blocks[0].rnode_relnode,
    1131            0 :             };
    1132            0 : 
    1133            0 :             Ok(Some(NeonrmgrRecord::ClearVmBits(ClearVmBits {
    1134            0 :                 new_heap_blkno,
    1135            0 :                 old_heap_blkno,
    1136            0 :                 vm_rel,
    1137            0 :                 flags,
    1138            0 :             })))
    1139              :         } else {
    1140            0 :             Ok(None)
    1141              :         }
    1142            0 :     }
    1143              : 
    1144              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
    1145            0 :     async fn ingest_xlog_dbase_create(
    1146            0 :         &mut self,
    1147            0 :         create: DbaseCreate,
    1148            0 :         modification: &mut DatadirModification<'_>,
    1149            0 :         ctx: &RequestContext,
    1150            0 :     ) -> anyhow::Result<()> {
    1151            0 :         let DbaseCreate {
    1152            0 :             db_id,
    1153            0 :             tablespace_id,
    1154            0 :             src_db_id,
    1155            0 :             src_tablespace_id,
    1156            0 :         } = create;
    1157              : 
    1158            0 :         let rels = modification
    1159            0 :             .tline
    1160            0 :             .list_rels(
    1161            0 :                 src_tablespace_id,
    1162            0 :                 src_db_id,
    1163            0 :                 Version::Modified(modification),
    1164            0 :                 ctx,
    1165            0 :             )
    1166            0 :             .await?;
    1167              : 
    1168            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
    1169              : 
    1170              :         // Copy relfilemap
    1171            0 :         let filemap = modification
    1172            0 :             .tline
    1173            0 :             .get_relmap_file(
    1174            0 :                 src_tablespace_id,
    1175            0 :                 src_db_id,
    1176            0 :                 Version::Modified(modification),
    1177            0 :                 ctx,
    1178            0 :             )
    1179            0 :             .await?;
    1180            0 :         modification
    1181            0 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
    1182            0 :             .await?;
    1183              : 
    1184            0 :         let mut num_rels_copied = 0;
    1185            0 :         let mut num_blocks_copied = 0;
    1186            0 :         for src_rel in rels {
    1187            0 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
    1188            0 :             assert_eq!(src_rel.dbnode, src_db_id);
    1189              : 
    1190            0 :             let nblocks = modification
    1191            0 :                 .tline
    1192            0 :                 .get_rel_size(src_rel, Version::Modified(modification), ctx)
    1193            0 :                 .await?;
    1194            0 :             let dst_rel = RelTag {
    1195            0 :                 spcnode: tablespace_id,
    1196            0 :                 dbnode: db_id,
    1197            0 :                 relnode: src_rel.relnode,
    1198            0 :                 forknum: src_rel.forknum,
    1199            0 :             };
    1200            0 : 
    1201            0 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
    1202              : 
    1203              :             // Copy content
    1204            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
    1205            0 :             for blknum in 0..nblocks {
    1206              :                 // Sharding:
    1207              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
    1208              :                 //    dbNode is not included in the hash inputs for sharding.
    1209              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
    1210              :                 //    that belong to it.
    1211            0 :                 let src_key = rel_block_to_key(src_rel, blknum);
    1212            0 :                 if !self.shard.is_key_local(&src_key) {
    1213            0 :                     debug!(
    1214            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
    1215              :                         src_key
    1216              :                     );
    1217            0 :                     continue;
    1218            0 :                 }
    1219            0 :                 debug!(
    1220            0 :                     "copying block {} from {} ({}) to {}",
    1221              :                     blknum, src_rel, src_key, dst_rel
    1222              :                 );
    1223              : 
    1224            0 :                 let content = modification
    1225            0 :                     .tline
    1226            0 :                     .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
    1227            0 :                     .await?;
    1228            0 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
    1229            0 :                 num_blocks_copied += 1;
    1230              :             }
    1231              : 
    1232            0 :             num_rels_copied += 1;
    1233              :         }
    1234              : 
    1235            0 :         info!(
    1236            0 :             "Created database {}/{}, copied {} blocks in {} rels",
    1237              :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
    1238              :         );
    1239            0 :         Ok(())
    1240            0 :     }
    1241              : 
    1242            0 :     async fn ingest_xlog_dbase_drop(
    1243            0 :         &mut self,
    1244            0 :         dbase_drop: DbaseDrop,
    1245            0 :         modification: &mut DatadirModification<'_>,
    1246            0 :         ctx: &RequestContext,
    1247            0 :     ) -> anyhow::Result<()> {
    1248            0 :         let DbaseDrop {
    1249            0 :             db_id,
    1250            0 :             tablespace_ids,
    1251            0 :         } = dbase_drop;
    1252            0 :         for tablespace_id in tablespace_ids {
    1253            0 :             trace!("Drop db {}, {}", tablespace_id, db_id);
    1254            0 :             modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
    1255              :         }
    1256              : 
    1257            0 :         Ok(())
    1258            0 :     }
    1259              : 
    1260            0 :     fn decode_dbase_record(
    1261            0 :         buf: &mut Bytes,
    1262            0 :         decoded: &DecodedWALRecord,
    1263            0 :         pg_version: u32,
    1264            0 :     ) -> anyhow::Result<Option<DbaseRecord>> {
    1265            0 :         // TODO: Refactor this to avoid the duplication between postgres versions.
    1266            0 : 
    1267            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1268            0 :         debug!(%info, %pg_version, "handle RM_DBASE_ID");
    1269              : 
    1270            0 :         if pg_version == 14 {
    1271            0 :             if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
    1272            0 :                 let createdb = XlCreateDatabase::decode(buf);
    1273            0 :                 debug!("XLOG_DBASE_CREATE v14");
    1274              : 
    1275            0 :                 let record = DbaseRecord::Create(DbaseCreate {
    1276            0 :                     db_id: createdb.db_id,
    1277            0 :                     tablespace_id: createdb.tablespace_id,
    1278            0 :                     src_db_id: createdb.src_db_id,
    1279            0 :                     src_tablespace_id: createdb.src_tablespace_id,
    1280            0 :                 });
    1281            0 : 
    1282            0 :                 return Ok(Some(record));
    1283            0 :             } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
    1284            0 :                 let dropdb = XlDropDatabase::decode(buf);
    1285            0 : 
    1286            0 :                 let record = DbaseRecord::Drop(DbaseDrop {
    1287            0 :                     db_id: dropdb.db_id,
    1288            0 :                     tablespace_ids: dropdb.tablespace_ids,
    1289            0 :                 });
    1290            0 : 
    1291            0 :                 return Ok(Some(record));
    1292            0 :             }
    1293            0 :         } else if pg_version == 15 {
    1294            0 :             if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
    1295            0 :                 debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
    1296            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
    1297              :                 // The XLOG record was renamed between v14 and v15,
    1298              :                 // but the record format is the same.
    1299              :                 // So we can reuse XlCreateDatabase here.
    1300            0 :                 debug!("XLOG_DBASE_CREATE_FILE_COPY");
    1301              : 
    1302            0 :                 let createdb = XlCreateDatabase::decode(buf);
    1303            0 :                 let record = DbaseRecord::Create(DbaseCreate {
    1304            0 :                     db_id: createdb.db_id,
    1305            0 :                     tablespace_id: createdb.tablespace_id,
    1306            0 :                     src_db_id: createdb.src_db_id,
    1307            0 :                     src_tablespace_id: createdb.src_tablespace_id,
    1308            0 :                 });
    1309            0 : 
    1310            0 :                 return Ok(Some(record));
    1311            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
    1312            0 :                 let dropdb = XlDropDatabase::decode(buf);
    1313            0 :                 let record = DbaseRecord::Drop(DbaseDrop {
    1314            0 :                     db_id: dropdb.db_id,
    1315            0 :                     tablespace_ids: dropdb.tablespace_ids,
    1316            0 :                 });
    1317            0 : 
    1318            0 :                 return Ok(Some(record));
    1319            0 :             }
    1320            0 :         } else if pg_version == 16 {
    1321            0 :             if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
    1322            0 :                 debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
    1323            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
    1324              :                 // The XLOG record was renamed between v14 and v15,
    1325              :                 // but the record format is the same.
    1326              :                 // So we can reuse XlCreateDatabase here.
    1327            0 :                 debug!("XLOG_DBASE_CREATE_FILE_COPY");
    1328              : 
    1329            0 :                 let createdb = XlCreateDatabase::decode(buf);
    1330            0 :                 let record = DbaseRecord::Create(DbaseCreate {
    1331            0 :                     db_id: createdb.db_id,
    1332            0 :                     tablespace_id: createdb.tablespace_id,
    1333            0 :                     src_db_id: createdb.src_db_id,
    1334            0 :                     src_tablespace_id: createdb.src_tablespace_id,
    1335            0 :                 });
    1336            0 : 
    1337            0 :                 return Ok(Some(record));
    1338            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
    1339            0 :                 let dropdb = XlDropDatabase::decode(buf);
    1340            0 :                 let record = DbaseRecord::Drop(DbaseDrop {
    1341            0 :                     db_id: dropdb.db_id,
    1342            0 :                     tablespace_ids: dropdb.tablespace_ids,
    1343            0 :                 });
    1344            0 : 
    1345            0 :                 return Ok(Some(record));
    1346            0 :             }
    1347            0 :         } else if pg_version == 17 {
    1348            0 :             if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
    1349            0 :                 debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
    1350            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
    1351              :                 // The XLOG record was renamed between v14 and v15,
    1352              :                 // but the record format is the same.
    1353              :                 // So we can reuse XlCreateDatabase here.
    1354            0 :                 debug!("XLOG_DBASE_CREATE_FILE_COPY");
    1355              : 
    1356            0 :                 let createdb = XlCreateDatabase::decode(buf);
    1357            0 :                 let record = DbaseRecord::Create(DbaseCreate {
    1358            0 :                     db_id: createdb.db_id,
    1359            0 :                     tablespace_id: createdb.tablespace_id,
    1360            0 :                     src_db_id: createdb.src_db_id,
    1361            0 :                     src_tablespace_id: createdb.src_tablespace_id,
    1362            0 :                 });
    1363            0 : 
    1364            0 :                 return Ok(Some(record));
    1365            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
    1366            0 :                 let dropdb = XlDropDatabase::decode(buf);
    1367            0 :                 let record = DbaseRecord::Drop(DbaseDrop {
    1368            0 :                     db_id: dropdb.db_id,
    1369            0 :                     tablespace_ids: dropdb.tablespace_ids,
    1370            0 :                 });
    1371            0 : 
    1372            0 :                 return Ok(Some(record));
    1373            0 :             }
    1374            0 :         }
    1375              : 
    1376            0 :         Ok(None)
    1377            0 :     }
    1378              : 
    1379           16 :     async fn ingest_xlog_smgr_create(
    1380           16 :         &mut self,
    1381           16 :         create: SmgrCreate,
    1382           16 :         modification: &mut DatadirModification<'_>,
    1383           16 :         ctx: &RequestContext,
    1384           16 :     ) -> anyhow::Result<()> {
    1385           16 :         let SmgrCreate { rel } = create;
    1386           16 :         self.put_rel_creation(modification, rel, ctx).await?;
    1387           16 :         Ok(())
    1388           16 :     }
    1389              : 
    1390           16 :     fn decode_smgr_record(
    1391           16 :         buf: &mut Bytes,
    1392           16 :         decoded: &DecodedWALRecord,
    1393           16 :         _pg_version: u32,
    1394           16 :     ) -> anyhow::Result<Option<SmgrRecord>> {
    1395           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1396           16 :         if info == pg_constants::XLOG_SMGR_CREATE {
    1397           16 :             let create = XlSmgrCreate::decode(buf);
    1398           16 :             let rel = RelTag {
    1399           16 :                 spcnode: create.rnode.spcnode,
    1400           16 :                 dbnode: create.rnode.dbnode,
    1401           16 :                 relnode: create.rnode.relnode,
    1402           16 :                 forknum: create.forknum,
    1403           16 :             };
    1404           16 : 
    1405           16 :             return Ok(Some(SmgrRecord::Create(SmgrCreate { rel })));
    1406            0 :         } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
    1407            0 :             let truncate = XlSmgrTruncate::decode(buf);
    1408            0 :             return Ok(Some(SmgrRecord::Truncate(truncate)));
    1409            0 :         }
    1410            0 : 
    1411            0 :         Ok(None)
    1412           16 :     }
    1413              : 
    1414              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
    1415              :     ///
    1416              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
    1417            0 :     async fn ingest_xlog_smgr_truncate(
    1418            0 :         &mut self,
    1419            0 :         truncate: XlSmgrTruncate,
    1420            0 :         modification: &mut DatadirModification<'_>,
    1421            0 :         ctx: &RequestContext,
    1422            0 :     ) -> anyhow::Result<()> {
    1423            0 :         let XlSmgrTruncate {
    1424            0 :             blkno,
    1425            0 :             rnode,
    1426            0 :             flags,
    1427            0 :         } = truncate;
    1428            0 : 
    1429            0 :         let spcnode = rnode.spcnode;
    1430            0 :         let dbnode = rnode.dbnode;
    1431            0 :         let relnode = rnode.relnode;
    1432            0 : 
    1433            0 :         if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
    1434            0 :             let rel = RelTag {
    1435            0 :                 spcnode,
    1436            0 :                 dbnode,
    1437            0 :                 relnode,
    1438            0 :                 forknum: MAIN_FORKNUM,
    1439            0 :             };
    1440            0 : 
    1441            0 :             self.put_rel_truncation(modification, rel, blkno, ctx)
    1442            0 :                 .await?;
    1443            0 :         }
    1444            0 :         if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
    1445            0 :             let rel = RelTag {
    1446            0 :                 spcnode,
    1447            0 :                 dbnode,
    1448            0 :                 relnode,
    1449            0 :                 forknum: FSM_FORKNUM,
    1450            0 :             };
    1451            0 : 
    1452            0 :             let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
    1453            0 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
    1454            0 :             if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
    1455              :                 // Tail of last remaining FSM page has to be zeroed.
    1456              :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
    1457            0 :                 modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
    1458            0 :                 fsm_physical_page_no += 1;
    1459            0 :             }
    1460            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1461            0 :             if nblocks > fsm_physical_page_no {
    1462              :                 // check if something to do: FSM is larger than truncate position
    1463            0 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
    1464            0 :                     .await?;
    1465            0 :             }
    1466            0 :         }
    1467            0 :         if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
    1468            0 :             let rel = RelTag {
    1469            0 :                 spcnode,
    1470            0 :                 dbnode,
    1471            0 :                 relnode,
    1472            0 :                 forknum: VISIBILITYMAP_FORKNUM,
    1473            0 :             };
    1474            0 : 
    1475            0 :             let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
    1476            0 :             if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
    1477              :                 // Tail of last remaining vm page has to be zeroed.
    1478              :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
    1479            0 :                 modification.put_rel_page_image_zero(rel, vm_page_no)?;
    1480            0 :                 vm_page_no += 1;
    1481            0 :             }
    1482            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1483            0 :             if nblocks > vm_page_no {
    1484              :                 // check if something to do: VM is larger than truncate position
    1485            0 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
    1486            0 :                     .await?;
    1487            0 :             }
    1488            0 :         }
    1489            0 :         Ok(())
    1490            0 :     }
    1491              : 
    1492            8 :     fn warn_on_ingest_lag(
    1493            8 :         &mut self,
    1494            8 :         conf: &crate::config::PageServerConf,
    1495            8 :         wal_timestamp: TimestampTz,
    1496            8 :     ) {
    1497            8 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1498            8 :         let now = SystemTime::now();
    1499            8 :         let rate_limits = &mut self.warn_ingest_lag;
    1500              : 
    1501            8 :         let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
    1502            0 :             pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
    1503              :         });
    1504              : 
    1505            8 :         match ts {
    1506            8 :             Ok(ts) => {
    1507            8 :                 match now.duration_since(ts) {
    1508            8 :                     Ok(lag) => {
    1509            8 :                         if lag > conf.wait_lsn_timeout {
    1510            8 :                             rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
    1511            2 :                                 if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
    1512            0 :                                     if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
    1513            0 :                                         return;
    1514            0 :                                     }
    1515            2 :                                 } else {
    1516            2 :                                     // Still loading? We shouldn't be here
    1517            2 :                                 }
    1518            2 :                                 let lag = humantime::format_duration(lag);
    1519            2 :                                 warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
    1520            8 :                             })
    1521            0 :                         }
    1522              :                     }
    1523            0 :                     Err(e) => {
    1524            0 :                         let delta_t = e.duration();
    1525              :                         // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
    1526              :                         // => https://www.robustperception.io/time-metric-from-the-node-exporter/
    1527              :                         const IGNORED_DRIFT: Duration = Duration::from_millis(100);
    1528            0 :                         if delta_t > IGNORED_DRIFT {
    1529            0 :                             let delta_t = humantime::format_duration(delta_t);
    1530            0 :                             rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
    1531            0 :                                 warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
    1532            0 :                             })
    1533            0 :                         }
    1534              :                     }
    1535              :                 };
    1536              :             }
    1537            0 :             Err(error) => {
    1538            0 :                 rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
    1539            0 :                     warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
    1540            0 :                 })
    1541              :             }
    1542              :         }
    1543            8 :     }
    1544              : 
    1545              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
    1546              :     ///
    1547            8 :     async fn ingest_xact_record(
    1548            8 :         &mut self,
    1549            8 :         record: XactRecord,
    1550            8 :         modification: &mut DatadirModification<'_>,
    1551            8 :         ctx: &RequestContext,
    1552            8 :     ) -> anyhow::Result<()> {
    1553            8 :         let (xact_common, is_commit, is_prepared) = match record {
    1554            0 :             XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
    1555            0 :                 let xid: u64 = if modification.tline.pg_version >= 17 {
    1556            0 :                     self.adjust_to_full_transaction_id(xl_xid)?
    1557              :                 } else {
    1558            0 :                     xl_xid as u64
    1559              :                 };
    1560            0 :                 return modification.put_twophase_file(xid, data, ctx).await;
    1561              :             }
    1562            8 :             XactRecord::Commit(common) => (common, true, false),
    1563            0 :             XactRecord::Abort(common) => (common, false, false),
    1564            0 :             XactRecord::CommitPrepared(common) => (common, true, true),
    1565            0 :             XactRecord::AbortPrepared(common) => (common, false, true),
    1566              :         };
    1567              : 
    1568              :         let XactCommon {
    1569            8 :             parsed,
    1570            8 :             origin_id,
    1571            8 :             xl_xid,
    1572            8 :             lsn,
    1573            8 :         } = xact_common;
    1574            8 : 
    1575            8 :         // Record update of CLOG pages
    1576            8 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
    1577            8 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1578            8 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1579            8 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
    1580            8 : 
    1581            8 :         self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
    1582              : 
    1583            8 :         for subxact in &parsed.subxacts {
    1584            0 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
    1585            0 :             if subxact_pageno != pageno {
    1586              :                 // This subxact goes to different page. Write the record
    1587              :                 // for all the XIDs on the previous page, and continue
    1588              :                 // accumulating XIDs on this new page.
    1589            0 :                 modification.put_slru_wal_record(
    1590            0 :                     SlruKind::Clog,
    1591            0 :                     segno,
    1592            0 :                     rpageno,
    1593            0 :                     if is_commit {
    1594            0 :                         NeonWalRecord::ClogSetCommitted {
    1595            0 :                             xids: page_xids,
    1596            0 :                             timestamp: parsed.xact_time,
    1597            0 :                         }
    1598              :                     } else {
    1599            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
    1600              :                     },
    1601            0 :                 )?;
    1602            0 :                 page_xids = Vec::new();
    1603            0 :             }
    1604            0 :             pageno = subxact_pageno;
    1605            0 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1606            0 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1607            0 :             page_xids.push(*subxact);
    1608              :         }
    1609            8 :         modification.put_slru_wal_record(
    1610            8 :             SlruKind::Clog,
    1611            8 :             segno,
    1612            8 :             rpageno,
    1613            8 :             if is_commit {
    1614            8 :                 NeonWalRecord::ClogSetCommitted {
    1615            8 :                     xids: page_xids,
    1616            8 :                     timestamp: parsed.xact_time,
    1617            8 :                 }
    1618              :             } else {
    1619            0 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
    1620              :             },
    1621            0 :         )?;
    1622              : 
    1623            8 :         for xnode in &parsed.xnodes {
    1624            0 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
    1625            0 :                 let rel = RelTag {
    1626            0 :                     forknum,
    1627            0 :                     spcnode: xnode.spcnode,
    1628            0 :                     dbnode: xnode.dbnode,
    1629            0 :                     relnode: xnode.relnode,
    1630            0 :                 };
    1631            0 :                 if modification
    1632            0 :                     .tline
    1633            0 :                     .get_rel_exists(rel, Version::Modified(modification), ctx)
    1634            0 :                     .await?
    1635              :                 {
    1636            0 :                     self.put_rel_drop(modification, rel, ctx).await?;
    1637            0 :                 }
    1638              :             }
    1639              :         }
    1640            8 :         if origin_id != 0 {
    1641            0 :             modification
    1642            0 :                 .set_replorigin(origin_id, parsed.origin_lsn)
    1643            0 :                 .await?;
    1644            8 :         }
    1645              : 
    1646            8 :         if is_prepared {
    1647              :             // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
    1648            0 :             trace!(
    1649            0 :                 "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
    1650              :                 xl_xid,
    1651              :                 parsed.xid,
    1652              :                 lsn,
    1653              :             );
    1654              : 
    1655            0 :             let xid: u64 = if modification.tline.pg_version >= 17 {
    1656            0 :                 self.adjust_to_full_transaction_id(parsed.xid)?
    1657              :             } else {
    1658            0 :                 parsed.xid as u64
    1659              :             };
    1660            0 :             modification.drop_twophase_file(xid, ctx).await?;
    1661            8 :         }
    1662              : 
    1663            8 :         Ok(())
    1664            8 :     }
    1665              : 
    1666              :     // TODO(vlad): Standardise interface for `decode_...`
    1667           24 :     fn decode_xact_record(
    1668           24 :         buf: &mut Bytes,
    1669           24 :         decoded: &DecodedWALRecord,
    1670           24 :         lsn: Lsn,
    1671           24 :         _pg_version: u32,
    1672           24 :     ) -> anyhow::Result<Option<XactRecord>> {
    1673           24 :         let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
    1674           24 :         let origin_id = decoded.origin_id;
    1675           24 :         let xl_xid = decoded.xl_xid;
    1676           24 : 
    1677           24 :         if info == pg_constants::XLOG_XACT_COMMIT {
    1678            8 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
    1679            8 :             return Ok(Some(XactRecord::Commit(XactCommon {
    1680            8 :                 parsed,
    1681            8 :                 origin_id,
    1682            8 :                 xl_xid,
    1683            8 :                 lsn,
    1684            8 :             })));
    1685           16 :         } else if info == pg_constants::XLOG_XACT_ABORT {
    1686            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
    1687            0 :             return Ok(Some(XactRecord::Abort(XactCommon {
    1688            0 :                 parsed,
    1689            0 :                 origin_id,
    1690            0 :                 xl_xid,
    1691            0 :                 lsn,
    1692            0 :             })));
    1693           16 :         } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
    1694            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
    1695            0 :             return Ok(Some(XactRecord::CommitPrepared(XactCommon {
    1696            0 :                 parsed,
    1697            0 :                 origin_id,
    1698            0 :                 xl_xid,
    1699            0 :                 lsn,
    1700            0 :             })));
    1701           16 :         } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
    1702            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
    1703            0 :             return Ok(Some(XactRecord::AbortPrepared(XactCommon {
    1704            0 :                 parsed,
    1705            0 :                 origin_id,
    1706            0 :                 xl_xid,
    1707            0 :                 lsn,
    1708            0 :             })));
    1709           16 :         } else if info == pg_constants::XLOG_XACT_PREPARE {
    1710            0 :             return Ok(Some(XactRecord::Prepare(XactPrepare {
    1711            0 :                 xl_xid: decoded.xl_xid,
    1712            0 :                 data: Bytes::copy_from_slice(&buf[..]),
    1713            0 :             })));
    1714           16 :         }
    1715           16 : 
    1716           16 :         Ok(None)
    1717           24 :     }
    1718              : 
    1719            0 :     async fn ingest_clog_truncate(
    1720            0 :         &mut self,
    1721            0 :         truncate: ClogTruncate,
    1722            0 :         modification: &mut DatadirModification<'_>,
    1723            0 :         ctx: &RequestContext,
    1724            0 :     ) -> anyhow::Result<()> {
    1725            0 :         let ClogTruncate {
    1726            0 :             pageno,
    1727            0 :             oldest_xid,
    1728            0 :             oldest_xid_db,
    1729            0 :         } = truncate;
    1730            0 : 
    1731            0 :         info!(
    1732            0 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
    1733              :             pageno, oldest_xid, oldest_xid_db
    1734              :         );
    1735              : 
    1736              :         // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
    1737              :         // truncated, but a checkpoint record with the updated values isn't written until
    1738              :         // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
    1739              :         // so we keep the oldestXid and oldestXidDB up-to-date.
    1740            0 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1741            0 :             cp.oldestXid = oldest_xid;
    1742            0 :             cp.oldestXidDB = oldest_xid_db;
    1743            0 :         });
    1744            0 :         self.checkpoint_modified = true;
    1745              : 
    1746              :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
    1747              : 
    1748            0 :         let latest_page_number =
    1749            0 :             enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
    1750              :                 / pg_constants::CLOG_XACTS_PER_PAGE;
    1751              : 
    1752              :         // Now delete all segments containing pages between xlrec.pageno
    1753              :         // and latest_page_number.
    1754              : 
    1755              :         // First, make an important safety check:
    1756              :         // the current endpoint page must not be eligible for removal.
    1757              :         // See SimpleLruTruncate() in slru.c
    1758            0 :         if dispatch_pgversion!(modification.tline.pg_version, {
    1759            0 :             pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
    1760              :         }) {
    1761            0 :             info!("could not truncate directory pg_xact apparent wraparound");
    1762            0 :             return Ok(());
    1763            0 :         }
    1764              : 
    1765              :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
    1766              :         //
    1767              :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
    1768              :         // will block waiting for the last valid LSN to advance up to
    1769              :         // it. So we use the previous record's LSN in the get calls
    1770              :         // instead.
    1771            0 :         for segno in modification
    1772            0 :             .tline
    1773            0 :             .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
    1774            0 :             .await?
    1775              :         {
    1776            0 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
    1777              : 
    1778            0 :             let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
    1779            0 :                 pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
    1780              :             });
    1781              : 
    1782            0 :             if may_delete {
    1783            0 :                 modification
    1784            0 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
    1785            0 :                     .await?;
    1786            0 :                 trace!("Drop CLOG segment {:>04X}", segno);
    1787            0 :             }
    1788              :         }
    1789              : 
    1790            0 :         Ok(())
    1791            0 :     }
    1792              : 
    1793            0 :     async fn ingest_clog_zero_page(
    1794            0 :         &mut self,
    1795            0 :         zero_page: ClogZeroPage,
    1796            0 :         modification: &mut DatadirModification<'_>,
    1797            0 :         ctx: &RequestContext,
    1798            0 :     ) -> anyhow::Result<()> {
    1799            0 :         let ClogZeroPage { segno, rpageno } = zero_page;
    1800            0 : 
    1801            0 :         self.put_slru_page_image(
    1802            0 :             modification,
    1803            0 :             SlruKind::Clog,
    1804            0 :             segno,
    1805            0 :             rpageno,
    1806            0 :             ZERO_PAGE.clone(),
    1807            0 :             ctx,
    1808            0 :         )
    1809            0 :         .await
    1810            0 :     }
    1811              : 
    1812            0 :     fn decode_clog_record(
    1813            0 :         buf: &mut Bytes,
    1814            0 :         decoded: &DecodedWALRecord,
    1815            0 :         pg_version: u32,
    1816            0 :     ) -> anyhow::Result<Option<ClogRecord>> {
    1817            0 :         let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
    1818            0 : 
    1819            0 :         if info == pg_constants::CLOG_ZEROPAGE {
    1820            0 :             let pageno = if pg_version < 17 {
    1821            0 :                 buf.get_u32_le()
    1822              :             } else {
    1823            0 :                 buf.get_u64_le() as u32
    1824              :             };
    1825            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1826            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1827            0 : 
    1828            0 :             Ok(Some(ClogRecord::ZeroPage(ClogZeroPage { segno, rpageno })))
    1829              :         } else {
    1830            0 :             assert!(info == pg_constants::CLOG_TRUNCATE);
    1831            0 :             let xlrec = XlClogTruncate::decode(buf, pg_version);
    1832            0 : 
    1833            0 :             Ok(Some(ClogRecord::Truncate(ClogTruncate {
    1834            0 :                 pageno: xlrec.pageno,
    1835            0 :                 oldest_xid: xlrec.oldest_xid,
    1836            0 :                 oldest_xid_db: xlrec.oldest_xid_db,
    1837            0 :             })))
    1838              :         }
    1839            0 :     }
    1840              : 
    1841            0 :     fn ingest_multixact_create(
    1842            0 :         &mut self,
    1843            0 :         modification: &mut DatadirModification,
    1844            0 :         xlrec: &XlMultiXactCreate,
    1845            0 :     ) -> Result<()> {
    1846            0 :         // Create WAL record for updating the multixact-offsets page
    1847            0 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1848            0 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1849            0 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1850            0 : 
    1851            0 :         modification.put_slru_wal_record(
    1852            0 :             SlruKind::MultiXactOffsets,
    1853            0 :             segno,
    1854            0 :             rpageno,
    1855            0 :             NeonWalRecord::MultixactOffsetCreate {
    1856            0 :                 mid: xlrec.mid,
    1857            0 :                 moff: xlrec.moff,
    1858            0 :             },
    1859            0 :         )?;
    1860              : 
    1861              :         // Create WAL records for the update of each affected multixact-members page
    1862            0 :         let mut members = xlrec.members.iter();
    1863            0 :         let mut offset = xlrec.moff;
    1864              :         loop {
    1865            0 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1866            0 : 
    1867            0 :             // How many members fit on this page?
    1868            0 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1869            0 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1870            0 : 
    1871            0 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1872            0 :             for _ in 0..page_remain {
    1873            0 :                 if let Some(m) = members.next() {
    1874            0 :                     this_page_members.push(m.clone());
    1875            0 :                 } else {
    1876            0 :                     break;
    1877              :                 }
    1878              :             }
    1879            0 :             if this_page_members.is_empty() {
    1880              :                 // all done
    1881            0 :                 break;
    1882            0 :             }
    1883            0 :             let n_this_page = this_page_members.len();
    1884            0 : 
    1885            0 :             modification.put_slru_wal_record(
    1886            0 :                 SlruKind::MultiXactMembers,
    1887            0 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1888            0 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1889            0 :                 NeonWalRecord::MultixactMembersCreate {
    1890            0 :                     moff: offset,
    1891            0 :                     members: this_page_members,
    1892            0 :                 },
    1893            0 :             )?;
    1894              : 
    1895              :             // Note: The multixact members can wrap around, even within one WAL record.
    1896            0 :             offset = offset.wrapping_add(n_this_page as u32);
    1897              :         }
    1898            0 :         let next_offset = offset;
    1899            0 :         assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
    1900              : 
    1901              :         // Update next-multi-xid and next-offset
    1902              :         //
    1903              :         // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
    1904              :         // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
    1905              :         // read it, like GetNewMultiXactId(). This is different from how nextXid is
    1906              :         // incremented! nextXid skips over < FirstNormalTransactionId when the the value
    1907              :         // is stored, so it's never 0 in a checkpoint.
    1908              :         //
    1909              :         // I don't know why it's done that way, it seems less error-prone to skip over 0
    1910              :         // when the value is stored rather than when it's read. But let's do it the same
    1911              :         // way here.
    1912            0 :         let next_multi_xid = xlrec.mid.wrapping_add(1);
    1913            0 : 
    1914            0 :         if self
    1915            0 :             .checkpoint
    1916            0 :             .update_next_multixid(next_multi_xid, next_offset)
    1917            0 :         {
    1918            0 :             self.checkpoint_modified = true;
    1919            0 :         }
    1920              : 
    1921              :         // Also update the next-xid with the highest member. According to the comments in
    1922              :         // multixact_redo(), this shouldn't be necessary, but let's do the same here.
    1923            0 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
    1924            0 :             if let Some(max_xid) = acc {
    1925            0 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
    1926            0 :                     Some(mbr.xid)
    1927              :                 } else {
    1928            0 :                     acc
    1929              :                 }
    1930              :             } else {
    1931            0 :                 Some(mbr.xid)
    1932              :             }
    1933            0 :         });
    1934              : 
    1935            0 :         if let Some(max_xid) = max_mbr_xid {
    1936            0 :             if self.checkpoint.update_next_xid(max_xid) {
    1937            0 :                 self.checkpoint_modified = true;
    1938            0 :             }
    1939            0 :         }
    1940            0 :         Ok(())
    1941            0 :     }
    1942              : 
    1943            0 :     async fn ingest_multixact_truncate(
    1944            0 :         &mut self,
    1945            0 :         modification: &mut DatadirModification<'_>,
    1946            0 :         xlrec: &XlMultiXactTruncate,
    1947            0 :         ctx: &RequestContext,
    1948            0 :     ) -> Result<()> {
    1949            0 :         let (maxsegment, startsegment, endsegment) =
    1950            0 :             enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1951            0 :                 cp.oldestMulti = xlrec.end_trunc_off;
    1952            0 :                 cp.oldestMultiDB = xlrec.oldest_multi_db;
    1953            0 :                 let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
    1954            0 :                     pg_constants::MAX_MULTIXACT_OFFSET,
    1955            0 :                 );
    1956            0 :                 let startsegment: i32 =
    1957            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1958            0 :                 let endsegment: i32 =
    1959            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1960            0 :                 (maxsegment, startsegment, endsegment)
    1961              :             });
    1962              : 
    1963            0 :         self.checkpoint_modified = true;
    1964            0 : 
    1965            0 :         // PerformMembersTruncation
    1966            0 :         let mut segment: i32 = startsegment;
    1967              : 
    1968              :         // Delete all the segments except the last one. The last segment can still
    1969              :         // contain, possibly partially, valid data.
    1970            0 :         while segment != endsegment {
    1971            0 :             modification
    1972            0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1973            0 :                 .await?;
    1974              : 
    1975              :             /* move to next segment, handling wraparound correctly */
    1976            0 :             if segment == maxsegment {
    1977            0 :                 segment = 0;
    1978            0 :             } else {
    1979            0 :                 segment += 1;
    1980            0 :             }
    1981              :         }
    1982              : 
    1983              :         // Truncate offsets
    1984              :         // FIXME: this did not handle wraparound correctly
    1985              : 
    1986            0 :         Ok(())
    1987            0 :     }
    1988              : 
    1989            0 :     async fn ingest_multixact_zero_page(
    1990            0 :         &mut self,
    1991            0 :         zero_page: MultiXactZeroPage,
    1992            0 :         modification: &mut DatadirModification<'_>,
    1993            0 :         ctx: &RequestContext,
    1994            0 :     ) -> Result<()> {
    1995            0 :         let MultiXactZeroPage {
    1996            0 :             slru_kind,
    1997            0 :             segno,
    1998            0 :             rpageno,
    1999            0 :         } = zero_page;
    2000            0 :         self.put_slru_page_image(
    2001            0 :             modification,
    2002            0 :             slru_kind,
    2003            0 :             segno,
    2004            0 :             rpageno,
    2005            0 :             ZERO_PAGE.clone(),
    2006            0 :             ctx,
    2007            0 :         )
    2008            0 :         .await
    2009            0 :     }
    2010              : 
    2011            0 :     fn decode_multixact_record(
    2012            0 :         buf: &mut Bytes,
    2013            0 :         decoded: &DecodedWALRecord,
    2014            0 :         pg_version: u32,
    2015            0 :     ) -> anyhow::Result<Option<MultiXactRecord>> {
    2016            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    2017            0 : 
    2018            0 :         if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
    2019            0 :             || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
    2020              :         {
    2021            0 :             let pageno = if pg_version < 17 {
    2022            0 :                 buf.get_u32_le()
    2023              :             } else {
    2024            0 :                 buf.get_u64_le() as u32
    2025              :             };
    2026            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    2027            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    2028              : 
    2029            0 :             let slru_kind = match info {
    2030            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
    2031            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
    2032            0 :                 _ => unreachable!(),
    2033              :             };
    2034              : 
    2035            0 :             return Ok(Some(MultiXactRecord::ZeroPage(MultiXactZeroPage {
    2036            0 :                 slru_kind,
    2037            0 :                 segno,
    2038            0 :                 rpageno,
    2039            0 :             })));
    2040            0 :         } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
    2041            0 :             let xlrec = XlMultiXactCreate::decode(buf);
    2042            0 :             return Ok(Some(MultiXactRecord::Create(xlrec)));
    2043            0 :         } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
    2044            0 :             let xlrec = XlMultiXactTruncate::decode(buf);
    2045            0 :             return Ok(Some(MultiXactRecord::Truncate(xlrec)));
    2046            0 :         }
    2047            0 : 
    2048            0 :         Ok(None)
    2049            0 :     }
    2050              : 
    2051            0 :     async fn ingest_relmap_update(
    2052            0 :         &mut self,
    2053            0 :         update: RelmapUpdate,
    2054            0 :         modification: &mut DatadirModification<'_>,
    2055            0 :         ctx: &RequestContext,
    2056            0 :     ) -> Result<()> {
    2057            0 :         let RelmapUpdate { update, buf } = update;
    2058            0 : 
    2059            0 :         modification
    2060            0 :             .put_relmap_file(update.tsid, update.dbid, buf, ctx)
    2061            0 :             .await
    2062            0 :     }
    2063              : 
    2064            0 :     fn decode_relmap_record(
    2065            0 :         buf: &mut Bytes,
    2066            0 :         decoded: &DecodedWALRecord,
    2067            0 :         _pg_version: u32,
    2068            0 :     ) -> anyhow::Result<Option<RelmapRecord>> {
    2069            0 :         let update = XlRelmapUpdate::decode(buf);
    2070            0 : 
    2071            0 :         let mut buf = decoded.record.clone();
    2072            0 :         buf.advance(decoded.main_data_offset);
    2073            0 :         // skip xl_relmap_update
    2074            0 :         buf.advance(12);
    2075            0 : 
    2076            0 :         Ok(Some(RelmapRecord::Update(RelmapUpdate {
    2077            0 :             update,
    2078            0 :             buf: Bytes::copy_from_slice(&buf[..]),
    2079            0 :         })))
    2080            0 :     }
    2081              : 
    2082           30 :     async fn ingest_raw_xlog_record(
    2083           30 :         &mut self,
    2084           30 :         raw_record: RawXlogRecord,
    2085           30 :         modification: &mut DatadirModification<'_>,
    2086           30 :         ctx: &RequestContext,
    2087           30 :     ) -> Result<()> {
    2088           30 :         let RawXlogRecord { info, lsn, mut buf } = raw_record;
    2089           30 :         let pg_version = modification.tline.pg_version;
    2090           30 : 
    2091           30 :         if info == pg_constants::XLOG_PARAMETER_CHANGE {
    2092            2 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    2093            0 :                 let rec = v17::XlParameterChange::decode(&mut buf);
    2094            0 :                 cp.wal_level = rec.wal_level;
    2095            0 :                 self.checkpoint_modified = true;
    2096            2 :             }
    2097           28 :         } else if info == pg_constants::XLOG_END_OF_RECOVERY {
    2098            0 :             if let CheckPoint::V17(cp) = &mut self.checkpoint {
    2099            0 :                 let rec = v17::XlEndOfRecovery::decode(&mut buf);
    2100            0 :                 cp.wal_level = rec.wal_level;
    2101            0 :                 self.checkpoint_modified = true;
    2102            0 :             }
    2103           28 :         }
    2104              : 
    2105           30 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    2106            0 :             if info == pg_constants::XLOG_NEXTOID {
    2107            0 :                 let next_oid = buf.get_u32_le();
    2108            0 :                 if cp.nextOid != next_oid {
    2109            0 :                     cp.nextOid = next_oid;
    2110            0 :                     self.checkpoint_modified = true;
    2111            0 :                 }
    2112            0 :             } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
    2113            0 :                 || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    2114              :             {
    2115            0 :                 let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
    2116            0 :                 buf.copy_to_slice(&mut checkpoint_bytes);
    2117            0 :                 let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
    2118            0 :                 trace!(
    2119            0 :                     "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
    2120              :                     xlog_checkpoint.oldestXid,
    2121              :                     cp.oldestXid
    2122              :                 );
    2123            0 :                 if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
    2124            0 :                     cp.oldestXid = xlog_checkpoint.oldestXid;
    2125            0 :                 }
    2126            0 :                 trace!(
    2127            0 :                     "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
    2128              :                     xlog_checkpoint.oldestActiveXid,
    2129              :                     cp.oldestActiveXid
    2130              :                 );
    2131              : 
    2132              :                 // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
    2133              :                 // because at shutdown, all in-progress transactions will implicitly
    2134              :                 // end. Postgres startup code knows that, and allows hot standby to start
    2135              :                 // immediately from a shutdown checkpoint.
    2136              :                 //
    2137              :                 // In Neon, Postgres hot standby startup always behaves as if starting from
    2138              :                 // an online checkpoint. It needs a valid `oldestActiveXid` value, so
    2139              :                 // instead of overwriting self.checkpoint.oldestActiveXid with
    2140              :                 // InvalidTransactionid from the checkpoint WAL record, update it to a
    2141              :                 // proper value, knowing that there are no in-progress transactions at this
    2142              :                 // point, except for prepared transactions.
    2143              :                 //
    2144              :                 // See also the neon code changes in the InitWalRecovery() function.
    2145            0 :                 if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
    2146            0 :                     && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
    2147              :                 {
    2148            0 :                     let oldest_active_xid = if pg_version >= 17 {
    2149            0 :                         let mut oldest_active_full_xid = cp.nextXid.value;
    2150            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    2151            0 :                             if xid < oldest_active_full_xid {
    2152            0 :                                 oldest_active_full_xid = xid;
    2153            0 :                             }
    2154              :                         }
    2155            0 :                         oldest_active_full_xid as u32
    2156              :                     } else {
    2157            0 :                         let mut oldest_active_xid = cp.nextXid.value as u32;
    2158            0 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
    2159            0 :                             let narrow_xid = xid as u32;
    2160            0 :                             if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
    2161            0 :                                 oldest_active_xid = narrow_xid;
    2162            0 :                             }
    2163              :                         }
    2164            0 :                         oldest_active_xid
    2165              :                     };
    2166            0 :                     cp.oldestActiveXid = oldest_active_xid;
    2167            0 :                 } else {
    2168            0 :                     cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
    2169            0 :                 }
    2170              : 
    2171              :                 // Write a new checkpoint key-value pair on every checkpoint record, even
    2172              :                 // if nothing really changed. Not strictly required, but it seems nice to
    2173              :                 // have some trace of the checkpoint records in the layer files at the same
    2174              :                 // LSNs.
    2175            0 :                 self.checkpoint_modified = true;
    2176            0 :             }
    2177              :         });
    2178              : 
    2179           30 :         Ok(())
    2180           30 :     }
    2181              : 
    2182           30 :     fn decode_xlog_record(
    2183           30 :         buf: &mut Bytes,
    2184           30 :         decoded: &DecodedWALRecord,
    2185           30 :         lsn: Lsn,
    2186           30 :         _pg_version: u32,
    2187           30 :     ) -> anyhow::Result<Option<XlogRecord>> {
    2188           30 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    2189           30 :         Ok(Some(XlogRecord::Raw(RawXlogRecord {
    2190           30 :             info,
    2191           30 :             lsn,
    2192           30 :             buf: buf.clone(),
    2193           30 :         })))
    2194           30 :     }
    2195              : 
    2196            0 :     async fn ingest_logical_message_put(
    2197            0 :         &mut self,
    2198            0 :         put: PutLogicalMessage,
    2199            0 :         modification: &mut DatadirModification<'_>,
    2200            0 :         ctx: &RequestContext,
    2201            0 :     ) -> Result<()> {
    2202            0 :         let PutLogicalMessage { path, buf } = put;
    2203            0 :         modification.put_file(path.as_str(), &buf, ctx).await
    2204            0 :     }
    2205              : 
    2206            0 :     fn decode_logical_message_record(
    2207            0 :         buf: &mut Bytes,
    2208            0 :         decoded: &DecodedWALRecord,
    2209            0 :         _pg_version: u32,
    2210            0 :     ) -> anyhow::Result<Option<LogicalMessageRecord>> {
    2211            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    2212            0 :         if info == pg_constants::XLOG_LOGICAL_MESSAGE {
    2213            0 :             let xlrec = crate::walrecord::XlLogicalMessage::decode(buf);
    2214            0 :             let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
    2215              : 
    2216              :             #[cfg(feature = "testing")]
    2217            0 :             if prefix == "neon-test" {
    2218            0 :                 return Ok(Some(LogicalMessageRecord::Failpoint));
    2219            0 :             }
    2220              : 
    2221            0 :             if let Some(path) = prefix.strip_prefix("neon-file:") {
    2222            0 :                 let buf_size = xlrec.prefix_size + xlrec.message_size;
    2223            0 :                 let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
    2224            0 :                 return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage {
    2225            0 :                     path: path.to_string(),
    2226            0 :                     buf,
    2227            0 :                 })));
    2228            0 :             }
    2229            0 :         }
    2230              : 
    2231            0 :         Ok(None)
    2232            0 :     }
    2233              : 
    2234           16 :     fn decode_standby_record(
    2235           16 :         buf: &mut Bytes,
    2236           16 :         decoded: &DecodedWALRecord,
    2237           16 :         _pg_version: u32,
    2238           16 :     ) -> anyhow::Result<Option<StandbyRecord>> {
    2239           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    2240           16 :         if info == pg_constants::XLOG_RUNNING_XACTS {
    2241            0 :             let xlrec = crate::walrecord::XlRunningXacts::decode(buf);
    2242            0 :             return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts {
    2243            0 :                 oldest_running_xid: xlrec.oldest_running_xid,
    2244            0 :             })));
    2245           16 :         }
    2246           16 : 
    2247           16 :         Ok(None)
    2248           16 :     }
    2249              : 
    2250            0 :     fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
    2251            0 :         match record {
    2252            0 :             StandbyRecord::RunningXacts(running_xacts) => {
    2253            0 :                 enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    2254            0 :                     cp.oldestActiveXid = running_xacts.oldest_running_xid;
    2255            0 :                 });
    2256              : 
    2257            0 :                 self.checkpoint_modified = true;
    2258            0 :             }
    2259            0 :         }
    2260            0 : 
    2261            0 :         Ok(())
    2262            0 :     }
    2263              : 
    2264            0 :     fn decode_replorigin_record(
    2265            0 :         buf: &mut Bytes,
    2266            0 :         decoded: &DecodedWALRecord,
    2267            0 :         _pg_version: u32,
    2268            0 :     ) -> anyhow::Result<Option<ReploriginRecord>> {
    2269            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    2270            0 :         if info == pg_constants::XLOG_REPLORIGIN_SET {
    2271            0 :             let xlrec = crate::walrecord::XlReploriginSet::decode(buf);
    2272            0 :             return Ok(Some(ReploriginRecord::Set(xlrec)));
    2273            0 :         } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
    2274            0 :             let xlrec = crate::walrecord::XlReploriginDrop::decode(buf);
    2275            0 :             return Ok(Some(ReploriginRecord::Drop(xlrec)));
    2276            0 :         }
    2277            0 : 
    2278            0 :         Ok(None)
    2279            0 :     }
    2280              : 
    2281            0 :     async fn ingest_replorigin_record(
    2282            0 :         &mut self,
    2283            0 :         record: ReploriginRecord,
    2284            0 :         modification: &mut DatadirModification<'_>,
    2285            0 :     ) -> Result<()> {
    2286            0 :         match record {
    2287            0 :             ReploriginRecord::Set(set) => {
    2288            0 :                 modification
    2289            0 :                     .set_replorigin(set.node_id, set.remote_lsn)
    2290            0 :                     .await?;
    2291              :             }
    2292            0 :             ReploriginRecord::Drop(drop) => {
    2293            0 :                 modification.drop_replorigin(drop.node_id).await?;
    2294              :             }
    2295              :         }
    2296              : 
    2297            0 :         Ok(())
    2298            0 :     }
    2299              : 
    2300           18 :     async fn put_rel_creation(
    2301           18 :         &mut self,
    2302           18 :         modification: &mut DatadirModification<'_>,
    2303           18 :         rel: RelTag,
    2304           18 :         ctx: &RequestContext,
    2305           18 :     ) -> Result<()> {
    2306           18 :         modification.put_rel_creation(rel, 0, ctx).await?;
    2307           18 :         Ok(())
    2308           18 :     }
    2309              : 
    2310       272426 :     async fn put_rel_page_image(
    2311       272426 :         &mut self,
    2312       272426 :         modification: &mut DatadirModification<'_>,
    2313       272426 :         rel: RelTag,
    2314       272426 :         blknum: BlockNumber,
    2315       272426 :         img: Bytes,
    2316       272426 :         ctx: &RequestContext,
    2317       272426 :     ) -> Result<(), PageReconstructError> {
    2318       272426 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    2319         5425 :             .await?;
    2320       272426 :         modification.put_rel_page_image(rel, blknum, img)?;
    2321       272426 :         Ok(())
    2322       272426 :     }
    2323              : 
    2324       145630 :     async fn put_rel_wal_record(
    2325       145630 :         &mut self,
    2326       145630 :         modification: &mut DatadirModification<'_>,
    2327       145630 :         rel: RelTag,
    2328       145630 :         blknum: BlockNumber,
    2329       145630 :         rec: NeonWalRecord,
    2330       145630 :         ctx: &RequestContext,
    2331       145630 :     ) -> Result<()> {
    2332       145630 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    2333          279 :             .await?;
    2334       145630 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    2335       145630 :         Ok(())
    2336       145630 :     }
    2337              : 
    2338         6012 :     async fn put_rel_truncation(
    2339         6012 :         &mut self,
    2340         6012 :         modification: &mut DatadirModification<'_>,
    2341         6012 :         rel: RelTag,
    2342         6012 :         nblocks: BlockNumber,
    2343         6012 :         ctx: &RequestContext,
    2344         6012 :     ) -> anyhow::Result<()> {
    2345         6012 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    2346         6012 :         Ok(())
    2347         6012 :     }
    2348              : 
    2349            2 :     async fn put_rel_drop(
    2350            2 :         &mut self,
    2351            2 :         modification: &mut DatadirModification<'_>,
    2352            2 :         rel: RelTag,
    2353            2 :         ctx: &RequestContext,
    2354            2 :     ) -> Result<()> {
    2355            2 :         modification.put_rel_drop(rel, ctx).await?;
    2356            2 :         Ok(())
    2357            2 :     }
    2358              : 
    2359       418056 :     async fn handle_rel_extend(
    2360       418056 :         &mut self,
    2361       418056 :         modification: &mut DatadirModification<'_>,
    2362       418056 :         rel: RelTag,
    2363       418056 :         blknum: BlockNumber,
    2364       418056 :         ctx: &RequestContext,
    2365       418056 :     ) -> Result<(), PageReconstructError> {
    2366       418056 :         let new_nblocks = blknum + 1;
    2367              :         // Check if the relation exists. We implicitly create relations on first
    2368              :         // record.
    2369              :         // TODO: would be nice if to be more explicit about it
    2370              : 
    2371              :         // Get current size and put rel creation if rel doesn't exist
    2372              :         //
    2373              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    2374              :         //       check the cache too. This is because eagerly checking the cache results in
    2375              :         //       less work overall and 10% better performance. It's more work on cache miss
    2376              :         //       but cache miss is rare.
    2377       418056 :         let old_nblocks = if let Some(nblocks) = modification
    2378       418056 :             .tline
    2379       418056 :             .get_cached_rel_size(&rel, modification.get_lsn())
    2380              :         {
    2381       418046 :             nblocks
    2382           10 :         } else if !modification
    2383           10 :             .tline
    2384           10 :             .get_rel_exists(rel, Version::Modified(modification), ctx)
    2385            2 :             .await?
    2386              :         {
    2387              :             // create it with 0 size initially, the logic below will extend it
    2388           10 :             modification
    2389           10 :                 .put_rel_creation(rel, 0, ctx)
    2390            3 :                 .await
    2391           10 :                 .context("Relation Error")?;
    2392           10 :             0
    2393              :         } else {
    2394            0 :             modification
    2395            0 :                 .tline
    2396            0 :                 .get_rel_size(rel, Version::Modified(modification), ctx)
    2397            0 :                 .await?
    2398              :         };
    2399              : 
    2400       418056 :         if new_nblocks > old_nblocks {
    2401              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    2402       274788 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    2403              : 
    2404       274788 :             let mut key = rel_block_to_key(rel, blknum);
    2405       274788 : 
    2406       274788 :             // fill the gap with zeros
    2407       274788 :             let mut gap_blocks_filled: u64 = 0;
    2408       274788 :             for gap_blknum in old_nblocks..blknum {
    2409         2998 :                 key.field6 = gap_blknum;
    2410         2998 : 
    2411         2998 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    2412            0 :                     continue;
    2413         2998 :                 }
    2414         2998 : 
    2415         2998 :                 modification.put_rel_page_image_zero(rel, gap_blknum)?;
    2416         2998 :                 gap_blocks_filled += 1;
    2417              :             }
    2418              : 
    2419       274788 :             WAL_INGEST
    2420       274788 :                 .gap_blocks_zeroed_on_rel_extend
    2421       274788 :                 .inc_by(gap_blocks_filled);
    2422       143268 :         }
    2423       418056 :         Ok(())
    2424       418056 :     }
    2425              : 
    2426            0 :     async fn put_slru_page_image(
    2427            0 :         &mut self,
    2428            0 :         modification: &mut DatadirModification<'_>,
    2429            0 :         kind: SlruKind,
    2430            0 :         segno: u32,
    2431            0 :         blknum: BlockNumber,
    2432            0 :         img: Bytes,
    2433            0 :         ctx: &RequestContext,
    2434            0 :     ) -> Result<()> {
    2435            0 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    2436            0 :             .await?;
    2437            0 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    2438            0 :         Ok(())
    2439            0 :     }
    2440              : 
    2441            0 :     async fn handle_slru_extend(
    2442            0 :         &mut self,
    2443            0 :         modification: &mut DatadirModification<'_>,
    2444            0 :         kind: SlruKind,
    2445            0 :         segno: u32,
    2446            0 :         blknum: BlockNumber,
    2447            0 :         ctx: &RequestContext,
    2448            0 :     ) -> anyhow::Result<()> {
    2449            0 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    2450            0 :         // extended with ZEROPAGE records, not with commit records, so it happens
    2451            0 :         // a lot less frequently.
    2452            0 : 
    2453            0 :         let new_nblocks = blknum + 1;
    2454              :         // Check if the relation exists. We implicitly create relations on first
    2455              :         // record.
    2456              :         // TODO: would be nice if to be more explicit about it
    2457            0 :         let old_nblocks = if !modification
    2458            0 :             .tline
    2459            0 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    2460            0 :             .await?
    2461              :         {
    2462              :             // create it with 0 size initially, the logic below will extend it
    2463            0 :             modification
    2464            0 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    2465            0 :                 .await?;
    2466            0 :             0
    2467              :         } else {
    2468            0 :             modification
    2469            0 :                 .tline
    2470            0 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    2471            0 :                 .await?
    2472              :         };
    2473              : 
    2474            0 :         if new_nblocks > old_nblocks {
    2475            0 :             trace!(
    2476            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    2477              :                 kind,
    2478              :                 segno,
    2479              :                 old_nblocks,
    2480              :                 new_nblocks
    2481              :             );
    2482            0 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    2483              : 
    2484              :             // fill the gap with zeros
    2485            0 :             for gap_blknum in old_nblocks..blknum {
    2486            0 :                 modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
    2487              :             }
    2488            0 :         }
    2489            0 :         Ok(())
    2490            0 :     }
    2491              : }
    2492              : 
    2493           12 : async fn get_relsize(
    2494           12 :     modification: &DatadirModification<'_>,
    2495           12 :     rel: RelTag,
    2496           12 :     ctx: &RequestContext,
    2497           12 : ) -> Result<BlockNumber, PageReconstructError> {
    2498           12 :     let nblocks = if !modification
    2499           12 :         .tline
    2500           12 :         .get_rel_exists(rel, Version::Modified(modification), ctx)
    2501            0 :         .await?
    2502              :     {
    2503            0 :         0
    2504              :     } else {
    2505           12 :         modification
    2506           12 :             .tline
    2507           12 :             .get_rel_size(rel, Version::Modified(modification), ctx)
    2508            0 :             .await?
    2509              :     };
    2510           12 :     Ok(nblocks)
    2511           12 : }
    2512              : 
    2513              : #[allow(clippy::bool_assert_comparison)]
    2514              : #[cfg(test)]
    2515              : mod tests {
    2516              :     use super::*;
    2517              :     use crate::tenant::harness::*;
    2518              :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    2519              :     use postgres_ffi::RELSEG_SIZE;
    2520              : 
    2521              :     use crate::DEFAULT_PG_VERSION;
    2522              : 
    2523              :     /// Arbitrary relation tag, for testing.
    2524              :     const TESTREL_A: RelTag = RelTag {
    2525              :         spcnode: 0,
    2526              :         dbnode: 111,
    2527              :         relnode: 1000,
    2528              :         forknum: 0,
    2529              :     };
    2530              : 
    2531           12 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    2532           12 :         // TODO
    2533           12 :     }
    2534              : 
    2535              :     #[tokio::test]
    2536            2 :     async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
    2537            8 :         for i in 14..=16 {
    2538            6 :             dispatch_pgversion!(i, {
    2539            2 :                 pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
    2540            2 :             });
    2541            2 :         }
    2542            2 : 
    2543            2 :         Ok(())
    2544            2 :     }
    2545              : 
    2546            8 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    2547            8 :         let mut m = tline.begin_modification(Lsn(0x10));
    2548            8 :         m.put_checkpoint(dispatch_pgversion!(
    2549            8 :             tline.pg_version,
    2550            0 :             pgv::ZERO_CHECKPOINT.clone()
    2551            0 :         ))?;
    2552           16 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    2553            8 :         m.commit(ctx).await?;
    2554            8 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    2555              : 
    2556            8 :         Ok(walingest)
    2557            8 :     }
    2558              : 
    2559              :     #[tokio::test]
    2560            2 :     async fn test_relsize() -> Result<()> {
    2561           10 :         let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
    2562            2 :         let tline = tenant
    2563            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2564            4 :             .await?;
    2565            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2566            2 : 
    2567            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    2568            2 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    2569            2 :         walingest
    2570            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    2571            2 :             .await?;
    2572            2 :         m.on_record_end();
    2573            2 :         m.commit(&ctx).await?;
    2574            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    2575            2 :         walingest
    2576            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
    2577            2 :             .await?;
    2578            2 :         m.on_record_end();
    2579            2 :         m.commit(&ctx).await?;
    2580            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    2581            2 :         walingest
    2582            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
    2583            2 :             .await?;
    2584            2 :         m.on_record_end();
    2585            2 :         m.commit(&ctx).await?;
    2586            2 :         let mut m = tline.begin_modification(Lsn(0x50));
    2587            2 :         walingest
    2588            2 :             .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
    2589            2 :             .await?;
    2590            2 :         m.on_record_end();
    2591            2 :         m.commit(&ctx).await?;
    2592            2 : 
    2593            2 :         assert_current_logical_size(&tline, Lsn(0x50));
    2594            2 : 
    2595            2 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    2596            2 :         assert_eq!(
    2597            2 :             tline
    2598            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2599            2 :                 .await?,
    2600            2 :             false
    2601            2 :         );
    2602            2 :         assert!(tline
    2603            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2604            2 :             .await
    2605            2 :             .is_err());
    2606            2 :         assert_eq!(
    2607            2 :             tline
    2608            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2609            2 :                 .await?,
    2610            2 :             true
    2611            2 :         );
    2612            2 :         assert_eq!(
    2613            2 :             tline
    2614            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2615            2 :                 .await?,
    2616            2 :             1
    2617            2 :         );
    2618            2 :         assert_eq!(
    2619            2 :             tline
    2620            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2621            2 :                 .await?,
    2622            2 :             3
    2623            2 :         );
    2624            2 : 
    2625            2 :         // Check page contents at each LSN
    2626            2 :         assert_eq!(
    2627            2 :             tline
    2628            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
    2629            2 :                 .await?,
    2630            2 :             test_img("foo blk 0 at 2")
    2631            2 :         );
    2632            2 : 
    2633            2 :         assert_eq!(
    2634            2 :             tline
    2635            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
    2636            2 :                 .await?,
    2637            2 :             test_img("foo blk 0 at 3")
    2638            2 :         );
    2639            2 : 
    2640            2 :         assert_eq!(
    2641            2 :             tline
    2642            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
    2643            2 :                 .await?,
    2644            2 :             test_img("foo blk 0 at 3")
    2645            2 :         );
    2646            2 :         assert_eq!(
    2647            2 :             tline
    2648            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
    2649            2 :                 .await?,
    2650            2 :             test_img("foo blk 1 at 4")
    2651            2 :         );
    2652            2 : 
    2653            2 :         assert_eq!(
    2654            2 :             tline
    2655            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
    2656            2 :                 .await?,
    2657            2 :             test_img("foo blk 0 at 3")
    2658            2 :         );
    2659            2 :         assert_eq!(
    2660            2 :             tline
    2661            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
    2662            2 :                 .await?,
    2663            2 :             test_img("foo blk 1 at 4")
    2664            2 :         );
    2665            2 :         assert_eq!(
    2666            2 :             tline
    2667            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    2668            2 :                 .await?,
    2669            2 :             test_img("foo blk 2 at 5")
    2670            2 :         );
    2671            2 : 
    2672            2 :         // Truncate last block
    2673            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    2674            2 :         walingest
    2675            2 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    2676            2 :             .await?;
    2677            2 :         m.commit(&ctx).await?;
    2678            2 :         assert_current_logical_size(&tline, Lsn(0x60));
    2679            2 : 
    2680            2 :         // Check reported size and contents after truncation
    2681            2 :         assert_eq!(
    2682            2 :             tline
    2683            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    2684            2 :                 .await?,
    2685            2 :             2
    2686            2 :         );
    2687            2 :         assert_eq!(
    2688            2 :             tline
    2689            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
    2690            2 :                 .await?,
    2691            2 :             test_img("foo blk 0 at 3")
    2692            2 :         );
    2693            2 :         assert_eq!(
    2694            2 :             tline
    2695            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
    2696            2 :                 .await?,
    2697            2 :             test_img("foo blk 1 at 4")
    2698            2 :         );
    2699            2 : 
    2700            2 :         // should still see the truncated block with older LSN
    2701            2 :         assert_eq!(
    2702            2 :             tline
    2703            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2704            2 :                 .await?,
    2705            2 :             3
    2706            2 :         );
    2707            2 :         assert_eq!(
    2708            2 :             tline
    2709            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    2710            2 :                 .await?,
    2711            2 :             test_img("foo blk 2 at 5")
    2712            2 :         );
    2713            2 : 
    2714            2 :         // Truncate to zero length
    2715            2 :         let mut m = tline.begin_modification(Lsn(0x68));
    2716            2 :         walingest
    2717            2 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    2718            2 :             .await?;
    2719            2 :         m.commit(&ctx).await?;
    2720            2 :         assert_eq!(
    2721            2 :             tline
    2722            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
    2723            2 :                 .await?,
    2724            2 :             0
    2725            2 :         );
    2726            2 : 
    2727            2 :         // Extend from 0 to 2 blocks, leaving a gap
    2728            2 :         let mut m = tline.begin_modification(Lsn(0x70));
    2729            2 :         walingest
    2730            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
    2731            2 :             .await?;
    2732            2 :         m.on_record_end();
    2733            2 :         m.commit(&ctx).await?;
    2734            2 :         assert_eq!(
    2735            2 :             tline
    2736            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
    2737            2 :                 .await?,
    2738            2 :             2
    2739            2 :         );
    2740            2 :         assert_eq!(
    2741            2 :             tline
    2742            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
    2743            2 :                 .await?,
    2744            2 :             ZERO_PAGE
    2745            2 :         );
    2746            2 :         assert_eq!(
    2747            2 :             tline
    2748            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
    2749            2 :                 .await?,
    2750            2 :             test_img("foo blk 1")
    2751            2 :         );
    2752            2 : 
    2753            2 :         // Extend a lot more, leaving a big gap that spans across segments
    2754            2 :         let mut m = tline.begin_modification(Lsn(0x80));
    2755            2 :         walingest
    2756            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
    2757            2 :             .await?;
    2758            2 :         m.on_record_end();
    2759          190 :         m.commit(&ctx).await?;
    2760            2 :         assert_eq!(
    2761            2 :             tline
    2762            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2763            2 :                 .await?,
    2764            2 :             1501
    2765            2 :         );
    2766         2998 :         for blk in 2..1500 {
    2767         2996 :             assert_eq!(
    2768         2996 :                 tline
    2769         2996 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
    2770         1540 :                     .await?,
    2771         2996 :                 ZERO_PAGE
    2772            2 :             );
    2773            2 :         }
    2774            2 :         assert_eq!(
    2775            2 :             tline
    2776            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
    2777            2 :                 .await?,
    2778            2 :             test_img("foo blk 1500")
    2779            2 :         );
    2780            2 : 
    2781            2 :         Ok(())
    2782            2 :     }
    2783              : 
    2784              :     // Test what happens if we dropped a relation
    2785              :     // and then created it again within the same layer.
    2786              :     #[tokio::test]
    2787            2 :     async fn test_drop_extend() -> Result<()> {
    2788            2 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")
    2789            2 :             .await?
    2790            2 :             .load()
    2791           10 :             .await;
    2792            2 :         let tline = tenant
    2793            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2794            4 :             .await?;
    2795            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2796            2 : 
    2797            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    2798            2 :         walingest
    2799            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    2800            2 :             .await?;
    2801            2 :         m.commit(&ctx).await?;
    2802            2 : 
    2803            2 :         // Check that rel exists and size is correct
    2804            2 :         assert_eq!(
    2805            2 :             tline
    2806            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2807            2 :                 .await?,
    2808            2 :             true
    2809            2 :         );
    2810            2 :         assert_eq!(
    2811            2 :             tline
    2812            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2813            2 :                 .await?,
    2814            2 :             1
    2815            2 :         );
    2816            2 : 
    2817            2 :         // Drop rel
    2818            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    2819            2 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    2820            2 :         m.commit(&ctx).await?;
    2821            2 : 
    2822            2 :         // Check that rel is not visible anymore
    2823            2 :         assert_eq!(
    2824            2 :             tline
    2825            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
    2826            2 :                 .await?,
    2827            2 :             false
    2828            2 :         );
    2829            2 : 
    2830            2 :         // FIXME: should fail
    2831            2 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    2832            2 : 
    2833            2 :         // Re-create it
    2834            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    2835            2 :         walingest
    2836            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
    2837            2 :             .await?;
    2838            2 :         m.commit(&ctx).await?;
    2839            2 : 
    2840            2 :         // Check that rel exists and size is correct
    2841            2 :         assert_eq!(
    2842            2 :             tline
    2843            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2844            2 :                 .await?,
    2845            2 :             true
    2846            2 :         );
    2847            2 :         assert_eq!(
    2848            2 :             tline
    2849            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2850            2 :                 .await?,
    2851            2 :             1
    2852            2 :         );
    2853            2 : 
    2854            2 :         Ok(())
    2855            2 :     }
    2856              : 
    2857              :     // Test what happens if we truncated a relation
    2858              :     // so that one of its segments was dropped
    2859              :     // and then extended it again within the same layer.
    2860              :     #[tokio::test]
    2861            2 :     async fn test_truncate_extend() -> Result<()> {
    2862            2 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
    2863            2 :             .await?
    2864            2 :             .load()
    2865           10 :             .await;
    2866            2 :         let tline = tenant
    2867            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2868            4 :             .await?;
    2869            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2870            2 : 
    2871            2 :         // Create a 20 MB relation (the size is arbitrary)
    2872            2 :         let relsize = 20 * 1024 * 1024 / 8192;
    2873            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    2874         5120 :         for blkno in 0..relsize {
    2875         5120 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    2876         5120 :             walingest
    2877         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2878            2 :                 .await?;
    2879            2 :         }
    2880            2 :         m.commit(&ctx).await?;
    2881            2 : 
    2882            2 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    2883            2 :         assert_eq!(
    2884            2 :             tline
    2885            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2886            2 :                 .await?,
    2887            2 :             false
    2888            2 :         );
    2889            2 :         assert!(tline
    2890            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2891            2 :             .await
    2892            2 :             .is_err());
    2893            2 : 
    2894            2 :         assert_eq!(
    2895            2 :             tline
    2896            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2897            2 :                 .await?,
    2898            2 :             true
    2899            2 :         );
    2900            2 :         assert_eq!(
    2901            2 :             tline
    2902            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2903            2 :                 .await?,
    2904            2 :             relsize
    2905            2 :         );
    2906            2 : 
    2907            2 :         // Check relation content
    2908         5120 :         for blkno in 0..relsize {
    2909         5120 :             let lsn = Lsn(0x20);
    2910         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2911         5120 :             assert_eq!(
    2912         5120 :                 tline
    2913         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
    2914         1803 :                     .await?,
    2915         5120 :                 test_img(&data)
    2916            2 :             );
    2917            2 :         }
    2918            2 : 
    2919            2 :         // Truncate relation so that second segment was dropped
    2920            2 :         // - only leave one page
    2921            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    2922            2 :         walingest
    2923            2 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2924            2 :             .await?;
    2925            2 :         m.commit(&ctx).await?;
    2926            2 : 
    2927            2 :         // Check reported size and contents after truncation
    2928            2 :         assert_eq!(
    2929            2 :             tline
    2930            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    2931            2 :                 .await?,
    2932            2 :             1
    2933            2 :         );
    2934            2 : 
    2935            4 :         for blkno in 0..1 {
    2936            2 :             let lsn = Lsn(0x20);
    2937            2 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2938            2 :             assert_eq!(
    2939            2 :                 tline
    2940            2 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
    2941            2 :                     .await?,
    2942            2 :                 test_img(&data)
    2943            2 :             );
    2944            2 :         }
    2945            2 : 
    2946            2 :         // should still see all blocks with older LSN
    2947            2 :         assert_eq!(
    2948            2 :             tline
    2949            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2950            2 :                 .await?,
    2951            2 :             relsize
    2952            2 :         );
    2953         5120 :         for blkno in 0..relsize {
    2954         5120 :             let lsn = Lsn(0x20);
    2955         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2956         5120 :             assert_eq!(
    2957         5120 :                 tline
    2958         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
    2959         1856 :                     .await?,
    2960         5120 :                 test_img(&data)
    2961            2 :             );
    2962            2 :         }
    2963            2 : 
    2964            2 :         // Extend relation again.
    2965            2 :         // Add enough blocks to create second segment
    2966            2 :         let lsn = Lsn(0x80);
    2967            2 :         let mut m = tline.begin_modification(lsn);
    2968         5120 :         for blkno in 0..relsize {
    2969         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2970         5120 :             walingest
    2971         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2972            2 :                 .await?;
    2973            2 :         }
    2974            3 :         m.commit(&ctx).await?;
    2975            2 : 
    2976            2 :         assert_eq!(
    2977            2 :             tline
    2978            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2979            2 :                 .await?,
    2980            2 :             true
    2981            2 :         );
    2982            2 :         assert_eq!(
    2983            2 :             tline
    2984            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2985            2 :                 .await?,
    2986            2 :             relsize
    2987            2 :         );
    2988            2 :         // Check relation content
    2989         5120 :         for blkno in 0..relsize {
    2990         5120 :             let lsn = Lsn(0x80);
    2991         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2992         5120 :             assert_eq!(
    2993         5120 :                 tline
    2994         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
    2995         1828 :                     .await?,
    2996         5120 :                 test_img(&data)
    2997            2 :             );
    2998            2 :         }
    2999            2 : 
    3000            2 :         Ok(())
    3001            2 :     }
    3002              : 
    3003              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    3004              :     /// split into multiple 1 GB segments in Postgres.
    3005              :     #[tokio::test]
    3006            2 :     async fn test_large_rel() -> Result<()> {
    3007           10 :         let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
    3008            2 :         let tline = tenant
    3009            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    3010            4 :             .await?;
    3011            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    3012            2 : 
    3013            2 :         let mut lsn = 0x10;
    3014       262146 :         for blknum in 0..RELSEG_SIZE + 1 {
    3015       262146 :             lsn += 0x10;
    3016       262146 :             let mut m = tline.begin_modification(Lsn(lsn));
    3017       262146 :             let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    3018       262146 :             walingest
    3019       262146 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    3020         5420 :                 .await?;
    3021       262146 :             m.commit(&ctx).await?;
    3022            2 :         }
    3023            2 : 
    3024            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    3025            2 : 
    3026            2 :         assert_eq!(
    3027            2 :             tline
    3028            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    3029            2 :                 .await?,
    3030            2 :             RELSEG_SIZE + 1
    3031            2 :         );
    3032            2 : 
    3033            2 :         // Truncate one block
    3034            2 :         lsn += 0x10;
    3035            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    3036            2 :         walingest
    3037            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    3038            2 :             .await?;
    3039            2 :         m.commit(&ctx).await?;
    3040            2 :         assert_eq!(
    3041            2 :             tline
    3042            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    3043            2 :                 .await?,
    3044            2 :             RELSEG_SIZE
    3045            2 :         );
    3046            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    3047            2 : 
    3048            2 :         // Truncate another block
    3049            2 :         lsn += 0x10;
    3050            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    3051            2 :         walingest
    3052            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    3053            2 :             .await?;
    3054            2 :         m.commit(&ctx).await?;
    3055            2 :         assert_eq!(
    3056            2 :             tline
    3057            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    3058            2 :                 .await?,
    3059            2 :             RELSEG_SIZE - 1
    3060            2 :         );
    3061            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    3062            2 : 
    3063            2 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    3064            2 :         // This tests the behavior at segment boundaries
    3065            2 :         let mut size: i32 = 3000;
    3066         6004 :         while size >= 0 {
    3067         6002 :             lsn += 0x10;
    3068         6002 :             let mut m = tline.begin_modification(Lsn(lsn));
    3069         6002 :             walingest
    3070         6002 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    3071            2 :                 .await?;
    3072         6002 :             m.commit(&ctx).await?;
    3073         6002 :             assert_eq!(
    3074         6002 :                 tline
    3075         6002 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    3076            2 :                     .await?,
    3077         6002 :                 size as BlockNumber
    3078            2 :             );
    3079            2 : 
    3080         6002 :             size -= 1;
    3081            2 :         }
    3082            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    3083            2 : 
    3084            2 :         Ok(())
    3085            2 :     }
    3086              : 
    3087              :     /// Replay a wal segment file taken directly from safekeepers.
    3088              :     ///
    3089              :     /// This test is useful for benchmarking since it allows us to profile only
    3090              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    3091              :     /// without waiting for unrelated steps.
    3092              :     #[tokio::test]
    3093            2 :     async fn test_ingest_real_wal() {
    3094            2 :         use crate::tenant::harness::*;
    3095            2 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    3096            2 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    3097            2 : 
    3098            2 :         // Define test data path and constants.
    3099            2 :         //
    3100            2 :         // Steps to reconstruct the data, if needed:
    3101            2 :         // 1. Run the pgbench python test
    3102            2 :         // 2. Take the first wal segment file from safekeeper
    3103            2 :         // 3. Compress it using `zstd --long input_file`
    3104            2 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    3105            2 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    3106            2 :         // 6. Run just the decoder from this test to get the endpoint.
    3107            2 :         //    It's the last LSN the decoder will output.
    3108            2 :         let pg_version = 15; // The test data was generated by pg15
    3109            2 :         let path = "test_data/sk_wal_segment_from_pgbench";
    3110            2 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    3111            2 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    3112            2 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    3113            2 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    3114            2 : 
    3115            2 :         let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
    3116            2 :         let span = harness
    3117            2 :             .span()
    3118            2 :             .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
    3119           10 :         let (tenant, ctx) = harness.load().await;
    3120            2 : 
    3121            2 :         let remote_initdb_path =
    3122            2 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    3123            2 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    3124            2 : 
    3125            2 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    3126            2 :             .expect("creating test dir should work");
    3127            2 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    3128            2 : 
    3129            2 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    3130            2 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    3131            2 :         // the garbage data.
    3132            2 :         let tline = tenant
    3133            2 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    3134        16662 :             .await
    3135            2 :             .unwrap();
    3136            2 : 
    3137            2 :         // We fully read and decompress this into memory before decoding
    3138            2 :         // to get a more accurate perf profile of the decoder.
    3139            2 :         let bytes = {
    3140            2 :             use async_compression::tokio::bufread::ZstdDecoder;
    3141            2 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    3142            2 :             let reader = tokio::io::BufReader::new(file);
    3143            2 :             let decoder = ZstdDecoder::new(reader);
    3144            2 :             let mut reader = tokio::io::BufReader::new(decoder);
    3145            2 :             let mut buffer = Vec::new();
    3146          224 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    3147            2 :             buffer
    3148            2 :         };
    3149            2 : 
    3150            2 :         // TODO start a profiler too
    3151            2 :         let started_at = std::time::Instant::now();
    3152            2 : 
    3153            2 :         // Initialize walingest
    3154            2 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    3155            2 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    3156            2 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    3157            5 :             .await
    3158            2 :             .unwrap();
    3159            2 :         let mut modification = tline.begin_modification(startpoint);
    3160            2 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    3161            2 : 
    3162            2 :         // Decode and ingest wal. We process the wal in chunks because
    3163            2 :         // that's what happens when we get bytes from safekeepers.
    3164       474686 :         for chunk in bytes[xlogoff..].chunks(50) {
    3165       474686 :             decoder.feed_bytes(chunk);
    3166       620536 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    3167       145850 :                 let mut decoded = DecodedWALRecord::default();
    3168       145850 :                 decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
    3169       145850 :                 walingest
    3170       145850 :                     .ingest_record(decoded, lsn, &mut modification, &ctx)
    3171       145850 :                     .instrument(span.clone())
    3172          296 :                     .await
    3173       145850 :                     .unwrap();
    3174            2 :             }
    3175       474686 :             modification.commit(&ctx).await.unwrap();
    3176            2 :         }
    3177            2 : 
    3178            2 :         let duration = started_at.elapsed();
    3179            2 :         println!("done in {:?}", duration);
    3180            2 :     }
    3181              : }
        

Generated by: LCOV version 2.1-beta