LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 53.0 % 1872 992
Test Date: 2024-07-03 15:33:13 Functions: 66.1 % 59 39

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  ->   WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9              : //! and decodes it to individual WAL records. It feeds the WAL records
      10              : //! to WalIngest, which parses them and stores them in the Repository.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14              : //! page images out of some WAL records, but most it stores as WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_wal_record or put_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use pageserver_api::shard::ShardIdentity;
      25              : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
      26              : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
      27              : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      28              : 
      29              : use anyhow::{bail, Context, Result};
      30              : use bytes::{Buf, Bytes, BytesMut};
      31              : use tracing::*;
      32              : use utils::failpoint_support;
      33              : 
      34              : use crate::context::RequestContext;
      35              : use crate::metrics::WAL_INGEST;
      36              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      37              : use crate::tenant::PageReconstructError;
      38              : use crate::tenant::Timeline;
      39              : use crate::walrecord::*;
      40              : use crate::ZERO_PAGE;
      41              : use pageserver_api::key::rel_block_to_key;
      42              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      43              : use postgres_ffi::pg_constants;
      44              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      45              : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
      46              : use postgres_ffi::v14::xlog_utils::*;
      47              : use postgres_ffi::v14::CheckPoint;
      48              : use postgres_ffi::TransactionId;
      49              : use postgres_ffi::BLCKSZ;
      50              : use utils::lsn::Lsn;
      51              : 
      52              : pub struct WalIngest {
      53              :     shard: ShardIdentity,
      54              :     checkpoint: CheckPoint,
      55              :     checkpoint_modified: bool,
      56              : }
      57              : 
      58              : impl WalIngest {
      59           12 :     pub async fn new(
      60           12 :         timeline: &Timeline,
      61           12 :         startpoint: Lsn,
      62           12 :         ctx: &RequestContext,
      63           12 :     ) -> anyhow::Result<WalIngest> {
      64              :         // Fetch the latest checkpoint into memory, so that we can compare with it
      65              :         // quickly in `ingest_record` and update it when it changes.
      66           12 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
      67           12 :         let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
      68           12 :         trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
      69              : 
      70           12 :         Ok(WalIngest {
      71           12 :             shard: *timeline.get_shard_identity(),
      72           12 :             checkpoint,
      73           12 :             checkpoint_modified: false,
      74           12 :         })
      75           12 :     }
      76              : 
      77              :     ///
      78              :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
      79              :     ///
      80              :     /// This function updates `lsn` field of `DatadirModification`
      81              :     ///
      82              :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
      83              :     /// relations/pages that the record affects.
      84              :     ///
      85              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
      86              :     ///
      87       145852 :     pub async fn ingest_record(
      88       145852 :         &mut self,
      89       145852 :         recdata: Bytes,
      90       145852 :         lsn: Lsn,
      91       145852 :         modification: &mut DatadirModification<'_>,
      92       145852 :         decoded: &mut DecodedWALRecord,
      93       145852 :         ctx: &RequestContext,
      94       145852 :     ) -> anyhow::Result<bool> {
      95       145852 :         WAL_INGEST.records_received.inc();
      96       145852 :         let pg_version = modification.tline.pg_version;
      97       145852 :         let prev_len = modification.len();
      98       145852 : 
      99       145852 :         modification.set_lsn(lsn)?;
     100       145852 :         decode_wal_record(recdata, decoded, pg_version)?;
     101              : 
     102       145852 :         let mut buf = decoded.record.clone();
     103       145852 :         buf.advance(decoded.main_data_offset);
     104       145852 : 
     105       145852 :         assert!(!self.checkpoint_modified);
     106       145852 :         if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
     107       145834 :             && self.checkpoint.update_next_xid(decoded.xl_xid)
     108            2 :         {
     109            2 :             self.checkpoint_modified = true;
     110       145850 :         }
     111              : 
     112       145852 :         failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
     113              : 
     114       145852 :         match decoded.xl_rmid {
     115              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
     116              :                 // Heap AM records need some special handling, because they modify VM pages
     117              :                 // without registering them with the standard mechanism.
     118       145474 :                 self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
     119            0 :                     .await?;
     120              :             }
     121              :             pg_constants::RM_NEON_ID => {
     122            0 :                 self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
     123            0 :                     .await?;
     124              :             }
     125              :             // Handle other special record types
     126              :             pg_constants::RM_SMGR_ID => {
     127           16 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     128           16 : 
     129           16 :                 if info == pg_constants::XLOG_SMGR_CREATE {
     130           16 :                     let create = XlSmgrCreate::decode(&mut buf);
     131           16 :                     self.ingest_xlog_smgr_create(modification, &create, ctx)
     132            5 :                         .await?;
     133            0 :                 } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     134            0 :                     let truncate = XlSmgrTruncate::decode(&mut buf);
     135            0 :                     self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
     136            0 :                         .await?;
     137            0 :                 }
     138              :             }
     139              :             pg_constants::RM_DBASE_ID => {
     140            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     141            0 :                 debug!(%info, %pg_version, "handle RM_DBASE_ID");
     142              : 
     143            0 :                 if pg_version == 14 {
     144            0 :                     if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     145            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     146            0 :                         debug!("XLOG_DBASE_CREATE v14");
     147              : 
     148            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     149            0 :                             .await?;
     150            0 :                     } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     151            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     152            0 :                         for tablespace_id in dropdb.tablespace_ids {
     153            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     154            0 :                             modification
     155            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     156            0 :                                 .await?;
     157              :                         }
     158            0 :                     }
     159            0 :                 } else if pg_version == 15 {
     160            0 :                     if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     161            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     162            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     163              :                         // The XLOG record was renamed between v14 and v15,
     164              :                         // but the record format is the same.
     165              :                         // So we can reuse XlCreateDatabase here.
     166            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     167            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     168            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     169            0 :                             .await?;
     170            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     171            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     172            0 :                         for tablespace_id in dropdb.tablespace_ids {
     173            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     174            0 :                             modification
     175            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     176            0 :                                 .await?;
     177              :                         }
     178            0 :                     }
     179            0 :                 } else if pg_version == 16 {
     180            0 :                     if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     181            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     182            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     183              :                         // The XLOG record was renamed between v14 and v15,
     184              :                         // but the record format is the same.
     185              :                         // So we can reuse XlCreateDatabase here.
     186            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     187            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     188            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     189            0 :                             .await?;
     190            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     191            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     192            0 :                         for tablespace_id in dropdb.tablespace_ids {
     193            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     194            0 :                             modification
     195            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     196            0 :                                 .await?;
     197              :                         }
     198            0 :                     }
     199            0 :                 }
     200              :             }
     201              :             pg_constants::RM_TBLSPC_ID => {
     202            0 :                 trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     203              :             }
     204              :             pg_constants::RM_CLOG_ID => {
     205            0 :                 let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     206            0 : 
     207            0 :                 if info == pg_constants::CLOG_ZEROPAGE {
     208            0 :                     let pageno = buf.get_u32_le();
     209            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     210            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     211            0 :                     self.put_slru_page_image(
     212            0 :                         modification,
     213            0 :                         SlruKind::Clog,
     214            0 :                         segno,
     215            0 :                         rpageno,
     216            0 :                         ZERO_PAGE.clone(),
     217            0 :                         ctx,
     218            0 :                     )
     219            0 :                     .await?;
     220              :                 } else {
     221            0 :                     assert!(info == pg_constants::CLOG_TRUNCATE);
     222            0 :                     let xlrec = XlClogTruncate::decode(&mut buf);
     223            0 :                     self.ingest_clog_truncate_record(modification, &xlrec, ctx)
     224            0 :                         .await?;
     225              :                 }
     226              :             }
     227              :             pg_constants::RM_XACT_ID => {
     228           24 :                 let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     229           24 : 
     230           24 :                 if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
     231            8 :                     let parsed_xact =
     232            8 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     233            8 :                     self.ingest_xact_record(
     234            8 :                         modification,
     235            8 :                         &parsed_xact,
     236            8 :                         info == pg_constants::XLOG_XACT_COMMIT,
     237            8 :                         decoded.origin_id,
     238            8 :                         ctx,
     239            8 :                     )
     240            0 :                     .await?;
     241           16 :                 } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
     242           16 :                     || info == pg_constants::XLOG_XACT_ABORT_PREPARED
     243              :                 {
     244            0 :                     let parsed_xact =
     245            0 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     246            0 :                     self.ingest_xact_record(
     247            0 :                         modification,
     248            0 :                         &parsed_xact,
     249            0 :                         info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
     250            0 :                         decoded.origin_id,
     251            0 :                         ctx,
     252            0 :                     )
     253            0 :                     .await?;
     254              :                     // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     255            0 :                     trace!(
     256            0 :                         "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     257              :                         decoded.xl_xid,
     258              :                         parsed_xact.xid,
     259              :                         lsn,
     260              :                     );
     261            0 :                     modification
     262            0 :                         .drop_twophase_file(parsed_xact.xid, ctx)
     263            0 :                         .await?;
     264           16 :                 } else if info == pg_constants::XLOG_XACT_PREPARE {
     265            0 :                     modification
     266            0 :                         .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
     267            0 :                         .await?;
     268           16 :                 }
     269              :             }
     270              :             pg_constants::RM_MULTIXACT_ID => {
     271            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     272            0 : 
     273            0 :                 if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
     274            0 :                     let pageno = buf.get_u32_le();
     275            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     276            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     277            0 :                     self.put_slru_page_image(
     278            0 :                         modification,
     279            0 :                         SlruKind::MultiXactOffsets,
     280            0 :                         segno,
     281            0 :                         rpageno,
     282            0 :                         ZERO_PAGE.clone(),
     283            0 :                         ctx,
     284            0 :                     )
     285            0 :                     .await?;
     286            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
     287            0 :                     let pageno = buf.get_u32_le();
     288            0 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     289            0 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     290            0 :                     self.put_slru_page_image(
     291            0 :                         modification,
     292            0 :                         SlruKind::MultiXactMembers,
     293            0 :                         segno,
     294            0 :                         rpageno,
     295            0 :                         ZERO_PAGE.clone(),
     296            0 :                         ctx,
     297            0 :                     )
     298            0 :                     .await?;
     299            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     300            0 :                     let xlrec = XlMultiXactCreate::decode(&mut buf);
     301            0 :                     self.ingest_multixact_create_record(modification, &xlrec)?;
     302            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     303            0 :                     let xlrec = XlMultiXactTruncate::decode(&mut buf);
     304            0 :                     self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
     305            0 :                         .await?;
     306            0 :                 }
     307              :             }
     308              :             pg_constants::RM_RELMAP_ID => {
     309            0 :                 let xlrec = XlRelmapUpdate::decode(&mut buf);
     310            0 :                 self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
     311            0 :                     .await?;
     312              :             }
     313              :             pg_constants::RM_XLOG_ID => {
     314           30 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     315           30 : 
     316           30 :                 if info == pg_constants::XLOG_NEXTOID {
     317            2 :                     let next_oid = buf.get_u32_le();
     318            2 :                     if self.checkpoint.nextOid != next_oid {
     319            2 :                         self.checkpoint.nextOid = next_oid;
     320            2 :                         self.checkpoint_modified = true;
     321            2 :                     }
     322           28 :                 } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
     323           28 :                     || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     324              :                 {
     325            2 :                     let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
     326            2 :                     buf.copy_to_slice(&mut checkpoint_bytes);
     327            2 :                     let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
     328            2 :                     trace!(
     329            0 :                         "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
     330              :                         xlog_checkpoint.oldestXid,
     331              :                         self.checkpoint.oldestXid
     332              :                     );
     333            2 :                     if (self
     334            2 :                         .checkpoint
     335            2 :                         .oldestXid
     336            2 :                         .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
     337            2 :                         < 0
     338            0 :                     {
     339            0 :                         self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
     340            2 :                     }
     341            2 :                     trace!(
     342            0 :                         "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
     343              :                         xlog_checkpoint.oldestActiveXid,
     344              :                         self.checkpoint.oldestActiveXid
     345              :                     );
     346              : 
     347              :                     // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
     348              :                     // because at shutdown, all in-progress transactions will implicitly
     349              :                     // end. Postgres startup code knows that, and allows hot standby to start
     350              :                     // immediately from a shutdown checkpoint.
     351              :                     //
     352              :                     // In Neon, Postgres hot standby startup always behaves as if starting from
     353              :                     // an online checkpoint. It needs a valid `oldestActiveXid` value, so
     354              :                     // instead of overwriting self.checkpoint.oldestActiveXid with
     355              :                     // InvalidTransactionid from the checkpoint WAL record, update it to a
     356              :                     // proper value, knowing that there are no in-progress transactions at this
     357              :                     // point, except for prepared transactions.
     358              :                     //
     359              :                     // See also the neon code changes in the InitWalRecovery() function.
     360            2 :                     if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
     361            2 :                         && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     362              :                     {
     363            2 :                         let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
     364            2 :                         for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
     365            0 :                             if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
     366            0 :                                 oldest_active_xid = xid;
     367            0 :                             }
     368              :                         }
     369            2 :                         self.checkpoint.oldestActiveXid = oldest_active_xid;
     370            0 :                     } else {
     371            0 :                         self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
     372            0 :                     }
     373              : 
     374              :                     // Write a new checkpoint key-value pair on every checkpoint record, even
     375              :                     // if nothing really changed. Not strictly required, but it seems nice to
     376              :                     // have some trace of the checkpoint records in the layer files at the same
     377              :                     // LSNs.
     378            2 :                     self.checkpoint_modified = true;
     379           26 :                 }
     380              :             }
     381              :             pg_constants::RM_LOGICALMSG_ID => {
     382            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     383            0 : 
     384            0 :                 if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     385            0 :                     let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
     386            0 :                     let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     387            0 :                     let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
     388            0 :                     if prefix == "neon-test" {
     389              :                         // This is a convenient way to make the WAL ingestion pause at
     390              :                         // particular point in the WAL. For more fine-grained control,
     391              :                         // we could peek into the message and only pause if it contains
     392              :                         // a particular string, for example, but this is enough for now.
     393            0 :                         failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
     394            0 :                     } else if let Some(path) = prefix.strip_prefix("neon-file:") {
     395            0 :                         modification.put_file(path, message, ctx).await?;
     396            0 :                     }
     397            0 :                 }
     398              :             }
     399              :             pg_constants::RM_STANDBY_ID => {
     400           16 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     401           16 :                 if info == pg_constants::XLOG_RUNNING_XACTS {
     402            0 :                     let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
     403            0 :                     self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
     404            0 :                     self.checkpoint_modified = true;
     405           16 :                 }
     406              :             }
     407              :             pg_constants::RM_REPLORIGIN_ID => {
     408            0 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     409            0 :                 if info == pg_constants::XLOG_REPLORIGIN_SET {
     410            0 :                     let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
     411            0 :                     modification
     412            0 :                         .set_replorigin(xlrec.node_id, xlrec.remote_lsn)
     413            0 :                         .await?
     414            0 :                 } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
     415            0 :                     let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
     416            0 :                     modification.drop_replorigin(xlrec.node_id).await?
     417            0 :                 }
     418              :             }
     419          292 :             _x => {
     420          292 :                 // TODO: should probably log & fail here instead of blindly
     421          292 :                 // doing something without understanding the protocol
     422          292 :             }
     423              :         }
     424              : 
     425              :         // Iterate through all the blocks that the record modifies, and
     426              :         // "put" a separate copy of the record for each block.
     427       145852 :         for blk in decoded.blocks.iter() {
     428       145642 :             let rel = RelTag {
     429       145642 :                 spcnode: blk.rnode_spcnode,
     430       145642 :                 dbnode: blk.rnode_dbnode,
     431       145642 :                 relnode: blk.rnode_relnode,
     432       145642 :                 forknum: blk.forknum,
     433       145642 :             };
     434       145642 : 
     435       145642 :             let key = rel_block_to_key(rel, blk.blkno);
     436       145642 :             let key_is_local = self.shard.is_key_local(&key);
     437       145642 : 
     438       145642 :             tracing::debug!(
     439              :                 lsn=%lsn,
     440              :                 key=%key,
     441            0 :                 "ingest: shard decision {} (checkpoint={})",
     442            0 :                 if !key_is_local { "drop" } else { "keep" },
     443              :                 self.checkpoint_modified
     444              :             );
     445              : 
     446       145642 :             if !key_is_local {
     447            0 :                 if self.shard.is_shard_zero() {
     448              :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     449              :                     // its blkno in case it implicitly extends a relation.
     450            0 :                     self.observe_decoded_block(modification, blk, ctx).await?;
     451            0 :                 }
     452              : 
     453            0 :                 continue;
     454       145642 :             }
     455       145642 :             self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
     456           43 :                 .await?;
     457              :         }
     458              : 
     459              :         // If checkpoint data was updated, store the new version in the repository
     460       145852 :         if self.checkpoint_modified {
     461            6 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     462              : 
     463            6 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     464            6 :             self.checkpoint_modified = false;
     465       145846 :         }
     466              : 
     467              :         // Note that at this point this record is only cached in the modification
     468              :         // until commit() is called to flush the data into the repository and update
     469              :         // the latest LSN.
     470              : 
     471       145852 :         Ok(modification.len() > prev_len)
     472       145852 :     }
     473              : 
     474              :     /// Do not store this block, but observe it for the purposes of updating our relation size state.
     475            0 :     async fn observe_decoded_block(
     476            0 :         &mut self,
     477            0 :         modification: &mut DatadirModification<'_>,
     478            0 :         blk: &DecodedBkpBlock,
     479            0 :         ctx: &RequestContext,
     480            0 :     ) -> Result<(), PageReconstructError> {
     481            0 :         let rel = RelTag {
     482            0 :             spcnode: blk.rnode_spcnode,
     483            0 :             dbnode: blk.rnode_dbnode,
     484            0 :             relnode: blk.rnode_relnode,
     485            0 :             forknum: blk.forknum,
     486            0 :         };
     487            0 :         self.handle_rel_extend(modification, rel, blk.blkno, ctx)
     488            0 :             .await
     489            0 :     }
     490              : 
     491       145642 :     async fn ingest_decoded_block(
     492       145642 :         &mut self,
     493       145642 :         modification: &mut DatadirModification<'_>,
     494       145642 :         lsn: Lsn,
     495       145642 :         decoded: &DecodedWALRecord,
     496       145642 :         blk: &DecodedBkpBlock,
     497       145642 :         ctx: &RequestContext,
     498       145642 :     ) -> Result<(), PageReconstructError> {
     499       145642 :         let rel = RelTag {
     500       145642 :             spcnode: blk.rnode_spcnode,
     501       145642 :             dbnode: blk.rnode_dbnode,
     502       145642 :             relnode: blk.rnode_relnode,
     503       145642 :             forknum: blk.forknum,
     504       145642 :         };
     505       145642 : 
     506       145642 :         //
     507       145642 :         // Instead of storing full-page-image WAL record,
     508       145642 :         // it is better to store extracted image: we can skip wal-redo
     509       145642 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     510       145642 :         // so them have to be copied multiple times.
     511       145642 :         //
     512       145642 :         if blk.apply_image
     513           60 :             && blk.has_image
     514           60 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     515           24 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     516            0 :                 || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     517              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     518           24 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
     519              :             // do not materialize null pages because them most likely be soon replaced with real data
     520           24 :             && blk.bimg_len != 0
     521              :         {
     522              :             // Extract page image from FPI record
     523           24 :             let img_len = blk.bimg_len as usize;
     524           24 :             let img_offs = blk.bimg_offset as usize;
     525           24 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     526           24 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     527           24 : 
     528           24 :             if blk.hole_length != 0 {
     529            0 :                 let tail = image.split_off(blk.hole_offset as usize);
     530            0 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     531            0 :                 image.unsplit(tail);
     532           24 :             }
     533              :             //
     534              :             // Match the logic of XLogReadBufferForRedoExtended:
     535              :             // The page may be uninitialized. If so, we can't set the LSN because
     536              :             // that would corrupt the page.
     537              :             //
     538           24 :             if !page_is_new(&image) {
     539           18 :                 page_set_lsn(&mut image, lsn)
     540            6 :             }
     541           24 :             assert_eq!(image.len(), BLCKSZ as usize);
     542           24 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     543            0 :                 .await?;
     544              :         } else {
     545       145618 :             let rec = NeonWalRecord::Postgres {
     546       145618 :                 will_init: blk.will_init || blk.apply_image,
     547       145618 :                 rec: decoded.record.clone(),
     548       145618 :             };
     549       145618 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     550           43 :                 .await?;
     551              :         }
     552       145642 :         Ok(())
     553       145642 :     }
     554              : 
     555       145474 :     async fn ingest_heapam_record(
     556       145474 :         &mut self,
     557       145474 :         buf: &mut Bytes,
     558       145474 :         modification: &mut DatadirModification<'_>,
     559       145474 :         decoded: &DecodedWALRecord,
     560       145474 :         ctx: &RequestContext,
     561       145474 :     ) -> anyhow::Result<()> {
     562       145474 :         // Handle VM bit updates that are implicitly part of heap records.
     563       145474 : 
     564       145474 :         // First, look at the record to determine which VM bits need
     565       145474 :         // to be cleared. If either of these variables is set, we
     566       145474 :         // need to clear the corresponding bits in the visibility map.
     567       145474 :         let mut new_heap_blkno: Option<u32> = None;
     568       145474 :         let mut old_heap_blkno: Option<u32> = None;
     569       145474 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     570       145474 : 
     571       145474 :         match modification.tline.pg_version {
     572              :             14 => {
     573            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     574            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     575            0 : 
     576            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     577            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     578            0 :                         assert_eq!(0, buf.remaining());
     579            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     580            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     581            0 :                         }
     582            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     583            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     584            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     585            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     586            0 :                         }
     587            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     588            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     589              :                     {
     590            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     591            0 :                         // the size of tuple data is inferred from the size of the record.
     592            0 :                         // we can't validate the remaining number of bytes without parsing
     593            0 :                         // the tuple data.
     594            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     595            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     596            0 :                         }
     597            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     598            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     599            0 :                             // non-HOT update where the new tuple goes to different page than
     600            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     601            0 :                             // set.
     602            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     603            0 :                         }
     604            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     605            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     606            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     607            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     608            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     609            0 :                         }
     610            0 :                     }
     611            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     612            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     613            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     614            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     615              : 
     616            0 :                         let offset_array_len =
     617            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     618              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     619            0 :                                 0
     620              :                             } else {
     621            0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     622              :                             };
     623            0 :                         assert_eq!(offset_array_len, buf.remaining());
     624              : 
     625            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     626            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     627            0 :                         }
     628            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     629            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     630            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     631            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     632            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     633            0 :                         }
     634            0 :                     }
     635              :                 } else {
     636            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     637              :                 }
     638              :             }
     639              :             15 => {
     640       145474 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     641       145286 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     642       145286 : 
     643       145286 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     644       145276 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     645       145276 :                         assert_eq!(0, buf.remaining());
     646       145276 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     647            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     648       145272 :                         }
     649           10 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     650            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     651            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     652            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     653            0 :                         }
     654           10 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     655            2 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     656              :                     {
     657            8 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     658            8 :                         // the size of tuple data is inferred from the size of the record.
     659            8 :                         // we can't validate the remaining number of bytes without parsing
     660            8 :                         // the tuple data.
     661            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     662            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     663            8 :                         }
     664            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     665            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     666            0 :                             // non-HOT update where the new tuple goes to different page than
     667            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     668            0 :                             // set.
     669            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     670            8 :                         }
     671            2 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     672            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     673            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     674            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     675            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     676            0 :                         }
     677            2 :                     }
     678          188 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     679          188 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     680          188 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     681           42 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     682              : 
     683           42 :                         let offset_array_len =
     684           42 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     685              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     686            2 :                                 0
     687              :                             } else {
     688           40 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     689              :                             };
     690           42 :                         assert_eq!(offset_array_len, buf.remaining());
     691              : 
     692           42 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     693            8 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     694           34 :                         }
     695          146 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     696            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     697            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     698            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     699            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     700            0 :                         }
     701          146 :                     }
     702              :                 } else {
     703            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     704              :                 }
     705              :             }
     706              :             16 => {
     707            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     708            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     709            0 : 
     710            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     711            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     712            0 :                         assert_eq!(0, buf.remaining());
     713            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     714            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     715            0 :                         }
     716            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     717            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     718            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     719            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     720            0 :                         }
     721            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     722            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     723              :                     {
     724            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     725            0 :                         // the size of tuple data is inferred from the size of the record.
     726            0 :                         // we can't validate the remaining number of bytes without parsing
     727            0 :                         // the tuple data.
     728            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     729            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     730            0 :                         }
     731            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     732            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     733            0 :                             // non-HOT update where the new tuple goes to different page than
     734            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     735            0 :                             // set.
     736            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     737            0 :                         }
     738            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     739            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     740            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     741            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     742            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     743            0 :                         }
     744            0 :                     }
     745            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     746            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     747            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     748            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     749              : 
     750            0 :                         let offset_array_len =
     751            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     752              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     753            0 :                                 0
     754              :                             } else {
     755            0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     756              :                             };
     757            0 :                         assert_eq!(offset_array_len, buf.remaining());
     758              : 
     759            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     760            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     761            0 :                         }
     762            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     763            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     764            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     765            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     766            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     767            0 :                         }
     768            0 :                     }
     769              :                 } else {
     770            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     771              :                 }
     772              :             }
     773            0 :             _ => {}
     774              :         }
     775              : 
     776              :         // Clear the VM bits if required.
     777       145474 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     778           12 :             let vm_rel = RelTag {
     779           12 :                 forknum: VISIBILITYMAP_FORKNUM,
     780           12 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     781           12 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     782           12 :                 relnode: decoded.blocks[0].rnode_relnode,
     783           12 :             };
     784           12 : 
     785           12 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     786           12 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     787              : 
     788              :             // Sometimes, Postgres seems to create heap WAL records with the
     789              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     790              :             // not set. In fact, it's possible that the VM page does not exist at all.
     791              :             // In that case, we don't want to store a record to clear the VM bit;
     792              :             // replaying it would fail to find the previous image of the page, because
     793              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     794              :             // record if it doesn't.
     795           12 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     796           12 :             if let Some(blknum) = new_vm_blk {
     797           12 :                 if blknum >= vm_size {
     798            0 :                     new_vm_blk = None;
     799           12 :                 }
     800            0 :             }
     801           12 :             if let Some(blknum) = old_vm_blk {
     802            0 :                 if blknum >= vm_size {
     803            0 :                     old_vm_blk = None;
     804            0 :                 }
     805           12 :             }
     806              : 
     807           12 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     808           12 :                 if new_vm_blk == old_vm_blk {
     809              :                     // An UPDATE record that needs to clear the bits for both old and the
     810              :                     // new page, both of which reside on the same VM page.
     811            0 :                     self.put_rel_wal_record(
     812            0 :                         modification,
     813            0 :                         vm_rel,
     814            0 :                         new_vm_blk.unwrap(),
     815            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     816            0 :                             new_heap_blkno,
     817            0 :                             old_heap_blkno,
     818            0 :                             flags,
     819            0 :                         },
     820            0 :                         ctx,
     821            0 :                     )
     822            0 :                     .await?;
     823              :                 } else {
     824              :                     // Clear VM bits for one heap page, or for two pages that reside on
     825              :                     // different VM pages.
     826           12 :                     if let Some(new_vm_blk) = new_vm_blk {
     827           12 :                         self.put_rel_wal_record(
     828           12 :                             modification,
     829           12 :                             vm_rel,
     830           12 :                             new_vm_blk,
     831           12 :                             NeonWalRecord::ClearVisibilityMapFlags {
     832           12 :                                 new_heap_blkno,
     833           12 :                                 old_heap_blkno: None,
     834           12 :                                 flags,
     835           12 :                             },
     836           12 :                             ctx,
     837           12 :                         )
     838            0 :                         .await?;
     839            0 :                     }
     840           12 :                     if let Some(old_vm_blk) = old_vm_blk {
     841            0 :                         self.put_rel_wal_record(
     842            0 :                             modification,
     843            0 :                             vm_rel,
     844            0 :                             old_vm_blk,
     845            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     846            0 :                                 new_heap_blkno: None,
     847            0 :                                 old_heap_blkno,
     848            0 :                                 flags,
     849            0 :                             },
     850            0 :                             ctx,
     851            0 :                         )
     852            0 :                         .await?;
     853           12 :                     }
     854              :                 }
     855            0 :             }
     856       145462 :         }
     857              : 
     858       145474 :         Ok(())
     859       145474 :     }
     860              : 
     861            0 :     async fn ingest_neonrmgr_record(
     862            0 :         &mut self,
     863            0 :         buf: &mut Bytes,
     864            0 :         modification: &mut DatadirModification<'_>,
     865            0 :         decoded: &DecodedWALRecord,
     866            0 :         ctx: &RequestContext,
     867            0 :     ) -> anyhow::Result<()> {
     868            0 :         // Handle VM bit updates that are implicitly part of heap records.
     869            0 : 
     870            0 :         // First, look at the record to determine which VM bits need
     871            0 :         // to be cleared. If either of these variables is set, we
     872            0 :         // need to clear the corresponding bits in the visibility map.
     873            0 :         let mut new_heap_blkno: Option<u32> = None;
     874            0 :         let mut old_heap_blkno: Option<u32> = None;
     875            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     876            0 :         let pg_version = modification.tline.pg_version;
     877            0 : 
     878            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     879              : 
     880            0 :         match pg_version {
     881              :             16 => {
     882            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     883            0 : 
     884            0 :                 match info {
     885              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     886            0 :                         let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
     887            0 :                         assert_eq!(0, buf.remaining());
     888            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     889            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     890            0 :                         }
     891              :                     }
     892              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     893            0 :                         let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
     894            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     895            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     896            0 :                         }
     897              :                     }
     898              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     899              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     900            0 :                         let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
     901            0 :                         // the size of tuple data is inferred from the size of the record.
     902            0 :                         // we can't validate the remaining number of bytes without parsing
     903            0 :                         // the tuple data.
     904            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     905            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     906            0 :                         }
     907            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     908            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     909            0 :                             // non-HOT update where the new tuple goes to different page than
     910            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     911            0 :                             // set.
     912            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     913            0 :                         }
     914              :                     }
     915              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     916            0 :                         let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     917              : 
     918            0 :                         let offset_array_len =
     919            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     920              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     921            0 :                                 0
     922              :                             } else {
     923            0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     924              :                             };
     925            0 :                         assert_eq!(offset_array_len, buf.remaining());
     926              : 
     927            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     928            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     929            0 :                         }
     930              :                     }
     931              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     932            0 :                         let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
     933            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     934            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     935            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     936            0 :                         }
     937              :                     }
     938            0 :                     info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
     939              :                 }
     940              :             }
     941            0 :             _ => bail!(
     942            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     943            0 :                 pg_version
     944            0 :             ),
     945              :         }
     946              : 
     947              :         // Clear the VM bits if required.
     948            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     949            0 :             let vm_rel = RelTag {
     950            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     951            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     952            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     953            0 :                 relnode: decoded.blocks[0].rnode_relnode,
     954            0 :             };
     955            0 : 
     956            0 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     957            0 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     958              : 
     959              :             // Sometimes, Postgres seems to create heap WAL records with the
     960              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     961              :             // not set. In fact, it's possible that the VM page does not exist at all.
     962              :             // In that case, we don't want to store a record to clear the VM bit;
     963              :             // replaying it would fail to find the previous image of the page, because
     964              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     965              :             // record if it doesn't.
     966            0 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     967            0 :             if let Some(blknum) = new_vm_blk {
     968            0 :                 if blknum >= vm_size {
     969            0 :                     new_vm_blk = None;
     970            0 :                 }
     971            0 :             }
     972            0 :             if let Some(blknum) = old_vm_blk {
     973            0 :                 if blknum >= vm_size {
     974            0 :                     old_vm_blk = None;
     975            0 :                 }
     976            0 :             }
     977              : 
     978            0 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     979            0 :                 if new_vm_blk == old_vm_blk {
     980              :                     // An UPDATE record that needs to clear the bits for both old and the
     981              :                     // new page, both of which reside on the same VM page.
     982            0 :                     self.put_rel_wal_record(
     983            0 :                         modification,
     984            0 :                         vm_rel,
     985            0 :                         new_vm_blk.unwrap(),
     986            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     987            0 :                             new_heap_blkno,
     988            0 :                             old_heap_blkno,
     989            0 :                             flags,
     990            0 :                         },
     991            0 :                         ctx,
     992            0 :                     )
     993            0 :                     .await?;
     994              :                 } else {
     995              :                     // Clear VM bits for one heap page, or for two pages that reside on
     996              :                     // different VM pages.
     997            0 :                     if let Some(new_vm_blk) = new_vm_blk {
     998            0 :                         self.put_rel_wal_record(
     999            0 :                             modification,
    1000            0 :                             vm_rel,
    1001            0 :                             new_vm_blk,
    1002            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1003            0 :                                 new_heap_blkno,
    1004            0 :                                 old_heap_blkno: None,
    1005            0 :                                 flags,
    1006            0 :                             },
    1007            0 :                             ctx,
    1008            0 :                         )
    1009            0 :                         .await?;
    1010            0 :                     }
    1011            0 :                     if let Some(old_vm_blk) = old_vm_blk {
    1012            0 :                         self.put_rel_wal_record(
    1013            0 :                             modification,
    1014            0 :                             vm_rel,
    1015            0 :                             old_vm_blk,
    1016            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
    1017            0 :                                 new_heap_blkno: None,
    1018            0 :                                 old_heap_blkno,
    1019            0 :                                 flags,
    1020            0 :                             },
    1021            0 :                             ctx,
    1022            0 :                         )
    1023            0 :                         .await?;
    1024            0 :                     }
    1025              :                 }
    1026            0 :             }
    1027            0 :         }
    1028              : 
    1029            0 :         Ok(())
    1030            0 :     }
    1031              : 
    1032              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
    1033            0 :     async fn ingest_xlog_dbase_create(
    1034            0 :         &mut self,
    1035            0 :         modification: &mut DatadirModification<'_>,
    1036            0 :         rec: &XlCreateDatabase,
    1037            0 :         ctx: &RequestContext,
    1038            0 :     ) -> anyhow::Result<()> {
    1039            0 :         let db_id = rec.db_id;
    1040            0 :         let tablespace_id = rec.tablespace_id;
    1041            0 :         let src_db_id = rec.src_db_id;
    1042            0 :         let src_tablespace_id = rec.src_tablespace_id;
    1043              : 
    1044            0 :         let rels = modification
    1045            0 :             .tline
    1046            0 :             .list_rels(
    1047            0 :                 src_tablespace_id,
    1048            0 :                 src_db_id,
    1049            0 :                 Version::Modified(modification),
    1050            0 :                 ctx,
    1051            0 :             )
    1052            0 :             .await?;
    1053              : 
    1054            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
    1055              : 
    1056              :         // Copy relfilemap
    1057            0 :         let filemap = modification
    1058            0 :             .tline
    1059            0 :             .get_relmap_file(
    1060            0 :                 src_tablespace_id,
    1061            0 :                 src_db_id,
    1062            0 :                 Version::Modified(modification),
    1063            0 :                 ctx,
    1064            0 :             )
    1065            0 :             .await?;
    1066            0 :         modification
    1067            0 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
    1068            0 :             .await?;
    1069              : 
    1070            0 :         let mut num_rels_copied = 0;
    1071            0 :         let mut num_blocks_copied = 0;
    1072            0 :         for src_rel in rels {
    1073            0 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
    1074            0 :             assert_eq!(src_rel.dbnode, src_db_id);
    1075              : 
    1076            0 :             let nblocks = modification
    1077            0 :                 .tline
    1078            0 :                 .get_rel_size(src_rel, Version::Modified(modification), ctx)
    1079            0 :                 .await?;
    1080            0 :             let dst_rel = RelTag {
    1081            0 :                 spcnode: tablespace_id,
    1082            0 :                 dbnode: db_id,
    1083            0 :                 relnode: src_rel.relnode,
    1084            0 :                 forknum: src_rel.forknum,
    1085            0 :             };
    1086            0 : 
    1087            0 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
    1088              : 
    1089              :             // Copy content
    1090            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
    1091            0 :             for blknum in 0..nblocks {
    1092              :                 // Sharding:
    1093              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
    1094              :                 //    dbNode is not included in the hash inputs for sharding.
    1095              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
    1096              :                 //    that belong to it.
    1097            0 :                 let src_key = rel_block_to_key(src_rel, blknum);
    1098            0 :                 if !self.shard.is_key_local(&src_key) {
    1099            0 :                     debug!(
    1100            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
    1101              :                         src_key
    1102              :                     );
    1103            0 :                     continue;
    1104            0 :                 }
    1105            0 :                 debug!(
    1106            0 :                     "copying block {} from {} ({}) to {}",
    1107              :                     blknum, src_rel, src_key, dst_rel
    1108              :                 );
    1109              : 
    1110            0 :                 let content = modification
    1111            0 :                     .tline
    1112            0 :                     .get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
    1113            0 :                     .await?;
    1114            0 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
    1115            0 :                 num_blocks_copied += 1;
    1116              :             }
    1117              : 
    1118            0 :             num_rels_copied += 1;
    1119              :         }
    1120              : 
    1121            0 :         info!(
    1122            0 :             "Created database {}/{}, copied {} blocks in {} rels",
    1123              :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
    1124              :         );
    1125            0 :         Ok(())
    1126            0 :     }
    1127              : 
    1128           16 :     async fn ingest_xlog_smgr_create(
    1129           16 :         &mut self,
    1130           16 :         modification: &mut DatadirModification<'_>,
    1131           16 :         rec: &XlSmgrCreate,
    1132           16 :         ctx: &RequestContext,
    1133           16 :     ) -> anyhow::Result<()> {
    1134           16 :         let rel = RelTag {
    1135           16 :             spcnode: rec.rnode.spcnode,
    1136           16 :             dbnode: rec.rnode.dbnode,
    1137           16 :             relnode: rec.rnode.relnode,
    1138           16 :             forknum: rec.forknum,
    1139           16 :         };
    1140           16 :         self.put_rel_creation(modification, rel, ctx).await?;
    1141           16 :         Ok(())
    1142           16 :     }
    1143              : 
    1144              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
    1145              :     ///
    1146              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
    1147            0 :     async fn ingest_xlog_smgr_truncate(
    1148            0 :         &mut self,
    1149            0 :         modification: &mut DatadirModification<'_>,
    1150            0 :         rec: &XlSmgrTruncate,
    1151            0 :         ctx: &RequestContext,
    1152            0 :     ) -> anyhow::Result<()> {
    1153            0 :         let spcnode = rec.rnode.spcnode;
    1154            0 :         let dbnode = rec.rnode.dbnode;
    1155            0 :         let relnode = rec.rnode.relnode;
    1156            0 : 
    1157            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
    1158            0 :             let rel = RelTag {
    1159            0 :                 spcnode,
    1160            0 :                 dbnode,
    1161            0 :                 relnode,
    1162            0 :                 forknum: MAIN_FORKNUM,
    1163            0 :             };
    1164            0 :             self.put_rel_truncation(modification, rel, rec.blkno, ctx)
    1165            0 :                 .await?;
    1166            0 :         }
    1167            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
    1168            0 :             let rel = RelTag {
    1169            0 :                 spcnode,
    1170            0 :                 dbnode,
    1171            0 :                 relnode,
    1172            0 :                 forknum: FSM_FORKNUM,
    1173            0 :             };
    1174            0 : 
    1175            0 :             let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
    1176            0 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
    1177            0 :             if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
    1178              :                 // Tail of last remaining FSM page has to be zeroed.
    1179              :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
    1180            0 :                 modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
    1181            0 :                 fsm_physical_page_no += 1;
    1182            0 :             }
    1183            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1184            0 :             if nblocks > fsm_physical_page_no {
    1185              :                 // check if something to do: FSM is larger than truncate position
    1186            0 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
    1187            0 :                     .await?;
    1188            0 :             }
    1189            0 :         }
    1190            0 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
    1191            0 :             let rel = RelTag {
    1192            0 :                 spcnode,
    1193            0 :                 dbnode,
    1194            0 :                 relnode,
    1195            0 :                 forknum: VISIBILITYMAP_FORKNUM,
    1196            0 :             };
    1197            0 : 
    1198            0 :             let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
    1199            0 :             if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
    1200              :                 // Tail of last remaining vm page has to be zeroed.
    1201              :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
    1202            0 :                 modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
    1203            0 :                 vm_page_no += 1;
    1204            0 :             }
    1205            0 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1206            0 :             if nblocks > vm_page_no {
    1207              :                 // check if something to do: VM is larger than truncate position
    1208            0 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
    1209            0 :                     .await?;
    1210            0 :             }
    1211            0 :         }
    1212            0 :         Ok(())
    1213            0 :     }
    1214              : 
    1215              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
    1216              :     ///
    1217            8 :     async fn ingest_xact_record(
    1218            8 :         &mut self,
    1219            8 :         modification: &mut DatadirModification<'_>,
    1220            8 :         parsed: &XlXactParsedRecord,
    1221            8 :         is_commit: bool,
    1222            8 :         origin_id: u16,
    1223            8 :         ctx: &RequestContext,
    1224            8 :     ) -> anyhow::Result<()> {
    1225            8 :         // Record update of CLOG pages
    1226            8 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
    1227            8 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1228            8 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1229            8 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
    1230              : 
    1231            8 :         for subxact in &parsed.subxacts {
    1232            0 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
    1233            0 :             if subxact_pageno != pageno {
    1234              :                 // This subxact goes to different page. Write the record
    1235              :                 // for all the XIDs on the previous page, and continue
    1236              :                 // accumulating XIDs on this new page.
    1237            0 :                 modification.put_slru_wal_record(
    1238            0 :                     SlruKind::Clog,
    1239            0 :                     segno,
    1240            0 :                     rpageno,
    1241            0 :                     if is_commit {
    1242            0 :                         NeonWalRecord::ClogSetCommitted {
    1243            0 :                             xids: page_xids,
    1244            0 :                             timestamp: parsed.xact_time,
    1245            0 :                         }
    1246              :                     } else {
    1247            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
    1248              :                     },
    1249            0 :                 )?;
    1250            0 :                 page_xids = Vec::new();
    1251            0 :             }
    1252            0 :             pageno = subxact_pageno;
    1253            0 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1254            0 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1255            0 :             page_xids.push(*subxact);
    1256              :         }
    1257            8 :         modification.put_slru_wal_record(
    1258            8 :             SlruKind::Clog,
    1259            8 :             segno,
    1260            8 :             rpageno,
    1261            8 :             if is_commit {
    1262            8 :                 NeonWalRecord::ClogSetCommitted {
    1263            8 :                     xids: page_xids,
    1264            8 :                     timestamp: parsed.xact_time,
    1265            8 :                 }
    1266              :             } else {
    1267            0 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
    1268              :             },
    1269            0 :         )?;
    1270              : 
    1271            8 :         for xnode in &parsed.xnodes {
    1272            0 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
    1273            0 :                 let rel = RelTag {
    1274            0 :                     forknum,
    1275            0 :                     spcnode: xnode.spcnode,
    1276            0 :                     dbnode: xnode.dbnode,
    1277            0 :                     relnode: xnode.relnode,
    1278            0 :                 };
    1279            0 :                 if modification
    1280            0 :                     .tline
    1281            0 :                     .get_rel_exists(rel, Version::Modified(modification), ctx)
    1282            0 :                     .await?
    1283              :                 {
    1284            0 :                     self.put_rel_drop(modification, rel, ctx).await?;
    1285            0 :                 }
    1286              :             }
    1287              :         }
    1288            8 :         if origin_id != 0 {
    1289            0 :             modification
    1290            0 :                 .set_replorigin(origin_id, parsed.origin_lsn)
    1291            0 :                 .await?;
    1292            8 :         }
    1293            8 :         Ok(())
    1294            8 :     }
    1295              : 
    1296            0 :     async fn ingest_clog_truncate_record(
    1297            0 :         &mut self,
    1298            0 :         modification: &mut DatadirModification<'_>,
    1299            0 :         xlrec: &XlClogTruncate,
    1300            0 :         ctx: &RequestContext,
    1301            0 :     ) -> anyhow::Result<()> {
    1302            0 :         info!(
    1303            0 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
    1304              :             xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
    1305              :         );
    1306              : 
    1307              :         // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
    1308              :         // truncated, but a checkpoint record with the updated values isn't written until
    1309              :         // later. In Neon, a server can start at any LSN, not just on a checkpoint record,
    1310              :         // so we keep the oldestXid and oldestXidDB up-to-date.
    1311            0 :         self.checkpoint.oldestXid = xlrec.oldest_xid;
    1312            0 :         self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
    1313            0 :         self.checkpoint_modified = true;
    1314            0 : 
    1315            0 :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
    1316            0 : 
    1317            0 :         let latest_page_number =
    1318            0 :             self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
    1319            0 : 
    1320            0 :         // Now delete all segments containing pages between xlrec.pageno
    1321            0 :         // and latest_page_number.
    1322            0 : 
    1323            0 :         // First, make an important safety check:
    1324            0 :         // the current endpoint page must not be eligible for removal.
    1325            0 :         // See SimpleLruTruncate() in slru.c
    1326            0 :         if clogpage_precedes(latest_page_number, xlrec.pageno) {
    1327            0 :             info!("could not truncate directory pg_xact apparent wraparound");
    1328            0 :             return Ok(());
    1329            0 :         }
    1330              : 
    1331              :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
    1332              :         //
    1333              :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
    1334              :         // will block waiting for the last valid LSN to advance up to
    1335              :         // it. So we use the previous record's LSN in the get calls
    1336              :         // instead.
    1337            0 :         for segno in modification
    1338            0 :             .tline
    1339            0 :             .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
    1340            0 :             .await?
    1341              :         {
    1342            0 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
    1343            0 :             if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
    1344            0 :                 modification
    1345            0 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
    1346            0 :                     .await?;
    1347            0 :                 trace!("Drop CLOG segment {:>04X}", segno);
    1348            0 :             }
    1349              :         }
    1350              : 
    1351            0 :         Ok(())
    1352            0 :     }
    1353              : 
    1354            0 :     fn ingest_multixact_create_record(
    1355            0 :         &mut self,
    1356            0 :         modification: &mut DatadirModification,
    1357            0 :         xlrec: &XlMultiXactCreate,
    1358            0 :     ) -> Result<()> {
    1359            0 :         // Create WAL record for updating the multixact-offsets page
    1360            0 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1361            0 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1362            0 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1363            0 : 
    1364            0 :         modification.put_slru_wal_record(
    1365            0 :             SlruKind::MultiXactOffsets,
    1366            0 :             segno,
    1367            0 :             rpageno,
    1368            0 :             NeonWalRecord::MultixactOffsetCreate {
    1369            0 :                 mid: xlrec.mid,
    1370            0 :                 moff: xlrec.moff,
    1371            0 :             },
    1372            0 :         )?;
    1373              : 
    1374              :         // Create WAL records for the update of each affected multixact-members page
    1375            0 :         let mut members = xlrec.members.iter();
    1376            0 :         let mut offset = xlrec.moff;
    1377              :         loop {
    1378            0 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1379            0 : 
    1380            0 :             // How many members fit on this page?
    1381            0 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1382            0 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1383            0 : 
    1384            0 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1385            0 :             for _ in 0..page_remain {
    1386            0 :                 if let Some(m) = members.next() {
    1387            0 :                     this_page_members.push(m.clone());
    1388            0 :                 } else {
    1389            0 :                     break;
    1390              :                 }
    1391              :             }
    1392            0 :             if this_page_members.is_empty() {
    1393              :                 // all done
    1394            0 :                 break;
    1395            0 :             }
    1396            0 :             let n_this_page = this_page_members.len();
    1397            0 : 
    1398            0 :             modification.put_slru_wal_record(
    1399            0 :                 SlruKind::MultiXactMembers,
    1400            0 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1401            0 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1402            0 :                 NeonWalRecord::MultixactMembersCreate {
    1403            0 :                     moff: offset,
    1404            0 :                     members: this_page_members,
    1405            0 :                 },
    1406            0 :             )?;
    1407              : 
    1408              :             // Note: The multixact members can wrap around, even within one WAL record.
    1409            0 :             offset = offset.wrapping_add(n_this_page as u32);
    1410              :         }
    1411            0 :         let next_offset = offset;
    1412            0 :         assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
    1413              : 
    1414              :         // Update next-multi-xid and next-offset
    1415              :         //
    1416              :         // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
    1417              :         // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
    1418              :         // read it, like GetNewMultiXactId(). This is different from how nextXid is
    1419              :         // incremented! nextXid skips over < FirstNormalTransactionId when the the value
    1420              :         // is stored, so it's never 0 in a checkpoint.
    1421              :         //
    1422              :         // I don't know why it's done that way, it seems less error-prone to skip over 0
    1423              :         // when the value is stored rather than when it's read. But let's do it the same
    1424              :         // way here.
    1425            0 :         let next_multi_xid = xlrec.mid.wrapping_add(1);
    1426            0 : 
    1427            0 :         if self
    1428            0 :             .checkpoint
    1429            0 :             .update_next_multixid(next_multi_xid, next_offset)
    1430            0 :         {
    1431            0 :             self.checkpoint_modified = true;
    1432            0 :         }
    1433              : 
    1434              :         // Also update the next-xid with the highest member. According to the comments in
    1435              :         // multixact_redo(), this shouldn't be necessary, but let's do the same here.
    1436            0 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
    1437            0 :             if let Some(max_xid) = acc {
    1438            0 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
    1439            0 :                     Some(mbr.xid)
    1440              :                 } else {
    1441            0 :                     acc
    1442              :                 }
    1443              :             } else {
    1444            0 :                 Some(mbr.xid)
    1445              :             }
    1446            0 :         });
    1447              : 
    1448            0 :         if let Some(max_xid) = max_mbr_xid {
    1449            0 :             if self.checkpoint.update_next_xid(max_xid) {
    1450            0 :                 self.checkpoint_modified = true;
    1451            0 :             }
    1452            0 :         }
    1453            0 :         Ok(())
    1454            0 :     }
    1455              : 
    1456            0 :     async fn ingest_multixact_truncate_record(
    1457            0 :         &mut self,
    1458            0 :         modification: &mut DatadirModification<'_>,
    1459            0 :         xlrec: &XlMultiXactTruncate,
    1460            0 :         ctx: &RequestContext,
    1461            0 :     ) -> Result<()> {
    1462            0 :         self.checkpoint.oldestMulti = xlrec.end_trunc_off;
    1463            0 :         self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
    1464            0 :         self.checkpoint_modified = true;
    1465            0 : 
    1466            0 :         // PerformMembersTruncation
    1467            0 :         let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
    1468            0 :         let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1469            0 :         let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1470            0 :         let mut segment: i32 = startsegment;
    1471              : 
    1472              :         // Delete all the segments except the last one. The last segment can still
    1473              :         // contain, possibly partially, valid data.
    1474            0 :         while segment != endsegment {
    1475            0 :             modification
    1476            0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1477            0 :                 .await?;
    1478              : 
    1479              :             /* move to next segment, handling wraparound correctly */
    1480            0 :             if segment == maxsegment {
    1481            0 :                 segment = 0;
    1482            0 :             } else {
    1483            0 :                 segment += 1;
    1484            0 :             }
    1485              :         }
    1486              : 
    1487              :         // Truncate offsets
    1488              :         // FIXME: this did not handle wraparound correctly
    1489              : 
    1490            0 :         Ok(())
    1491            0 :     }
    1492              : 
    1493            0 :     async fn ingest_relmap_page(
    1494            0 :         &mut self,
    1495            0 :         modification: &mut DatadirModification<'_>,
    1496            0 :         xlrec: &XlRelmapUpdate,
    1497            0 :         decoded: &DecodedWALRecord,
    1498            0 :         ctx: &RequestContext,
    1499            0 :     ) -> Result<()> {
    1500            0 :         let mut buf = decoded.record.clone();
    1501            0 :         buf.advance(decoded.main_data_offset);
    1502            0 :         // skip xl_relmap_update
    1503            0 :         buf.advance(12);
    1504            0 : 
    1505            0 :         modification
    1506            0 :             .put_relmap_file(
    1507            0 :                 xlrec.tsid,
    1508            0 :                 xlrec.dbid,
    1509            0 :                 Bytes::copy_from_slice(&buf[..]),
    1510            0 :                 ctx,
    1511            0 :             )
    1512            0 :             .await
    1513            0 :     }
    1514              : 
    1515           18 :     async fn put_rel_creation(
    1516           18 :         &mut self,
    1517           18 :         modification: &mut DatadirModification<'_>,
    1518           18 :         rel: RelTag,
    1519           18 :         ctx: &RequestContext,
    1520           18 :     ) -> Result<()> {
    1521           18 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1522           18 :         Ok(())
    1523           18 :     }
    1524              : 
    1525       272426 :     async fn put_rel_page_image(
    1526       272426 :         &mut self,
    1527       272426 :         modification: &mut DatadirModification<'_>,
    1528       272426 :         rel: RelTag,
    1529       272426 :         blknum: BlockNumber,
    1530       272426 :         img: Bytes,
    1531       272426 :         ctx: &RequestContext,
    1532       272426 :     ) -> Result<(), PageReconstructError> {
    1533       272426 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1534         4073 :             .await?;
    1535       272426 :         modification.put_rel_page_image(rel, blknum, img)?;
    1536       272426 :         Ok(())
    1537       272426 :     }
    1538              : 
    1539       145630 :     async fn put_rel_wal_record(
    1540       145630 :         &mut self,
    1541       145630 :         modification: &mut DatadirModification<'_>,
    1542       145630 :         rel: RelTag,
    1543       145630 :         blknum: BlockNumber,
    1544       145630 :         rec: NeonWalRecord,
    1545       145630 :         ctx: &RequestContext,
    1546       145630 :     ) -> Result<()> {
    1547       145630 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1548           43 :             .await?;
    1549       145630 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1550       145630 :         Ok(())
    1551       145630 :     }
    1552              : 
    1553         6012 :     async fn put_rel_truncation(
    1554         6012 :         &mut self,
    1555         6012 :         modification: &mut DatadirModification<'_>,
    1556         6012 :         rel: RelTag,
    1557         6012 :         nblocks: BlockNumber,
    1558         6012 :         ctx: &RequestContext,
    1559         6012 :     ) -> anyhow::Result<()> {
    1560         6012 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1561         6012 :         Ok(())
    1562         6012 :     }
    1563              : 
    1564            2 :     async fn put_rel_drop(
    1565            2 :         &mut self,
    1566            2 :         modification: &mut DatadirModification<'_>,
    1567            2 :         rel: RelTag,
    1568            2 :         ctx: &RequestContext,
    1569            2 :     ) -> Result<()> {
    1570            2 :         modification.put_rel_drop(rel, ctx).await?;
    1571            2 :         Ok(())
    1572            2 :     }
    1573              : 
    1574       418056 :     async fn handle_rel_extend(
    1575       418056 :         &mut self,
    1576       418056 :         modification: &mut DatadirModification<'_>,
    1577       418056 :         rel: RelTag,
    1578       418056 :         blknum: BlockNumber,
    1579       418056 :         ctx: &RequestContext,
    1580       418056 :     ) -> Result<(), PageReconstructError> {
    1581       418056 :         let new_nblocks = blknum + 1;
    1582              :         // Check if the relation exists. We implicitly create relations on first
    1583              :         // record.
    1584              :         // TODO: would be nice if to be more explicit about it
    1585              : 
    1586              :         // Get current size and put rel creation if rel doesn't exist
    1587              :         //
    1588              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1589              :         //       check the cache too. This is because eagerly checking the cache results in
    1590              :         //       less work overall and 10% better performance. It's more work on cache miss
    1591              :         //       but cache miss is rare.
    1592       418056 :         let old_nblocks = if let Some(nblocks) = modification
    1593       418056 :             .tline
    1594       418056 :             .get_cached_rel_size(&rel, modification.get_lsn())
    1595              :         {
    1596       418046 :             nblocks
    1597           10 :         } else if !modification
    1598           10 :             .tline
    1599           10 :             .get_rel_exists(rel, Version::Modified(modification), ctx)
    1600            0 :             .await?
    1601              :         {
    1602              :             // create it with 0 size initially, the logic below will extend it
    1603           10 :             modification
    1604           10 :                 .put_rel_creation(rel, 0, ctx)
    1605            0 :                 .await
    1606           10 :                 .context("Relation Error")?;
    1607           10 :             0
    1608              :         } else {
    1609            0 :             modification
    1610            0 :                 .tline
    1611            0 :                 .get_rel_size(rel, Version::Modified(modification), ctx)
    1612            0 :                 .await?
    1613              :         };
    1614              : 
    1615       418056 :         if new_nblocks > old_nblocks {
    1616              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1617       274788 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1618              : 
    1619       274788 :             let mut key = rel_block_to_key(rel, blknum);
    1620              :             // fill the gap with zeros
    1621       274788 :             for gap_blknum in old_nblocks..blknum {
    1622         2998 :                 key.field6 = gap_blknum;
    1623         2998 : 
    1624         2998 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1625            0 :                     continue;
    1626         2998 :                 }
    1627         2998 : 
    1628         2998 :                 modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
    1629              :             }
    1630       143268 :         }
    1631       418056 :         Ok(())
    1632       418056 :     }
    1633              : 
    1634            0 :     async fn put_slru_page_image(
    1635            0 :         &mut self,
    1636            0 :         modification: &mut DatadirModification<'_>,
    1637            0 :         kind: SlruKind,
    1638            0 :         segno: u32,
    1639            0 :         blknum: BlockNumber,
    1640            0 :         img: Bytes,
    1641            0 :         ctx: &RequestContext,
    1642            0 :     ) -> Result<()> {
    1643            0 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1644            0 :             .await?;
    1645            0 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1646            0 :         Ok(())
    1647            0 :     }
    1648              : 
    1649            0 :     async fn handle_slru_extend(
    1650            0 :         &mut self,
    1651            0 :         modification: &mut DatadirModification<'_>,
    1652            0 :         kind: SlruKind,
    1653            0 :         segno: u32,
    1654            0 :         blknum: BlockNumber,
    1655            0 :         ctx: &RequestContext,
    1656            0 :     ) -> anyhow::Result<()> {
    1657            0 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1658            0 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1659            0 :         // a lot less frequently.
    1660            0 : 
    1661            0 :         let new_nblocks = blknum + 1;
    1662              :         // Check if the relation exists. We implicitly create relations on first
    1663              :         // record.
    1664              :         // TODO: would be nice if to be more explicit about it
    1665            0 :         let old_nblocks = if !modification
    1666            0 :             .tline
    1667            0 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1668            0 :             .await?
    1669              :         {
    1670              :             // create it with 0 size initially, the logic below will extend it
    1671            0 :             modification
    1672            0 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1673            0 :                 .await?;
    1674            0 :             0
    1675              :         } else {
    1676            0 :             modification
    1677            0 :                 .tline
    1678            0 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1679            0 :                 .await?
    1680              :         };
    1681              : 
    1682            0 :         if new_nblocks > old_nblocks {
    1683            0 :             trace!(
    1684            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1685              :                 kind,
    1686              :                 segno,
    1687              :                 old_nblocks,
    1688              :                 new_nblocks
    1689              :             );
    1690            0 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1691              : 
    1692              :             // fill the gap with zeros
    1693            0 :             for gap_blknum in old_nblocks..blknum {
    1694            0 :                 modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
    1695              :             }
    1696            0 :         }
    1697            0 :         Ok(())
    1698            0 :     }
    1699              : }
    1700              : 
    1701           12 : async fn get_relsize(
    1702           12 :     modification: &DatadirModification<'_>,
    1703           12 :     rel: RelTag,
    1704           12 :     ctx: &RequestContext,
    1705           12 : ) -> anyhow::Result<BlockNumber> {
    1706           12 :     let nblocks = if !modification
    1707           12 :         .tline
    1708           12 :         .get_rel_exists(rel, Version::Modified(modification), ctx)
    1709            0 :         .await?
    1710              :     {
    1711            0 :         0
    1712              :     } else {
    1713           12 :         modification
    1714           12 :             .tline
    1715           12 :             .get_rel_size(rel, Version::Modified(modification), ctx)
    1716            0 :             .await?
    1717              :     };
    1718           12 :     Ok(nblocks)
    1719           12 : }
    1720              : 
    1721              : #[allow(clippy::bool_assert_comparison)]
    1722              : #[cfg(test)]
    1723              : mod tests {
    1724              :     use super::*;
    1725              :     use crate::tenant::harness::*;
    1726              :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    1727              :     use postgres_ffi::RELSEG_SIZE;
    1728              : 
    1729              :     use crate::DEFAULT_PG_VERSION;
    1730              : 
    1731              :     /// Arbitrary relation tag, for testing.
    1732              :     const TESTREL_A: RelTag = RelTag {
    1733              :         spcnode: 0,
    1734              :         dbnode: 111,
    1735              :         relnode: 1000,
    1736              :         forknum: 0,
    1737              :     };
    1738              : 
    1739           12 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1740           12 :         // TODO
    1741           12 :     }
    1742              : 
    1743              :     static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
    1744              : 
    1745            8 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    1746            8 :         let mut m = tline.begin_modification(Lsn(0x10));
    1747            8 :         m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1748           16 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1749            8 :         m.commit(ctx).await?;
    1750            8 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1751              : 
    1752            8 :         Ok(walingest)
    1753            8 :     }
    1754              : 
    1755              :     #[tokio::test]
    1756            2 :     async fn test_relsize() -> Result<()> {
    1757            8 :         let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
    1758            2 :         let tline = tenant
    1759            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1760            6 :             .await?;
    1761            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1762            2 : 
    1763            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1764            2 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1765            2 :         walingest
    1766            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1767            2 :             .await?;
    1768            2 :         m.commit(&ctx).await?;
    1769            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    1770            2 :         walingest
    1771            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
    1772            2 :             .await?;
    1773            2 :         m.commit(&ctx).await?;
    1774            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    1775            2 :         walingest
    1776            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
    1777            2 :             .await?;
    1778            2 :         m.commit(&ctx).await?;
    1779            2 :         let mut m = tline.begin_modification(Lsn(0x50));
    1780            2 :         walingest
    1781            2 :             .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
    1782            2 :             .await?;
    1783            2 :         m.commit(&ctx).await?;
    1784            2 : 
    1785            2 :         assert_current_logical_size(&tline, Lsn(0x50));
    1786            2 : 
    1787            2 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1788            2 :         assert_eq!(
    1789            2 :             tline
    1790            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1791            2 :                 .await?,
    1792            2 :             false
    1793            2 :         );
    1794            2 :         assert!(tline
    1795            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    1796            2 :             .await
    1797            2 :             .is_err());
    1798            2 :         assert_eq!(
    1799            2 :             tline
    1800            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1801            2 :                 .await?,
    1802            2 :             true
    1803            2 :         );
    1804            2 :         assert_eq!(
    1805            2 :             tline
    1806            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1807            2 :                 .await?,
    1808            2 :             1
    1809            2 :         );
    1810            2 :         assert_eq!(
    1811            2 :             tline
    1812            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1813            2 :                 .await?,
    1814            2 :             3
    1815            2 :         );
    1816            2 : 
    1817            2 :         // Check page contents at each LSN
    1818            2 :         assert_eq!(
    1819            2 :             tline
    1820            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
    1821            2 :                 .await?,
    1822            2 :             test_img("foo blk 0 at 2")
    1823            2 :         );
    1824            2 : 
    1825            2 :         assert_eq!(
    1826            2 :             tline
    1827            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
    1828            2 :                 .await?,
    1829            2 :             test_img("foo blk 0 at 3")
    1830            2 :         );
    1831            2 : 
    1832            2 :         assert_eq!(
    1833            2 :             tline
    1834            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
    1835            2 :                 .await?,
    1836            2 :             test_img("foo blk 0 at 3")
    1837            2 :         );
    1838            2 :         assert_eq!(
    1839            2 :             tline
    1840            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
    1841            2 :                 .await?,
    1842            2 :             test_img("foo blk 1 at 4")
    1843            2 :         );
    1844            2 : 
    1845            2 :         assert_eq!(
    1846            2 :             tline
    1847            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
    1848            2 :                 .await?,
    1849            2 :             test_img("foo blk 0 at 3")
    1850            2 :         );
    1851            2 :         assert_eq!(
    1852            2 :             tline
    1853            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
    1854            2 :                 .await?,
    1855            2 :             test_img("foo blk 1 at 4")
    1856            2 :         );
    1857            2 :         assert_eq!(
    1858            2 :             tline
    1859            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    1860            2 :                 .await?,
    1861            2 :             test_img("foo blk 2 at 5")
    1862            2 :         );
    1863            2 : 
    1864            2 :         // Truncate last block
    1865            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    1866            2 :         walingest
    1867            2 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1868            2 :             .await?;
    1869            2 :         m.commit(&ctx).await?;
    1870            2 :         assert_current_logical_size(&tline, Lsn(0x60));
    1871            2 : 
    1872            2 :         // Check reported size and contents after truncation
    1873            2 :         assert_eq!(
    1874            2 :             tline
    1875            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    1876            2 :                 .await?,
    1877            2 :             2
    1878            2 :         );
    1879            2 :         assert_eq!(
    1880            2 :             tline
    1881            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
    1882            2 :                 .await?,
    1883            2 :             test_img("foo blk 0 at 3")
    1884            2 :         );
    1885            2 :         assert_eq!(
    1886            2 :             tline
    1887            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
    1888            2 :                 .await?,
    1889            2 :             test_img("foo blk 1 at 4")
    1890            2 :         );
    1891            2 : 
    1892            2 :         // should still see the truncated block with older LSN
    1893            2 :         assert_eq!(
    1894            2 :             tline
    1895            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    1896            2 :                 .await?,
    1897            2 :             3
    1898            2 :         );
    1899            2 :         assert_eq!(
    1900            2 :             tline
    1901            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
    1902            2 :                 .await?,
    1903            2 :             test_img("foo blk 2 at 5")
    1904            2 :         );
    1905            2 : 
    1906            2 :         // Truncate to zero length
    1907            2 :         let mut m = tline.begin_modification(Lsn(0x68));
    1908            2 :         walingest
    1909            2 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1910            2 :             .await?;
    1911            2 :         m.commit(&ctx).await?;
    1912            2 :         assert_eq!(
    1913            2 :             tline
    1914            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
    1915            2 :                 .await?,
    1916            2 :             0
    1917            2 :         );
    1918            2 : 
    1919            2 :         // Extend from 0 to 2 blocks, leaving a gap
    1920            2 :         let mut m = tline.begin_modification(Lsn(0x70));
    1921            2 :         walingest
    1922            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
    1923            2 :             .await?;
    1924            2 :         m.commit(&ctx).await?;
    1925            2 :         assert_eq!(
    1926            2 :             tline
    1927            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
    1928            2 :                 .await?,
    1929            2 :             2
    1930            2 :         );
    1931            2 :         assert_eq!(
    1932            2 :             tline
    1933            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
    1934            2 :                 .await?,
    1935            2 :             ZERO_PAGE
    1936            2 :         );
    1937            2 :         assert_eq!(
    1938            2 :             tline
    1939            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
    1940            2 :                 .await?,
    1941            2 :             test_img("foo blk 1")
    1942            2 :         );
    1943            2 : 
    1944            2 :         // Extend a lot more, leaving a big gap that spans across segments
    1945            2 :         let mut m = tline.begin_modification(Lsn(0x80));
    1946            2 :         walingest
    1947            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
    1948            2 :             .await?;
    1949          214 :         m.commit(&ctx).await?;
    1950            2 :         assert_eq!(
    1951            2 :             tline
    1952            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    1953            2 :                 .await?,
    1954            2 :             1501
    1955            2 :         );
    1956         2998 :         for blk in 2..1500 {
    1957         2996 :             assert_eq!(
    1958         2996 :                 tline
    1959         2996 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
    1960         3016 :                     .await?,
    1961         2996 :                 ZERO_PAGE
    1962            2 :             );
    1963            2 :         }
    1964            2 :         assert_eq!(
    1965            2 :             tline
    1966            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
    1967            2 :                 .await?,
    1968            2 :             test_img("foo blk 1500")
    1969            2 :         );
    1970            2 : 
    1971            2 :         Ok(())
    1972            2 :     }
    1973              : 
    1974              :     // Test what happens if we dropped a relation
    1975              :     // and then created it again within the same layer.
    1976              :     #[tokio::test]
    1977            2 :     async fn test_drop_extend() -> Result<()> {
    1978            8 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
    1979            2 :         let tline = tenant
    1980            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1981            6 :             .await?;
    1982            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1983            2 : 
    1984            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1985            2 :         walingest
    1986            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
    1987            2 :             .await?;
    1988            2 :         m.commit(&ctx).await?;
    1989            2 : 
    1990            2 :         // Check that rel exists and size is correct
    1991            2 :         assert_eq!(
    1992            2 :             tline
    1993            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    1994            2 :                 .await?,
    1995            2 :             true
    1996            2 :         );
    1997            2 :         assert_eq!(
    1998            2 :             tline
    1999            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2000            2 :                 .await?,
    2001            2 :             1
    2002            2 :         );
    2003            2 : 
    2004            2 :         // Drop rel
    2005            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    2006            2 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    2007            2 :         m.commit(&ctx).await?;
    2008            2 : 
    2009            2 :         // Check that rel is not visible anymore
    2010            2 :         assert_eq!(
    2011            2 :             tline
    2012            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
    2013            2 :                 .await?,
    2014            2 :             false
    2015            2 :         );
    2016            2 : 
    2017            2 :         // FIXME: should fail
    2018            2 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    2019            2 : 
    2020            2 :         // Re-create it
    2021            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    2022            2 :         walingest
    2023            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
    2024            2 :             .await?;
    2025            2 :         m.commit(&ctx).await?;
    2026            2 : 
    2027            2 :         // Check that rel exists and size is correct
    2028            2 :         assert_eq!(
    2029            2 :             tline
    2030            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2031            2 :                 .await?,
    2032            2 :             true
    2033            2 :         );
    2034            2 :         assert_eq!(
    2035            2 :             tline
    2036            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
    2037            2 :                 .await?,
    2038            2 :             1
    2039            2 :         );
    2040            2 : 
    2041            2 :         Ok(())
    2042            2 :     }
    2043              : 
    2044              :     // Test what happens if we truncated a relation
    2045              :     // so that one of its segments was dropped
    2046              :     // and then extended it again within the same layer.
    2047              :     #[tokio::test]
    2048            2 :     async fn test_truncate_extend() -> Result<()> {
    2049            8 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
    2050            2 :         let tline = tenant
    2051            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2052            6 :             .await?;
    2053            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2054            2 : 
    2055            2 :         // Create a 20 MB relation (the size is arbitrary)
    2056            2 :         let relsize = 20 * 1024 * 1024 / 8192;
    2057            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    2058         5120 :         for blkno in 0..relsize {
    2059         5120 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    2060         5120 :             walingest
    2061         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2062            2 :                 .await?;
    2063            2 :         }
    2064           41 :         m.commit(&ctx).await?;
    2065            2 : 
    2066            2 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    2067            2 :         assert_eq!(
    2068            2 :             tline
    2069            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2070            2 :                 .await?,
    2071            2 :             false
    2072            2 :         );
    2073            2 :         assert!(tline
    2074            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
    2075            2 :             .await
    2076            2 :             .is_err());
    2077            2 : 
    2078            2 :         assert_eq!(
    2079            2 :             tline
    2080            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2081            2 :                 .await?,
    2082            2 :             true
    2083            2 :         );
    2084            2 :         assert_eq!(
    2085            2 :             tline
    2086            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
    2087            2 :                 .await?,
    2088            2 :             relsize
    2089            2 :         );
    2090            2 : 
    2091            2 :         // Check relation content
    2092         5120 :         for blkno in 0..relsize {
    2093         5120 :             let lsn = Lsn(0x20);
    2094         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2095         5120 :             assert_eq!(
    2096         5120 :                 tline
    2097         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
    2098          160 :                     .await?,
    2099         5120 :                 test_img(&data)
    2100            2 :             );
    2101            2 :         }
    2102            2 : 
    2103            2 :         // Truncate relation so that second segment was dropped
    2104            2 :         // - only leave one page
    2105            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    2106            2 :         walingest
    2107            2 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2108            2 :             .await?;
    2109            2 :         m.commit(&ctx).await?;
    2110            2 : 
    2111            2 :         // Check reported size and contents after truncation
    2112            2 :         assert_eq!(
    2113            2 :             tline
    2114            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
    2115            2 :                 .await?,
    2116            2 :             1
    2117            2 :         );
    2118            2 : 
    2119            4 :         for blkno in 0..1 {
    2120            2 :             let lsn = Lsn(0x20);
    2121            2 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2122            2 :             assert_eq!(
    2123            2 :                 tline
    2124            2 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
    2125            2 :                     .await?,
    2126            2 :                 test_img(&data)
    2127            2 :             );
    2128            2 :         }
    2129            2 : 
    2130            2 :         // should still see all blocks with older LSN
    2131            2 :         assert_eq!(
    2132            2 :             tline
    2133            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
    2134            2 :                 .await?,
    2135            2 :             relsize
    2136            2 :         );
    2137         5120 :         for blkno in 0..relsize {
    2138         5120 :             let lsn = Lsn(0x20);
    2139         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2140         5120 :             assert_eq!(
    2141         5120 :                 tline
    2142         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
    2143          321 :                     .await?,
    2144         5120 :                 test_img(&data)
    2145            2 :             );
    2146            2 :         }
    2147            2 : 
    2148            2 :         // Extend relation again.
    2149            2 :         // Add enough blocks to create second segment
    2150            2 :         let lsn = Lsn(0x80);
    2151            2 :         let mut m = tline.begin_modification(lsn);
    2152         5120 :         for blkno in 0..relsize {
    2153         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2154         5120 :             walingest
    2155         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
    2156            2 :                 .await?;
    2157            2 :         }
    2158           42 :         m.commit(&ctx).await?;
    2159            2 : 
    2160            2 :         assert_eq!(
    2161            2 :             tline
    2162            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2163            2 :                 .await?,
    2164            2 :             true
    2165            2 :         );
    2166            2 :         assert_eq!(
    2167            2 :             tline
    2168            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
    2169            2 :                 .await?,
    2170            2 :             relsize
    2171            2 :         );
    2172            2 :         // Check relation content
    2173         5120 :         for blkno in 0..relsize {
    2174         5120 :             let lsn = Lsn(0x80);
    2175         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2176         5120 :             assert_eq!(
    2177         5120 :                 tline
    2178         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
    2179          160 :                     .await?,
    2180         5120 :                 test_img(&data)
    2181            2 :             );
    2182            2 :         }
    2183            2 : 
    2184            2 :         Ok(())
    2185            2 :     }
    2186              : 
    2187              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2188              :     /// split into multiple 1 GB segments in Postgres.
    2189              :     #[tokio::test]
    2190            2 :     async fn test_large_rel() -> Result<()> {
    2191            8 :         let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
    2192            2 :         let tline = tenant
    2193            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2194            6 :             .await?;
    2195            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2196            2 : 
    2197            2 :         let mut lsn = 0x10;
    2198       262146 :         for blknum in 0..RELSEG_SIZE + 1 {
    2199       262146 :             lsn += 0x10;
    2200       262146 :             let mut m = tline.begin_modification(Lsn(lsn));
    2201       262146 :             let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2202       262146 :             walingest
    2203       262146 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2204         4073 :                 .await?;
    2205       262146 :             m.commit(&ctx).await?;
    2206            2 :         }
    2207            2 : 
    2208            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2209            2 : 
    2210            2 :         assert_eq!(
    2211            2 :             tline
    2212            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2213            2 :                 .await?,
    2214            2 :             RELSEG_SIZE + 1
    2215            2 :         );
    2216            2 : 
    2217            2 :         // Truncate one block
    2218            2 :         lsn += 0x10;
    2219            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    2220            2 :         walingest
    2221            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2222            2 :             .await?;
    2223            2 :         m.commit(&ctx).await?;
    2224            2 :         assert_eq!(
    2225            2 :             tline
    2226            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2227            2 :                 .await?,
    2228            2 :             RELSEG_SIZE
    2229            2 :         );
    2230            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2231            2 : 
    2232            2 :         // Truncate another block
    2233            2 :         lsn += 0x10;
    2234            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    2235            2 :         walingest
    2236            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2237            2 :             .await?;
    2238            2 :         m.commit(&ctx).await?;
    2239            2 :         assert_eq!(
    2240            2 :             tline
    2241            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2242            2 :                 .await?,
    2243            2 :             RELSEG_SIZE - 1
    2244            2 :         );
    2245            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2246            2 : 
    2247            2 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2248            2 :         // This tests the behavior at segment boundaries
    2249            2 :         let mut size: i32 = 3000;
    2250         6004 :         while size >= 0 {
    2251         6002 :             lsn += 0x10;
    2252         6002 :             let mut m = tline.begin_modification(Lsn(lsn));
    2253         6002 :             walingest
    2254         6002 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2255           94 :                 .await?;
    2256         6002 :             m.commit(&ctx).await?;
    2257         6002 :             assert_eq!(
    2258         6002 :                 tline
    2259         6002 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
    2260            2 :                     .await?,
    2261         6002 :                 size as BlockNumber
    2262            2 :             );
    2263            2 : 
    2264         6002 :             size -= 1;
    2265            2 :         }
    2266            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2267            2 : 
    2268            2 :         Ok(())
    2269            2 :     }
    2270              : 
    2271              :     /// Replay a wal segment file taken directly from safekeepers.
    2272              :     ///
    2273              :     /// This test is useful for benchmarking since it allows us to profile only
    2274              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2275              :     /// without waiting for unrelated steps.
    2276              :     #[tokio::test]
    2277            2 :     async fn test_ingest_real_wal() {
    2278            2 :         use crate::tenant::harness::*;
    2279            2 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2280            2 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2281            2 : 
    2282            2 :         // Define test data path and constants.
    2283            2 :         //
    2284            2 :         // Steps to reconstruct the data, if needed:
    2285            2 :         // 1. Run the pgbench python test
    2286            2 :         // 2. Take the first wal segment file from safekeeper
    2287            2 :         // 3. Compress it using `zstd --long input_file`
    2288            2 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2289            2 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2290            2 :         // 6. Run just the decoder from this test to get the endpoint.
    2291            2 :         //    It's the last LSN the decoder will output.
    2292            2 :         let pg_version = 15; // The test data was generated by pg15
    2293            2 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2294            2 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2295            2 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2296            2 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2297            2 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2298            2 : 
    2299            2 :         let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
    2300            8 :         let (tenant, ctx) = harness.load().await;
    2301            2 : 
    2302            2 :         let remote_initdb_path =
    2303            2 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    2304            2 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2305            2 : 
    2306            2 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2307            2 :             .expect("creating test dir should work");
    2308            2 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2309            2 : 
    2310            2 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2311            2 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2312            2 :         // the garbage data.
    2313            2 :         let tline = tenant
    2314            2 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2315        21895 :             .await
    2316            2 :             .unwrap();
    2317            2 : 
    2318            2 :         // We fully read and decompress this into memory before decoding
    2319            2 :         // to get a more accurate perf profile of the decoder.
    2320            2 :         let bytes = {
    2321            2 :             use async_compression::tokio::bufread::ZstdDecoder;
    2322            2 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2323            2 :             let reader = tokio::io::BufReader::new(file);
    2324            2 :             let decoder = ZstdDecoder::new(reader);
    2325            2 :             let mut reader = tokio::io::BufReader::new(decoder);
    2326            2 :             let mut buffer = Vec::new();
    2327          220 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2328            2 :             buffer
    2329            2 :         };
    2330            2 : 
    2331            2 :         // TODO start a profiler too
    2332            2 :         let started_at = std::time::Instant::now();
    2333            2 : 
    2334            2 :         // Initialize walingest
    2335            2 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2336            2 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2337            2 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2338            5 :             .await
    2339            2 :             .unwrap();
    2340            2 :         let mut modification = tline.begin_modification(startpoint);
    2341            2 :         let mut decoded = DecodedWALRecord::default();
    2342            2 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2343            2 : 
    2344            2 :         // Decode and ingest wal. We process the wal in chunks because
    2345            2 :         // that's what happens when we get bytes from safekeepers.
    2346       474686 :         for chunk in bytes[xlogoff..].chunks(50) {
    2347       474686 :             decoder.feed_bytes(chunk);
    2348       620536 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2349       145850 :                 walingest
    2350       145850 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
    2351           48 :                     .await
    2352       145850 :                     .unwrap();
    2353            2 :             }
    2354       474686 :             modification.commit(&ctx).await.unwrap();
    2355            2 :         }
    2356            2 : 
    2357            2 :         let duration = started_at.elapsed();
    2358            2 :         println!("done in {:?}", duration);
    2359            2 :     }
    2360              : }
        

Generated by: LCOV version 2.1-beta