LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: 2453312769e0b6b061a2008879e6693300d0b938.info Lines: 53.7 % 1936 1040
Test Date: 2024-09-06 16:40:18 Functions: 65.6 % 64 42

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  ->   WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9              : //! and decodes it to individual WAL records. It feeds the WAL records
      10              : //! to WalIngest, which parses them and stores them in the Repository.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14              : //! page images out of some WAL records, but most it stores as WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_wal_record or put_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use std::time::Duration;
      25              : use std::time::SystemTime;
      26              : 
      27              : use pageserver_api::shard::ShardIdentity;
      28              : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
      29              : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
      30              : use postgres_ffi::TimestampTz;
      31              : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      32              : 
      33              : use anyhow::{bail, Context, Result};
      34              : use bytes::{Buf, Bytes, BytesMut};
      35              : use tracing::*;
      36              : use utils::failpoint_support;
      37              : use utils::rate_limit::RateLimit;
      38              : 
      39              : use crate::context::RequestContext;
      40              : use crate::metrics::WAL_INGEST;
      41              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      42              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      43              : use crate::tenant::PageReconstructError;
      44              : use crate::tenant::Timeline;
      45              : use crate::walrecord::*;
      46              : use crate::ZERO_PAGE;
      47              : use pageserver_api::key::rel_block_to_key;
      48              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      49              : use postgres_ffi::pg_constants;
      50              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      51              : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
      52              : use postgres_ffi::v14::xlog_utils::*;
      53              : use postgres_ffi::v14::CheckPoint;
      54              : use postgres_ffi::TransactionId;
      55              : use postgres_ffi::BLCKSZ;
      56              : use utils::lsn::Lsn;
      57              : 
      58              : pub struct WalIngest {
      59              :     shard: ShardIdentity,
      60              :     pg_version: u32,
      61              :     checkpoint: CheckPoint,
      62              :     checkpoint_modified: bool,
      63              :     warn_ingest_lag: WarnIngestLag,
      64              : }
      65              : 
      66              : struct WarnIngestLag {
      67              :     lag_msg_ratelimit: RateLimit,
      68              :     future_lsn_msg_ratelimit: RateLimit,
      69              :     timestamp_invalid_msg_ratelimit: RateLimit,
      70              : }
      71              : 
      72              : impl WalIngest {
      73           36 :     pub async fn new(
      74           36 :         timeline: &Timeline,
      75           36 :         startpoint: Lsn,
      76           36 :         ctx: &RequestContext,
      77           36 :     ) -> anyhow::Result<WalIngest> {
      78              :         // Fetch the latest checkpoint into memory, so that we can compare with it
      79              :         // quickly in `ingest_record` and update it when it changes.
      80           36 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
      81           36 :         let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
      82           36 :         trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
      83              : 
      84           36 :         Ok(WalIngest {
      85           36 :             shard: *timeline.get_shard_identity(),
      86           36 :             pg_version: timeline.pg_version,
      87           36 :             checkpoint,
      88           36 :             checkpoint_modified: false,
      89           36 :             warn_ingest_lag: WarnIngestLag {
      90           36 :                 lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
      91           36 :                 future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
      92           36 :                 timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
      93           36 :             },
      94           36 :         })
      95           36 :     }
      96              : 
      97              :     ///
      98              :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
      99              :     ///
     100              :     /// This function updates `lsn` field of `DatadirModification`
     101              :     ///
     102              :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
     103              :     /// relations/pages that the record affects.
     104              :     ///
     105              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
     106              :     ///
     107       437556 :     pub async fn ingest_record(
     108       437556 :         &mut self,
     109       437556 :         decoded: DecodedWALRecord,
     110       437556 :         lsn: Lsn,
     111       437556 :         modification: &mut DatadirModification<'_>,
     112       437556 :         ctx: &RequestContext,
     113       437556 :     ) -> anyhow::Result<bool> {
     114       437556 :         WAL_INGEST.records_received.inc();
     115       437556 :         let pg_version = modification.tline.pg_version;
     116       437556 :         let prev_len = modification.len();
     117       437556 : 
     118       437556 :         modification.set_lsn(lsn)?;
     119              : 
     120       437556 :         if decoded.is_dbase_create_copy(self.pg_version) {
     121              :             // Records of this type should always be preceded by a commit(), as they
     122              :             // rely on reading data pages back from the Timeline.
     123            0 :             assert!(!modification.has_dirty_data_pages());
     124       437556 :         }
     125              : 
     126       437556 :         let mut buf = decoded.record.clone();
     127       437556 :         buf.advance(decoded.main_data_offset);
     128       437556 : 
     129       437556 :         assert!(!self.checkpoint_modified);
     130       437556 :         if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
     131       437502 :             && self.checkpoint.update_next_xid(decoded.xl_xid)
     132            6 :         {
     133            6 :             self.checkpoint_modified = true;
     134       437550 :         }
     135              : 
     136       437556 :         failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
     137              : 
     138       437556 :         match decoded.xl_rmid {
     139              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
     140              :                 // Heap AM records need some special handling, because they modify VM pages
     141              :                 // without registering them with the standard mechanism.
     142       436422 :                 self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
     143            0 :                     .await?;
     144              :             }
     145              :             pg_constants::RM_NEON_ID => {
     146            0 :                 self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
     147            0 :                     .await?;
     148              :             }
     149              :             // Handle other special record types
     150              :             pg_constants::RM_SMGR_ID => {
     151           48 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     152           48 : 
     153           48 :                 if info == pg_constants::XLOG_SMGR_CREATE {
     154           48 :                     let create = XlSmgrCreate::decode(&mut buf);
     155           48 :                     self.ingest_xlog_smgr_create(modification, &create, ctx)
     156           36 :                         .await?;
     157            0 :                 } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     158            0 :                     let truncate = XlSmgrTruncate::decode(&mut buf);
     159            0 :                     self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
     160            0 :                         .await?;
     161            0 :                 }
     162              :             }
     163              :             pg_constants::RM_DBASE_ID => {
     164            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     165            0 :                 debug!(%info, %pg_version, "handle RM_DBASE_ID");
     166              : 
     167            0 :                 if pg_version == 14 {
     168            0 :                     if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     169            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     170            0 :                         debug!("XLOG_DBASE_CREATE v14");
     171              : 
     172            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     173            0 :                             .await?;
     174            0 :                     } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     175            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     176            0 :                         for tablespace_id in dropdb.tablespace_ids {
     177            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     178            0 :                             modification
     179            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     180            0 :                                 .await?;
     181              :                         }
     182            0 :                     }
     183            0 :                 } else if pg_version == 15 {
     184            0 :                     if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     185            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     186            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     187              :                         // The XLOG record was renamed between v14 and v15,
     188              :                         // but the record format is the same.
     189              :                         // So we can reuse XlCreateDatabase here.
     190            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     191            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     192            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     193            0 :                             .await?;
     194            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     195            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     196            0 :                         for tablespace_id in dropdb.tablespace_ids {
     197            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     198            0 :                             modification
     199            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     200            0 :                                 .await?;
     201              :                         }
     202            0 :                     }
     203            0 :                 } else if pg_version == 16 {
     204            0 :                     if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     205            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     206            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     207              :                         // The XLOG record was renamed between v14 and v15,
     208              :                         // but the record format is the same.
     209              :                         // So we can reuse XlCreateDatabase here.
     210            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     211            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     212            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     213            0 :                             .await?;
     214            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     215            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     216            0 :                         for tablespace_id in dropdb.tablespace_ids {
     217            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     218            0 :                             modification
     219            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     220            0 :                                 .await?;
     221              :                         }
     222            0 :                     }
     223            0 :                 }
     224              :             }
     225              :             pg_constants::RM_TBLSPC_ID => {
     226            0 :                 trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     227              :             }
     228              :             pg_constants::RM_CLOG_ID => {
     229            0 :                 let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     230            0 : 
     231            0 :                 if info == pg_constants::CLOG_ZEROPAGE {
     232            0 :                     let pageno = buf.get_u32_le();
     233            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     234            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     235            0 :                     self.put_slru_page_image(
     236            0 :                         modification,
     237            0 :                         SlruKind::Clog,
     238            0 :                         segno,
     239            0 :                         rpageno,
     240            0 :                         ZERO_PAGE.clone(),
     241            0 :                         ctx,
     242            0 :                     )
     243            0 :                     .await?;
     244              :                 } else {
     245            0 :                     assert!(info == pg_constants::CLOG_TRUNCATE);
     246            0 :                     let xlrec = XlClogTruncate::decode(&mut buf);
     247            0 :                     self.ingest_clog_truncate_record(modification, &xlrec, ctx)
     248            0 :                         .await?;
     249              :                 }
     250              :             }
     251              :             pg_constants::RM_XACT_ID => {
     252           72 :                 let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     253           72 : 
     254           72 :                 if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
     255           24 :                     let parsed_xact =
     256           24 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     257           24 :                     self.ingest_xact_record(
     258           24 :                         modification,
     259           24 :                         &parsed_xact,
     260           24 :                         info == pg_constants::XLOG_XACT_COMMIT,
     261           24 :                         decoded.origin_id,
     262           24 :                         ctx,
     263           24 :                     )
     264            0 :                     .await?;
     265           48 :                 } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
     266           48 :                     || info == pg_constants::XLOG_XACT_ABORT_PREPARED
     267              :                 {
     268            0 :                     let parsed_xact =
     269            0 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     270            0 :                     self.ingest_xact_record(
     271            0 :                         modification,
     272            0 :                         &parsed_xact,
     273            0 :                         info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
     274            0 :                         decoded.origin_id,
     275            0 :                         ctx,
     276            0 :                     )
     277            0 :                     .await?;
     278              :                     // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     279            0 :                     trace!(
     280            0 :                         "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     281              :                         decoded.xl_xid,
     282              :                         parsed_xact.xid,
     283              :                         lsn,
     284              :                     );
     285            0 :                     modification
     286            0 :                         .drop_twophase_file(parsed_xact.xid, ctx)
     287            0 :                         .await?;
     288           48 :                 } else if info == pg_constants::XLOG_XACT_PREPARE {
     289            0 :                     modification
     290            0 :                         .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
     291            0 :                         .await?;
     292           48 :                 }
     293              :             }
     294              :             pg_constants::RM_MULTIXACT_ID => {
     295            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     296            0 : 
     297            0 :                 if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
     298            0 :                     let pageno = buf.get_u32_le();
     299            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     300            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     301            0 :                     self.put_slru_page_image(
     302            0 :                         modification,
     303            0 :                         SlruKind::MultiXactOffsets,
     304            0 :                         segno,
     305            0 :                         rpageno,
     306            0 :                         ZERO_PAGE.clone(),
     307            0 :                         ctx,
     308            0 :                     )
     309            0 :                     .await?;
     310            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
     311            0 :                     let pageno = buf.get_u32_le();
     312            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     313            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     314            0 :                     self.put_slru_page_image(
     315            0 :                         modification,
     316            0 :                         SlruKind::MultiXactMembers,
     317            0 :                         segno,
     318            0 :                         rpageno,
     319            0 :                         ZERO_PAGE.clone(),
     320            0 :                         ctx,
     321            0 :                     )
     322            0 :                     .await?;
     323            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     324            0 :                     let xlrec = XlMultiXactCreate::decode(&mut buf);
     325            0 :                     self.ingest_multixact_create_record(modification, &xlrec)?;
     326            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     327            0 :                     let xlrec = XlMultiXactTruncate::decode(&mut buf);
     328            0 :                     self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
     329            0 :                         .await?;
     330            0 :                 }
     331              :             }
     332              :             pg_constants::RM_RELMAP_ID => {
     333            0 :                 let xlrec = XlRelmapUpdate::decode(&mut buf);
     334            0 :                 self.ingest_relmap_page(modification, &xlrec, &decoded, ctx)
     335            0 :                     .await?;
     336              :             }
     337              :             pg_constants::RM_XLOG_ID => {
     338           90 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     339           90 : 
     340           90 :                 if info == pg_constants::XLOG_NEXTOID {
     341            6 :                     let next_oid = buf.get_u32_le();
     342            6 :                     if self.checkpoint.nextOid != next_oid {
     343            6 :                         self.checkpoint.nextOid = next_oid;
     344            6 :                         self.checkpoint_modified = true;
     345            6 :                     }
     346           84 :                 } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
     347           84 :                     || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     348              :                 {
     349            6 :                     let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
     350            6 :                     buf.copy_to_slice(&mut checkpoint_bytes);
     351            6 :                     let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
     352            6 :                     trace!(
     353            0 :                         "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
     354              :                         xlog_checkpoint.oldestXid,
     355              :                         self.checkpoint.oldestXid
     356              :                     );
     357            6 :                     if (self
     358            6 :                         .checkpoint
     359            6 :                         .oldestXid
     360            6 :                         .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
     361            6 :                         < 0
     362            0 :                     {
     363            0 :                         self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
     364            6 :                     }
     365            6 :                     trace!(
     366            0 :                         "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
     367              :                         xlog_checkpoint.oldestActiveXid,
     368              :                         self.checkpoint.oldestActiveXid
     369              :                     );
     370              : 
     371              :                     // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
     372              :                     // because at shutdown, all in-progress transactions will implicitly
     373              :                     // end. Postgres startup code knows that, and allows hot standby to start
     374              :                     // immediately from a shutdown checkpoint.
     375              :                     //
     376              :                     // In Neon, Postgres hot standby startup always behaves as if starting from
     377              :                     // an online checkpoint. It needs a valid `oldestActiveXid` value, so
     378              :                     // instead of overwriting self.checkpoint.oldestActiveXid with
     379              :                     // InvalidTransactionid from the checkpoint WAL record, update it to a
     380              :                     // proper value, knowing that there are no in-progress transactions at this
     381              :                     // point, except for prepared transactions.
     382              :                     //
     383              :                     // See also the neon code changes in the InitWalRecovery() function.
     384            6 :                     if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
     385            6 :                         && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     386              :                     {
     387            6 :                         let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
     388            6 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
     389            0 :                             if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
     390            0 :                                 oldest_active_xid = xid;
     391            0 :                             }
     392              :                         }
     393            6 :                         self.checkpoint.oldestActiveXid = oldest_active_xid;
     394            0 :                     } else {
     395            0 :                         self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
     396            0 :                     }
     397              : 
     398              :                     // Write a new checkpoint key-value pair on every checkpoint record, even
     399              :                     // if nothing really changed. Not strictly required, but it seems nice to
     400              :                     // have some trace of the checkpoint records in the layer files at the same
     401              :                     // LSNs.
     402            6 :                     self.checkpoint_modified = true;
     403           78 :                 }
     404              :             }
     405              :             pg_constants::RM_LOGICALMSG_ID => {
     406            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     407            0 : 
     408            0 :                 if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     409            0 :                     let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
     410            0 :                     let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     411            0 :                     let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
     412            0 :                     if prefix == "neon-test" {
     413              :                         // This is a convenient way to make the WAL ingestion pause at
     414              :                         // particular point in the WAL. For more fine-grained control,
     415              :                         // we could peek into the message and only pause if it contains
     416              :                         // a particular string, for example, but this is enough for now.
     417            0 :                         failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
     418            0 :                     } else if let Some(path) = prefix.strip_prefix("neon-file:") {
     419            0 :                         modification.put_file(path, message, ctx).await?;
     420            0 :                     }
     421            0 :                 }
     422              :             }
     423              :             pg_constants::RM_STANDBY_ID => {
     424           48 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     425           48 :                 if info == pg_constants::XLOG_RUNNING_XACTS {
     426            0 :                     let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
     427            0 :                     self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
     428            0 :                     self.checkpoint_modified = true;
     429           48 :                 }
     430              :             }
     431              :             pg_constants::RM_REPLORIGIN_ID => {
     432            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     433            0 :                 if info == pg_constants::XLOG_REPLORIGIN_SET {
     434            0 :                     let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
     435            0 :                     modification
     436            0 :                         .set_replorigin(xlrec.node_id, xlrec.remote_lsn)
     437            0 :                         .await?
     438            0 :                 } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
     439            0 :                     let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
     440            0 :                     modification.drop_replorigin(xlrec.node_id).await?
     441            0 :                 }
     442              :             }
     443          876 :             _x => {
     444          876 :                 // TODO: should probably log & fail here instead of blindly
     445          876 :                 // doing something without understanding the protocol
     446          876 :             }
     447              :         }
     448              : 
     449              :         // Iterate through all the blocks that the record modifies, and
     450              :         // "put" a separate copy of the record for each block.
     451       437556 :         for blk in decoded.blocks.iter() {
     452       436926 :             let rel = RelTag {
     453       436926 :                 spcnode: blk.rnode_spcnode,
     454       436926 :                 dbnode: blk.rnode_dbnode,
     455       436926 :                 relnode: blk.rnode_relnode,
     456       436926 :                 forknum: blk.forknum,
     457       436926 :             };
     458       436926 : 
     459       436926 :             let key = rel_block_to_key(rel, blk.blkno);
     460       436926 :             let key_is_local = self.shard.is_key_local(&key);
     461       436926 : 
     462       436926 :             tracing::debug!(
     463              :                 lsn=%lsn,
     464              :                 key=%key,
     465            0 :                 "ingest: shard decision {} (checkpoint={})",
     466            0 :                 if !key_is_local { "drop" } else { "keep" },
     467              :                 self.checkpoint_modified
     468              :             );
     469              : 
     470       436926 :             if !key_is_local {
     471            0 :                 if self.shard.is_shard_zero() {
     472              :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     473              :                     // its blkno in case it implicitly extends a relation.
     474            0 :                     self.observe_decoded_block(modification, blk, ctx).await?;
     475            0 :                 }
     476              : 
     477            0 :                 continue;
     478       436926 :             }
     479       436926 :             self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx)
     480          852 :                 .await?;
     481              :         }
     482              : 
     483              :         // If checkpoint data was updated, store the new version in the repository
     484       437556 :         if self.checkpoint_modified {
     485           18 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     486              : 
     487           18 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     488           18 :             self.checkpoint_modified = false;
     489       437538 :         }
     490              : 
     491              :         // Note that at this point this record is only cached in the modification
     492              :         // until commit() is called to flush the data into the repository and update
     493              :         // the latest LSN.
     494              : 
     495       437556 :         modification.on_record_end();
     496       437556 : 
     497       437556 :         Ok(modification.len() > prev_len)
     498       437556 :     }
     499              : 
     500              :     /// Do not store this block, but observe it for the purposes of updating our relation size state.
     501            0 :     async fn observe_decoded_block(
     502            0 :         &mut self,
     503            0 :         modification: &mut DatadirModification<'_>,
     504            0 :         blk: &DecodedBkpBlock,
     505            0 :         ctx: &RequestContext,
     506            0 :     ) -> Result<(), PageReconstructError> {
     507            0 :         let rel = RelTag {
     508            0 :             spcnode: blk.rnode_spcnode,
     509            0 :             dbnode: blk.rnode_dbnode,
     510            0 :             relnode: blk.rnode_relnode,
     511            0 :             forknum: blk.forknum,
     512            0 :         };
     513            0 :         self.handle_rel_extend(modification, rel, blk.blkno, ctx)
     514            0 :             .await
     515            0 :     }
     516              : 
     517       436926 :     async fn ingest_decoded_block(
     518       436926 :         &mut self,
     519       436926 :         modification: &mut DatadirModification<'_>,
     520       436926 :         lsn: Lsn,
     521       436926 :         decoded: &DecodedWALRecord,
     522       436926 :         blk: &DecodedBkpBlock,
     523       436926 :         ctx: &RequestContext,
     524       436926 :     ) -> Result<(), PageReconstructError> {
     525       436926 :         let rel = RelTag {
     526       436926 :             spcnode: blk.rnode_spcnode,
     527       436926 :             dbnode: blk.rnode_dbnode,
     528       436926 :             relnode: blk.rnode_relnode,
     529       436926 :             forknum: blk.forknum,
     530       436926 :         };
     531       436926 : 
     532       436926 :         //
     533       436926 :         // Instead of storing full-page-image WAL record,
     534       436926 :         // it is better to store extracted image: we can skip wal-redo
     535       436926 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     536       436926 :         // so them have to be copied multiple times.
     537       436926 :         //
     538       436926 :         if blk.apply_image
     539          180 :             && blk.has_image
     540          180 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     541           72 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     542            0 :                 || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     543              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     544           72 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
     545              :             // do not materialize null pages because them most likely be soon replaced with real data
     546           72 :             && blk.bimg_len != 0
     547              :         {
     548              :             // Extract page image from FPI record
     549           72 :             let img_len = blk.bimg_len as usize;
     550           72 :             let img_offs = blk.bimg_offset as usize;
     551           72 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     552           72 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     553           72 : 
     554           72 :             if blk.hole_length != 0 {
     555            0 :                 let tail = image.split_off(blk.hole_offset as usize);
     556            0 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     557            0 :                 image.unsplit(tail);
     558           72 :             }
     559              :             //
     560              :             // Match the logic of XLogReadBufferForRedoExtended:
     561              :             // The page may be uninitialized. If so, we can't set the LSN because
     562              :             // that would corrupt the page.
     563              :             //
     564           72 :             if !page_is_new(&image) {
     565           54 :                 page_set_lsn(&mut image, lsn)
     566           18 :             }
     567           72 :             assert_eq!(image.len(), BLCKSZ as usize);
     568              : 
     569           72 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     570           15 :                 .await?;
     571              :         } else {
     572       436854 :             let rec = NeonWalRecord::Postgres {
     573       436854 :                 will_init: blk.will_init || blk.apply_image,
     574       436854 :                 rec: decoded.record.clone(),
     575       436854 :             };
     576       436854 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     577          837 :                 .await?;
     578              :         }
     579       436926 :         Ok(())
     580       436926 :     }
     581              : 
     582       436422 :     async fn ingest_heapam_record(
     583       436422 :         &mut self,
     584       436422 :         buf: &mut Bytes,
     585       436422 :         modification: &mut DatadirModification<'_>,
     586       436422 :         decoded: &DecodedWALRecord,
     587       436422 :         ctx: &RequestContext,
     588       436422 :     ) -> anyhow::Result<()> {
     589       436422 :         // Handle VM bit updates that are implicitly part of heap records.
     590       436422 : 
     591       436422 :         // First, look at the record to determine which VM bits need
     592       436422 :         // to be cleared. If either of these variables is set, we
     593       436422 :         // need to clear the corresponding bits in the visibility map.
     594       436422 :         let mut new_heap_blkno: Option<u32> = None;
     595       436422 :         let mut old_heap_blkno: Option<u32> = None;
     596       436422 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     597       436422 : 
     598       436422 :         match modification.tline.pg_version {
     599              :             14 => {
     600            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     601            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     602            0 : 
     603            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     604            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     605            0 :                         assert_eq!(0, buf.remaining());
     606            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     607            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     608            0 :                         }
     609            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     610            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     611            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     612            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     613            0 :                         }
     614            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     615            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     616              :                     {
     617            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     618            0 :                         // the size of tuple data is inferred from the size of the record.
     619            0 :                         // we can't validate the remaining number of bytes without parsing
     620            0 :                         // the tuple data.
     621            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     622            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     623            0 :                         }
     624            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     625            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     626            0 :                             // non-HOT update where the new tuple goes to different page than
     627            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     628            0 :                             // set.
     629            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     630            0 :                         }
     631            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     632            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     633            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     634            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     635            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     636            0 :                         }
     637            0 :                     }
     638            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     639            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     640            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     641            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     642              : 
     643            0 :                         let offset_array_len =
     644            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     645              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     646            0 :                                 0
     647              :                             } else {
     648            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     649              :                             };
     650            0 :                         assert_eq!(offset_array_len, buf.remaining());
     651              : 
     652            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     653            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     654            0 :                         }
     655            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     656            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     657            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     658            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     659            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     660            0 :                         }
     661            0 :                     }
     662              :                 } else {
     663            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     664              :                 }
     665              :             }
     666              :             15 => {
     667       436422 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     668       435858 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     669       435858 : 
     670       435858 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     671       435828 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     672       435828 :                         assert_eq!(0, buf.remaining());
     673       435828 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     674           12 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     675       435816 :                         }
     676           30 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     677            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     678            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     679            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     680            0 :                         }
     681           30 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     682            6 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     683              :                     {
     684           24 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     685           24 :                         // the size of tuple data is inferred from the size of the record.
     686           24 :                         // we can't validate the remaining number of bytes without parsing
     687           24 :                         // the tuple data.
     688           24 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     689            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     690           24 :                         }
     691           24 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     692            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     693            0 :                             // non-HOT update where the new tuple goes to different page than
     694            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     695            0 :                             // set.
     696            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     697           24 :                         }
     698            6 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     699            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     700            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     701            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     702            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     703            0 :                         }
     704            6 :                     }
     705          564 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     706          564 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     707          564 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     708          126 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     709              : 
     710          126 :                         let offset_array_len =
     711          126 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     712              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     713            6 :                                 0
     714              :                             } else {
     715          120 :                                 size_of::<u16>() * xlrec.ntuples as usize
     716              :                             };
     717          126 :                         assert_eq!(offset_array_len, buf.remaining());
     718              : 
     719          126 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     720           24 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     721          102 :                         }
     722          438 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     723            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     724            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     725            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     726            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     727            0 :                         }
     728          438 :                     }
     729              :                 } else {
     730            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     731              :                 }
     732              :             }
     733              :             16 => {
     734            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     735            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     736            0 : 
     737            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     738            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     739            0 :                         assert_eq!(0, buf.remaining());
     740            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     741            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     742            0 :                         }
     743            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     744            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     745            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     746            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     747            0 :                         }
     748            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     749            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     750              :                     {
     751            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     752            0 :                         // the size of tuple data is inferred from the size of the record.
     753            0 :                         // we can't validate the remaining number of bytes without parsing
     754            0 :                         // the tuple data.
     755            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     756            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     757            0 :                         }
     758            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     759            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     760            0 :                             // non-HOT update where the new tuple goes to different page than
     761            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     762            0 :                             // set.
     763            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     764            0 :                         }
     765            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     766            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     767            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     768            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     769            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     770            0 :                         }
     771            0 :                     }
     772            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     773            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     774            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     775            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     776              : 
     777            0 :                         let offset_array_len =
     778            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     779              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     780            0 :                                 0
     781              :                             } else {
     782            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     783              :                             };
     784            0 :                         assert_eq!(offset_array_len, buf.remaining());
     785              : 
     786            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     787            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     788            0 :                         }
     789            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     790            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     791            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     792            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     793            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     794            0 :                         }
     795            0 :                     }
     796              :                 } else {
     797            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     798              :                 }
     799              :             }
     800            0 :             _ => {}
     801              :         }
     802              : 
     803              :         // Clear the VM bits if required.
     804       436422 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     805           36 :             let vm_rel = RelTag {
     806           36 :                 forknum: VISIBILITYMAP_FORKNUM,
     807           36 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     808           36 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     809           36 :                 relnode: decoded.blocks[0].rnode_relnode,
     810           36 :             };
     811           36 : 
     812           36 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     813           36 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     814              : 
     815              :             // Sometimes, Postgres seems to create heap WAL records with the
     816              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     817              :             // not set. In fact, it's possible that the VM page does not exist at all.
     818              :             // In that case, we don't want to store a record to clear the VM bit;
     819              :             // replaying it would fail to find the previous image of the page, because
     820              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     821              :             // record if it doesn't.
     822           36 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     823           36 :             if let Some(blknum) = new_vm_blk {
     824           36 :                 if blknum >= vm_size {
     825            0 :                     new_vm_blk = None;
     826           36 :                 }
     827            0 :             }
     828           36 :             if let Some(blknum) = old_vm_blk {
     829            0 :                 if blknum >= vm_size {
     830            0 :                     old_vm_blk = None;
     831            0 :                 }
     832           36 :             }
     833              : 
     834           36 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     835           36 :                 if new_vm_blk == old_vm_blk {
     836              :                     // An UPDATE record that needs to clear the bits for both old and the
     837              :                     // new page, both of which reside on the same VM page.
     838            0 :                     self.put_rel_wal_record(
     839            0 :                         modification,
     840            0 :                         vm_rel,
     841            0 :                         new_vm_blk.unwrap(),
     842            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     843            0 :                             new_heap_blkno,
     844            0 :                             old_heap_blkno,
     845            0 :                             flags,
     846            0 :                         },
     847            0 :                         ctx,
     848            0 :                     )
     849            0 :                     .await?;
     850              :                 } else {
     851              :                     // Clear VM bits for one heap page, or for two pages that reside on
     852              :                     // different VM pages.
     853           36 :                     if let Some(new_vm_blk) = new_vm_blk {
     854           36 :                         self.put_rel_wal_record(
     855           36 :                             modification,
     856           36 :                             vm_rel,
     857           36 :                             new_vm_blk,
     858           36 :                             NeonWalRecord::ClearVisibilityMapFlags {
     859           36 :                                 new_heap_blkno,
     860           36 :                                 old_heap_blkno: None,
     861           36 :                                 flags,
     862           36 :                             },
     863           36 :                             ctx,
     864           36 :                         )
     865            0 :                         .await?;
     866            0 :                     }
     867           36 :                     if let Some(old_vm_blk) = old_vm_blk {
     868            0 :                         self.put_rel_wal_record(
     869            0 :                             modification,
     870            0 :                             vm_rel,
     871            0 :                             old_vm_blk,
     872            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     873            0 :                                 new_heap_blkno: None,
     874            0 :                                 old_heap_blkno,
     875            0 :                                 flags,
     876            0 :                             },
     877            0 :                             ctx,
     878            0 :                         )
     879            0 :                         .await?;
     880           36 :                     }
     881              :                 }
     882            0 :             }
     883       436386 :         }
     884              : 
     885       436422 :         Ok(())
     886       436422 :     }
     887              : 
     888            0 :     async fn ingest_neonrmgr_record(
     889            0 :         &mut self,
     890            0 :         buf: &mut Bytes,
     891            0 :         modification: &mut DatadirModification<'_>,
     892            0 :         decoded: &DecodedWALRecord,
     893            0 :         ctx: &RequestContext,
     894            0 :     ) -> anyhow::Result<()> {
     895            0 :         // Handle VM bit updates that are implicitly part of heap records.
     896            0 : 
     897            0 :         // First, look at the record to determine which VM bits need
     898            0 :         // to be cleared. If either of these variables is set, we
     899            0 :         // need to clear the corresponding bits in the visibility map.
     900            0 :         let mut new_heap_blkno: Option<u32> = None;
     901            0 :         let mut old_heap_blkno: Option<u32> = None;
     902            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     903            0 :         let pg_version = modification.tline.pg_version;
     904            0 : 
     905            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     906              : 
     907            0 :         match pg_version {
     908              :             16 => {
     909            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     910            0 : 
     911            0 :                 match info {
     912              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     913            0 :                         let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
     914            0 :                         assert_eq!(0, buf.remaining());
     915            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     916            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     917            0 :                         }
     918              :                     }
     919              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     920            0 :                         let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
     921            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     922            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     923            0 :                         }
     924              :                     }
     925              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     926              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     927            0 :                         let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
     928            0 :                         // the size of tuple data is inferred from the size of the record.
     929            0 :                         // we can't validate the remaining number of bytes without parsing
     930            0 :                         // the tuple data.
     931            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     932            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     933            0 :                         }
     934            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     935            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     936            0 :                             // non-HOT update where the new tuple goes to different page than
     937            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     938            0 :                             // set.
     939            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     940            0 :                         }
     941              :                     }
     942              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     943            0 :                         let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     944              : 
     945            0 :                         let offset_array_len =
     946            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     947              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     948            0 :                                 0
     949              :                             } else {
     950            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     951              :                             };
     952            0 :                         assert_eq!(offset_array_len, buf.remaining());
     953              : 
     954            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     955            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     956            0 :                         }
     957              :                     }
     958              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     959            0 :                         let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
     960            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     961            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     962            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     963            0 :                         }
     964              :                     }
     965            0 :                     info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
     966              :                 }
     967              :             }
     968            0 :             _ => bail!(
     969            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     970            0 :                 pg_version
     971            0 :             ),
     972              :         }
     973              : 
     974              :         // Clear the VM bits if required.
     975            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     976            0 :             let vm_rel = RelTag {
     977            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     978            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     979            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     980            0 :                 relnode: decoded.blocks[0].rnode_relnode,
     981            0 :             };
     982            0 : 
     983            0 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     984            0 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     985              : 
     986              :             // Sometimes, Postgres seems to create heap WAL records with the
     987              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     988              :             // not set. In fact, it's possible that the VM page does not exist at all.
     989              :             // In that case, we don't want to store a record to clear the VM bit;
     990              :             // replaying it would fail to find the previous image of the page, because
     991              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     992              :             // record if it doesn't.
     993            0 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     994            0 :             if let Some(blknum) = new_vm_blk {
     995            0 :                 if blknum >= vm_size {
     996            0 :                     new_vm_blk = None;
     997            0 :                 }
     998            0 :             }
     999            0 :             if let Some(blknum) = old_vm_blk {
    1000            0 :                 if blknum >= vm_size {
    1001            0 :                     old_vm_blk = None;
    1002            0 :                 }
    1003            0 :             }
    1004              : 
    1005            0 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
    1006            0 :                 if new_vm_blk == old_vm_blk {
    1007              :                     // An UPDATE record that needs to clear the bits for both old and the
    1008              :                     // new page, both of which reside on the same VM page.
    1009            0 :                     self.put_rel_wal_record(
    1010            0 :                         modification,
    1011            0 :                         vm_rel,
    1012            0 :                         new_vm_blk.unwrap(),
    1013            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
    1014            0 :                             new_heap_blkno,
    1015            0 :                             old_heap_blkno,
    1016            0 :                             flags,
    1017            0 :                         },
    1018            0 :                         ctx,
    1019            0 :                     )
    1020            0 :                     .await?;
    1021              :                 } else {
    1022              :                     // Clear VM bits for one heap page, or for two pages that reside on
    1023              :                     // different VM pages.
    1024            0 :                     if let Some(new_vm_blk) = new_vm_blk {
    1025            0 :                         self.put_rel_wal_record(
    1026            0 :                             modification,
    1027            0 :                             vm_rel,
    1028            0 :                             new_vm_blk,
    1029            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1030            0 :                                 new_heap_blkno,
    1031            0 :                                 old_heap_blkno: None,
    1032            0 :                                 flags,
    1033            0 :                             },
    1034            0 :                             ctx,
    1035            0 :                         )
    1036            0 :                         .await?;
    1037            0 :                     }
    1038            0 :                     if let Some(old_vm_blk) = old_vm_blk {
    1039            0 :                         self.put_rel_wal_record(
    1040            0 :                             modification,
    1041            0 :                             vm_rel,
    1042            0 :                             old_vm_blk,
    1043            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1044            0 :                                 new_heap_blkno: None,
    1045            0 :                                 old_heap_blkno,
    1046            0 :                                 flags,
    1047            0 :                             },
    1048            0 :                             ctx,
    1049            0 :                         )
    1050            0 :                         .await?;
    1051            0 :                     }
    1052              :                 }
    1053            0 :             }
    1054            0 :         }
    1055              : 
    1056            0 :         Ok(())
    1057            0 :     }
    1058              : 
    1059              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
    1060            0 :     async fn ingest_xlog_dbase_create(
    1061            0 :         &mut self,
    1062            0 :         modification: &mut DatadirModification<'_>,
    1063            0 :         rec: &XlCreateDatabase,
    1064            0 :         ctx: &RequestContext,
    1065            0 :     ) -> anyhow::Result<()> {
    1066            0 :         let db_id = rec.db_id;
    1067            0 :         let tablespace_id = rec.tablespace_id;
    1068            0 :         let src_db_id = rec.src_db_id;
    1069            0 :         let src_tablespace_id = rec.src_tablespace_id;
    1070              : 
    1071            0 :         let rels = modification
    1072            0 :             .tline
    1073            0 :             .list_rels(
    1074            0 :                 src_tablespace_id,
    1075            0 :                 src_db_id,
    1076            0 :                 Version::Modified(modification),
    1077            0 :                 ctx,
    1078            0 :             )
    1079            0 :             .await?;
    1080              : 
    1081            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
    1082              : 
    1083              :         // Copy relfilemap
    1084            0 :         let filemap = modification
    1085            0 :             .tline
    1086            0 :             .get_relmap_file(
    1087            0 :                 src_tablespace_id,
    1088            0 :                 src_db_id,
    1089            0 :                 Version::Modified(modification),
    1090            0 :                 ctx,
    1091            0 :             )
    1092            0 :             .await?;
    1093            0 :         modification
    1094            0 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
    1095            0 :             .await?;
    1096              : 
    1097            0 :         let mut num_rels_copied = 0;
    1098            0 :         let mut num_blocks_copied = 0;
    1099            0 :         for src_rel in rels {
    1100            0 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
    1101            0 :             assert_eq!(src_rel.dbnode, src_db_id);
    1102              : 
    1103            0 :             let nblocks = modification
    1104            0 :                 .tline
    1105            0 :                 .get_rel_size(src_rel, Version::Modified(modification), ctx)
    1106            0 :                 .await?;
    1107            0 :             let dst_rel = RelTag {
    1108            0 :                 spcnode: tablespace_id,
    1109            0 :                 dbnode: db_id,
    1110            0 :                 relnode: src_rel.relnode,
    1111            0 :                 forknum: src_rel.forknum,
    1112            0 :             };
    1113            0 : 
    1114            0 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
    1115              : 
    1116              :             // Copy content
    1117            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
    1118            0 :             for blknum in 0..nblocks {
    1119              :                 // Sharding:
    1120              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
    1121              :                 //    dbNode is not included in the hash inputs for sharding.
    1122              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
    1123              :                 //    that belong to it.
    1124            0 :                 let src_key = rel_block_to_key(src_rel, blknum);
    1125            0 :                 if !self.shard.is_key_local(&src_key) {
    1126            0 :                     debug!(
    1127            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
    1128              :                         src_key
    1129              :                     );
    1130            0 :                     continue;
    1131            0 :                 }
    1132            0 :                 debug!(
    1133            0 :                     "copying block {} from {} ({}) to {}",
    1134              :                     blknum, src_rel, src_key, dst_rel
    1135              :                 );
    1136              : 
    1137            0 :                 let content = modification
    1138            0 :                     .tline
    1139            0 :                     .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
    1140            0 :                     .await?;
    1141            0 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
    1142            0 :                 num_blocks_copied += 1;
    1143              :             }
    1144              : 
    1145            0 :             num_rels_copied += 1;
    1146              :         }
    1147              : 
    1148            0 :         info!(
    1149            0 :             "Created database {}/{}, copied {} blocks in {} rels",
    1150              :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
    1151              :         );
    1152            0 :         Ok(())
    1153            0 :     }
    1154              : 
    1155           48 :     async fn ingest_xlog_smgr_create(
    1156           48 :         &mut self,
    1157           48 :         modification: &mut DatadirModification<'_>,
    1158           48 :         rec: &XlSmgrCreate,
    1159           48 :         ctx: &RequestContext,
    1160           48 :     ) -> anyhow::Result<()> {
    1161           48 :         let rel = RelTag {
    1162           48 :             spcnode: rec.rnode.spcnode,
    1163           48 :             dbnode: rec.rnode.dbnode,
    1164           48 :             relnode: rec.rnode.relnode,
    1165           48 :             forknum: rec.forknum,
    1166           48 :         };
    1167           48 :         self.put_rel_creation(modification, rel, ctx).await?;
    1168           48 :         Ok(())
    1169           48 :     }
    1170              : 
    1171              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
    1172              :     ///
    1173              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
    1174            0 :     async fn ingest_xlog_smgr_truncate(
    1175            0 :         &mut self,
    1176            0 :         modification: &mut DatadirModification<'_>,
    1177            0 :         rec: &XlSmgrTruncate,
    1178            0 :         ctx: &RequestContext,
    1179            0 :     ) -> anyhow::Result<()> {
    1180            0 :         let spcnode = rec.rnode.spcnode;
    1181            0 :         let dbnode = rec.rnode.dbnode;
    1182            0 :         let relnode = rec.rnode.relnode;
    1183            0 : 
    1184            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
    1185            0 :             let rel = RelTag {
    1186            0 :                 spcnode,
    1187            0 :                 dbnode,
    1188            0 :                 relnode,
    1189            0 :                 forknum: MAIN_FORKNUM,
    1190            0 :             };
    1191            0 :             self.put_rel_truncation(modification, rel, rec.blkno, ctx)
    1192            0 :                 .await?;
    1193            0 :         }
    1194            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
    1195            0 :             let rel = RelTag {
    1196            0 :                 spcnode,
    1197            0 :                 dbnode,
    1198            0 :                 relnode,
    1199            0 :                 forknum: FSM_FORKNUM,
    1200            0 :             };
    1201            0 : 
    1202            0 :             let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
    1203            0 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
    1204            0 :             if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
    1205            0 :                 // Tail of last remaining FSM page has to be zeroed.
    1206            0 :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
    1207            0 :                 modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
    1208            0 :                 fsm_physical_page_no += 1;
    1209            0 :             }
    1210            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1211            0 :             if nblocks > fsm_physical_page_no {
    1212              :                 // check if something to do: FSM is larger than truncate position
    1213            0 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
    1214            0 :                     .await?;
    1215            0 :             }
    1216            0 :         }
    1217            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
    1218            0 :             let rel = RelTag {
    1219            0 :                 spcnode,
    1220            0 :                 dbnode,
    1221            0 :                 relnode,
    1222            0 :                 forknum: VISIBILITYMAP_FORKNUM,
    1223            0 :             };
    1224            0 : 
    1225            0 :             let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
    1226            0 :             if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
    1227            0 :                 // Tail of last remaining vm page has to be zeroed.
    1228            0 :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
    1229            0 :                 modification.put_rel_page_image_zero(rel, vm_page_no);
    1230            0 :                 vm_page_no += 1;
    1231            0 :             }
    1232            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1233            0 :             if nblocks > vm_page_no {
    1234              :                 // check if something to do: VM is larger than truncate position
    1235            0 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
    1236            0 :                     .await?;
    1237            0 :             }
    1238            0 :         }
    1239            0 :         Ok(())
    1240            0 :     }
    1241              : 
    1242           24 :     fn warn_on_ingest_lag(
    1243           24 :         &mut self,
    1244           24 :         conf: &crate::config::PageServerConf,
    1245           24 :         wal_timestmap: TimestampTz,
    1246           24 :     ) {
    1247           24 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1248           24 :         let now = SystemTime::now();
    1249           24 :         let rate_limits = &mut self.warn_ingest_lag;
    1250           24 :         match try_from_pg_timestamp(wal_timestmap) {
    1251           24 :             Ok(ts) => {
    1252           24 :                 match now.duration_since(ts) {
    1253           24 :                     Ok(lag) => {
    1254           24 :                         if lag > conf.wait_lsn_timeout {
    1255           24 :                             rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
    1256            6 :                                 let lag = humantime::format_duration(lag);
    1257            6 :                                 warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
    1258           24 :                             })
    1259            0 :                         }
    1260              :                     },
    1261            0 :                     Err(e) => {
    1262            0 :                         let delta_t = e.duration();
    1263              :                         // determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
    1264              :                         // => https://www.robustperception.io/time-metric-from-the-node-exporter/
    1265              :                         const IGNORED_DRIFT: Duration = Duration::from_millis(100);
    1266            0 :                         if delta_t > IGNORED_DRIFT {
    1267            0 :                             let delta_t = humantime::format_duration(delta_t);
    1268            0 :                             rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
    1269            0 :                                 warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
    1270            0 :                             })
    1271            0 :                         }
    1272              :                     }
    1273              :                 };
    1274              : 
    1275              :             }
    1276            0 :             Err(error) => {
    1277            0 :                 rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
    1278            0 :                     warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
    1279            0 :                 })
    1280              :             }
    1281              :         }
    1282           24 :     }
    1283              : 
    1284              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
    1285              :     ///
    1286           24 :     async fn ingest_xact_record(
    1287           24 :         &mut self,
    1288           24 :         modification: &mut DatadirModification<'_>,
    1289           24 :         parsed: &XlXactParsedRecord,
    1290           24 :         is_commit: bool,
    1291           24 :         origin_id: u16,
    1292           24 :         ctx: &RequestContext,
    1293           24 :     ) -> anyhow::Result<()> {
    1294           24 :         // Record update of CLOG pages
    1295           24 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
    1296           24 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1297           24 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1298           24 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
    1299           24 : 
    1300           24 :         self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
    1301              : 
    1302           24 :         for subxact in &parsed.subxacts {
    1303            0 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
    1304            0 :             if subxact_pageno != pageno {
    1305              :                 // This subxact goes to different page. Write the record
    1306              :                 // for all the XIDs on the previous page, and continue
    1307              :                 // accumulating XIDs on this new page.
    1308            0 :                 modification.put_slru_wal_record(
    1309            0 :                     SlruKind::Clog,
    1310            0 :                     segno,
    1311            0 :                     rpageno,
    1312            0 :                     if is_commit {
    1313            0 :                         NeonWalRecord::ClogSetCommitted {
    1314            0 :                             xids: page_xids,
    1315            0 :                             timestamp: parsed.xact_time,
    1316            0 :                         }
    1317              :                     } else {
    1318            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
    1319              :                     },
    1320            0 :                 )?;
    1321            0 :                 page_xids = Vec::new();
    1322            0 :             }
    1323            0 :             pageno = subxact_pageno;
    1324            0 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1325            0 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1326            0 :             page_xids.push(*subxact);
    1327              :         }
    1328           24 :         modification.put_slru_wal_record(
    1329           24 :             SlruKind::Clog,
    1330           24 :             segno,
    1331           24 :             rpageno,
    1332           24 :             if is_commit {
    1333           24 :                 NeonWalRecord::ClogSetCommitted {
    1334           24 :                     xids: page_xids,
    1335           24 :                     timestamp: parsed.xact_time,
    1336           24 :                 }
    1337              :             } else {
    1338            0 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
    1339              :             },
    1340            0 :         )?;
    1341              : 
    1342           24 :         for xnode in &parsed.xnodes {
    1343            0 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
    1344            0 :                 let rel = RelTag {
    1345            0 :                     forknum,
    1346            0 :                     spcnode: xnode.spcnode,
    1347            0 :                     dbnode: xnode.dbnode,
    1348            0 :                     relnode: xnode.relnode,
    1349            0 :                 };
    1350            0 :                 if modification
    1351            0 :                     .tline
    1352            0 :                     .get_rel_exists(rel, Version::Modified(modification), ctx)
    1353            0 :                     .await?
    1354              :                 {
    1355            0 :                     self.put_rel_drop(modification, rel, ctx).await?;
    1356            0 :                 }
    1357              :             }
    1358              :         }
    1359           24 :         if origin_id != 0 {
    1360            0 :             modification
    1361            0 :                 .set_replorigin(origin_id, parsed.origin_lsn)
    1362            0 :                 .await?;
    1363           24 :         }
    1364           24 :         Ok(())
    1365           24 :     }
    1366              : 
    1367            0 :     async fn ingest_clog_truncate_record(
    1368            0 :         &mut self,
    1369            0 :         modification: &mut DatadirModification<'_>,
    1370            0 :         xlrec: &XlClogTruncate,
    1371            0 :         ctx: &RequestContext,
    1372            0 :     ) -> anyhow::Result<()> {
    1373            0 :         info!(
    1374            0 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
    1375              :             xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
    1376              :         );
    1377              : 
    1378              :         // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
    1379              :         // truncated, but a checkpoint record with the updated values isn't written until
    1380              :         // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
    1381              :         // so we keep the oldestXid and oldestXidDB up-to-date.
    1382            0 :         self.checkpoint.oldestXid = xlrec.oldest_xid;
    1383            0 :         self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
    1384            0 :         self.checkpoint_modified = true;
    1385            0 : 
    1386            0 :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
    1387            0 : 
    1388            0 :         let latest_page_number =
    1389            0 :             self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
    1390            0 : 
    1391            0 :         // Now delete all segments containing pages between xlrec.pageno
    1392            0 :         // and latest_page_number.
    1393            0 : 
    1394            0 :         // First, make an important safety check:
    1395            0 :         // the current endpoint page must not be eligible for removal.
    1396            0 :         // See SimpleLruTruncate() in slru.c
    1397            0 :         if clogpage_precedes(latest_page_number, xlrec.pageno) {
    1398            0 :             info!("could not truncate directory pg_xact apparent wraparound");
    1399            0 :             return Ok(());
    1400            0 :         }
    1401              : 
    1402              :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
    1403              :         //
    1404              :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
    1405              :         // will block waiting for the last valid LSN to advance up to
    1406              :         // it. So we use the previous record's LSN in the get calls
    1407              :         // instead.
    1408            0 :         for segno in modification
    1409            0 :             .tline
    1410            0 :             .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
    1411            0 :             .await?
    1412              :         {
    1413            0 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
    1414            0 :             if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
    1415            0 :                 modification
    1416            0 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
    1417            0 :                     .await?;
    1418            0 :                 trace!("Drop CLOG segment {:>04X}", segno);
    1419            0 :             }
    1420              :         }
    1421              : 
    1422            0 :         Ok(())
    1423            0 :     }
    1424              : 
    1425            0 :     fn ingest_multixact_create_record(
    1426            0 :         &mut self,
    1427            0 :         modification: &mut DatadirModification,
    1428            0 :         xlrec: &XlMultiXactCreate,
    1429            0 :     ) -> Result<()> {
    1430            0 :         // Create WAL record for updating the multixact-offsets page
    1431            0 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1432            0 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1433            0 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1434            0 : 
    1435            0 :         modification.put_slru_wal_record(
    1436            0 :             SlruKind::MultiXactOffsets,
    1437            0 :             segno,
    1438            0 :             rpageno,
    1439            0 :             NeonWalRecord::MultixactOffsetCreate {
    1440            0 :                 mid: xlrec.mid,
    1441            0 :                 moff: xlrec.moff,
    1442            0 :             },
    1443            0 :         )?;
    1444              : 
    1445              :         // Create WAL records for the update of each affected multixact-members page
    1446            0 :         let mut members = xlrec.members.iter();
    1447            0 :         let mut offset = xlrec.moff;
    1448              :         loop {
    1449            0 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1450            0 : 
    1451            0 :             // How many members fit on this page?
    1452            0 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1453            0 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1454            0 : 
    1455            0 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1456            0 :             for _ in 0..page_remain {
    1457            0 :                 if let Some(m) = members.next() {
    1458            0 :                     this_page_members.push(m.clone());
    1459            0 :                 } else {
    1460            0 :                     break;
    1461              :                 }
    1462              :             }
    1463            0 :             if this_page_members.is_empty() {
    1464              :                 // all done
    1465            0 :                 break;
    1466            0 :             }
    1467            0 :             let n_this_page = this_page_members.len();
    1468            0 : 
    1469            0 :             modification.put_slru_wal_record(
    1470            0 :                 SlruKind::MultiXactMembers,
    1471            0 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1472            0 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1473            0 :                 NeonWalRecord::MultixactMembersCreate {
    1474            0 :                     moff: offset,
    1475            0 :                     members: this_page_members,
    1476            0 :                 },
    1477            0 :             )?;
    1478              : 
    1479              :             // Note: The multixact members can wrap around, even within one WAL record.
    1480            0 :             offset = offset.wrapping_add(n_this_page as u32);
    1481              :         }
    1482            0 :         let next_offset = offset;
    1483            0 :         assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
    1484              : 
    1485              :         // Update next-multi-xid and next-offset
    1486              :         //
    1487              :         // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
    1488              :         // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
    1489              :         // read it, like GetNewMultiXactId(). This is different from how nextXid is
    1490              :         // incremented! nextXid skips over < FirstNormalTransactionId when the the value
    1491              :         // is stored, so it's never 0 in a checkpoint.
    1492              :         //
    1493              :         // I don't know why it's done that way, it seems less error-prone to skip over 0
    1494              :         // when the value is stored rather than when it's read. But let's do it the same
    1495              :         // way here.
    1496            0 :         let next_multi_xid = xlrec.mid.wrapping_add(1);
    1497            0 : 
    1498            0 :         if self
    1499            0 :             .checkpoint
    1500            0 :             .update_next_multixid(next_multi_xid, next_offset)
    1501            0 :         {
    1502            0 :             self.checkpoint_modified = true;
    1503            0 :         }
    1504              : 
    1505              :         // Also update the next-xid with the highest member. According to the comments in
    1506              :         // multixact_redo(), this shouldn't be necessary, but let's do the same here.
    1507            0 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
    1508            0 :             if let Some(max_xid) = acc {
    1509            0 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
    1510            0 :                     Some(mbr.xid)
    1511              :                 } else {
    1512            0 :                     acc
    1513              :                 }
    1514              :             } else {
    1515            0 :                 Some(mbr.xid)
    1516              :             }
    1517            0 :         });
    1518              : 
    1519            0 :         if let Some(max_xid) = max_mbr_xid {
    1520            0 :             if self.checkpoint.update_next_xid(max_xid) {
    1521            0 :                 self.checkpoint_modified = true;
    1522            0 :             }
    1523            0 :         }
    1524            0 :         Ok(())
    1525            0 :     }
    1526              : 
    1527            0 :     async fn ingest_multixact_truncate_record(
    1528            0 :         &mut self,
    1529            0 :         modification: &mut DatadirModification<'_>,
    1530            0 :         xlrec: &XlMultiXactTruncate,
    1531            0 :         ctx: &RequestContext,
    1532            0 :     ) -> Result<()> {
    1533            0 :         self.checkpoint.oldestMulti = xlrec.end_trunc_off;
    1534            0 :         self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
    1535            0 :         self.checkpoint_modified = true;
    1536            0 : 
    1537            0 :         // PerformMembersTruncation
    1538            0 :         let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
    1539            0 :         let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1540            0 :         let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1541            0 :         let mut segment: i32 = startsegment;
    1542              : 
    1543              :         // Delete all the segments except the last one. The last segment can still
    1544              :         // contain, possibly partially, valid data.
    1545            0 :         while segment != endsegment {
    1546            0 :             modification
    1547            0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1548            0 :                 .await?;
    1549              : 
    1550              :             /* move to next segment, handling wraparound correctly */
    1551            0 :             if segment == maxsegment {
    1552            0 :                 segment = 0;
    1553            0 :             } else {
    1554            0 :                 segment += 1;
    1555            0 :             }
    1556              :         }
    1557              : 
    1558              :         // Truncate offsets
    1559              :         // FIXME: this did not handle wraparound correctly
    1560              : 
    1561            0 :         Ok(())
    1562            0 :     }
    1563              : 
    1564            0 :     async fn ingest_relmap_page(
    1565            0 :         &mut self,
    1566            0 :         modification: &mut DatadirModification<'_>,
    1567            0 :         xlrec: &XlRelmapUpdate,
    1568            0 :         decoded: &DecodedWALRecord,
    1569            0 :         ctx: &RequestContext,
    1570            0 :     ) -> Result<()> {
    1571            0 :         let mut buf = decoded.record.clone();
    1572            0 :         buf.advance(decoded.main_data_offset);
    1573            0 :         // skip xl_relmap_update
    1574            0 :         buf.advance(12);
    1575            0 : 
    1576            0 :         modification
    1577            0 :             .put_relmap_file(
    1578            0 :                 xlrec.tsid,
    1579            0 :                 xlrec.dbid,
    1580            0 :                 Bytes::copy_from_slice(&buf[..]),
    1581            0 :                 ctx,
    1582            0 :             )
    1583            0 :             .await
    1584            0 :     }
    1585              : 
    1586           54 :     async fn put_rel_creation(
    1587           54 :         &mut self,
    1588           54 :         modification: &mut DatadirModification<'_>,
    1589           54 :         rel: RelTag,
    1590           54 :         ctx: &RequestContext,
    1591           54 :     ) -> Result<()> {
    1592           54 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1593           54 :         Ok(())
    1594           54 :     }
    1595              : 
    1596       817278 :     async fn put_rel_page_image(
    1597       817278 :         &mut self,
    1598       817278 :         modification: &mut DatadirModification<'_>,
    1599       817278 :         rel: RelTag,
    1600       817278 :         blknum: BlockNumber,
    1601       817278 :         img: Bytes,
    1602       817278 :         ctx: &RequestContext,
    1603       817278 :     ) -> Result<(), PageReconstructError> {
    1604       817278 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1605        16275 :             .await?;
    1606       817278 :         modification.put_rel_page_image(rel, blknum, img)?;
    1607       817278 :         Ok(())
    1608       817278 :     }
    1609              : 
    1610       436890 :     async fn put_rel_wal_record(
    1611       436890 :         &mut self,
    1612       436890 :         modification: &mut DatadirModification<'_>,
    1613       436890 :         rel: RelTag,
    1614       436890 :         blknum: BlockNumber,
    1615       436890 :         rec: NeonWalRecord,
    1616       436890 :         ctx: &RequestContext,
    1617       436890 :     ) -> Result<()> {
    1618       436890 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1619          837 :             .await?;
    1620       436890 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1621       436890 :         Ok(())
    1622       436890 :     }
    1623              : 
    1624        18036 :     async fn put_rel_truncation(
    1625        18036 :         &mut self,
    1626        18036 :         modification: &mut DatadirModification<'_>,
    1627        18036 :         rel: RelTag,
    1628        18036 :         nblocks: BlockNumber,
    1629        18036 :         ctx: &RequestContext,
    1630        18036 :     ) -> anyhow::Result<()> {
    1631        18036 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1632        18036 :         Ok(())
    1633        18036 :     }
    1634              : 
    1635            6 :     async fn put_rel_drop(
    1636            6 :         &mut self,
    1637            6 :         modification: &mut DatadirModification<'_>,
    1638            6 :         rel: RelTag,
    1639            6 :         ctx: &RequestContext,
    1640            6 :     ) -> Result<()> {
    1641            6 :         modification.put_rel_drop(rel, ctx).await?;
    1642            6 :         Ok(())
    1643            6 :     }
    1644              : 
    1645      1254168 :     async fn handle_rel_extend(
    1646      1254168 :         &mut self,
    1647      1254168 :         modification: &mut DatadirModification<'_>,
    1648      1254168 :         rel: RelTag,
    1649      1254168 :         blknum: BlockNumber,
    1650      1254168 :         ctx: &RequestContext,
    1651      1254168 :     ) -> Result<(), PageReconstructError> {
    1652      1254168 :         let new_nblocks = blknum + 1;
    1653              :         // Check if the relation exists. We implicitly create relations on first
    1654              :         // record.
    1655              :         // TODO: would be nice if to be more explicit about it
    1656              : 
    1657              :         // Get current size and put rel creation if rel doesn't exist
    1658              :         //
    1659              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1660              :         //       check the cache too. This is because eagerly checking the cache results in
    1661              :         //       less work overall and 10% better performance. It's more work on cache miss
    1662              :         //       but cache miss is rare.
    1663      1254168 :         let old_nblocks = if let Some(nblocks) = modification
    1664      1254168 :             .tline
    1665      1254168 :             .get_cached_rel_size(&rel, modification.get_lsn())
    1666              :         {
    1667      1254138 :             nblocks
    1668           30 :         } else if !modification
    1669           30 :             .tline
    1670           30 :             .get_rel_exists(rel, Version::Modified(modification), ctx)
    1671            6 :             .await?
    1672              :         {
    1673              :             // create it with 0 size initially, the logic below will extend it
    1674           30 :             modification
    1675           30 :                 .put_rel_creation(rel, 0, ctx)
    1676            9 :                 .await
    1677           30 :                 .context("Relation Error")?;
    1678           30 :             0
    1679              :         } else {
    1680            0 :             modification
    1681            0 :                 .tline
    1682            0 :                 .get_rel_size(rel, Version::Modified(modification), ctx)
    1683            0 :                 .await?
    1684              :         };
    1685              : 
    1686      1254168 :         if new_nblocks > old_nblocks {
    1687              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1688       824364 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1689              : 
    1690       824364 :             let mut key = rel_block_to_key(rel, blknum);
    1691              :             // fill the gap with zeros
    1692       824364 :             for gap_blknum in old_nblocks..blknum {
    1693         8994 :                 key.field6 = gap_blknum;
    1694         8994 : 
    1695         8994 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1696            0 :                     continue;
    1697         8994 :                 }
    1698         8994 : 
    1699         8994 :                 modification.put_rel_page_image_zero(rel, gap_blknum);
    1700              :             }
    1701       429804 :         }
    1702      1254168 :         Ok(())
    1703      1254168 :     }
    1704              : 
    1705            0 :     async fn put_slru_page_image(
    1706            0 :         &mut self,
    1707            0 :         modification: &mut DatadirModification<'_>,
    1708            0 :         kind: SlruKind,
    1709            0 :         segno: u32,
    1710            0 :         blknum: BlockNumber,
    1711            0 :         img: Bytes,
    1712            0 :         ctx: &RequestContext,
    1713            0 :     ) -> Result<()> {
    1714            0 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1715            0 :             .await?;
    1716            0 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1717            0 :         Ok(())
    1718            0 :     }
    1719              : 
    1720            0 :     async fn handle_slru_extend(
    1721            0 :         &mut self,
    1722            0 :         modification: &mut DatadirModification<'_>,
    1723            0 :         kind: SlruKind,
    1724            0 :         segno: u32,
    1725            0 :         blknum: BlockNumber,
    1726            0 :         ctx: &RequestContext,
    1727            0 :     ) -> anyhow::Result<()> {
    1728            0 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1729            0 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1730            0 :         // a lot less frequently.
    1731            0 : 
    1732            0 :         let new_nblocks = blknum + 1;
    1733              :         // Check if the relation exists. We implicitly create relations on first
    1734              :         // record.
    1735              :         // TODO: would be nice if to be more explicit about it
    1736            0 :         let old_nblocks = if !modification
    1737            0 :             .tline
    1738            0 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1739            0 :             .await?
    1740              :         {
    1741              :             // create it with 0 size initially, the logic below will extend it
    1742            0 :             modification
    1743            0 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1744            0 :                 .await?;
    1745            0 :             0
    1746              :         } else {
    1747            0 :             modification
    1748            0 :                 .tline
    1749            0 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1750            0 :                 .await?
    1751              :         };
    1752              : 
    1753            0 :         if new_nblocks > old_nblocks {
    1754            0 :             trace!(
    1755            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1756              :                 kind,
    1757              :                 segno,
    1758              :                 old_nblocks,
    1759              :                 new_nblocks
    1760              :             );
    1761            0 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1762              : 
    1763              :             // fill the gap with zeros
    1764            0 :             for gap_blknum in old_nblocks..blknum {
    1765            0 :                 modification.put_slru_page_image_zero(kind, segno, gap_blknum);
    1766            0 :             }
    1767            0 :         }
    1768            0 :         Ok(())
    1769            0 :     }
    1770              : }
    1771              : 
    1772           36 : async fn get_relsize(
    1773           36 :     modification: &DatadirModification<'_>,
    1774           36 :     rel: RelTag,
    1775           36 :     ctx: &RequestContext,
    1776           36 : ) -> Result<BlockNumber, PageReconstructError> {
    1777           36 :     let nblocks = if !modification
    1778           36 :         .tline
    1779           36 :         .get_rel_exists(rel, Version::Modified(modification), ctx)
    1780            0 :         .await?
    1781              :     {
    1782            0 :         0
    1783              :     } else {
    1784           36 :         modification
    1785           36 :             .tline
    1786           36 :             .get_rel_size(rel, Version::Modified(modification), ctx)
    1787            0 :             .await?
    1788              :     };
    1789           36 :     Ok(nblocks)
    1790           36 : }
    1791              : 
    1792              : #[allow(clippy::bool_assert_comparison)]
    1793              : #[cfg(test)]
    1794              : mod tests {
    1795              :     use super::*;
    1796              :     use crate::tenant::harness::*;
    1797              :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    1798              :     use postgres_ffi::RELSEG_SIZE;
    1799              : 
    1800              :     use crate::DEFAULT_PG_VERSION;
    1801              : 
    1802              :     /// Arbitrary relation tag, for testing.
    1803              :     const TESTREL_A: RelTag = RelTag {
    1804              :         spcnode: 0,
    1805              :         dbnode: 111,
    1806              :         relnode: 1000,
    1807              :         forknum: 0,
    1808              :     };
    1809              : 
    1810           36 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1811           36 :         // TODO
    1812           36 :     }
    1813              : 
    1814              :     static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
    1815              : 
    1816           24 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    1817           24 :         let mut m = tline.begin_modification(Lsn(0x10));
    1818           24 :         m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1819           48 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1820           24 :         m.commit(ctx).await?;
    1821           24 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1822              : 
    1823           24 :         Ok(walingest)
    1824           24 :     }
    1825              : 
    1826              :     #[tokio::test]
    1827            6 :     async fn test_relsize() -> Result<()> {
    1828           24 :         let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
    1829            6 :         let tline = tenant
    1830            6 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1831           12 :             .await?;
    1832           15 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1833            6 : 
    1834            6 :         let mut m = tline.begin_modification(Lsn(0x20));
    1835            6 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1836            6 :         walingest
    1837            6 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1838            6 :             .await?;
    1839            6 :         m.on_record_end();
    1840            6 :         m.commit(&ctx).await?;
    1841            6 :         let mut m = tline.begin_modification(Lsn(0x30));
    1842            6 :         walingest
    1843            6 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
    1844            6 :             .await?;
    1845            6 :         m.on_record_end();
    1846            6 :         m.commit(&ctx).await?;
    1847            6 :         let mut m = tline.begin_modification(Lsn(0x40));
    1848            6 :         walingest
    1849            6 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
    1850            6 :             .await?;
    1851            6 :         m.on_record_end();
    1852            6 :         m.commit(&ctx).await?;
    1853            6 :         let mut m = tline.begin_modification(Lsn(0x50));
    1854            6 :         walingest
    1855            6 :             .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
    1856            6 :             .await?;
    1857            6 :         m.on_record_end();
    1858            6 :         m.commit(&ctx).await?;
    1859            6 : 
    1860            6 :         assert_current_logical_size(&tline, Lsn(0x50));
    1861            6 : 
    1862            6 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1863            6 :         assert_eq!(
    1864            6 :             tline
    1865            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1866            6 :                 .await?,
    1867            6 :             false
    1868            6 :         );
    1869            6 :         assert!(tline
    1870            6 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1871            6 :             .await
    1872            6 :             .is_err());
    1873            6 :         assert_eq!(
    1874            6 :             tline
    1875            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1876            6 :                 .await?,
    1877            6 :             true
    1878            6 :         );
    1879            6 :         assert_eq!(
    1880            6 :             tline
    1881            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1882            6 :                 .await?,
    1883            6 :             1
    1884            6 :         );
    1885            6 :         assert_eq!(
    1886            6 :             tline
    1887            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1888            6 :                 .await?,
    1889            6 :             3
    1890            6 :         );
    1891            6 : 
    1892            6 :         // Check page contents at each LSN
    1893            6 :         assert_eq!(
    1894            6 :             tline
    1895            6 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
    1896            6 :                 .await?,
    1897            6 :             test_img("foo blk 0 at 2")
    1898            6 :         );
    1899            6 : 
    1900            6 :         assert_eq!(
    1901            6 :             tline
    1902            6 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
    1903            6 :                 .await?,
    1904            6 :             test_img("foo blk 0 at 3")
    1905            6 :         );
    1906            6 : 
    1907            6 :         assert_eq!(
    1908            6 :             tline
    1909            6 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
    1910            6 :                 .await?,
    1911            6 :             test_img("foo blk 0 at 3")
    1912            6 :         );
    1913            6 :         assert_eq!(
    1914            6 :             tline
    1915            6 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
    1916            6 :                 .await?,
    1917            6 :             test_img("foo blk 1 at 4")
    1918            6 :         );
    1919            6 : 
    1920            6 :         assert_eq!(
    1921            6 :             tline
    1922            6 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
    1923            6 :                 .await?,
    1924            6 :             test_img("foo blk 0 at 3")
    1925            6 :         );
    1926            6 :         assert_eq!(
    1927            6 :             tline
    1928            6 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
    1929            6 :                 .await?,
    1930            6 :             test_img("foo blk 1 at 4")
    1931            6 :         );
    1932            6 :         assert_eq!(
    1933            6 :             tline
    1934            6 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    1935            6 :                 .await?,
    1936            6 :             test_img("foo blk 2 at 5")
    1937            6 :         );
    1938            6 : 
    1939            6 :         // Truncate last block
    1940            6 :         let mut m = tline.begin_modification(Lsn(0x60));
    1941            6 :         walingest
    1942            6 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1943            6 :             .await?;
    1944            6 :         m.commit(&ctx).await?;
    1945            6 :         assert_current_logical_size(&tline, Lsn(0x60));
    1946            6 : 
    1947            6 :         // Check reported size and contents after truncation
    1948            6 :         assert_eq!(
    1949            6 :             tline
    1950            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    1951            6 :                 .await?,
    1952            6 :             2
    1953            6 :         );
    1954            6 :         assert_eq!(
    1955            6 :             tline
    1956            6 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
    1957            6 :                 .await?,
    1958            6 :             test_img("foo blk 0 at 3")
    1959            6 :         );
    1960            6 :         assert_eq!(
    1961            6 :             tline
    1962            6 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
    1963            6 :                 .await?,
    1964            6 :             test_img("foo blk 1 at 4")
    1965            6 :         );
    1966            6 : 
    1967            6 :         // should still see the truncated block with older LSN
    1968            6 :         assert_eq!(
    1969            6 :             tline
    1970            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1971            6 :                 .await?,
    1972            6 :             3
    1973            6 :         );
    1974            6 :         assert_eq!(
    1975            6 :             tline
    1976            6 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    1977            6 :                 .await?,
    1978            6 :             test_img("foo blk 2 at 5")
    1979            6 :         );
    1980            6 : 
    1981            6 :         // Truncate to zero length
    1982            6 :         let mut m = tline.begin_modification(Lsn(0x68));
    1983            6 :         walingest
    1984            6 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1985            6 :             .await?;
    1986            6 :         m.commit(&ctx).await?;
    1987            6 :         assert_eq!(
    1988            6 :             tline
    1989            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
    1990            6 :                 .await?,
    1991            6 :             0
    1992            6 :         );
    1993            6 : 
    1994            6 :         // Extend from 0 to 2 blocks, leaving a gap
    1995            6 :         let mut m = tline.begin_modification(Lsn(0x70));
    1996            6 :         walingest
    1997            6 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
    1998            6 :             .await?;
    1999            6 :         m.on_record_end();
    2000            6 :         m.commit(&ctx).await?;
    2001            6 :         assert_eq!(
    2002            6 :             tline
    2003            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
    2004            6 :                 .await?,
    2005            6 :             2
    2006            6 :         );
    2007            6 :         assert_eq!(
    2008            6 :             tline
    2009            6 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
    2010            6 :                 .await?,
    2011            6 :             ZERO_PAGE
    2012            6 :         );
    2013            6 :         assert_eq!(
    2014            6 :             tline
    2015            6 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
    2016            6 :                 .await?,
    2017            6 :             test_img("foo blk 1")
    2018            6 :         );
    2019            6 : 
    2020            6 :         // Extend a lot more, leaving a big gap that spans across segments
    2021            6 :         let mut m = tline.begin_modification(Lsn(0x80));
    2022            6 :         walingest
    2023            6 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
    2024            6 :             .await?;
    2025            6 :         m.on_record_end();
    2026          570 :         m.commit(&ctx).await?;
    2027            6 :         assert_eq!(
    2028            6 :             tline
    2029            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2030            6 :                 .await?,
    2031            6 :             1501
    2032            6 :         );
    2033         8994 :         for blk in 2..1500 {
    2034         8988 :             assert_eq!(
    2035         8988 :                 tline
    2036         8988 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
    2037         4620 :                     .await?,
    2038         8988 :                 ZERO_PAGE
    2039            6 :             );
    2040            6 :         }
    2041            6 :         assert_eq!(
    2042            6 :             tline
    2043            6 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
    2044            6 :                 .await?,
    2045            6 :             test_img("foo blk 1500")
    2046            6 :         );
    2047            6 : 
    2048            6 :         Ok(())
    2049            6 :     }
    2050              : 
    2051              :     // Test what happens if we dropped a relation
    2052              :     // and then created it again within the same layer.
    2053              :     #[tokio::test]
    2054            6 :     async fn test_drop_extend() -> Result<()> {
    2055            6 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")
    2056            6 :             .await?
    2057            6 :             .load()
    2058           24 :             .await;
    2059            6 :         let tline = tenant
    2060            6 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2061           12 :             .await?;
    2062           15 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2063            6 : 
    2064            6 :         let mut m = tline.begin_modification(Lsn(0x20));
    2065            6 :         walingest
    2066            6 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    2067            6 :             .await?;
    2068            6 :         m.commit(&ctx).await?;
    2069            6 : 
    2070            6 :         // Check that rel exists and size is correct
    2071            6 :         assert_eq!(
    2072            6 :             tline
    2073            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2074            6 :                 .await?,
    2075            6 :             true
    2076            6 :         );
    2077            6 :         assert_eq!(
    2078            6 :             tline
    2079            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2080            6 :                 .await?,
    2081            6 :             1
    2082            6 :         );
    2083            6 : 
    2084            6 :         // Drop rel
    2085            6 :         let mut m = tline.begin_modification(Lsn(0x30));
    2086            6 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    2087            6 :         m.commit(&ctx).await?;
    2088            6 : 
    2089            6 :         // Check that rel is not visible anymore
    2090            6 :         assert_eq!(
    2091            6 :             tline
    2092            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
    2093            6 :                 .await?,
    2094            6 :             false
    2095            6 :         );
    2096            6 : 
    2097            6 :         // FIXME: should fail
    2098            6 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    2099            6 : 
    2100            6 :         // Re-create it
    2101            6 :         let mut m = tline.begin_modification(Lsn(0x40));
    2102            6 :         walingest
    2103            6 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
    2104            6 :             .await?;
    2105            6 :         m.commit(&ctx).await?;
    2106            6 : 
    2107            6 :         // Check that rel exists and size is correct
    2108            6 :         assert_eq!(
    2109            6 :             tline
    2110            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2111            6 :                 .await?,
    2112            6 :             true
    2113            6 :         );
    2114            6 :         assert_eq!(
    2115            6 :             tline
    2116            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2117            6 :                 .await?,
    2118            6 :             1
    2119            6 :         );
    2120            6 : 
    2121            6 :         Ok(())
    2122            6 :     }
    2123              : 
    2124              :     // Test what happens if we truncated a relation
    2125              :     // so that one of its segments was dropped
    2126              :     // and then extended it again within the same layer.
    2127              :     #[tokio::test]
    2128            6 :     async fn test_truncate_extend() -> Result<()> {
    2129            6 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
    2130            6 :             .await?
    2131            6 :             .load()
    2132           24 :             .await;
    2133            6 :         let tline = tenant
    2134            6 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2135           12 :             .await?;
    2136           15 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2137            6 : 
    2138            6 :         // Create a 20 MB relation (the size is arbitrary)
    2139            6 :         let relsize = 20 * 1024 * 1024 / 8192;
    2140            6 :         let mut m = tline.begin_modification(Lsn(0x20));
    2141        15360 :         for blkno in 0..relsize {
    2142        15360 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    2143        15360 :             walingest
    2144        15360 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2145            6 :                 .await?;
    2146            6 :         }
    2147            6 :         m.commit(&ctx).await?;
    2148            6 : 
    2149            6 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    2150            6 :         assert_eq!(
    2151            6 :             tline
    2152            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2153            6 :                 .await?,
    2154            6 :             false
    2155            6 :         );
    2156            6 :         assert!(tline
    2157            6 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2158            6 :             .await
    2159            6 :             .is_err());
    2160            6 : 
    2161            6 :         assert_eq!(
    2162            6 :             tline
    2163            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2164            6 :                 .await?,
    2165            6 :             true
    2166            6 :         );
    2167            6 :         assert_eq!(
    2168            6 :             tline
    2169            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2170            6 :                 .await?,
    2171            6 :             relsize
    2172            6 :         );
    2173            6 : 
    2174            6 :         // Check relation content
    2175        15360 :         for blkno in 0..relsize {
    2176        15360 :             let lsn = Lsn(0x20);
    2177        15360 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2178        15360 :             assert_eq!(
    2179        15360 :                 tline
    2180        15360 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
    2181         5409 :                     .await?,
    2182        15360 :                 test_img(&data)
    2183            6 :             );
    2184            6 :         }
    2185            6 : 
    2186            6 :         // Truncate relation so that second segment was dropped
    2187            6 :         // - only leave one page
    2188            6 :         let mut m = tline.begin_modification(Lsn(0x60));
    2189            6 :         walingest
    2190            6 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2191            6 :             .await?;
    2192            6 :         m.commit(&ctx).await?;
    2193            6 : 
    2194            6 :         // Check reported size and contents after truncation
    2195            6 :         assert_eq!(
    2196            6 :             tline
    2197            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    2198            6 :                 .await?,
    2199            6 :             1
    2200            6 :         );
    2201            6 : 
    2202           12 :         for blkno in 0..1 {
    2203            6 :             let lsn = Lsn(0x20);
    2204            6 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2205            6 :             assert_eq!(
    2206            6 :                 tline
    2207            6 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
    2208            6 :                     .await?,
    2209            6 :                 test_img(&data)
    2210            6 :             );
    2211            6 :         }
    2212            6 : 
    2213            6 :         // should still see all blocks with older LSN
    2214            6 :         assert_eq!(
    2215            6 :             tline
    2216            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2217            6 :                 .await?,
    2218            6 :             relsize
    2219            6 :         );
    2220        15360 :         for blkno in 0..relsize {
    2221        15360 :             let lsn = Lsn(0x20);
    2222        15360 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2223        15360 :             assert_eq!(
    2224        15360 :                 tline
    2225        15360 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
    2226         5568 :                     .await?,
    2227        15360 :                 test_img(&data)
    2228            6 :             );
    2229            6 :         }
    2230            6 : 
    2231            6 :         // Extend relation again.
    2232            6 :         // Add enough blocks to create second segment
    2233            6 :         let lsn = Lsn(0x80);
    2234            6 :         let mut m = tline.begin_modification(lsn);
    2235        15360 :         for blkno in 0..relsize {
    2236        15360 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2237        15360 :             walingest
    2238        15360 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2239            6 :                 .await?;
    2240            6 :         }
    2241            9 :         m.commit(&ctx).await?;
    2242            6 : 
    2243            6 :         assert_eq!(
    2244            6 :             tline
    2245            6 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2246            6 :                 .await?,
    2247            6 :             true
    2248            6 :         );
    2249            6 :         assert_eq!(
    2250            6 :             tline
    2251            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2252            6 :                 .await?,
    2253            6 :             relsize
    2254            6 :         );
    2255            6 :         // Check relation content
    2256        15360 :         for blkno in 0..relsize {
    2257        15360 :             let lsn = Lsn(0x80);
    2258        15360 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2259        15360 :             assert_eq!(
    2260        15360 :                 tline
    2261        15360 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
    2262         5484 :                     .await?,
    2263        15360 :                 test_img(&data)
    2264            6 :             );
    2265            6 :         }
    2266            6 : 
    2267            6 :         Ok(())
    2268            6 :     }
    2269              : 
    2270              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2271              :     /// split into multiple 1 GB segments in Postgres.
    2272              :     #[tokio::test]
    2273            6 :     async fn test_large_rel() -> Result<()> {
    2274           21 :         let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
    2275            6 :         let tline = tenant
    2276            6 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2277           12 :             .await?;
    2278           15 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2279            6 : 
    2280            6 :         let mut lsn = 0x10;
    2281       786438 :         for blknum in 0..RELSEG_SIZE + 1 {
    2282       786438 :             lsn += 0x10;
    2283       786438 :             let mut m = tline.begin_modification(Lsn(lsn));
    2284       786438 :             let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2285       786438 :             walingest
    2286       786438 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2287        16260 :                 .await?;
    2288       786438 :             m.commit(&ctx).await?;
    2289            6 :         }
    2290            6 : 
    2291            6 :         assert_current_logical_size(&tline, Lsn(lsn));
    2292            6 : 
    2293            6 :         assert_eq!(
    2294            6 :             tline
    2295            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2296            6 :                 .await?,
    2297            6 :             RELSEG_SIZE + 1
    2298            6 :         );
    2299            6 : 
    2300            6 :         // Truncate one block
    2301            6 :         lsn += 0x10;
    2302            6 :         let mut m = tline.begin_modification(Lsn(lsn));
    2303            6 :         walingest
    2304            6 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2305            6 :             .await?;
    2306            6 :         m.commit(&ctx).await?;
    2307            6 :         assert_eq!(
    2308            6 :             tline
    2309            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2310            6 :                 .await?,
    2311            6 :             RELSEG_SIZE
    2312            6 :         );
    2313            6 :         assert_current_logical_size(&tline, Lsn(lsn));
    2314            6 : 
    2315            6 :         // Truncate another block
    2316            6 :         lsn += 0x10;
    2317            6 :         let mut m = tline.begin_modification(Lsn(lsn));
    2318            6 :         walingest
    2319            6 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2320            6 :             .await?;
    2321            6 :         m.commit(&ctx).await?;
    2322            6 :         assert_eq!(
    2323            6 :             tline
    2324            6 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2325            6 :                 .await?,
    2326            6 :             RELSEG_SIZE - 1
    2327            6 :         );
    2328            6 :         assert_current_logical_size(&tline, Lsn(lsn));
    2329            6 : 
    2330            6 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2331            6 :         // This tests the behavior at segment boundaries
    2332            6 :         let mut size: i32 = 3000;
    2333        18012 :         while size >= 0 {
    2334        18006 :             lsn += 0x10;
    2335        18006 :             let mut m = tline.begin_modification(Lsn(lsn));
    2336        18006 :             walingest
    2337        18006 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2338            6 :                 .await?;
    2339        18006 :             m.commit(&ctx).await?;
    2340        18006 :             assert_eq!(
    2341        18006 :                 tline
    2342        18006 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2343            6 :                     .await?,
    2344        18006 :                 size as BlockNumber
    2345            6 :             );
    2346            6 : 
    2347        18006 :             size -= 1;
    2348            6 :         }
    2349            6 :         assert_current_logical_size(&tline, Lsn(lsn));
    2350            6 : 
    2351            6 :         Ok(())
    2352            6 :     }
    2353              : 
    2354              :     /// Replay a wal segment file taken directly from safekeepers.
    2355              :     ///
    2356              :     /// This test is useful for benchmarking since it allows us to profile only
    2357              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2358              :     /// without waiting for unrelated steps.
    2359              :     #[tokio::test]
    2360            6 :     async fn test_ingest_real_wal() {
    2361            6 :         use crate::tenant::harness::*;
    2362            6 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2363            6 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2364            6 : 
    2365            6 :         // Define test data path and constants.
    2366            6 :         //
    2367            6 :         // Steps to reconstruct the data, if needed:
    2368            6 :         // 1. Run the pgbench python test
    2369            6 :         // 2. Take the first wal segment file from safekeeper
    2370            6 :         // 3. Compress it using `zstd --long input_file`
    2371            6 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2372            6 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2373            6 :         // 6. Run just the decoder from this test to get the endpoint.
    2374            6 :         //    It's the last LSN the decoder will output.
    2375            6 :         let pg_version = 15; // The test data was generated by pg15
    2376            6 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2377            6 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2378            6 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2379            6 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2380            6 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2381            6 : 
    2382            6 :         let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
    2383            6 :         let span = harness
    2384            6 :             .span()
    2385            6 :             .in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
    2386           21 :         let (tenant, ctx) = harness.load().await;
    2387            6 : 
    2388            6 :         let remote_initdb_path =
    2389            6 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    2390            6 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2391            6 : 
    2392            6 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2393            6 :             .expect("creating test dir should work");
    2394            6 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2395            6 : 
    2396            6 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2397            6 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2398            6 :         // the garbage data.
    2399            6 :         let tline = tenant
    2400            6 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2401        60641 :             .await
    2402            6 :             .unwrap();
    2403            6 : 
    2404            6 :         // We fully read and decompress this into memory before decoding
    2405            6 :         // to get a more accurate perf profile of the decoder.
    2406            6 :         let bytes = {
    2407            6 :             use async_compression::tokio::bufread::ZstdDecoder;
    2408            6 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2409            6 :             let reader = tokio::io::BufReader::new(file);
    2410            6 :             let decoder = ZstdDecoder::new(reader);
    2411            6 :             let mut reader = tokio::io::BufReader::new(decoder);
    2412            6 :             let mut buffer = Vec::new();
    2413          663 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2414            6 :             buffer
    2415            6 :         };
    2416            6 : 
    2417            6 :         // TODO start a profiler too
    2418            6 :         let started_at = std::time::Instant::now();
    2419            6 : 
    2420            6 :         // Initialize walingest
    2421            6 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2422            6 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2423            6 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2424           15 :             .await
    2425            6 :             .unwrap();
    2426            6 :         let mut modification = tline.begin_modification(startpoint);
    2427            6 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2428            6 : 
    2429            6 :         // Decode and ingest wal. We process the wal in chunks because
    2430            6 :         // that's what happens when we get bytes from safekeepers.
    2431      1424058 :         for chunk in bytes[xlogoff..].chunks(50) {
    2432      1424058 :             decoder.feed_bytes(chunk);
    2433      1861608 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2434       437550 :                 let mut decoded = DecodedWALRecord::default();
    2435       437550 :                 decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap();
    2436       437550 :                 walingest
    2437       437550 :                     .ingest_record(decoded, lsn, &mut modification, &ctx)
    2438       437550 :                     .instrument(span.clone())
    2439          888 :                     .await
    2440       437550 :                     .unwrap();
    2441            6 :             }
    2442      1424058 :             modification.commit(&ctx).await.unwrap();
    2443            6 :         }
    2444            6 : 
    2445            6 :         let duration = started_at.elapsed();
    2446            6 :         println!("done in {:?}", duration);
    2447            6 :     }
    2448              : }
        

Generated by: LCOV version 2.1-beta