LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 49.8 % 2090 1041
Test Date: 2024-10-22 22:13:45 Functions: 64.8 % 71 46

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  ->   WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9              : //! and decodes it to individual WAL records. It feeds the WAL records
      10              : //! to WalIngest, which parses them and stores them in the Repository.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14              : //! page images out of some WAL records, but most it stores as WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_wal_record or put_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use std::sync::Arc;
      25              : use std::sync::OnceLock;
      26              : use std::time::Duration;
      27              : use std::time::Instant;
      28              : use std::time::SystemTime;
      29              : 
      30              : use pageserver_api::shard::ShardIdentity;
      31              : use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
      32              : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      33              : 
      34              : use anyhow::{bail, Context, Result};
      35              : use bytes::{Buf, Bytes, BytesMut};
      36              : use tracing::*;
      37              : use utils::failpoint_support;
      38              : use utils::rate_limit::RateLimit;
      39              : 
      40              : use crate::context::RequestContext;
      41              : use crate::metrics::WAL_INGEST;
      42              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      43              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      44              : use crate::tenant::PageReconstructError;
      45              : use crate::tenant::Timeline;
      46              : use crate::walrecord::*;
      47              : use crate::ZERO_PAGE;
      48              : use pageserver_api::key::rel_block_to_key;
      49              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      50              : use postgres_ffi::pg_constants;
      51              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      52              : use postgres_ffi::TransactionId;
      53              : use postgres_ffi::BLCKSZ;
      54              : use utils::bin_ser::SerializeError;
      55              : use utils::lsn::Lsn;
      56              : 
      57              : enum_pgversion! {CheckPoint, pgv::CheckPoint}
      58              : 
      59              : impl CheckPoint {
      60            6 :     fn encode(&self) -> Result<Bytes, SerializeError> {
      61            6 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
      62            6 :     }
      63              : 
      64       145834 :     fn update_next_xid(&mut self, xid: u32) -> bool {
      65       145834 :         enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
      66       145834 :     }
      67              : 
      68            0 :     pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
      69            0 :         enum_pgversion_dispatch!(self, CheckPoint, cp, {
      70            0 :             cp.update_next_multixid(multi_xid, multi_offset)
      71              :         })
      72            0 :     }
      73              : }
      74              : 
      75              : /// Temporary limitation of WAL lag warnings after attach
      76              : ///
      77              : /// After tenant attach, we want to limit WAL lag warnings because
      78              : /// we don't look at the WAL until the attach is complete, which
      79              : /// might take a while.
      80              : pub struct WalLagCooldown {
      81              :     /// Until when should this limitation apply at all
      82              :     active_until: std::time::Instant,
      83              :     /// The maximum lag to suppress. Lags above this limit get reported anyways.
      84              :     max_lag: Duration,
      85              : }
      86              : 
      87              : impl WalLagCooldown {
      88            0 :     pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
      89            0 :         Self {
      90            0 :             active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
      91            0 :             max_lag: attach_duration * 2 + Duration::from_secs(60),
      92            0 :         }
      93            0 :     }
      94              : }
      95              : 
      96              : pub struct WalIngest {
      97              :     attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
      98              :     shard: ShardIdentity,
      99              :     checkpoint: CheckPoint,
     100              :     checkpoint_modified: bool,
     101              :     warn_ingest_lag: WarnIngestLag,
     102              : }
     103              : 
     104              : struct WarnIngestLag {
     105              :     lag_msg_ratelimit: RateLimit,
     106              :     future_lsn_msg_ratelimit: RateLimit,
     107              :     timestamp_invalid_msg_ratelimit: RateLimit,
     108              : }
     109              : 
     110              : impl WalIngest {
     111           12 :     pub async fn new(
     112           12 :         timeline: &Timeline,
     113           12 :         startpoint: Lsn,
     114           12 :         ctx: &RequestContext,
     115           12 :     ) -> anyhow::Result<WalIngest> {
     116              :         // Fetch the latest checkpoint into memory, so that we can compare with it
     117              :         // quickly in `ingest_record` and update it when it changes.
     118           12 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
     119           12 :         let pgversion = timeline.pg_version;
     120              : 
     121           12 :         let checkpoint = dispatch_pgversion!(pgversion, {
     122            0 :             let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
     123            0 :             trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
     124            0 :             <pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
     125              :         });
     126              : 
     127           12 :         Ok(WalIngest {
     128           12 :             shard: *timeline.get_shard_identity(),
     129           12 :             checkpoint,
     130           12 :             checkpoint_modified: false,
     131           12 :             attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
     132           12 :             warn_ingest_lag: WarnIngestLag {
     133           12 :                 lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     134           12 :                 future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     135           12 :                 timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
     136           12 :             },
     137           12 :         })
     138           12 :     }
     139              : 
     140              :     ///
     141              :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
     142              :     ///
     143              :     /// This function updates `lsn` field of `DatadirModification`
     144              :     ///
     145              :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
     146              :     /// relations/pages that the record affects.
     147              :     ///
     148              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
     149              :     ///
     150       145852 :     pub async fn ingest_record(
     151       145852 :         &mut self,
     152       145852 :         decoded: DecodedWALRecord,
     153       145852 :         lsn: Lsn,
     154       145852 :         modification: &mut DatadirModification<'_>,
     155       145852 :         ctx: &RequestContext,
     156       145852 :     ) -> anyhow::Result<bool> {
     157       145852 :         WAL_INGEST.records_received.inc();
     158       145852 :         let pg_version = modification.tline.pg_version;
     159       145852 :         let prev_len = modification.len();
     160       145852 : 
     161       145852 :         modification.set_lsn(lsn)?;
     162              : 
     163       145852 :         if decoded.is_dbase_create_copy(pg_version) {
     164              :             // Records of this type should always be preceded by a commit(), as they
     165              :             // rely on reading data pages back from the Timeline.
     166            0 :             assert!(!modification.has_dirty_data_pages());
     167       145852 :         }
     168              : 
     169       145852 :         let mut buf = decoded.record.clone();
     170       145852 :         buf.advance(decoded.main_data_offset);
     171       145852 : 
     172       145852 :         assert!(!self.checkpoint_modified);
     173       145852 :         if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
     174       145834 :             && self.checkpoint.update_next_xid(decoded.xl_xid)
     175            2 :         {
     176            2 :             self.checkpoint_modified = true;
     177       145850 :         }
     178              : 
     179       145852 :         failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
     180              : 
     181       145852 :         match decoded.xl_rmid {
     182              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
     183              :                 // Heap AM records need some special handling, because they modify VM pages
     184              :                 // without registering them with the standard mechanism.
     185       145474 :                 self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
     186            0 :                     .await?;
     187              :             }
     188              :             pg_constants::RM_NEON_ID => {
     189            0 :                 self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
     190            0 :                     .await?;
     191              :             }
     192              :             // Handle other special record types
     193              :             pg_constants::RM_SMGR_ID => {
     194           16 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     195           16 : 
     196           16 :                 if info == pg_constants::XLOG_SMGR_CREATE {
     197           16 :                     let create = XlSmgrCreate::decode(&mut buf);
     198           16 :                     self.ingest_xlog_smgr_create(modification, &create, ctx)
     199           12 :                         .await?;
     200            0 :                 } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     201            0 :                     let truncate = XlSmgrTruncate::decode(&mut buf);
     202            0 :                     self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
     203            0 :                         .await?;
     204            0 :                 }
     205              :             }
     206              :             pg_constants::RM_DBASE_ID => {
     207            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     208            0 :                 debug!(%info, %pg_version, "handle RM_DBASE_ID");
     209              : 
     210            0 :                 if pg_version == 14 {
     211            0 :                     if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     212            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     213            0 :                         debug!("XLOG_DBASE_CREATE v14");
     214              : 
     215            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     216            0 :                             .await?;
     217            0 :                     } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     218            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     219            0 :                         for tablespace_id in dropdb.tablespace_ids {
     220            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     221            0 :                             modification
     222            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     223            0 :                                 .await?;
     224              :                         }
     225            0 :                     }
     226            0 :                 } else if pg_version == 15 {
     227            0 :                     if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     228            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     229            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     230              :                         // The XLOG record was renamed between v14 and v15,
     231              :                         // but the record format is the same.
     232              :                         // So we can reuse XlCreateDatabase here.
     233            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     234            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     235            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     236            0 :                             .await?;
     237            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     238            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     239            0 :                         for tablespace_id in dropdb.tablespace_ids {
     240            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     241            0 :                             modification
     242            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     243            0 :                                 .await?;
     244              :                         }
     245            0 :                     }
     246            0 :                 } else if pg_version == 16 {
     247            0 :                     if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     248            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     249            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     250              :                         // The XLOG record was renamed between v14 and v15,
     251              :                         // but the record format is the same.
     252              :                         // So we can reuse XlCreateDatabase here.
     253            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     254            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     255            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     256            0 :                             .await?;
     257            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     258            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     259            0 :                         for tablespace_id in dropdb.tablespace_ids {
     260            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     261            0 :                             modification
     262            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     263            0 :                                 .await?;
     264              :                         }
     265            0 :                     }
     266            0 :                 } else if pg_version == 17 {
     267            0 :                     if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     268            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     269            0 :                     } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     270              :                         // The XLOG record was renamed between v14 and v15,
     271              :                         // but the record format is the same.
     272              :                         // So we can reuse XlCreateDatabase here.
     273            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     274            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     275            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     276            0 :                             .await?;
     277            0 :                     } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
     278            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     279            0 :                         for tablespace_id in dropdb.tablespace_ids {
     280            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     281            0 :                             modification
     282            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     283            0 :                                 .await?;
     284              :                         }
     285            0 :                     }
     286            0 :                 }
     287              :             }
     288              :             pg_constants::RM_TBLSPC_ID => {
     289            0 :                 trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     290              :             }
     291              :             pg_constants::RM_CLOG_ID => {
     292            0 :                 let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     293            0 : 
     294            0 :                 if info == pg_constants::CLOG_ZEROPAGE {
     295            0 :                     let pageno = if pg_version < 17 {
     296            0 :                         buf.get_u32_le()
     297              :                     } else {
     298            0 :                         buf.get_u64_le() as u32
     299              :                     };
     300            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     301            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     302            0 :                     self.put_slru_page_image(
     303            0 :                         modification,
     304            0 :                         SlruKind::Clog,
     305            0 :                         segno,
     306            0 :                         rpageno,
     307            0 :                         ZERO_PAGE.clone(),
     308            0 :                         ctx,
     309            0 :                     )
     310            0 :                     .await?;
     311              :                 } else {
     312            0 :                     assert!(info == pg_constants::CLOG_TRUNCATE);
     313            0 :                     let xlrec = XlClogTruncate::decode(&mut buf, pg_version);
     314            0 :                     self.ingest_clog_truncate_record(modification, &xlrec, ctx)
     315            0 :                         .await?;
     316              :                 }
     317              :             }
     318              :             pg_constants::RM_XACT_ID => {
     319           24 :                 let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     320           24 : 
     321           24 :                 if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
     322            8 :                     let parsed_xact =
     323            8 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     324            8 :                     self.ingest_xact_record(
     325            8 :                         modification,
     326            8 :                         &parsed_xact,
     327            8 :                         info == pg_constants::XLOG_XACT_COMMIT,
     328            8 :                         decoded.origin_id,
     329            8 :                         ctx,
     330            8 :                     )
     331            0 :                     .await?;
     332           16 :                 } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
     333           16 :                     || info == pg_constants::XLOG_XACT_ABORT_PREPARED
     334              :                 {
     335            0 :                     let parsed_xact =
     336            0 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     337            0 :                     self.ingest_xact_record(
     338            0 :                         modification,
     339            0 :                         &parsed_xact,
     340            0 :                         info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
     341            0 :                         decoded.origin_id,
     342            0 :                         ctx,
     343            0 :                     )
     344            0 :                     .await?;
     345              :                     // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     346            0 :                     trace!(
     347            0 :                         "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     348              :                         decoded.xl_xid,
     349              :                         parsed_xact.xid,
     350              :                         lsn,
     351              :                     );
     352              : 
     353            0 :                     let xid: u64 = if pg_version >= 17 {
     354            0 :                         self.adjust_to_full_transaction_id(parsed_xact.xid)?
     355              :                     } else {
     356            0 :                         parsed_xact.xid as u64
     357              :                     };
     358            0 :                     modification.drop_twophase_file(xid, ctx).await?;
     359           16 :                 } else if info == pg_constants::XLOG_XACT_PREPARE {
     360            0 :                     let xid: u64 = if pg_version >= 17 {
     361            0 :                         self.adjust_to_full_transaction_id(decoded.xl_xid)?
     362              :                     } else {
     363            0 :                         decoded.xl_xid as u64
     364              :                     };
     365            0 :                     modification
     366            0 :                         .put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx)
     367            0 :                         .await?;
     368           16 :                 }
     369              :             }
     370              :             pg_constants::RM_MULTIXACT_ID => {
     371            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     372            0 : 
     373            0 :                 if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
     374            0 :                     let pageno = if pg_version < 17 {
     375            0 :                         buf.get_u32_le()
     376              :                     } else {
     377            0 :                         buf.get_u64_le() as u32
     378              :                     };
     379            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     380            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     381            0 :                     self.put_slru_page_image(
     382            0 :                         modification,
     383            0 :                         SlruKind::MultiXactOffsets,
     384            0 :                         segno,
     385            0 :                         rpageno,
     386            0 :                         ZERO_PAGE.clone(),
     387            0 :                         ctx,
     388            0 :                     )
     389            0 :                     .await?;
     390            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
     391            0 :                     let pageno = if pg_version < 17 {
     392            0 :                         buf.get_u32_le()
     393              :                     } else {
     394            0 :                         buf.get_u64_le() as u32
     395              :                     };
     396            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     397            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     398            0 :                     self.put_slru_page_image(
     399            0 :                         modification,
     400            0 :                         SlruKind::MultiXactMembers,
     401            0 :                         segno,
     402            0 :                         rpageno,
     403            0 :                         ZERO_PAGE.clone(),
     404            0 :                         ctx,
     405            0 :                     )
     406            0 :                     .await?;
     407            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     408            0 :                     let xlrec = XlMultiXactCreate::decode(&mut buf);
     409            0 :                     self.ingest_multixact_create_record(modification, &xlrec)?;
     410            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     411            0 :                     let xlrec = XlMultiXactTruncate::decode(&mut buf);
     412            0 :                     self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
     413            0 :                         .await?;
     414            0 :                 }
     415              :             }
     416              :             pg_constants::RM_RELMAP_ID => {
     417            0 :                 let xlrec = XlRelmapUpdate::decode(&mut buf);
     418            0 :                 self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
     419            0 :                     .await?;
     420              :             }
     421              :             pg_constants::RM_XLOG_ID => {
     422           30 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     423           30 : 
     424           30 :                 if info == pg_constants::XLOG_PARAMETER_CHANGE {
     425            2 :                     if let CheckPoint::V17(cp) = &mut self.checkpoint {
     426            0 :                         let rec = v17::XlParameterChange::decode(&mut buf);
     427            0 :                         cp.wal_level = rec.wal_level;
     428            0 :                         self.checkpoint_modified = true;
     429            2 :                     }
     430           28 :                 } else if info == pg_constants::XLOG_END_OF_RECOVERY {
     431            0 :                     if let CheckPoint::V17(cp) = &mut self.checkpoint {
     432            0 :                         let rec = v17::XlEndOfRecovery::decode(&mut buf);
     433            0 :                         cp.wal_level = rec.wal_level;
     434            0 :                         self.checkpoint_modified = true;
     435            0 :                     }
     436           28 :                 }
     437              : 
     438           30 :                 enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
     439            0 :                     if info == pg_constants::XLOG_NEXTOID {
     440            0 :                         let next_oid = buf.get_u32_le();
     441            0 :                         if cp.nextOid != next_oid {
     442            0 :                             cp.nextOid = next_oid;
     443            0 :                             self.checkpoint_modified = true;
     444            0 :                         }
     445            0 :                     } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
     446            0 :                         || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     447              :                     {
     448            0 :                         let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
     449            0 :                         buf.copy_to_slice(&mut checkpoint_bytes);
     450            0 :                         let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
     451            0 :                         trace!(
     452            0 :                             "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
     453              :                             xlog_checkpoint.oldestXid,
     454              :                             cp.oldestXid
     455              :                         );
     456            0 :                         if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
     457            0 :                             cp.oldestXid = xlog_checkpoint.oldestXid;
     458            0 :                         }
     459            0 :                         trace!(
     460            0 :                             "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
     461              :                             xlog_checkpoint.oldestActiveXid,
     462              :                             cp.oldestActiveXid
     463              :                         );
     464              : 
     465              :                         // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
     466              :                         // because at shutdown, all in-progress transactions will implicitly
     467              :                         // end. Postgres startup code knows that, and allows hot standby to start
     468              :                         // immediately from a shutdown checkpoint.
     469              :                         //
     470              :                         // In Neon, Postgres hot standby startup always behaves as if starting from
     471              :                         // an online checkpoint. It needs a valid `oldestActiveXid` value, so
     472              :                         // instead of overwriting self.checkpoint.oldestActiveXid with
     473              :                         // InvalidTransactionid from the checkpoint WAL record, update it to a
     474              :                         // proper value, knowing that there are no in-progress transactions at this
     475              :                         // point, except for prepared transactions.
     476              :                         //
     477              :                         // See also the neon code changes in the InitWalRecovery() function.
     478            0 :                         if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
     479            0 :                             && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     480              :                         {
     481            0 :                             let oldest_active_xid = if pg_version >= 17 {
     482            0 :                                 let mut oldest_active_full_xid = cp.nextXid.value;
     483            0 :                                 for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
     484            0 :                                     if xid < oldest_active_full_xid {
     485            0 :                                         oldest_active_full_xid = xid;
     486            0 :                                     }
     487              :                                 }
     488            0 :                                 oldest_active_full_xid as u32
     489              :                             } else {
     490            0 :                                 let mut oldest_active_xid = cp.nextXid.value as u32;
     491            0 :                                 for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
     492            0 :                                     let narrow_xid = xid as u32;
     493            0 :                                     if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
     494            0 :                                         oldest_active_xid = narrow_xid;
     495            0 :                                     }
     496              :                                 }
     497            0 :                                 oldest_active_xid
     498              :                             };
     499            0 :                             cp.oldestActiveXid = oldest_active_xid;
     500            0 :                         } else {
     501            0 :                             cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
     502            0 :                         }
     503              : 
     504              :                         // Write a new checkpoint key-value pair on every checkpoint record, even
     505              :                         // if nothing really changed. Not strictly required, but it seems nice to
     506              :                         // have some trace of the checkpoint records in the layer files at the same
     507              :                         // LSNs.
     508            0 :                         self.checkpoint_modified = true;
     509            0 :                     }
     510              :                 });
     511              :             }
     512              :             pg_constants::RM_LOGICALMSG_ID => {
     513            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     514            0 : 
     515            0 :                 if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     516            0 :                     let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
     517            0 :                     let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     518            0 :                     let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
     519            0 :                     if prefix == "neon-test" {
     520              :                         // This is a convenient way to make the WAL ingestion pause at
     521              :                         // particular point in the WAL. For more fine-grained control,
     522              :                         // we could peek into the message and only pause if it contains
     523              :                         // a particular string, for example, but this is enough for now.
     524            0 :                         failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
     525            0 :                     } else if let Some(path) = prefix.strip_prefix("neon-file:") {
     526            0 :                         modification.put_file(path, message, ctx).await?;
     527            0 :                     }
     528            0 :                 }
     529              :             }
     530              :             pg_constants::RM_STANDBY_ID => {
     531           16 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     532           16 :                 if info == pg_constants::XLOG_RUNNING_XACTS {
     533            0 :                     let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
     534              : 
     535            0 :                     enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
     536            0 :                         cp.oldestActiveXid = xlrec.oldest_running_xid;
     537            0 :                     });
     538              : 
     539            0 :                     self.checkpoint_modified = true;
     540           16 :                 }
     541              :             }
     542              :             pg_constants::RM_REPLORIGIN_ID => {
     543            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     544            0 :                 if info == pg_constants::XLOG_REPLORIGIN_SET {
     545            0 :                     let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
     546            0 :                     modification
     547            0 :                         .set_replorigin(xlrec.node_id, xlrec.remote_lsn)
     548            0 :                         .await?
     549            0 :                 } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
     550            0 :                     let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
     551            0 :                     modification.drop_replorigin(xlrec.node_id).await?
     552            0 :                 }
     553              :             }
     554          292 :             _x => {
     555          292 :                 // TODO: should probably log & fail here instead of blindly
     556          292 :                 // doing something without understanding the protocol
     557          292 :             }
     558              :         }
     559              : 
     560              :         // Iterate through all the blocks that the record modifies, and
     561              :         // "put" a separate copy of the record for each block.
     562       145852 :         for blk in decoded.blocks.iter() {
     563       145642 :             let rel = RelTag {
     564       145642 :                 spcnode: blk.rnode_spcnode,
     565       145642 :                 dbnode: blk.rnode_dbnode,
     566       145642 :                 relnode: blk.rnode_relnode,
     567       145642 :                 forknum: blk.forknum,
     568       145642 :             };
     569       145642 : 
     570       145642 :             let key = rel_block_to_key(rel, blk.blkno);
     571       145642 :             let key_is_local = self.shard.is_key_local(&key);
     572       145642 : 
     573       145642 :             tracing::debug!(
     574              :                 lsn=%lsn,
     575              :                 key=%key,
     576            0 :                 "ingest: shard decision {} (checkpoint={})",
     577            0 :                 if !key_is_local { "drop" } else { "keep" },
     578              :                 self.checkpoint_modified
     579              :             );
     580              : 
     581       145642 :             if !key_is_local {
     582            0 :                 if self.shard.is_shard_zero() {
     583              :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     584              :                     // its blkno in case it implicitly extends a relation.
     585            0 :                     self.observe_decoded_block(modification, blk, ctx).await?;
     586            0 :                 }
     587              : 
     588            0 :                 continue;
     589       145642 :             }
     590       145642 :             self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
     591          284 :                 .await?;
     592              :         }
     593              : 
     594              :         // If checkpoint data was updated, store the new version in the repository
     595       145852 :         if self.checkpoint_modified {
     596            6 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     597              : 
     598            6 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     599            6 :             self.checkpoint_modified = false;
     600       145846 :         }
     601              : 
     602              :         // Note that at this point this record is only cached in the modification
     603              :         // until commit() is called to flush the data into the repository and update
     604              :         // the latest LSN.
     605              : 
     606       145852 :         modification.on_record_end();
     607       145852 : 
     608       145852 :         Ok(modification.len() > prev_len)
     609       145852 :     }
     610              : 
     611              :     /// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
     612            0 :     fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
     613            0 :         let next_full_xid =
     614            0 :             enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
     615              : 
     616            0 :         let next_xid = (next_full_xid) as u32;
     617            0 :         let mut epoch = (next_full_xid >> 32) as u32;
     618            0 : 
     619            0 :         if xid > next_xid {
     620              :             // Wraparound occurred, must be from a prev epoch.
     621            0 :             if epoch == 0 {
     622            0 :                 bail!("apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}");
     623            0 :             }
     624            0 :             epoch -= 1;
     625            0 :         }
     626              : 
     627            0 :         Ok((epoch as u64) << 32 | xid as u64)
     628            0 :     }
     629              : 
     630              :     /// Do not store this block, but observe it for the purposes of updating our relation size state.
     631            0 :     async fn observe_decoded_block(
     632            0 :         &mut self,
     633            0 :         modification: &mut DatadirModification<'_>,
     634            0 :         blk: &DecodedBkpBlock,
     635            0 :         ctx: &RequestContext,
     636            0 :     ) -> Result<(), PageReconstructError> {
     637            0 :         let rel = RelTag {
     638            0 :             spcnode: blk.rnode_spcnode,
     639            0 :             dbnode: blk.rnode_dbnode,
     640            0 :             relnode: blk.rnode_relnode,
     641            0 :             forknum: blk.forknum,
     642            0 :         };
     643            0 :         self.handle_rel_extend(modification, rel, blk.blkno, ctx)
     644            0 :             .await
     645            0 :     }
     646              : 
     647       145642 :     async fn ingest_decoded_block(
     648       145642 :         &mut self,
     649       145642 :         modification: &mut DatadirModification<'_>,
     650       145642 :         lsn: Lsn,
     651       145642 :         decoded: &DecodedWALRecord,
     652       145642 :         blk: &DecodedBkpBlock,
     653       145642 :         ctx: &RequestContext,
     654       145642 :     ) -> Result<(), PageReconstructError> {
     655       145642 :         let rel = RelTag {
     656       145642 :             spcnode: blk.rnode_spcnode,
     657       145642 :             dbnode: blk.rnode_dbnode,
     658       145642 :             relnode: blk.rnode_relnode,
     659       145642 :             forknum: blk.forknum,
     660       145642 :         };
     661       145642 : 
     662       145642 :         //
     663       145642 :         // Instead of storing full-page-image WAL record,
     664       145642 :         // it is better to store extracted image: we can skip wal-redo
     665       145642 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     666       145642 :         // so them have to be copied multiple times.
     667       145642 :         //
     668       145642 :         if blk.apply_image
     669           60 :             && blk.has_image
     670           60 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     671           24 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     672            0 :             || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     673              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     674           24 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
     675              :             // do not materialize null pages because them most likely be soon replaced with real data
     676           24 :             && blk.bimg_len != 0
     677              :         {
     678              :             // Extract page image from FPI record
     679           24 :             let img_len = blk.bimg_len as usize;
     680           24 :             let img_offs = blk.bimg_offset as usize;
     681           24 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     682           24 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     683           24 : 
     684           24 :             if blk.hole_length != 0 {
     685            0 :                 let tail = image.split_off(blk.hole_offset as usize);
     686            0 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     687            0 :                 image.unsplit(tail);
     688           24 :             }
     689              :             //
     690              :             // Match the logic of XLogReadBufferForRedoExtended:
     691              :             // The page may be uninitialized. If so, we can't set the LSN because
     692              :             // that would corrupt the page.
     693              :             //
     694           24 :             if !page_is_new(&image) {
     695           18 :                 page_set_lsn(&mut image, lsn)
     696            6 :             }
     697           24 :             assert_eq!(image.len(), BLCKSZ as usize);
     698              : 
     699           24 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     700            5 :                 .await?;
     701              :         } else {
     702       145618 :             let rec = NeonWalRecord::Postgres {
     703       145618 :                 will_init: blk.will_init || blk.apply_image,
     704       145618 :                 rec: decoded.record.clone(),
     705       145618 :             };
     706       145618 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     707          279 :                 .await?;
     708              :         }
     709       145642 :         Ok(())
     710       145642 :     }
     711              : 
     712       145474 :     async fn ingest_heapam_record(
     713       145474 :         &mut self,
     714       145474 :         buf: &mut Bytes,
     715       145474 :         modification: &mut DatadirModification<'_>,
     716       145474 :         decoded: &DecodedWALRecord,
     717       145474 :         ctx: &RequestContext,
     718       145474 :     ) -> anyhow::Result<()> {
     719       145474 :         // Handle VM bit updates that are implicitly part of heap records.
     720       145474 : 
     721       145474 :         // First, look at the record to determine which VM bits need
     722       145474 :         // to be cleared. If either of these variables is set, we
     723       145474 :         // need to clear the corresponding bits in the visibility map.
     724       145474 :         let mut new_heap_blkno: Option<u32> = None;
     725       145474 :         let mut old_heap_blkno: Option<u32> = None;
     726       145474 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     727       145474 : 
     728       145474 :         match modification.tline.pg_version {
     729              :             14 => {
     730            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     731            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     732            0 : 
     733            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     734            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     735            0 :                         assert_eq!(0, buf.remaining());
     736            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     737            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     738            0 :                         }
     739            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     740            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     741            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     742            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     743            0 :                         }
     744            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     745            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     746              :                     {
     747            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     748            0 :                         // the size of tuple data is inferred from the size of the record.
     749            0 :                         // we can't validate the remaining number of bytes without parsing
     750            0 :                         // the tuple data.
     751            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     752            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     753            0 :                         }
     754            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     755            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     756            0 :                             // non-HOT update where the new tuple goes to different page than
     757            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     758            0 :                             // set.
     759            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     760            0 :                         }
     761            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     762            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     763            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     764            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     765            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     766            0 :                         }
     767            0 :                     }
     768            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     769            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     770            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     771            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     772              : 
     773            0 :                         let offset_array_len =
     774            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     775              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     776            0 :                                 0
     777              :                             } else {
     778            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     779              :                             };
     780            0 :                         assert_eq!(offset_array_len, buf.remaining());
     781              : 
     782            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     783            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     784            0 :                         }
     785            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     786            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     787            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     788            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     789            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     790            0 :                         }
     791            0 :                     }
     792              :                 } else {
     793            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     794              :                 }
     795              :             }
     796              :             15 => {
     797       145474 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     798       145286 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     799       145286 : 
     800       145286 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     801       145276 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     802       145276 :                         assert_eq!(0, buf.remaining());
     803       145276 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     804            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     805       145272 :                         }
     806           10 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     807            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     808            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     809            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     810            0 :                         }
     811           10 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     812            2 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     813              :                     {
     814            8 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     815            8 :                         // the size of tuple data is inferred from the size of the record.
     816            8 :                         // we can't validate the remaining number of bytes without parsing
     817            8 :                         // the tuple data.
     818            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     819            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     820            8 :                         }
     821            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     822            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     823            0 :                             // non-HOT update where the new tuple goes to different page than
     824            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     825            0 :                             // set.
     826            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     827            8 :                         }
     828            2 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     829            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     830            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     831            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     832            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     833            0 :                         }
     834            2 :                     }
     835          188 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     836          188 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     837          188 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     838           42 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     839              : 
     840           42 :                         let offset_array_len =
     841           42 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     842              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     843            2 :                                 0
     844              :                             } else {
     845           40 :                                 size_of::<u16>() * xlrec.ntuples as usize
     846              :                             };
     847           42 :                         assert_eq!(offset_array_len, buf.remaining());
     848              : 
     849           42 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     850            8 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     851           34 :                         }
     852          146 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     853            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     854            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     855            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     856            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     857            0 :                         }
     858          146 :                     }
     859              :                 } else {
     860            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     861              :                 }
     862              :             }
     863              :             16 => {
     864            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     865            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     866            0 : 
     867            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     868            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     869            0 :                         assert_eq!(0, buf.remaining());
     870            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     871            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     872            0 :                         }
     873            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     874            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     875            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     876            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     877            0 :                         }
     878            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     879            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     880              :                     {
     881            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     882            0 :                         // the size of tuple data is inferred from the size of the record.
     883            0 :                         // we can't validate the remaining number of bytes without parsing
     884            0 :                         // the tuple data.
     885            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     886            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     887            0 :                         }
     888            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     889            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     890            0 :                             // non-HOT update where the new tuple goes to different page than
     891            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     892            0 :                             // set.
     893            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     894            0 :                         }
     895            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     896            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     897            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     898            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     899            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     900            0 :                         }
     901            0 :                     }
     902            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     903            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     904            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     905            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     906              : 
     907            0 :                         let offset_array_len =
     908            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     909              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     910            0 :                                 0
     911              :                             } else {
     912            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     913              :                             };
     914            0 :                         assert_eq!(offset_array_len, buf.remaining());
     915              : 
     916            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     917            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     918            0 :                         }
     919            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     920            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     921            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     922            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     923            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     924            0 :                         }
     925            0 :                     }
     926              :                 } else {
     927            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     928              :                 }
     929              :             }
     930              :             17 => {
     931            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     932            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     933            0 : 
     934            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     935            0 :                         let xlrec = v17::XlHeapInsert::decode(buf);
     936            0 :                         assert_eq!(0, buf.remaining());
     937            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     938            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     939            0 :                         }
     940            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     941            0 :                         let xlrec = v17::XlHeapDelete::decode(buf);
     942            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     943            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     944            0 :                         }
     945            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     946            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     947              :                     {
     948            0 :                         let xlrec = v17::XlHeapUpdate::decode(buf);
     949            0 :                         // the size of tuple data is inferred from the size of the record.
     950            0 :                         // we can't validate the remaining number of bytes without parsing
     951            0 :                         // the tuple data.
     952            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     953            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     954            0 :                         }
     955            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     956            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     957            0 :                             // non-HOT update where the new tuple goes to different page than
     958            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     959            0 :                             // set.
     960            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     961            0 :                         }
     962            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     963            0 :                         let xlrec = v17::XlHeapLock::decode(buf);
     964            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     965            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     966            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     967            0 :                         }
     968            0 :                     }
     969            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     970            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     971            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     972            0 :                         let xlrec = v17::XlHeapMultiInsert::decode(buf);
     973              : 
     974            0 :                         let offset_array_len =
     975            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     976              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     977            0 :                                 0
     978              :                             } else {
     979            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     980              :                             };
     981            0 :                         assert_eq!(offset_array_len, buf.remaining());
     982              : 
     983            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     984            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     985            0 :                         }
     986            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     987            0 :                         let xlrec = v17::XlHeapLockUpdated::decode(buf);
     988            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     989            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     990            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     991            0 :                         }
     992            0 :                     }
     993              :                 } else {
     994            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     995              :                 }
     996              :             }
     997            0 :             _ => {}
     998              :         }
     999              : 
    1000              :         // Clear the VM bits if required.
    1001       145474 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
    1002           12 :             let vm_rel = RelTag {
    1003           12 :                 forknum: VISIBILITYMAP_FORKNUM,
    1004           12 :                 spcnode: decoded.blocks[0].rnode_spcnode,
    1005           12 :                 dbnode: decoded.blocks[0].rnode_dbnode,
    1006           12 :                 relnode: decoded.blocks[0].rnode_relnode,
    1007           12 :             };
    1008           12 : 
    1009           12 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
    1010           12 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
    1011              : 
    1012              :             // Sometimes, Postgres seems to create heap WAL records with the
    1013              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
    1014              :             // not set. In fact, it's possible that the VM page does not exist at all.
    1015              :             // In that case, we don't want to store a record to clear the VM bit;
    1016              :             // replaying it would fail to find the previous image of the page, because
    1017              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
    1018              :             // record if it doesn't.
    1019           12 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
    1020           12 :             if let Some(blknum) = new_vm_blk {
    1021           12 :                 if blknum >= vm_size {
    1022            0 :                     new_vm_blk = None;
    1023           12 :                 }
    1024            0 :             }
    1025           12 :             if let Some(blknum) = old_vm_blk {
    1026            0 :                 if blknum >= vm_size {
    1027            0 :                     old_vm_blk = None;
    1028            0 :                 }
    1029           12 :             }
    1030              : 
    1031           12 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
    1032           12 :                 if new_vm_blk == old_vm_blk {
    1033              :                     // An UPDATE record that needs to clear the bits for both old and the
    1034              :                     // new page, both of which reside on the same VM page.
    1035            0 :                     self.put_rel_wal_record(
    1036            0 :                         modification,
    1037            0 :                         vm_rel,
    1038            0 :                         new_vm_blk.unwrap(),
    1039            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
    1040            0 :                             new_heap_blkno,
    1041            0 :                             old_heap_blkno,
    1042            0 :                             flags,
    1043            0 :                         },
    1044            0 :                         ctx,
    1045            0 :                     )
    1046            0 :                     .await?;
    1047              :                 } else {
    1048              :                     // Clear VM bits for one heap page, or for two pages that reside on
    1049              :                     // different VM pages.
    1050           12 :                     if let Some(new_vm_blk) = new_vm_blk {
    1051           12 :                         self.put_rel_wal_record(
    1052           12 :                             modification,
    1053           12 :                             vm_rel,
    1054           12 :                             new_vm_blk,
    1055           12 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1056           12 :                                 new_heap_blkno,
    1057           12 :                                 old_heap_blkno: None,
    1058           12 :                                 flags,
    1059           12 :                             },
    1060           12 :                             ctx,
    1061           12 :                         )
    1062            0 :                         .await?;
    1063            0 :                     }
    1064           12 :                     if let Some(old_vm_blk) = old_vm_blk {
    1065            0 :                         self.put_rel_wal_record(
    1066            0 :                             modification,
    1067            0 :                             vm_rel,
    1068            0 :                             old_vm_blk,
    1069            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1070            0 :                                 new_heap_blkno: None,
    1071            0 :                                 old_heap_blkno,
    1072            0 :                                 flags,
    1073            0 :                             },
    1074            0 :                             ctx,
    1075            0 :                         )
    1076            0 :                         .await?;
    1077           12 :                     }
    1078              :                 }
    1079            0 :             }
    1080       145462 :         }
    1081              : 
    1082       145474 :         Ok(())
    1083       145474 :     }
    1084              : 
    1085            0 :     async fn ingest_neonrmgr_record(
    1086            0 :         &mut self,
    1087            0 :         buf: &mut Bytes,
    1088            0 :         modification: &mut DatadirModification<'_>,
    1089            0 :         decoded: &DecodedWALRecord,
    1090            0 :         ctx: &RequestContext,
    1091            0 :     ) -> anyhow::Result<()> {
    1092            0 :         // Handle VM bit updates that are implicitly part of heap records.
    1093            0 : 
    1094            0 :         // First, look at the record to determine which VM bits need
    1095            0 :         // to be cleared. If either of these variables is set, we
    1096            0 :         // need to clear the corresponding bits in the visibility map.
    1097            0 :         let mut new_heap_blkno: Option<u32> = None;
    1098            0 :         let mut old_heap_blkno: Option<u32> = None;
    1099            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
    1100            0 :         let pg_version = modification.tline.pg_version;
    1101            0 : 
    1102            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
    1103              : 
    1104            0 :         match pg_version {
    1105              :             16 | 17 => {
    1106            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1107            0 : 
    1108            0 :                 match info {
    1109              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
    1110            0 :                         let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
    1111            0 :                         assert_eq!(0, buf.remaining());
    1112            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
    1113            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1114            0 :                         }
    1115              :                     }
    1116              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
    1117            0 :                         let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
    1118            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
    1119            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1120            0 :                         }
    1121              :                     }
    1122              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
    1123              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
    1124            0 :                         let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
    1125            0 :                         // the size of tuple data is inferred from the size of the record.
    1126            0 :                         // we can't validate the remaining number of bytes without parsing
    1127            0 :                         // the tuple data.
    1128            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
    1129            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
    1130            0 :                         }
    1131            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
    1132            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
    1133            0 :                             // non-HOT update where the new tuple goes to different page than
    1134            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
    1135            0 :                             // set.
    1136            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1137            0 :                         }
    1138              :                     }
    1139              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
    1140            0 :                         let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
    1141              : 
    1142            0 :                         let offset_array_len =
    1143            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
    1144              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
    1145            0 :                                 0
    1146              :                             } else {
    1147            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
    1148              :                             };
    1149            0 :                         assert_eq!(offset_array_len, buf.remaining());
    1150              : 
    1151            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
    1152            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
    1153            0 :                         }
    1154              :                     }
    1155              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
    1156            0 :                         let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
    1157            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
    1158            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
    1159            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
    1160            0 :                         }
    1161              :                     }
    1162            0 :                     info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
    1163              :                 }
    1164              :             }
    1165            0 :             _ => bail!(
    1166            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
    1167            0 :                 pg_version
    1168            0 :             ),
    1169              :         }
    1170              : 
    1171              :         // Clear the VM bits if required.
    1172            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
    1173            0 :             let vm_rel = RelTag {
    1174            0 :                 forknum: VISIBILITYMAP_FORKNUM,
    1175            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
    1176            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
    1177            0 :                 relnode: decoded.blocks[0].rnode_relnode,
    1178            0 :             };
    1179            0 : 
    1180            0 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
    1181            0 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
    1182              : 
    1183              :             // Sometimes, Postgres seems to create heap WAL records with the
    1184              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
    1185              :             // not set. In fact, it's possible that the VM page does not exist at all.
    1186              :             // In that case, we don't want to store a record to clear the VM bit;
    1187              :             // replaying it would fail to find the previous image of the page, because
    1188              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
    1189              :             // record if it doesn't.
    1190            0 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
    1191            0 :             if let Some(blknum) = new_vm_blk {
    1192            0 :                 if blknum >= vm_size {
    1193            0 :                     new_vm_blk = None;
    1194            0 :                 }
    1195            0 :             }
    1196            0 :             if let Some(blknum) = old_vm_blk {
    1197            0 :                 if blknum >= vm_size {
    1198            0 :                     old_vm_blk = None;
    1199            0 :                 }
    1200            0 :             }
    1201              : 
    1202            0 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
    1203            0 :                 if new_vm_blk == old_vm_blk {
    1204              :                     // An UPDATE record that needs to clear the bits for both old and the
    1205              :                     // new page, both of which reside on the same VM page.
    1206            0 :                     self.put_rel_wal_record(
    1207            0 :                         modification,
    1208            0 :                         vm_rel,
    1209            0 :                         new_vm_blk.unwrap(),
    1210            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
    1211            0 :                             new_heap_blkno,
    1212            0 :                             old_heap_blkno,
    1213            0 :                             flags,
    1214            0 :                         },
    1215            0 :                         ctx,
    1216            0 :                     )
    1217            0 :                     .await?;
    1218              :                 } else {
    1219              :                     // Clear VM bits for one heap page, or for two pages that reside on
    1220              :                     // different VM pages.
    1221            0 :                     if let Some(new_vm_blk) = new_vm_blk {
    1222            0 :                         self.put_rel_wal_record(
    1223            0 :                             modification,
    1224            0 :                             vm_rel,
    1225            0 :                             new_vm_blk,
    1226            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1227            0 :                                 new_heap_blkno,
    1228            0 :                                 old_heap_blkno: None,
    1229            0 :                                 flags,
    1230            0 :                             },
    1231            0 :                             ctx,
    1232            0 :                         )
    1233            0 :                         .await?;
    1234            0 :                     }
    1235            0 :                     if let Some(old_vm_blk) = old_vm_blk {
    1236            0 :                         self.put_rel_wal_record(
    1237            0 :                             modification,
    1238            0 :                             vm_rel,
    1239            0 :                             old_vm_blk,
    1240            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1241            0 :                                 new_heap_blkno: None,
    1242            0 :                                 old_heap_blkno,
    1243            0 :                                 flags,
    1244            0 :                             },
    1245            0 :                             ctx,
    1246            0 :                         )
    1247            0 :                         .await?;
    1248            0 :                     }
    1249              :                 }
    1250            0 :             }
    1251            0 :         }
    1252              : 
    1253            0 :         Ok(())
    1254            0 :     }
    1255              : 
    1256              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
    1257            0 :     async fn ingest_xlog_dbase_create(
    1258            0 :         &mut self,
    1259            0 :         modification: &mut DatadirModification<'_>,
    1260            0 :         rec: &XlCreateDatabase,
    1261            0 :         ctx: &RequestContext,
    1262            0 :     ) -> anyhow::Result<()> {
    1263            0 :         let db_id = rec.db_id;
    1264            0 :         let tablespace_id = rec.tablespace_id;
    1265            0 :         let src_db_id = rec.src_db_id;
    1266            0 :         let src_tablespace_id = rec.src_tablespace_id;
    1267              : 
    1268            0 :         let rels = modification
    1269            0 :             .tline
    1270            0 :             .list_rels(
    1271            0 :                 src_tablespace_id,
    1272            0 :                 src_db_id,
    1273            0 :                 Version::Modified(modification),
    1274            0 :                 ctx,
    1275            0 :             )
    1276            0 :             .await?;
    1277              : 
    1278            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
    1279              : 
    1280              :         // Copy relfilemap
    1281            0 :         let filemap = modification
    1282            0 :             .tline
    1283            0 :             .get_relmap_file(
    1284            0 :                 src_tablespace_id,
    1285            0 :                 src_db_id,
    1286            0 :                 Version::Modified(modification),
    1287            0 :                 ctx,
    1288            0 :             )
    1289            0 :             .await?;
    1290            0 :         modification
    1291            0 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
    1292            0 :             .await?;
    1293              : 
    1294            0 :         let mut num_rels_copied = 0;
    1295            0 :         let mut num_blocks_copied = 0;
    1296            0 :         for src_rel in rels {
    1297            0 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
    1298            0 :             assert_eq!(src_rel.dbnode, src_db_id);
    1299              : 
    1300            0 :             let nblocks = modification
    1301            0 :                 .tline
    1302            0 :                 .get_rel_size(src_rel, Version::Modified(modification), ctx)
    1303            0 :                 .await?;
    1304            0 :             let dst_rel = RelTag {
    1305            0 :                 spcnode: tablespace_id,
    1306            0 :                 dbnode: db_id,
    1307            0 :                 relnode: src_rel.relnode,
    1308            0 :                 forknum: src_rel.forknum,
    1309            0 :             };
    1310            0 : 
    1311            0 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
    1312              : 
    1313              :             // Copy content
    1314            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
    1315            0 :             for blknum in 0..nblocks {
    1316              :                 // Sharding:
    1317              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
    1318              :                 //    dbNode is not included in the hash inputs for sharding.
    1319              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
    1320              :                 //    that belong to it.
    1321            0 :                 let src_key = rel_block_to_key(src_rel, blknum);
    1322            0 :                 if !self.shard.is_key_local(&src_key) {
    1323            0 :                     debug!(
    1324            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
    1325              :                         src_key
    1326              :                     );
    1327            0 :                     continue;
    1328            0 :                 }
    1329            0 :                 debug!(
    1330            0 :                     "copying block {} from {} ({}) to {}",
    1331              :                     blknum, src_rel, src_key, dst_rel
    1332              :                 );
    1333              : 
    1334            0 :                 let content = modification
    1335            0 :                     .tline
    1336            0 :                     .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
    1337            0 :                     .await?;
    1338            0 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
    1339            0 :                 num_blocks_copied += 1;
    1340              :             }
    1341              : 
    1342            0 :             num_rels_copied += 1;
    1343              :         }
    1344              : 
    1345            0 :         info!(
    1346            0 :             "Created database {}/{}, copied {} blocks in {} rels",
    1347              :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
    1348              :         );
    1349            0 :         Ok(())
    1350            0 :     }
    1351              : 
    1352           16 :     async fn ingest_xlog_smgr_create(
    1353           16 :         &mut self,
    1354           16 :         modification: &mut DatadirModification<'_>,
    1355           16 :         rec: &XlSmgrCreate,
    1356           16 :         ctx: &RequestContext,
    1357           16 :     ) -> anyhow::Result<()> {
    1358           16 :         let rel = RelTag {
    1359           16 :             spcnode: rec.rnode.spcnode,
    1360           16 :             dbnode: rec.rnode.dbnode,
    1361           16 :             relnode: rec.rnode.relnode,
    1362           16 :             forknum: rec.forknum,
    1363           16 :         };
    1364           16 :         self.put_rel_creation(modification, rel, ctx).await?;
    1365           16 :         Ok(())
    1366           16 :     }
    1367              : 
    1368              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
    1369              :     ///
    1370              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
    1371            0 :     async fn ingest_xlog_smgr_truncate(
    1372            0 :         &mut self,
    1373            0 :         modification: &mut DatadirModification<'_>,
    1374            0 :         rec: &XlSmgrTruncate,
    1375            0 :         ctx: &RequestContext,
    1376            0 :     ) -> anyhow::Result<()> {
    1377            0 :         let spcnode = rec.rnode.spcnode;
    1378            0 :         let dbnode = rec.rnode.dbnode;
    1379            0 :         let relnode = rec.rnode.relnode;
    1380            0 : 
    1381            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
    1382            0 :             let rel = RelTag {
    1383            0 :                 spcnode,
    1384            0 :                 dbnode,
    1385            0 :                 relnode,
    1386            0 :                 forknum: MAIN_FORKNUM,
    1387            0 :             };
    1388            0 :             self.put_rel_truncation(modification, rel, rec.blkno, ctx)
    1389            0 :                 .await?;
    1390            0 :         }
    1391            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
    1392            0 :             let rel = RelTag {
    1393            0 :                 spcnode,
    1394            0 :                 dbnode,
    1395            0 :                 relnode,
    1396            0 :                 forknum: FSM_FORKNUM,
    1397            0 :             };
    1398            0 : 
    1399            0 :             let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
    1400            0 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
    1401            0 :             if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
    1402              :                 // Tail of last remaining FSM page has to be zeroed.
    1403              :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
    1404            0 :                 modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
    1405            0 :                 fsm_physical_page_no += 1;
    1406            0 :             }
    1407            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1408            0 :             if nblocks > fsm_physical_page_no {
    1409              :                 // check if something to do: FSM is larger than truncate position
    1410            0 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
    1411            0 :                     .await?;
    1412            0 :             }
    1413            0 :         }
    1414            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
    1415            0 :             let rel = RelTag {
    1416            0 :                 spcnode,
    1417            0 :                 dbnode,
    1418            0 :                 relnode,
    1419            0 :                 forknum: VISIBILITYMAP_FORKNUM,
    1420            0 :             };
    1421            0 : 
    1422            0 :             let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
    1423            0 :             if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
    1424              :                 // Tail of last remaining vm page has to be zeroed.
    1425              :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
    1426            0 :                 modification.put_rel_page_image_zero(rel, vm_page_no)?;
    1427            0 :                 vm_page_no += 1;
    1428            0 :             }
    1429            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1430            0 :             if nblocks > vm_page_no {
    1431              :                 // check if something to do: VM is larger than truncate position
    1432            0 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
    1433            0 :                     .await?;
    1434            0 :             }
    1435            0 :         }
    1436            0 :         Ok(())
    1437            0 :     }
    1438              : 
    1439            8 :     fn warn_on_ingest_lag(
    1440            8 :         &mut self,
    1441            8 :         conf: &crate::config::PageServerConf,
    1442            8 :         wal_timestamp: TimestampTz,
    1443            8 :     ) {
    1444            8 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1445            8 :         let now = SystemTime::now();
    1446            8 :         let rate_limits = &mut self.warn_ingest_lag;
    1447              : 
    1448            8 :         let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
    1449            0 :             pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
    1450              :         });
    1451              : 
    1452            8 :         match ts {
    1453            8 :             Ok(ts) => {
    1454            8 :                 match now.duration_since(ts) {
    1455            8 :                     Ok(lag) => {
    1456            8 :                         if lag > conf.wait_lsn_timeout {
    1457            8 :                             rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
    1458            2 :                                 if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
    1459            0 :                                     if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
    1460            0 :                                         return;
    1461            0 :                                     }
    1462            2 :                                 } else {
    1463            2 :                                     // Still loading? We shouldn't be here
    1464            2 :                                 }
    1465            2 :                                 let lag = humantime::format_duration(lag);
    1466            2 :                                 warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
    1467            8 :                             })
    1468            0 :                         }
    1469              :                     }
    1470            0 :                     Err(e) => {
    1471            0 :                         let delta_t = e.duration();
    1472              :                         // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
    1473              :                         // => https://www.robustperception.io/time-metric-from-the-node-exporter/
    1474              :                         const IGNORED_DRIFT: Duration = Duration::from_millis(100);
    1475            0 :                         if delta_t > IGNORED_DRIFT {
    1476            0 :                             let delta_t = humantime::format_duration(delta_t);
    1477            0 :                             rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
    1478            0 :                                 warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
    1479            0 :                             })
    1480            0 :                         }
    1481              :                     }
    1482              :                 };
    1483              :             }
    1484            0 :             Err(error) => {
    1485            0 :                 rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
    1486            0 :                     warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
    1487            0 :                 })
    1488              :             }
    1489              :         }
    1490            8 :     }
    1491              : 
    1492              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
    1493              :     ///
    1494            8 :     async fn ingest_xact_record(
    1495            8 :         &mut self,
    1496            8 :         modification: &mut DatadirModification<'_>,
    1497            8 :         parsed: &XlXactParsedRecord,
    1498            8 :         is_commit: bool,
    1499            8 :         origin_id: u16,
    1500            8 :         ctx: &RequestContext,
    1501            8 :     ) -> anyhow::Result<()> {
    1502            8 :         // Record update of CLOG pages
    1503            8 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
    1504            8 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1505            8 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1506            8 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
    1507            8 : 
    1508            8 :         self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
    1509              : 
    1510            8 :         for subxact in &parsed.subxacts {
    1511            0 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
    1512            0 :             if subxact_pageno != pageno {
    1513              :                 // This subxact goes to different page. Write the record
    1514              :                 // for all the XIDs on the previous page, and continue
    1515              :                 // accumulating XIDs on this new page.
    1516            0 :                 modification.put_slru_wal_record(
    1517            0 :                     SlruKind::Clog,
    1518            0 :                     segno,
    1519            0 :                     rpageno,
    1520            0 :                     if is_commit {
    1521            0 :                         NeonWalRecord::ClogSetCommitted {
    1522            0 :                             xids: page_xids,
    1523            0 :                             timestamp: parsed.xact_time,
    1524            0 :                         }
    1525              :                     } else {
    1526            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
    1527              :                     },
    1528            0 :                 )?;
    1529            0 :                 page_xids = Vec::new();
    1530            0 :             }
    1531            0 :             pageno = subxact_pageno;
    1532            0 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1533            0 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1534            0 :             page_xids.push(*subxact);
    1535              :         }
    1536            8 :         modification.put_slru_wal_record(
    1537            8 :             SlruKind::Clog,
    1538            8 :             segno,
    1539            8 :             rpageno,
    1540            8 :             if is_commit {
    1541            8 :                 NeonWalRecord::ClogSetCommitted {
    1542            8 :                     xids: page_xids,
    1543            8 :                     timestamp: parsed.xact_time,
    1544            8 :                 }
    1545              :             } else {
    1546            0 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
    1547              :             },
    1548            0 :         )?;
    1549              : 
    1550            8 :         for xnode in &parsed.xnodes {
    1551            0 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
    1552            0 :                 let rel = RelTag {
    1553            0 :                     forknum,
    1554            0 :                     spcnode: xnode.spcnode,
    1555            0 :                     dbnode: xnode.dbnode,
    1556            0 :                     relnode: xnode.relnode,
    1557            0 :                 };
    1558            0 :                 if modification
    1559            0 :                     .tline
    1560            0 :                     .get_rel_exists(rel, Version::Modified(modification), ctx)
    1561            0 :                     .await?
    1562              :                 {
    1563            0 :                     self.put_rel_drop(modification, rel, ctx).await?;
    1564            0 :                 }
    1565              :             }
    1566              :         }
    1567            8 :         if origin_id != 0 {
    1568            0 :             modification
    1569            0 :                 .set_replorigin(origin_id, parsed.origin_lsn)
    1570            0 :                 .await?;
    1571            8 :         }
    1572            8 :         Ok(())
    1573            8 :     }
    1574              : 
    1575            0 :     async fn ingest_clog_truncate_record(
    1576            0 :         &mut self,
    1577            0 :         modification: &mut DatadirModification<'_>,
    1578            0 :         xlrec: &XlClogTruncate,
    1579            0 :         ctx: &RequestContext,
    1580            0 :     ) -> anyhow::Result<()> {
    1581            0 :         info!(
    1582            0 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
    1583              :             xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
    1584              :         );
    1585              : 
    1586              :         // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
    1587              :         // truncated, but a checkpoint record with the updated values isn't written until
    1588              :         // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
    1589              :         // so we keep the oldestXid and oldestXidDB up-to-date.
    1590            0 :         enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1591            0 :             cp.oldestXid = xlrec.oldest_xid;
    1592            0 :             cp.oldestXidDB = xlrec.oldest_xid_db;
    1593            0 :         });
    1594            0 :         self.checkpoint_modified = true;
    1595              : 
    1596              :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
    1597              : 
    1598            0 :         let latest_page_number =
    1599            0 :             enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
    1600              :                 / pg_constants::CLOG_XACTS_PER_PAGE;
    1601              : 
    1602              :         // Now delete all segments containing pages between xlrec.pageno
    1603              :         // and latest_page_number.
    1604              : 
    1605              :         // First, make an important safety check:
    1606              :         // the current endpoint page must not be eligible for removal.
    1607              :         // See SimpleLruTruncate() in slru.c
    1608            0 :         if dispatch_pgversion!(modification.tline.pg_version, {
    1609            0 :             pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
    1610              :         }) {
    1611            0 :             info!("could not truncate directory pg_xact apparent wraparound");
    1612            0 :             return Ok(());
    1613            0 :         }
    1614              : 
    1615              :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
    1616              :         //
    1617              :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
    1618              :         // will block waiting for the last valid LSN to advance up to
    1619              :         // it. So we use the previous record's LSN in the get calls
    1620              :         // instead.
    1621            0 :         for segno in modification
    1622            0 :             .tline
    1623            0 :             .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
    1624            0 :             .await?
    1625              :         {
    1626            0 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
    1627              : 
    1628            0 :             let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
    1629            0 :                 pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
    1630              :             });
    1631              : 
    1632            0 :             if may_delete {
    1633            0 :                 modification
    1634            0 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
    1635            0 :                     .await?;
    1636            0 :                 trace!("Drop CLOG segment {:>04X}", segno);
    1637            0 :             }
    1638              :         }
    1639              : 
    1640            0 :         Ok(())
    1641            0 :     }
    1642              : 
    1643            0 :     fn ingest_multixact_create_record(
    1644            0 :         &mut self,
    1645            0 :         modification: &mut DatadirModification,
    1646            0 :         xlrec: &XlMultiXactCreate,
    1647            0 :     ) -> Result<()> {
    1648            0 :         // Create WAL record for updating the multixact-offsets page
    1649            0 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1650            0 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1651            0 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1652            0 : 
    1653            0 :         modification.put_slru_wal_record(
    1654            0 :             SlruKind::MultiXactOffsets,
    1655            0 :             segno,
    1656            0 :             rpageno,
    1657            0 :             NeonWalRecord::MultixactOffsetCreate {
    1658            0 :                 mid: xlrec.mid,
    1659            0 :                 moff: xlrec.moff,
    1660            0 :             },
    1661            0 :         )?;
    1662              : 
    1663              :         // Create WAL records for the update of each affected multixact-members page
    1664            0 :         let mut members = xlrec.members.iter();
    1665            0 :         let mut offset = xlrec.moff;
    1666              :         loop {
    1667            0 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1668            0 : 
    1669            0 :             // How many members fit on this page?
    1670            0 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1671            0 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1672            0 : 
    1673            0 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1674            0 :             for _ in 0..page_remain {
    1675            0 :                 if let Some(m) = members.next() {
    1676            0 :                     this_page_members.push(m.clone());
    1677            0 :                 } else {
    1678            0 :                     break;
    1679              :                 }
    1680              :             }
    1681            0 :             if this_page_members.is_empty() {
    1682              :                 // all done
    1683            0 :                 break;
    1684            0 :             }
    1685            0 :             let n_this_page = this_page_members.len();
    1686            0 : 
    1687            0 :             modification.put_slru_wal_record(
    1688            0 :                 SlruKind::MultiXactMembers,
    1689            0 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1690            0 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1691            0 :                 NeonWalRecord::MultixactMembersCreate {
    1692            0 :                     moff: offset,
    1693            0 :                     members: this_page_members,
    1694            0 :                 },
    1695            0 :             )?;
    1696              : 
    1697              :             // Note: The multixact members can wrap around, even within one WAL record.
    1698            0 :             offset = offset.wrapping_add(n_this_page as u32);
    1699              :         }
    1700            0 :         let next_offset = offset;
    1701            0 :         assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
    1702              : 
    1703              :         // Update next-multi-xid and next-offset
    1704              :         //
    1705              :         // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
    1706              :         // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
    1707              :         // read it, like GetNewMultiXactId(). This is different from how nextXid is
    1708              :         // incremented! nextXid skips over < FirstNormalTransactionId when the the value
    1709              :         // is stored, so it's never 0 in a checkpoint.
    1710              :         //
    1711              :         // I don't know why it's done that way, it seems less error-prone to skip over 0
    1712              :         // when the value is stored rather than when it's read. But let's do it the same
    1713              :         // way here.
    1714            0 :         let next_multi_xid = xlrec.mid.wrapping_add(1);
    1715            0 : 
    1716            0 :         if self
    1717            0 :             .checkpoint
    1718            0 :             .update_next_multixid(next_multi_xid, next_offset)
    1719            0 :         {
    1720            0 :             self.checkpoint_modified = true;
    1721            0 :         }
    1722              : 
    1723              :         // Also update the next-xid with the highest member. According to the comments in
    1724              :         // multixact_redo(), this shouldn't be necessary, but let's do the same here.
    1725            0 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
    1726            0 :             if let Some(max_xid) = acc {
    1727            0 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
    1728            0 :                     Some(mbr.xid)
    1729              :                 } else {
    1730            0 :                     acc
    1731              :                 }
    1732              :             } else {
    1733            0 :                 Some(mbr.xid)
    1734              :             }
    1735            0 :         });
    1736              : 
    1737            0 :         if let Some(max_xid) = max_mbr_xid {
    1738            0 :             if self.checkpoint.update_next_xid(max_xid) {
    1739            0 :                 self.checkpoint_modified = true;
    1740            0 :             }
    1741            0 :         }
    1742            0 :         Ok(())
    1743            0 :     }
    1744              : 
    1745            0 :     async fn ingest_multixact_truncate_record(
    1746            0 :         &mut self,
    1747            0 :         modification: &mut DatadirModification<'_>,
    1748            0 :         xlrec: &XlMultiXactTruncate,
    1749            0 :         ctx: &RequestContext,
    1750            0 :     ) -> Result<()> {
    1751            0 :         let (maxsegment, startsegment, endsegment) =
    1752            0 :             enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
    1753            0 :                 cp.oldestMulti = xlrec.end_trunc_off;
    1754            0 :                 cp.oldestMultiDB = xlrec.oldest_multi_db;
    1755            0 :                 let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
    1756            0 :                     pg_constants::MAX_MULTIXACT_OFFSET,
    1757            0 :                 );
    1758            0 :                 let startsegment: i32 =
    1759            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1760            0 :                 let endsegment: i32 =
    1761            0 :                     pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1762            0 :                 (maxsegment, startsegment, endsegment)
    1763              :             });
    1764              : 
    1765            0 :         self.checkpoint_modified = true;
    1766            0 : 
    1767            0 :         // PerformMembersTruncation
    1768            0 :         let mut segment: i32 = startsegment;
    1769              : 
    1770              :         // Delete all the segments except the last one. The last segment can still
    1771              :         // contain, possibly partially, valid data.
    1772            0 :         while segment != endsegment {
    1773            0 :             modification
    1774            0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1775            0 :                 .await?;
    1776              : 
    1777              :             /* move to next segment, handling wraparound correctly */
    1778            0 :             if segment == maxsegment {
    1779            0 :                 segment = 0;
    1780            0 :             } else {
    1781            0 :                 segment += 1;
    1782            0 :             }
    1783              :         }
    1784              : 
    1785              :         // Truncate offsets
    1786              :         // FIXME: this did not handle wraparound correctly
    1787              : 
    1788            0 :         Ok(())
    1789            0 :     }
    1790              : 
    1791            0 :     async fn ingest_relmap_page(
    1792            0 :         &mut self,
    1793            0 :         modification: &mut DatadirModification<'_>,
    1794            0 :         xlrec: &XlRelmapUpdate,
    1795            0 :         decoded: &DecodedWALRecord,
    1796            0 :         ctx: &RequestContext,
    1797            0 :     ) -> Result<()> {
    1798            0 :         let mut buf = decoded.record.clone();
    1799            0 :         buf.advance(decoded.main_data_offset);
    1800            0 :         // skip xl_relmap_update
    1801            0 :         buf.advance(12);
    1802            0 : 
    1803            0 :         modification
    1804            0 :             .put_relmap_file(
    1805            0 :                 xlrec.tsid,
    1806            0 :                 xlrec.dbid,
    1807            0 :                 Bytes::copy_from_slice(&buf[..]),
    1808            0 :                 ctx,
    1809            0 :             )
    1810            0 :             .await
    1811            0 :     }
    1812              : 
    1813           18 :     async fn put_rel_creation(
    1814           18 :         &mut self,
    1815           18 :         modification: &mut DatadirModification<'_>,
    1816           18 :         rel: RelTag,
    1817           18 :         ctx: &RequestContext,
    1818           18 :     ) -> Result<()> {
    1819           18 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1820           18 :         Ok(())
    1821           18 :     }
    1822              : 
    1823       272426 :     async fn put_rel_page_image(
    1824       272426 :         &mut self,
    1825       272426 :         modification: &mut DatadirModification<'_>,
    1826       272426 :         rel: RelTag,
    1827       272426 :         blknum: BlockNumber,
    1828       272426 :         img: Bytes,
    1829       272426 :         ctx: &RequestContext,
    1830       272426 :     ) -> Result<(), PageReconstructError> {
    1831       272426 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1832         5425 :             .await?;
    1833       272426 :         modification.put_rel_page_image(rel, blknum, img)?;
    1834       272426 :         Ok(())
    1835       272426 :     }
    1836              : 
    1837       145630 :     async fn put_rel_wal_record(
    1838       145630 :         &mut self,
    1839       145630 :         modification: &mut DatadirModification<'_>,
    1840       145630 :         rel: RelTag,
    1841       145630 :         blknum: BlockNumber,
    1842       145630 :         rec: NeonWalRecord,
    1843       145630 :         ctx: &RequestContext,
    1844       145630 :     ) -> Result<()> {
    1845       145630 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1846          279 :             .await?;
    1847       145630 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1848       145630 :         Ok(())
    1849       145630 :     }
    1850              : 
    1851         6012 :     async fn put_rel_truncation(
    1852         6012 :         &mut self,
    1853         6012 :         modification: &mut DatadirModification<'_>,
    1854         6012 :         rel: RelTag,
    1855         6012 :         nblocks: BlockNumber,
    1856         6012 :         ctx: &RequestContext,
    1857         6012 :     ) -> anyhow::Result<()> {
    1858         6012 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1859         6012 :         Ok(())
    1860         6012 :     }
    1861              : 
    1862            2 :     async fn put_rel_drop(
    1863            2 :         &mut self,
    1864            2 :         modification: &mut DatadirModification<'_>,
    1865            2 :         rel: RelTag,
    1866            2 :         ctx: &RequestContext,
    1867            2 :     ) -> Result<()> {
    1868            2 :         modification.put_rel_drop(rel, ctx).await?;
    1869            2 :         Ok(())
    1870            2 :     }
    1871              : 
    1872       418056 :     async fn handle_rel_extend(
    1873       418056 :         &mut self,
    1874       418056 :         modification: &mut DatadirModification<'_>,
    1875       418056 :         rel: RelTag,
    1876       418056 :         blknum: BlockNumber,
    1877       418056 :         ctx: &RequestContext,
    1878       418056 :     ) -> Result<(), PageReconstructError> {
    1879       418056 :         let new_nblocks = blknum + 1;
    1880              :         // Check if the relation exists. We implicitly create relations on first
    1881              :         // record.
    1882              :         // TODO: would be nice if to be more explicit about it
    1883              : 
    1884              :         // Get current size and put rel creation if rel doesn't exist
    1885              :         //
    1886              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1887              :         //       check the cache too. This is because eagerly checking the cache results in
    1888              :         //       less work overall and 10% better performance. It's more work on cache miss
    1889              :         //       but cache miss is rare.
    1890       418056 :         let old_nblocks = if let Some(nblocks) = modification
    1891       418056 :             .tline
    1892       418056 :             .get_cached_rel_size(&rel, modification.get_lsn())
    1893              :         {
    1894       418046 :             nblocks
    1895           10 :         } else if !modification
    1896           10 :             .tline
    1897           10 :             .get_rel_exists(rel, Version::Modified(modification), ctx)
    1898            2 :             .await?
    1899              :         {
    1900              :             // create it with 0 size initially, the logic below will extend it
    1901           10 :             modification
    1902           10 :                 .put_rel_creation(rel, 0, ctx)
    1903            3 :                 .await
    1904           10 :                 .context("Relation Error")?;
    1905           10 :             0
    1906              :         } else {
    1907            0 :             modification
    1908            0 :                 .tline
    1909            0 :                 .get_rel_size(rel, Version::Modified(modification), ctx)
    1910            0 :                 .await?
    1911              :         };
    1912              : 
    1913       418056 :         if new_nblocks > old_nblocks {
    1914              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1915       274788 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1916              : 
    1917       274788 :             let mut key = rel_block_to_key(rel, blknum);
    1918              :             // fill the gap with zeros
    1919       274788 :             for gap_blknum in old_nblocks..blknum {
    1920         2998 :                 key.field6 = gap_blknum;
    1921         2998 : 
    1922         2998 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1923            0 :                     continue;
    1924         2998 :                 }
    1925         2998 : 
    1926         2998 :                 modification.put_rel_page_image_zero(rel, gap_blknum)?;
    1927              :             }
    1928       143268 :         }
    1929       418056 :         Ok(())
    1930       418056 :     }
    1931              : 
    1932            0 :     async fn put_slru_page_image(
    1933            0 :         &mut self,
    1934            0 :         modification: &mut DatadirModification<'_>,
    1935            0 :         kind: SlruKind,
    1936            0 :         segno: u32,
    1937            0 :         blknum: BlockNumber,
    1938            0 :         img: Bytes,
    1939            0 :         ctx: &RequestContext,
    1940            0 :     ) -> Result<()> {
    1941            0 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1942            0 :             .await?;
    1943            0 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1944            0 :         Ok(())
    1945            0 :     }
    1946              : 
    1947            0 :     async fn handle_slru_extend(
    1948            0 :         &mut self,
    1949            0 :         modification: &mut DatadirModification<'_>,
    1950            0 :         kind: SlruKind,
    1951            0 :         segno: u32,
    1952            0 :         blknum: BlockNumber,
    1953            0 :         ctx: &RequestContext,
    1954            0 :     ) -> anyhow::Result<()> {
    1955            0 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1956            0 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1957            0 :         // a lot less frequently.
    1958            0 : 
    1959            0 :         let new_nblocks = blknum + 1;
    1960              :         // Check if the relation exists. We implicitly create relations on first
    1961              :         // record.
    1962              :         // TODO: would be nice if to be more explicit about it
    1963            0 :         let old_nblocks = if !modification
    1964            0 :             .tline
    1965            0 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1966            0 :             .await?
    1967              :         {
    1968              :             // create it with 0 size initially, the logic below will extend it
    1969            0 :             modification
    1970            0 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1971            0 :                 .await?;
    1972            0 :             0
    1973              :         } else {
    1974            0 :             modification
    1975            0 :                 .tline
    1976            0 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1977            0 :                 .await?
    1978              :         };
    1979              : 
    1980            0 :         if new_nblocks > old_nblocks {
    1981            0 :             trace!(
    1982            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1983              :                 kind,
    1984              :                 segno,
    1985              :                 old_nblocks,
    1986              :                 new_nblocks
    1987              :             );
    1988            0 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1989              : 
    1990              :             // fill the gap with zeros
    1991            0 :             for gap_blknum in old_nblocks..blknum {
    1992            0 :                 modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
    1993              :             }
    1994            0 :         }
    1995            0 :         Ok(())
    1996            0 :     }
    1997              : }
    1998              : 
    1999           12 : async fn get_relsize(
    2000           12 :     modification: &DatadirModification<'_>,
    2001           12 :     rel: RelTag,
    2002           12 :     ctx: &RequestContext,
    2003           12 : ) -> Result<BlockNumber, PageReconstructError> {
    2004           12 :     let nblocks = if !modification
    2005           12 :         .tline
    2006           12 :         .get_rel_exists(rel, Version::Modified(modification), ctx)
    2007            0 :         .await?
    2008              :     {
    2009            0 :         0
    2010              :     } else {
    2011           12 :         modification
    2012           12 :             .tline
    2013           12 :             .get_rel_size(rel, Version::Modified(modification), ctx)
    2014            0 :             .await?
    2015              :     };
    2016           12 :     Ok(nblocks)
    2017           12 : }
    2018              : 
    2019              : #[allow(clippy::bool_assert_comparison)]
    2020              : #[cfg(test)]
    2021              : mod tests {
    2022              :     use super::*;
    2023              :     use crate::tenant::harness::*;
    2024              :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    2025              :     use postgres_ffi::RELSEG_SIZE;
    2026              : 
    2027              :     use crate::DEFAULT_PG_VERSION;
    2028              : 
    2029              :     /// Arbitrary relation tag, for testing.
    2030              :     const TESTREL_A: RelTag = RelTag {
    2031              :         spcnode: 0,
    2032              :         dbnode: 111,
    2033              :         relnode: 1000,
    2034              :         forknum: 0,
    2035              :     };
    2036              : 
    2037           12 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    2038           12 :         // TODO
    2039           12 :     }
    2040              : 
    2041              :     #[tokio::test]
    2042            2 :     async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
    2043            8 :         for i in 14..=16 {
    2044            6 :             dispatch_pgversion!(i, {
    2045            2 :                 pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
    2046            2 :             });
    2047            2 :         }
    2048            2 : 
    2049            2 :         Ok(())
    2050            2 :     }
    2051              : 
    2052            8 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    2053            8 :         let mut m = tline.begin_modification(Lsn(0x10));
    2054            8 :         m.put_checkpoint(dispatch_pgversion!(
    2055            8 :             tline.pg_version,
    2056            0 :             pgv::ZERO_CHECKPOINT.clone()
    2057            0 :         ))?;
    2058           16 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    2059            8 :         m.commit(ctx).await?;
    2060            8 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    2061              : 
    2062            8 :         Ok(walingest)
    2063            8 :     }
    2064              : 
    2065              :     #[tokio::test]
    2066            2 :     async fn test_relsize() -> Result<()> {
    2067            8 :         let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
    2068            2 :         let tline = tenant
    2069            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2070            4 :             .await?;
    2071            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2072            2 : 
    2073            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    2074            2 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    2075            2 :         walingest
    2076            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    2077            2 :             .await?;
    2078            2 :         m.on_record_end();
    2079            2 :         m.commit(&ctx).await?;
    2080            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    2081            2 :         walingest
    2082            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
    2083            2 :             .await?;
    2084            2 :         m.on_record_end();
    2085            2 :         m.commit(&ctx).await?;
    2086            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    2087            2 :         walingest
    2088            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
    2089            2 :             .await?;
    2090            2 :         m.on_record_end();
    2091            2 :         m.commit(&ctx).await?;
    2092            2 :         let mut m = tline.begin_modification(Lsn(0x50));
    2093            2 :         walingest
    2094            2 :             .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
    2095            2 :             .await?;
    2096            2 :         m.on_record_end();
    2097            2 :         m.commit(&ctx).await?;
    2098            2 : 
    2099            2 :         assert_current_logical_size(&tline, Lsn(0x50));
    2100            2 : 
    2101            2 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    2102            2 :         assert_eq!(
    2103            2 :             tline
    2104            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2105            2 :                 .await?,
    2106            2 :             false
    2107            2 :         );
    2108            2 :         assert!(tline
    2109            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2110            2 :             .await
    2111            2 :             .is_err());
    2112            2 :         assert_eq!(
    2113            2 :             tline
    2114            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2115            2 :                 .await?,
    2116            2 :             true
    2117            2 :         );
    2118            2 :         assert_eq!(
    2119            2 :             tline
    2120            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2121            2 :                 .await?,
    2122            2 :             1
    2123            2 :         );
    2124            2 :         assert_eq!(
    2125            2 :             tline
    2126            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2127            2 :                 .await?,
    2128            2 :             3
    2129            2 :         );
    2130            2 : 
    2131            2 :         // Check page contents at each LSN
    2132            2 :         assert_eq!(
    2133            2 :             tline
    2134            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
    2135            2 :                 .await?,
    2136            2 :             test_img("foo blk 0 at 2")
    2137            2 :         );
    2138            2 : 
    2139            2 :         assert_eq!(
    2140            2 :             tline
    2141            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
    2142            2 :                 .await?,
    2143            2 :             test_img("foo blk 0 at 3")
    2144            2 :         );
    2145            2 : 
    2146            2 :         assert_eq!(
    2147            2 :             tline
    2148            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
    2149            2 :                 .await?,
    2150            2 :             test_img("foo blk 0 at 3")
    2151            2 :         );
    2152            2 :         assert_eq!(
    2153            2 :             tline
    2154            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
    2155            2 :                 .await?,
    2156            2 :             test_img("foo blk 1 at 4")
    2157            2 :         );
    2158            2 : 
    2159            2 :         assert_eq!(
    2160            2 :             tline
    2161            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
    2162            2 :                 .await?,
    2163            2 :             test_img("foo blk 0 at 3")
    2164            2 :         );
    2165            2 :         assert_eq!(
    2166            2 :             tline
    2167            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
    2168            2 :                 .await?,
    2169            2 :             test_img("foo blk 1 at 4")
    2170            2 :         );
    2171            2 :         assert_eq!(
    2172            2 :             tline
    2173            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    2174            2 :                 .await?,
    2175            2 :             test_img("foo blk 2 at 5")
    2176            2 :         );
    2177            2 : 
    2178            2 :         // Truncate last block
    2179            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    2180            2 :         walingest
    2181            2 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    2182            2 :             .await?;
    2183            2 :         m.commit(&ctx).await?;
    2184            2 :         assert_current_logical_size(&tline, Lsn(0x60));
    2185            2 : 
    2186            2 :         // Check reported size and contents after truncation
    2187            2 :         assert_eq!(
    2188            2 :             tline
    2189            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    2190            2 :                 .await?,
    2191            2 :             2
    2192            2 :         );
    2193            2 :         assert_eq!(
    2194            2 :             tline
    2195            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
    2196            2 :                 .await?,
    2197            2 :             test_img("foo blk 0 at 3")
    2198            2 :         );
    2199            2 :         assert_eq!(
    2200            2 :             tline
    2201            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
    2202            2 :                 .await?,
    2203            2 :             test_img("foo blk 1 at 4")
    2204            2 :         );
    2205            2 : 
    2206            2 :         // should still see the truncated block with older LSN
    2207            2 :         assert_eq!(
    2208            2 :             tline
    2209            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2210            2 :                 .await?,
    2211            2 :             3
    2212            2 :         );
    2213            2 :         assert_eq!(
    2214            2 :             tline
    2215            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    2216            2 :                 .await?,
    2217            2 :             test_img("foo blk 2 at 5")
    2218            2 :         );
    2219            2 : 
    2220            2 :         // Truncate to zero length
    2221            2 :         let mut m = tline.begin_modification(Lsn(0x68));
    2222            2 :         walingest
    2223            2 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    2224            2 :             .await?;
    2225            2 :         m.commit(&ctx).await?;
    2226            2 :         assert_eq!(
    2227            2 :             tline
    2228            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
    2229            2 :                 .await?,
    2230            2 :             0
    2231            2 :         );
    2232            2 : 
    2233            2 :         // Extend from 0 to 2 blocks, leaving a gap
    2234            2 :         let mut m = tline.begin_modification(Lsn(0x70));
    2235            2 :         walingest
    2236            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
    2237            2 :             .await?;
    2238            2 :         m.on_record_end();
    2239            2 :         m.commit(&ctx).await?;
    2240            2 :         assert_eq!(
    2241            2 :             tline
    2242            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
    2243            2 :                 .await?,
    2244            2 :             2
    2245            2 :         );
    2246            2 :         assert_eq!(
    2247            2 :             tline
    2248            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
    2249            2 :                 .await?,
    2250            2 :             ZERO_PAGE
    2251            2 :         );
    2252            2 :         assert_eq!(
    2253            2 :             tline
    2254            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
    2255            2 :                 .await?,
    2256            2 :             test_img("foo blk 1")
    2257            2 :         );
    2258            2 : 
    2259            2 :         // Extend a lot more, leaving a big gap that spans across segments
    2260            2 :         let mut m = tline.begin_modification(Lsn(0x80));
    2261            2 :         walingest
    2262            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
    2263            2 :             .await?;
    2264            2 :         m.on_record_end();
    2265          190 :         m.commit(&ctx).await?;
    2266            2 :         assert_eq!(
    2267            2 :             tline
    2268            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2269            2 :                 .await?,
    2270            2 :             1501
    2271            2 :         );
    2272         2998 :         for blk in 2..1500 {
    2273         2996 :             assert_eq!(
    2274         2996 :                 tline
    2275         2996 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
    2276         1540 :                     .await?,
    2277         2996 :                 ZERO_PAGE
    2278            2 :             );
    2279            2 :         }
    2280            2 :         assert_eq!(
    2281            2 :             tline
    2282            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
    2283            2 :                 .await?,
    2284            2 :             test_img("foo blk 1500")
    2285            2 :         );
    2286            2 : 
    2287            2 :         Ok(())
    2288            2 :     }
    2289              : 
    2290              :     // Test what happens if we dropped a relation
    2291              :     // and then created it again within the same layer.
    2292              :     #[tokio::test]
    2293            2 :     async fn test_drop_extend() -> Result<()> {
    2294            2 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")
    2295            2 :             .await?
    2296            2 :             .load()
    2297            8 :             .await;
    2298            2 :         let tline = tenant
    2299            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2300            4 :             .await?;
    2301            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2302            2 : 
    2303            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    2304            2 :         walingest
    2305            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    2306            2 :             .await?;
    2307            2 :         m.commit(&ctx).await?;
    2308            2 : 
    2309            2 :         // Check that rel exists and size is correct
    2310            2 :         assert_eq!(
    2311            2 :             tline
    2312            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2313            2 :                 .await?,
    2314            2 :             true
    2315            2 :         );
    2316            2 :         assert_eq!(
    2317            2 :             tline
    2318            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2319            2 :                 .await?,
    2320            2 :             1
    2321            2 :         );
    2322            2 : 
    2323            2 :         // Drop rel
    2324            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    2325            2 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    2326            2 :         m.commit(&ctx).await?;
    2327            2 : 
    2328            2 :         // Check that rel is not visible anymore
    2329            2 :         assert_eq!(
    2330            2 :             tline
    2331            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
    2332            2 :                 .await?,
    2333            2 :             false
    2334            2 :         );
    2335            2 : 
    2336            2 :         // FIXME: should fail
    2337            2 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    2338            2 : 
    2339            2 :         // Re-create it
    2340            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    2341            2 :         walingest
    2342            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
    2343            2 :             .await?;
    2344            2 :         m.commit(&ctx).await?;
    2345            2 : 
    2346            2 :         // Check that rel exists and size is correct
    2347            2 :         assert_eq!(
    2348            2 :             tline
    2349            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2350            2 :                 .await?,
    2351            2 :             true
    2352            2 :         );
    2353            2 :         assert_eq!(
    2354            2 :             tline
    2355            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2356            2 :                 .await?,
    2357            2 :             1
    2358            2 :         );
    2359            2 : 
    2360            2 :         Ok(())
    2361            2 :     }
    2362              : 
    2363              :     // Test what happens if we truncated a relation
    2364              :     // so that one of its segments was dropped
    2365              :     // and then extended it again within the same layer.
    2366              :     #[tokio::test]
    2367            2 :     async fn test_truncate_extend() -> Result<()> {
    2368            2 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
    2369            2 :             .await?
    2370            2 :             .load()
    2371            5 :             .await;
    2372            2 :         let tline = tenant
    2373            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2374            4 :             .await?;
    2375            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2376            2 : 
    2377            2 :         // Create a 20 MB relation (the size is arbitrary)
    2378            2 :         let relsize = 20 * 1024 * 1024 / 8192;
    2379            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    2380         5120 :         for blkno in 0..relsize {
    2381         5120 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    2382         5120 :             walingest
    2383         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2384            2 :                 .await?;
    2385            2 :         }
    2386            2 :         m.commit(&ctx).await?;
    2387            2 : 
    2388            2 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    2389            2 :         assert_eq!(
    2390            2 :             tline
    2391            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2392            2 :                 .await?,
    2393            2 :             false
    2394            2 :         );
    2395            2 :         assert!(tline
    2396            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2397            2 :             .await
    2398            2 :             .is_err());
    2399            2 : 
    2400            2 :         assert_eq!(
    2401            2 :             tline
    2402            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2403            2 :                 .await?,
    2404            2 :             true
    2405            2 :         );
    2406            2 :         assert_eq!(
    2407            2 :             tline
    2408            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2409            2 :                 .await?,
    2410            2 :             relsize
    2411            2 :         );
    2412            2 : 
    2413            2 :         // Check relation content
    2414         5120 :         for blkno in 0..relsize {
    2415         5120 :             let lsn = Lsn(0x20);
    2416         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2417         5120 :             assert_eq!(
    2418         5120 :                 tline
    2419         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
    2420         1803 :                     .await?,
    2421         5120 :                 test_img(&data)
    2422            2 :             );
    2423            2 :         }
    2424            2 : 
    2425            2 :         // Truncate relation so that second segment was dropped
    2426            2 :         // - only leave one page
    2427            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    2428            2 :         walingest
    2429            2 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2430            2 :             .await?;
    2431            2 :         m.commit(&ctx).await?;
    2432            2 : 
    2433            2 :         // Check reported size and contents after truncation
    2434            2 :         assert_eq!(
    2435            2 :             tline
    2436            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    2437            2 :                 .await?,
    2438            2 :             1
    2439            2 :         );
    2440            2 : 
    2441            4 :         for blkno in 0..1 {
    2442            2 :             let lsn = Lsn(0x20);
    2443            2 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2444            2 :             assert_eq!(
    2445            2 :                 tline
    2446            2 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
    2447            2 :                     .await?,
    2448            2 :                 test_img(&data)
    2449            2 :             );
    2450            2 :         }
    2451            2 : 
    2452            2 :         // should still see all blocks with older LSN
    2453            2 :         assert_eq!(
    2454            2 :             tline
    2455            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2456            2 :                 .await?,
    2457            2 :             relsize
    2458            2 :         );
    2459         5120 :         for blkno in 0..relsize {
    2460         5120 :             let lsn = Lsn(0x20);
    2461         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2462         5120 :             assert_eq!(
    2463         5120 :                 tline
    2464         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
    2465         1856 :                     .await?,
    2466         5120 :                 test_img(&data)
    2467            2 :             );
    2468            2 :         }
    2469            2 : 
    2470            2 :         // Extend relation again.
    2471            2 :         // Add enough blocks to create second segment
    2472            2 :         let lsn = Lsn(0x80);
    2473            2 :         let mut m = tline.begin_modification(lsn);
    2474         5120 :         for blkno in 0..relsize {
    2475         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2476         5120 :             walingest
    2477         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2478            2 :                 .await?;
    2479            2 :         }
    2480            3 :         m.commit(&ctx).await?;
    2481            2 : 
    2482            2 :         assert_eq!(
    2483            2 :             tline
    2484            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2485            2 :                 .await?,
    2486            2 :             true
    2487            2 :         );
    2488            2 :         assert_eq!(
    2489            2 :             tline
    2490            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2491            2 :                 .await?,
    2492            2 :             relsize
    2493            2 :         );
    2494            2 :         // Check relation content
    2495         5120 :         for blkno in 0..relsize {
    2496         5120 :             let lsn = Lsn(0x80);
    2497         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2498         5120 :             assert_eq!(
    2499         5120 :                 tline
    2500         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
    2501         1828 :                     .await?,
    2502         5120 :                 test_img(&data)
    2503            2 :             );
    2504            2 :         }
    2505            2 : 
    2506            2 :         Ok(())
    2507            2 :     }
    2508              : 
    2509              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2510              :     /// split into multiple 1 GB segments in Postgres.
    2511              :     #[tokio::test]
    2512            2 :     async fn test_large_rel() -> Result<()> {
    2513            8 :         let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
    2514            2 :         let tline = tenant
    2515            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2516            4 :             .await?;
    2517            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2518            2 : 
    2519            2 :         let mut lsn = 0x10;
    2520       262146 :         for blknum in 0..RELSEG_SIZE + 1 {
    2521       262146 :             lsn += 0x10;
    2522       262146 :             let mut m = tline.begin_modification(Lsn(lsn));
    2523       262146 :             let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2524       262146 :             walingest
    2525       262146 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2526         5420 :                 .await?;
    2527       262146 :             m.commit(&ctx).await?;
    2528            2 :         }
    2529            2 : 
    2530            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2531            2 : 
    2532            2 :         assert_eq!(
    2533            2 :             tline
    2534            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2535            2 :                 .await?,
    2536            2 :             RELSEG_SIZE + 1
    2537            2 :         );
    2538            2 : 
    2539            2 :         // Truncate one block
    2540            2 :         lsn += 0x10;
    2541            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    2542            2 :         walingest
    2543            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2544            2 :             .await?;
    2545            2 :         m.commit(&ctx).await?;
    2546            2 :         assert_eq!(
    2547            2 :             tline
    2548            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2549            2 :                 .await?,
    2550            2 :             RELSEG_SIZE
    2551            2 :         );
    2552            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2553            2 : 
    2554            2 :         // Truncate another block
    2555            2 :         lsn += 0x10;
    2556            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    2557            2 :         walingest
    2558            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2559            2 :             .await?;
    2560            2 :         m.commit(&ctx).await?;
    2561            2 :         assert_eq!(
    2562            2 :             tline
    2563            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2564            2 :                 .await?,
    2565            2 :             RELSEG_SIZE - 1
    2566            2 :         );
    2567            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2568            2 : 
    2569            2 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2570            2 :         // This tests the behavior at segment boundaries
    2571            2 :         let mut size: i32 = 3000;
    2572         6004 :         while size >= 0 {
    2573         6002 :             lsn += 0x10;
    2574         6002 :             let mut m = tline.begin_modification(Lsn(lsn));
    2575         6002 :             walingest
    2576         6002 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2577            2 :                 .await?;
    2578         6002 :             m.commit(&ctx).await?;
    2579         6002 :             assert_eq!(
    2580         6002 :                 tline
    2581         6002 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2582            2 :                     .await?,
    2583         6002 :                 size as BlockNumber
    2584            2 :             );
    2585            2 : 
    2586         6002 :             size -= 1;
    2587            2 :         }
    2588            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2589            2 : 
    2590            2 :         Ok(())
    2591            2 :     }
    2592              : 
    2593              :     /// Replay a wal segment file taken directly from safekeepers.
    2594              :     ///
    2595              :     /// This test is useful for benchmarking since it allows us to profile only
    2596              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2597              :     /// without waiting for unrelated steps.
    2598              :     #[tokio::test]
    2599            2 :     async fn test_ingest_real_wal() {
    2600            2 :         use crate::tenant::harness::*;
    2601            2 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2602            2 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2603            2 : 
    2604            2 :         // Define test data path and constants.
    2605            2 :         //
    2606            2 :         // Steps to reconstruct the data, if needed:
    2607            2 :         // 1. Run the pgbench python test
    2608            2 :         // 2. Take the first wal segment file from safekeeper
    2609            2 :         // 3. Compress it using `zstd --long input_file`
    2610            2 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2611            2 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2612            2 :         // 6. Run just the decoder from this test to get the endpoint.
    2613            2 :         //    It's the last LSN the decoder will output.
    2614            2 :         let pg_version = 15; // The test data was generated by pg15
    2615            2 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2616            2 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2617            2 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2618            2 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2619            2 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2620            2 : 
    2621            2 :         let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
    2622            2 :         let span = harness
    2623            2 :             .span()
    2624            2 :             .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
    2625            8 :         let (tenant, ctx) = harness.load().await;
    2626            2 : 
    2627            2 :         let remote_initdb_path =
    2628            2 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    2629            2 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2630            2 : 
    2631            2 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2632            2 :             .expect("creating test dir should work");
    2633            2 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2634            2 : 
    2635            2 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2636            2 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2637            2 :         // the garbage data.
    2638            2 :         let tline = tenant
    2639            2 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2640        19957 :             .await
    2641            2 :             .unwrap();
    2642            2 : 
    2643            2 :         // We fully read and decompress this into memory before decoding
    2644            2 :         // to get a more accurate perf profile of the decoder.
    2645            2 :         let bytes = {
    2646            2 :             use async_compression::tokio::bufread::ZstdDecoder;
    2647            2 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2648            2 :             let reader = tokio::io::BufReader::new(file);
    2649            2 :             let decoder = ZstdDecoder::new(reader);
    2650            2 :             let mut reader = tokio::io::BufReader::new(decoder);
    2651            2 :             let mut buffer = Vec::new();
    2652          192 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2653            2 :             buffer
    2654            2 :         };
    2655            2 : 
    2656            2 :         // TODO start a profiler too
    2657            2 :         let started_at = std::time::Instant::now();
    2658            2 : 
    2659            2 :         // Initialize walingest
    2660            2 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2661            2 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2662            2 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2663            5 :             .await
    2664            2 :             .unwrap();
    2665            2 :         let mut modification = tline.begin_modification(startpoint);
    2666            2 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2667            2 : 
    2668            2 :         // Decode and ingest wal. We process the wal in chunks because
    2669            2 :         // that's what happens when we get bytes from safekeepers.
    2670       474686 :         for chunk in bytes[xlogoff..].chunks(50) {
    2671       474686 :             decoder.feed_bytes(chunk);
    2672       620536 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2673       145850 :                 let mut decoded = DecodedWALRecord::default();
    2674       145850 :                 decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
    2675       145850 :                 walingest
    2676       145850 :                     .ingest_record(decoded, lsn, &mut modification, &ctx)
    2677       145850 :                     .instrument(span.clone())
    2678          296 :                     .await
    2679       145850 :                     .unwrap();
    2680            2 :             }
    2681       474686 :             modification.commit(&ctx).await.unwrap();
    2682            2 :         }
    2683            2 : 
    2684            2 :         let duration = started_at.elapsed();
    2685            2 :         println!("done in {:?}", duration);
    2686            2 :     }
    2687              : }
        

Generated by: LCOV version 2.1-beta