LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 81.0 % 1866 1512
Test Date: 2024-02-12 20:26:03 Functions: 69.9 % 83 58

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  ->   WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9              : //! and decodes it to individual WAL records. It feeds the WAL records
      10              : //! to WalIngest, which parses them and stores them in the Repository.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14              : //! page images out of some WAL records, but most it stores as WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_wal_record or put_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use pageserver_api::shard::ShardIdentity;
      25              : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
      26              : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
      27              : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      28              : 
      29              : use anyhow::{bail, Context, Result};
      30              : use bytes::{Buf, Bytes, BytesMut};
      31              : use tracing::*;
      32              : use utils::failpoint_support;
      33              : 
      34              : use crate::context::RequestContext;
      35              : use crate::metrics::WAL_INGEST;
      36              : use crate::pgdatadir_mapping::{DatadirModification, Version};
      37              : use crate::tenant::PageReconstructError;
      38              : use crate::tenant::Timeline;
      39              : use crate::walrecord::*;
      40              : use crate::ZERO_PAGE;
      41              : use pageserver_api::key::rel_block_to_key;
      42              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      43              : use postgres_ffi::pg_constants;
      44              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      45              : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
      46              : use postgres_ffi::v14::xlog_utils::*;
      47              : use postgres_ffi::v14::CheckPoint;
      48              : use postgres_ffi::TransactionId;
      49              : use postgres_ffi::BLCKSZ;
      50              : use utils::lsn::Lsn;
      51              : 
      52              : pub struct WalIngest {
      53              :     shard: ShardIdentity,
      54              :     checkpoint: CheckPoint,
      55              :     checkpoint_modified: bool,
      56              : }
      57              : 
      58              : impl WalIngest {
      59         1338 :     pub async fn new(
      60         1338 :         timeline: &Timeline,
      61         1338 :         startpoint: Lsn,
      62         1338 :         ctx: &RequestContext,
      63         1338 :     ) -> anyhow::Result<WalIngest> {
      64              :         // Fetch the latest checkpoint into memory, so that we can compare with it
      65              :         // quickly in `ingest_record` and update it when it changes.
      66         1338 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
      67         1324 :         let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
      68            0 :         trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
      69              : 
      70         1324 :         Ok(WalIngest {
      71         1324 :             shard: *timeline.get_shard_identity(),
      72         1324 :             checkpoint,
      73         1324 :             checkpoint_modified: false,
      74         1324 :         })
      75         1338 :     }
      76              : 
      77              :     ///
      78              :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
      79              :     ///
      80              :     /// This function updates `lsn` field of `DatadirModification`
      81              :     ///
      82              :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
      83              :     /// relations/pages that the record affects.
      84              :     ///
      85              :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
      86              :     ///
      87     73047338 :     pub async fn ingest_record(
      88     73047338 :         &mut self,
      89     73047338 :         recdata: Bytes,
      90     73047338 :         lsn: Lsn,
      91     73047338 :         modification: &mut DatadirModification<'_>,
      92     73047338 :         decoded: &mut DecodedWALRecord,
      93     73047338 :         ctx: &RequestContext,
      94     73047349 :     ) -> anyhow::Result<bool> {
      95     73047349 :         WAL_INGEST.records_received.inc();
      96     73047349 :         let pg_version = modification.tline.pg_version;
      97     73047349 :         let prev_len = modification.len();
      98     73047349 : 
      99     73047349 :         modification.set_lsn(lsn)?;
     100     73047349 :         decode_wal_record(recdata, decoded, pg_version)?;
     101              : 
     102     73047349 :         let mut buf = decoded.record.clone();
     103     73047349 :         buf.advance(decoded.main_data_offset);
     104     73047349 : 
     105     73047349 :         assert!(!self.checkpoint_modified);
     106     73047349 :         if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
     107     71794365 :             && self.checkpoint.update_next_xid(decoded.xl_xid)
     108         8455 :         {
     109         8455 :             self.checkpoint_modified = true;
     110     73038894 :         }
     111              : 
     112     73047349 :         match decoded.xl_rmid {
     113              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
     114              :                 // Heap AM records need some special handling, because they modify VM pages
     115              :                 // without registering them with the standard mechanism.
     116     54762178 :                 self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
     117           53 :                     .await?;
     118              :             }
     119              :             pg_constants::RM_NEON_ID => {
     120            0 :                 self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
     121            0 :                     .await?;
     122              :             }
     123              :             // Handle other special record types
     124              :             pg_constants::RM_SMGR_ID => {
     125        75603 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     126        75603 : 
     127        75603 :                 if info == pg_constants::XLOG_SMGR_CREATE {
     128        75378 :                     let create = XlSmgrCreate::decode(&mut buf);
     129        75378 :                     self.ingest_xlog_smgr_create(modification, &create, ctx)
     130         1240 :                         .await?;
     131          225 :                 } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     132          225 :                     let truncate = XlSmgrTruncate::decode(&mut buf);
     133          225 :                     self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
     134            2 :                         .await?;
     135            0 :                 }
     136              :             }
     137              :             pg_constants::RM_DBASE_ID => {
     138           27 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     139            0 :                 debug!(%info, %pg_version, "handle RM_DBASE_ID");
     140              : 
     141           27 :                 if pg_version == 14 {
     142           27 :                     if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     143           24 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     144            0 :                         debug!("XLOG_DBASE_CREATE v14");
     145              : 
     146           24 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     147         1547 :                             .await?;
     148            3 :                     } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     149            3 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     150            6 :                         for tablespace_id in dropdb.tablespace_ids {
     151            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     152            3 :                             modification
     153            3 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     154            0 :                                 .await?;
     155              :                         }
     156            0 :                     }
     157            0 :                 } else if pg_version == 15 {
     158            0 :                     if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     159            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     160            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     161              :                         // The XLOG record was renamed between v14 and v15,
     162              :                         // but the record format is the same.
     163              :                         // So we can reuse XlCreateDatabase here.
     164            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     165            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     166            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     167            0 :                             .await?;
     168            0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     169            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     170            0 :                         for tablespace_id in dropdb.tablespace_ids {
     171            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     172            0 :                             modification
     173            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     174            0 :                                 .await?;
     175              :                         }
     176            0 :                     }
     177            0 :                 } else if pg_version == 16 {
     178            0 :                     if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     179            0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     180            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     181              :                         // The XLOG record was renamed between v14 and v15,
     182              :                         // but the record format is the same.
     183              :                         // So we can reuse XlCreateDatabase here.
     184            0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     185            0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     186            0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     187            0 :                             .await?;
     188            0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     189            0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     190            0 :                         for tablespace_id in dropdb.tablespace_ids {
     191            0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     192            0 :                             modification
     193            0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     194            0 :                                 .await?;
     195              :                         }
     196            0 :                     }
     197            0 :                 }
     198              :             }
     199              :             pg_constants::RM_TBLSPC_ID => {
     200            0 :                 trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     201              :             }
     202              :             pg_constants::RM_CLOG_ID => {
     203         1326 :                 let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     204         1326 : 
     205         1326 :                 if info == pg_constants::CLOG_ZEROPAGE {
     206         1323 :                     let pageno = buf.get_u32_le();
     207         1323 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     208         1323 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     209         1323 :                     self.put_slru_page_image(
     210         1323 :                         modification,
     211         1323 :                         SlruKind::Clog,
     212         1323 :                         segno,
     213         1323 :                         rpageno,
     214         1323 :                         ZERO_PAGE.clone(),
     215         1323 :                         ctx,
     216         1323 :                     )
     217          115 :                     .await?;
     218              :                 } else {
     219            3 :                     assert!(info == pg_constants::CLOG_TRUNCATE);
     220            3 :                     let xlrec = XlClogTruncate::decode(&mut buf);
     221            3 :                     self.ingest_clog_truncate_record(modification, &xlrec, ctx)
     222            0 :                         .await?;
     223              :                 }
     224              :             }
     225              :             pg_constants::RM_XACT_ID => {
     226      6538173 :                 let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     227      6538173 : 
     228      6538173 :                 if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
     229      6132815 :                     let parsed_xact =
     230      6132815 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     231      6132815 :                     self.ingest_xact_record(
     232      6132815 :                         modification,
     233      6132815 :                         &parsed_xact,
     234      6132815 :                         info == pg_constants::XLOG_XACT_COMMIT,
     235      6132815 :                         ctx,
     236      6132815 :                     )
     237         1361 :                     .await?;
     238       405358 :                 } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
     239       405357 :                     || info == pg_constants::XLOG_XACT_ABORT_PREPARED
     240              :                 {
     241            2 :                     let parsed_xact =
     242            2 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     243            2 :                     self.ingest_xact_record(
     244            2 :                         modification,
     245            2 :                         &parsed_xact,
     246            2 :                         info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
     247            2 :                         ctx,
     248            2 :                     )
     249            0 :                     .await?;
     250              :                     // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     251            0 :                     trace!(
     252            0 :                         "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     253            0 :                         decoded.xl_xid,
     254            0 :                         parsed_xact.xid,
     255            0 :                         lsn,
     256            0 :                     );
     257            2 :                     modification
     258            2 :                         .drop_twophase_file(parsed_xact.xid, ctx)
     259            0 :                         .await?;
     260       405356 :                 } else if info == pg_constants::XLOG_XACT_PREPARE {
     261            4 :                     modification
     262            4 :                         .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
     263            0 :                         .await?;
     264       405352 :                 }
     265              :             }
     266              :             pg_constants::RM_MULTIXACT_ID => {
     267        25139 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     268        25139 : 
     269        25139 :                 if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
     270           23 :                     let pageno = buf.get_u32_le();
     271           23 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     272           23 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     273           23 :                     self.put_slru_page_image(
     274           23 :                         modification,
     275           23 :                         SlruKind::MultiXactOffsets,
     276           23 :                         segno,
     277           23 :                         rpageno,
     278           23 :                         ZERO_PAGE.clone(),
     279           23 :                         ctx,
     280           23 :                     )
     281            0 :                     .await?;
     282        25116 :                 } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
     283          300 :                     let pageno = buf.get_u32_le();
     284          300 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     285          300 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     286          300 :                     self.put_slru_page_image(
     287          300 :                         modification,
     288          300 :                         SlruKind::MultiXactMembers,
     289          300 :                         segno,
     290          300 :                         rpageno,
     291          300 :                         ZERO_PAGE.clone(),
     292          300 :                         ctx,
     293          300 :                     )
     294            0 :                     .await?;
     295        24816 :                 } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     296        24816 :                     let xlrec = XlMultiXactCreate::decode(&mut buf);
     297        24816 :                     self.ingest_multixact_create_record(modification, &xlrec)?;
     298            0 :                 } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     299            0 :                     let xlrec = XlMultiXactTruncate::decode(&mut buf);
     300            0 :                     self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
     301            0 :                         .await?;
     302            0 :                 }
     303              :             }
     304              :             pg_constants::RM_RELMAP_ID => {
     305           62 :                 let xlrec = XlRelmapUpdate::decode(&mut buf);
     306           62 :                 self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
     307            2 :                     .await?;
     308              :             }
     309              :             pg_constants::RM_XLOG_ID => {
     310       240343 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     311       240343 : 
     312       240343 :                 if info == pg_constants::XLOG_NEXTOID {
     313          445 :                     let next_oid = buf.get_u32_le();
     314          445 :                     if self.checkpoint.nextOid != next_oid {
     315          445 :                         self.checkpoint.nextOid = next_oid;
     316          445 :                         self.checkpoint_modified = true;
     317          445 :                     }
     318       239898 :                 } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
     319       239817 :                     || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     320              :                 {
     321          677 :                     let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
     322          677 :                     buf.copy_to_slice(&mut checkpoint_bytes);
     323          677 :                     let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
     324            0 :                     trace!(
     325            0 :                         "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
     326            0 :                         xlog_checkpoint.oldestXid,
     327            0 :                         self.checkpoint.oldestXid
     328            0 :                     );
     329          677 :                     if (self
     330          677 :                         .checkpoint
     331          677 :                         .oldestXid
     332          677 :                         .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
     333          677 :                         < 0
     334            0 :                     {
     335            0 :                         self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
     336          677 :                     }
     337              : 
     338              :                     // Write a new checkpoint key-value pair on every checkpoint record, even
     339              :                     // if nothing really changed. Not strictly required, but it seems nice to
     340              :                     // have some trace of the checkpoint records in the layer files at the same
     341              :                     // LSNs.
     342          677 :                     self.checkpoint_modified = true;
     343       239221 :                 }
     344              :             }
     345              :             pg_constants::RM_LOGICALMSG_ID => {
     346          129 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     347          129 : 
     348          129 :                 if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     349          129 :                     let xlrec = XlLogicalMessage::decode(&mut buf);
     350          129 :                     let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     351          129 :                     let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
     352          129 :                     if prefix == "neon-test" {
     353              :                         // This is a convenient way to make the WAL ingestion pause at
     354              :                         // particular point in the WAL. For more fine-grained control,
     355              :                         // we could peek into the message and only pause if it contains
     356              :                         // a particular string, for example, but this is enough for now.
     357            6 :                         failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
     358          125 :                     } else if let Some(path) = prefix.strip_prefix("neon-file:") {
     359          118 :                         modification.put_file(path, message, ctx).await?;
     360            7 :                     }
     361            0 :                 }
     362              :             }
     363     11404349 :             _x => {
     364     11404349 :                 // TODO: should probably log & fail here instead of blindly
     365     11404349 :                 // doing something without understanding the protocol
     366     11404349 :             }
     367              :         }
     368              : 
     369              :         // Iterate through all the blocks that the record modifies, and
     370              :         // "put" a separate copy of the record for each block.
     371     73047349 :         for blk in decoded.blocks.iter() {
     372     66986380 :             let rel = RelTag {
     373     66986380 :                 spcnode: blk.rnode_spcnode,
     374     66986380 :                 dbnode: blk.rnode_dbnode,
     375     66986380 :                 relnode: blk.rnode_relnode,
     376     66986380 :                 forknum: blk.forknum,
     377     66986380 :             };
     378     66986380 : 
     379     66986380 :             let key = rel_block_to_key(rel, blk.blkno);
     380     66986380 :             let key_is_local = self.shard.is_key_local(&key);
     381              : 
     382            0 :             tracing::debug!(
     383            0 :                 lsn=%lsn,
     384            0 :                 key=%key,
     385            0 :                 "ingest: shard decision {} (checkpoint={})",
     386            0 :                 if !key_is_local { "drop" } else { "keep" },
     387            0 :                 self.checkpoint_modified
     388            0 :             );
     389              : 
     390     66986380 :             if !key_is_local {
     391     13903887 :                 if self.shard.is_zero() {
     392              :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     393              :                     // its blkno in case it implicitly extends a relation.
     394      3520036 :                     self.observe_decoded_block(modification, blk, ctx).await?;
     395     10383851 :                 }
     396              : 
     397     13903887 :                 continue;
     398     53082493 :             }
     399     53082493 :             self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
     400         7535 :                 .await?;
     401              :         }
     402              : 
     403              :         // If checkpoint data was updated, store the new version in the repository
     404     73047346 :         if self.checkpoint_modified {
     405        34376 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     406              : 
     407        34376 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     408        34376 :             self.checkpoint_modified = false;
     409     73012970 :         }
     410              : 
     411              :         // Note that at this point this record is only cached in the modification
     412              :         // until commit() is called to flush the data into the repository and update
     413              :         // the latest LSN.
     414              : 
     415     73047346 :         Ok(modification.len() > prev_len)
     416     73047349 :     }
     417              : 
     418              :     /// Do not store this block, but observe it for the purposes of updating our relation size state.
     419      3520036 :     async fn observe_decoded_block(
     420      3520036 :         &mut self,
     421      3520036 :         modification: &mut DatadirModification<'_>,
     422      3520036 :         blk: &DecodedBkpBlock,
     423      3520036 :         ctx: &RequestContext,
     424      3520036 :     ) -> Result<(), PageReconstructError> {
     425      3520036 :         let rel = RelTag {
     426      3520036 :             spcnode: blk.rnode_spcnode,
     427      3520036 :             dbnode: blk.rnode_dbnode,
     428      3520036 :             relnode: blk.rnode_relnode,
     429      3520036 :             forknum: blk.forknum,
     430      3520036 :         };
     431      3520036 :         self.handle_rel_extend(modification, rel, blk.blkno, ctx)
     432          449 :             .await
     433      3520036 :     }
     434              : 
     435     53082477 :     async fn ingest_decoded_block(
     436     53082477 :         &mut self,
     437     53082477 :         modification: &mut DatadirModification<'_>,
     438     53082477 :         lsn: Lsn,
     439     53082477 :         decoded: &DecodedWALRecord,
     440     53082477 :         blk: &DecodedBkpBlock,
     441     53082477 :         ctx: &RequestContext,
     442     53082493 :     ) -> Result<(), PageReconstructError> {
     443     53082493 :         let rel = RelTag {
     444     53082493 :             spcnode: blk.rnode_spcnode,
     445     53082493 :             dbnode: blk.rnode_dbnode,
     446     53082493 :             relnode: blk.rnode_relnode,
     447     53082493 :             forknum: blk.forknum,
     448     53082493 :         };
     449     53082493 : 
     450     53082493 :         //
     451     53082493 :         // Instead of storing full-page-image WAL record,
     452     53082493 :         // it is better to store extracted image: we can skip wal-redo
     453     53082493 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     454     53082493 :         // so them have to be copied multiple times.
     455     53082493 :         //
     456     53082493 :         if blk.apply_image
     457       189852 :             && blk.has_image
     458       189852 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     459       167676 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     460            0 :                 || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     461              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     462       167676 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
     463              :             // do not materialize null pages because them most likely be soon replaced with real data
     464       167676 :             && blk.bimg_len != 0
     465              :         {
     466              :             // Extract page image from FPI record
     467       167676 :             let img_len = blk.bimg_len as usize;
     468       167676 :             let img_offs = blk.bimg_offset as usize;
     469       167676 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     470       167676 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     471       167676 : 
     472       167676 :             if blk.hole_length != 0 {
     473       134535 :                 let tail = image.split_off(blk.hole_offset as usize);
     474       134535 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     475       134535 :                 image.unsplit(tail);
     476       134535 :             }
     477              :             //
     478              :             // Match the logic of XLogReadBufferForRedoExtended:
     479              :             // The page may be uninitialized. If so, we can't set the LSN because
     480              :             // that would corrupt the page.
     481              :             //
     482       167676 :             if !page_is_new(&image) {
     483       160496 :                 page_set_lsn(&mut image, lsn)
     484         7180 :             }
     485       167676 :             assert_eq!(image.len(), BLCKSZ as usize);
     486       167676 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     487          565 :                 .await?;
     488              :         } else {
     489     52914817 :             let rec = NeonWalRecord::Postgres {
     490     52914817 :                 will_init: blk.will_init || blk.apply_image,
     491     52914817 :                 rec: decoded.record.clone(),
     492     52914817 :             };
     493     52914817 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     494         6970 :                 .await?;
     495              :         }
     496     53082490 :         Ok(())
     497     53082493 :     }
     498              : 
     499     54762167 :     async fn ingest_heapam_record(
     500     54762167 :         &mut self,
     501     54762167 :         buf: &mut Bytes,
     502     54762167 :         modification: &mut DatadirModification<'_>,
     503     54762167 :         decoded: &DecodedWALRecord,
     504     54762167 :         ctx: &RequestContext,
     505     54762178 :     ) -> anyhow::Result<()> {
     506     54762178 :         // Handle VM bit updates that are implicitly part of heap records.
     507     54762178 : 
     508     54762178 :         // First, look at the record to determine which VM bits need
     509     54762178 :         // to be cleared. If either of these variables is set, we
     510     54762178 :         // need to clear the corresponding bits in the visibility map.
     511     54762178 :         let mut new_heap_blkno: Option<u32> = None;
     512     54762178 :         let mut old_heap_blkno: Option<u32> = None;
     513     54762178 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     514     54762178 : 
     515     54762178 :         match modification.tline.pg_version {
     516              :             14 => {
     517     54616704 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     518     50292894 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     519     50292894 : 
     520     50292894 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     521     41676372 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     522     41676372 :                         assert_eq!(0, buf.remaining());
     523     41676372 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     524         4871 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     525     41671501 :                         }
     526      8616522 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     527      2303981 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     528      2303981 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     529         2777 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     530      2301204 :                         }
     531      6312541 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     532      4806456 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     533              :                     {
     534      3450011 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     535      3450011 :                         // the size of tuple data is inferred from the size of the record.
     536      3450011 :                         // we can't validate the remaining number of bytes without parsing
     537      3450011 :                         // the tuple data.
     538      3450011 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     539        81531 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     540      3368480 :                         }
     541      3450011 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     542         2087 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     543         2087 :                             // non-HOT update where the new tuple goes to different page than
     544         2087 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     545         2087 :                             // set.
     546         2087 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     547      3447924 :                         }
     548      2862530 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     549      2760102 :                         let xlrec = v14::XlHeapLock::decode(buf);
     550      2760102 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     551          210 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     552          210 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     553      2759892 :                         }
     554       102428 :                     }
     555      4323810 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     556      4323810 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     557      4323810 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     558      1126139 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     559              : 
     560      1126139 :                         let offset_array_len =
     561      1126139 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     562              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     563       689082 :                                 0
     564              :                             } else {
     565       437057 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     566              :                             };
     567      1126139 :                         assert_eq!(offset_array_len, buf.remaining());
     568              : 
     569      1126139 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     570         3730 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     571      1122409 :                         }
     572      3197671 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     573         4146 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     574         4146 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     575            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     576            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     577         4146 :                         }
     578      3193525 :                     }
     579              :                 } else {
     580            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     581              :                 }
     582              :             }
     583              :             15 => {
     584       145474 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     585       145286 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     586       145286 : 
     587       145286 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     588       145276 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     589       145276 :                         assert_eq!(0, buf.remaining());
     590       145276 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     591            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     592       145272 :                         }
     593           10 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     594            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     595            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     596            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     597            0 :                         }
     598           10 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     599            2 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     600              :                     {
     601            8 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     602            8 :                         // the size of tuple data is inferred from the size of the record.
     603            8 :                         // we can't validate the remaining number of bytes without parsing
     604            8 :                         // the tuple data.
     605            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     606            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     607            8 :                         }
     608            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     609            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     610            0 :                             // non-HOT update where the new tuple goes to different page than
     611            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     612            0 :                             // set.
     613            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     614            8 :                         }
     615            2 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     616            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     617            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     618            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     619            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     620            0 :                         }
     621            2 :                     }
     622          188 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     623          188 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     624          188 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     625           42 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     626              : 
     627           42 :                         let offset_array_len =
     628           42 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     629              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     630            2 :                                 0
     631              :                             } else {
     632           40 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     633              :                             };
     634           42 :                         assert_eq!(offset_array_len, buf.remaining());
     635              : 
     636           42 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     637            8 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     638           34 :                         }
     639          146 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     640            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     641            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     642            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     643            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     644            0 :                         }
     645          146 :                     }
     646              :                 } else {
     647            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     648              :                 }
     649              :             }
     650              :             16 => {
     651            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     652            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     653            0 : 
     654            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     655            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     656            0 :                         assert_eq!(0, buf.remaining());
     657            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     658            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     659            0 :                         }
     660            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     661            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     662            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     663            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     664            0 :                         }
     665            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     666            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     667              :                     {
     668            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     669            0 :                         // the size of tuple data is inferred from the size of the record.
     670            0 :                         // we can't validate the remaining number of bytes without parsing
     671            0 :                         // the tuple data.
     672            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     673            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     674            0 :                         }
     675            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     676            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     677            0 :                             // non-HOT update where the new tuple goes to different page than
     678            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     679            0 :                             // set.
     680            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     681            0 :                         }
     682            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     683            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     684            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     685            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     686            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     687            0 :                         }
     688            0 :                     }
     689            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     690            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     691            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     692            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     693              : 
     694            0 :                         let offset_array_len =
     695            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     696              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     697            0 :                                 0
     698              :                             } else {
     699            0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     700              :                             };
     701            0 :                         assert_eq!(offset_array_len, buf.remaining());
     702              : 
     703            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     704            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     705            0 :                         }
     706            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     707            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     708            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     709            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     710            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     711            0 :                         }
     712            0 :                     }
     713              :                 } else {
     714            0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     715              :                 }
     716              :             }
     717            0 :             _ => {}
     718              :         }
     719              : 
     720              :         // Clear the VM bits if required.
     721     54762178 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     722        95121 :             let vm_rel = RelTag {
     723        95121 :                 forknum: VISIBILITYMAP_FORKNUM,
     724        95121 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     725        95121 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     726        95121 :                 relnode: decoded.blocks[0].rnode_relnode,
     727        95121 :             };
     728        95121 : 
     729        95121 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     730        95121 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     731              : 
     732              :             // Sometimes, Postgres seems to create heap WAL records with the
     733              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     734              :             // not set. In fact, it's possible that the VM page does not exist at all.
     735              :             // In that case, we don't want to store a record to clear the VM bit;
     736              :             // replaying it would fail to find the previous image of the page, because
     737              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     738              :             // record if it doesn't.
     739        95121 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     740        95121 :             if let Some(blknum) = new_vm_blk {
     741        13477 :                 if blknum >= vm_size {
     742         1947 :                     new_vm_blk = None;
     743        11530 :                 }
     744        81644 :             }
     745        95121 :             if let Some(blknum) = old_vm_blk {
     746        81741 :                 if blknum >= vm_size {
     747          589 :                     old_vm_blk = None;
     748        81152 :                 }
     749        13380 :             }
     750              : 
     751        95121 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     752        92599 :                 if new_vm_blk == old_vm_blk {
     753              :                     // An UPDATE record that needs to clear the bits for both old and the
     754              :                     // new page, both of which reside on the same VM page.
     755           83 :                     self.put_rel_wal_record(
     756           83 :                         modification,
     757           83 :                         vm_rel,
     758           83 :                         new_vm_blk.unwrap(),
     759           83 :                         NeonWalRecord::ClearVisibilityMapFlags {
     760           83 :                             new_heap_blkno,
     761           83 :                             old_heap_blkno,
     762           83 :                             flags,
     763           83 :                         },
     764           83 :                         ctx,
     765           83 :                     )
     766            0 :                     .await?;
     767              :                 } else {
     768              :                     // Clear VM bits for one heap page, or for two pages that reside on
     769              :                     // different VM pages.
     770        92516 :                     if let Some(new_vm_blk) = new_vm_blk {
     771        11447 :                         self.put_rel_wal_record(
     772        11447 :                             modification,
     773        11447 :                             vm_rel,
     774        11447 :                             new_vm_blk,
     775        11447 :                             NeonWalRecord::ClearVisibilityMapFlags {
     776        11447 :                                 new_heap_blkno,
     777        11447 :                                 old_heap_blkno: None,
     778        11447 :                                 flags,
     779        11447 :                             },
     780        11447 :                             ctx,
     781        11447 :                         )
     782            0 :                         .await?;
     783        81069 :                     }
     784        92516 :                     if let Some(old_vm_blk) = old_vm_blk {
     785        81069 :                         self.put_rel_wal_record(
     786        81069 :                             modification,
     787        81069 :                             vm_rel,
     788        81069 :                             old_vm_blk,
     789        81069 :                             NeonWalRecord::ClearVisibilityMapFlags {
     790        81069 :                                 new_heap_blkno: None,
     791        81069 :                                 old_heap_blkno,
     792        81069 :                                 flags,
     793        81069 :                             },
     794        81069 :                             ctx,
     795        81069 :                         )
     796            0 :                         .await?;
     797        11447 :                     }
     798              :                 }
     799         2522 :             }
     800     54667057 :         }
     801              : 
     802     54762178 :         Ok(())
     803     54762178 :     }
     804              : 
     805            0 :     async fn ingest_neonrmgr_record(
     806            0 :         &mut self,
     807            0 :         buf: &mut Bytes,
     808            0 :         modification: &mut DatadirModification<'_>,
     809            0 :         decoded: &DecodedWALRecord,
     810            0 :         ctx: &RequestContext,
     811            0 :     ) -> anyhow::Result<()> {
     812            0 :         // Handle VM bit updates that are implicitly part of heap records.
     813            0 : 
     814            0 :         // First, look at the record to determine which VM bits need
     815            0 :         // to be cleared. If either of these variables is set, we
     816            0 :         // need to clear the corresponding bits in the visibility map.
     817            0 :         let mut new_heap_blkno: Option<u32> = None;
     818            0 :         let mut old_heap_blkno: Option<u32> = None;
     819            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     820            0 :         let pg_version = modification.tline.pg_version;
     821            0 : 
     822            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     823              : 
     824            0 :         match pg_version {
     825              :             16 => {
     826            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     827            0 : 
     828            0 :                 match info {
     829              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     830            0 :                         let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
     831            0 :                         assert_eq!(0, buf.remaining());
     832            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     833            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     834            0 :                         }
     835              :                     }
     836              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     837            0 :                         let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
     838            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     839            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     840            0 :                         }
     841              :                     }
     842              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     843              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     844            0 :                         let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
     845            0 :                         // the size of tuple data is inferred from the size of the record.
     846            0 :                         // we can't validate the remaining number of bytes without parsing
     847            0 :                         // the tuple data.
     848            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     849            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     850            0 :                         }
     851            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     852            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     853            0 :                             // non-HOT update where the new tuple goes to different page than
     854            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     855            0 :                             // set.
     856            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     857            0 :                         }
     858              :                     }
     859              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     860            0 :                         let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     861              : 
     862            0 :                         let offset_array_len =
     863            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     864              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     865            0 :                                 0
     866              :                             } else {
     867            0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     868              :                             };
     869            0 :                         assert_eq!(offset_array_len, buf.remaining());
     870              : 
     871            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     872            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     873            0 :                         }
     874              :                     }
     875              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     876            0 :                         let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
     877            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     878            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     879            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     880            0 :                         }
     881              :                     }
     882            0 :                     info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
     883              :                 }
     884              :             }
     885            0 :             _ => bail!(
     886            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     887            0 :                 pg_version
     888            0 :             ),
     889              :         }
     890              : 
     891              :         // Clear the VM bits if required.
     892            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     893            0 :             let vm_rel = RelTag {
     894            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     895            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     896            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     897            0 :                 relnode: decoded.blocks[0].rnode_relnode,
     898            0 :             };
     899            0 : 
     900            0 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     901            0 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     902              : 
     903              :             // Sometimes, Postgres seems to create heap WAL records with the
     904              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     905              :             // not set. In fact, it's possible that the VM page does not exist at all.
     906              :             // In that case, we don't want to store a record to clear the VM bit;
     907              :             // replaying it would fail to find the previous image of the page, because
     908              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     909              :             // record if it doesn't.
     910            0 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     911            0 :             if let Some(blknum) = new_vm_blk {
     912            0 :                 if blknum >= vm_size {
     913            0 :                     new_vm_blk = None;
     914            0 :                 }
     915            0 :             }
     916            0 :             if let Some(blknum) = old_vm_blk {
     917            0 :                 if blknum >= vm_size {
     918            0 :                     old_vm_blk = None;
     919            0 :                 }
     920            0 :             }
     921              : 
     922            0 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     923            0 :                 if new_vm_blk == old_vm_blk {
     924              :                     // An UPDATE record that needs to clear the bits for both old and the
     925              :                     // new page, both of which reside on the same VM page.
     926            0 :                     self.put_rel_wal_record(
     927            0 :                         modification,
     928            0 :                         vm_rel,
     929            0 :                         new_vm_blk.unwrap(),
     930            0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     931            0 :                             new_heap_blkno,
     932            0 :                             old_heap_blkno,
     933            0 :                             flags,
     934            0 :                         },
     935            0 :                         ctx,
     936            0 :                     )
     937            0 :                     .await?;
     938              :                 } else {
     939              :                     // Clear VM bits for one heap page, or for two pages that reside on
     940              :                     // different VM pages.
     941            0 :                     if let Some(new_vm_blk) = new_vm_blk {
     942            0 :                         self.put_rel_wal_record(
     943            0 :                             modification,
     944            0 :                             vm_rel,
     945            0 :                             new_vm_blk,
     946            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     947            0 :                                 new_heap_blkno,
     948            0 :                                 old_heap_blkno: None,
     949            0 :                                 flags,
     950            0 :                             },
     951            0 :                             ctx,
     952            0 :                         )
     953            0 :                         .await?;
     954            0 :                     }
     955            0 :                     if let Some(old_vm_blk) = old_vm_blk {
     956            0 :                         self.put_rel_wal_record(
     957            0 :                             modification,
     958            0 :                             vm_rel,
     959            0 :                             old_vm_blk,
     960            0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     961            0 :                                 new_heap_blkno: None,
     962            0 :                                 old_heap_blkno,
     963            0 :                                 flags,
     964            0 :                             },
     965            0 :                             ctx,
     966            0 :                         )
     967            0 :                         .await?;
     968            0 :                     }
     969              :                 }
     970            0 :             }
     971            0 :         }
     972              : 
     973            0 :         Ok(())
     974            0 :     }
     975              : 
     976              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
     977           24 :     async fn ingest_xlog_dbase_create(
     978           24 :         &mut self,
     979           24 :         modification: &mut DatadirModification<'_>,
     980           24 :         rec: &XlCreateDatabase,
     981           24 :         ctx: &RequestContext,
     982           24 :     ) -> anyhow::Result<()> {
     983           24 :         let db_id = rec.db_id;
     984           24 :         let tablespace_id = rec.tablespace_id;
     985           24 :         let src_db_id = rec.src_db_id;
     986           24 :         let src_tablespace_id = rec.src_tablespace_id;
     987              : 
     988           24 :         let rels = modification
     989           24 :             .tline
     990           24 :             .list_rels(
     991           24 :                 src_tablespace_id,
     992           24 :                 src_db_id,
     993           24 :                 Version::Modified(modification),
     994           24 :                 ctx,
     995           24 :             )
     996            0 :             .await?;
     997              : 
     998            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
     999              : 
    1000              :         // Copy relfilemap
    1001           24 :         let filemap = modification
    1002           24 :             .tline
    1003           24 :             .get_relmap_file(
    1004           24 :                 src_tablespace_id,
    1005           24 :                 src_db_id,
    1006           24 :                 Version::Modified(modification),
    1007           24 :                 ctx,
    1008           24 :             )
    1009            0 :             .await?;
    1010           24 :         modification
    1011           24 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
    1012            0 :             .await?;
    1013              : 
    1014           24 :         let mut num_rels_copied = 0;
    1015           24 :         let mut num_blocks_copied = 0;
    1016         7032 :         for src_rel in rels {
    1017         7008 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
    1018         7008 :             assert_eq!(src_rel.dbnode, src_db_id);
    1019              : 
    1020         7008 :             let nblocks = modification
    1021         7008 :                 .tline
    1022         7008 :                 .get_rel_size(src_rel, Version::Modified(modification), true, ctx)
    1023          196 :                 .await?;
    1024         7008 :             let dst_rel = RelTag {
    1025         7008 :                 spcnode: tablespace_id,
    1026         7008 :                 dbnode: db_id,
    1027         7008 :                 relnode: src_rel.relnode,
    1028         7008 :                 forknum: src_rel.forknum,
    1029         7008 :             };
    1030         7008 : 
    1031         7008 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
    1032              : 
    1033              :             // Copy content
    1034            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
    1035        24264 :             for blknum in 0..nblocks {
    1036              :                 // Sharding:
    1037              :                 //  - src and dst are always on the same shard, because they differ only by dbNode, and
    1038              :                 //    dbNode is not included in the hash inputs for sharding.
    1039              :                 //  - This WAL command is replayed on all shards, but each shard only copies the blocks
    1040              :                 //    that belong to it.
    1041        24264 :                 let src_key = rel_block_to_key(src_rel, blknum);
    1042        24264 :                 if !self.shard.is_key_local(&src_key) {
    1043            0 :                     debug!(
    1044            0 :                         "Skipping non-local key {} during XLOG_DBASE_CREATE",
    1045            0 :                         src_key
    1046            0 :                     );
    1047         9099 :                     continue;
    1048        15165 :                 }
    1049            0 :                 debug!(
    1050            0 :                     "copying block {} from {} ({}) to {}",
    1051            0 :                     blknum, src_rel, src_key, dst_rel
    1052            0 :                 );
    1053              : 
    1054        15165 :                 let content = modification
    1055        15165 :                     .tline
    1056        15165 :                     .get_rel_page_at_lsn(
    1057        15165 :                         src_rel,
    1058        15165 :                         blknum,
    1059        15165 :                         Version::Modified(modification),
    1060        15165 :                         true,
    1061        15165 :                         ctx,
    1062        15165 :                     )
    1063         1351 :                     .await?;
    1064        15165 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
    1065        15165 :                 num_blocks_copied += 1;
    1066              :             }
    1067              : 
    1068         7008 :             num_rels_copied += 1;
    1069              :         }
    1070              : 
    1071           24 :         info!(
    1072           24 :             "Created database {}/{}, copied {} blocks in {} rels",
    1073           24 :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
    1074           24 :         );
    1075           24 :         Ok(())
    1076           24 :     }
    1077              : 
    1078        75378 :     async fn ingest_xlog_smgr_create(
    1079        75378 :         &mut self,
    1080        75378 :         modification: &mut DatadirModification<'_>,
    1081        75378 :         rec: &XlSmgrCreate,
    1082        75378 :         ctx: &RequestContext,
    1083        75378 :     ) -> anyhow::Result<()> {
    1084        75378 :         let rel = RelTag {
    1085        75378 :             spcnode: rec.rnode.spcnode,
    1086        75378 :             dbnode: rec.rnode.dbnode,
    1087        75378 :             relnode: rec.rnode.relnode,
    1088        75378 :             forknum: rec.forknum,
    1089        75378 :         };
    1090        75378 :         self.put_rel_creation(modification, rel, ctx).await?;
    1091        75378 :         Ok(())
    1092        75378 :     }
    1093              : 
    1094              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
    1095              :     ///
    1096              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
    1097          225 :     async fn ingest_xlog_smgr_truncate(
    1098          225 :         &mut self,
    1099          225 :         modification: &mut DatadirModification<'_>,
    1100          225 :         rec: &XlSmgrTruncate,
    1101          225 :         ctx: &RequestContext,
    1102          225 :     ) -> anyhow::Result<()> {
    1103          225 :         let spcnode = rec.rnode.spcnode;
    1104          225 :         let dbnode = rec.rnode.dbnode;
    1105          225 :         let relnode = rec.rnode.relnode;
    1106          225 : 
    1107          225 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
    1108          225 :             let rel = RelTag {
    1109          225 :                 spcnode,
    1110          225 :                 dbnode,
    1111          225 :                 relnode,
    1112          225 :                 forknum: MAIN_FORKNUM,
    1113          225 :             };
    1114          225 :             self.put_rel_truncation(modification, rel, rec.blkno, ctx)
    1115            2 :                 .await?;
    1116            0 :         }
    1117          225 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
    1118          225 :             let rel = RelTag {
    1119          225 :                 spcnode,
    1120          225 :                 dbnode,
    1121          225 :                 relnode,
    1122          225 :                 forknum: FSM_FORKNUM,
    1123          225 :             };
    1124          225 : 
    1125          225 :             let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
    1126          225 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
    1127          225 :             if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
    1128              :                 // Tail of last remaining FSM page has to be zeroed.
    1129              :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
    1130          108 :                 modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
    1131          108 :                 fsm_physical_page_no += 1;
    1132          117 :             }
    1133          225 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1134          225 :             if nblocks > fsm_physical_page_no {
    1135              :                 // check if something to do: FSM is larger than truncate position
    1136           61 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
    1137            0 :                     .await?;
    1138          164 :             }
    1139            0 :         }
    1140          225 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
    1141          225 :             let rel = RelTag {
    1142          225 :                 spcnode,
    1143          225 :                 dbnode,
    1144          225 :                 relnode,
    1145          225 :                 forknum: VISIBILITYMAP_FORKNUM,
    1146          225 :             };
    1147          225 : 
    1148          225 :             let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
    1149          225 :             if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
    1150              :                 // Tail of last remaining vm page has to be zeroed.
    1151              :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
    1152          108 :                 modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
    1153          108 :                 vm_page_no += 1;
    1154          117 :             }
    1155          225 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1156          225 :             if nblocks > vm_page_no {
    1157              :                 // check if something to do: VM is larger than truncate position
    1158           61 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
    1159            0 :                     .await?;
    1160          164 :             }
    1161            0 :         }
    1162          225 :         Ok(())
    1163          225 :     }
    1164              : 
    1165              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
    1166              :     ///
    1167      6132817 :     async fn ingest_xact_record(
    1168      6132817 :         &mut self,
    1169      6132817 :         modification: &mut DatadirModification<'_>,
    1170      6132817 :         parsed: &XlXactParsedRecord,
    1171      6132817 :         is_commit: bool,
    1172      6132817 :         ctx: &RequestContext,
    1173      6132817 :     ) -> anyhow::Result<()> {
    1174      6132817 :         // Record update of CLOG pages
    1175      6132817 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
    1176      6132817 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1177      6132817 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1178      6132817 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
    1179              : 
    1180      6243062 :         for subxact in &parsed.subxacts {
    1181       110245 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
    1182       110245 :             if subxact_pageno != pageno {
    1183              :                 // This subxact goes to different page. Write the record
    1184              :                 // for all the XIDs on the previous page, and continue
    1185              :                 // accumulating XIDs on this new page.
    1186            4 :                 modification.put_slru_wal_record(
    1187            4 :                     SlruKind::Clog,
    1188            4 :                     segno,
    1189            4 :                     rpageno,
    1190            4 :                     if is_commit {
    1191            4 :                         NeonWalRecord::ClogSetCommitted {
    1192            4 :                             xids: page_xids,
    1193            4 :                             timestamp: parsed.xact_time,
    1194            4 :                         }
    1195              :                     } else {
    1196            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
    1197              :                     },
    1198            0 :                 )?;
    1199            4 :                 page_xids = Vec::new();
    1200       110241 :             }
    1201       110245 :             pageno = subxact_pageno;
    1202       110245 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1203       110245 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1204       110245 :             page_xids.push(*subxact);
    1205              :         }
    1206      6132817 :         modification.put_slru_wal_record(
    1207      6132817 :             SlruKind::Clog,
    1208      6132817 :             segno,
    1209      6132817 :             rpageno,
    1210      6132817 :             if is_commit {
    1211      6123947 :                 NeonWalRecord::ClogSetCommitted {
    1212      6123947 :                     xids: page_xids,
    1213      6123947 :                     timestamp: parsed.xact_time,
    1214      6123947 :                 }
    1215              :             } else {
    1216         8870 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
    1217              :             },
    1218            0 :         )?;
    1219              : 
    1220      6198253 :         for xnode in &parsed.xnodes {
    1221       327180 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
    1222       261744 :                 let rel = RelTag {
    1223       261744 :                     forknum,
    1224       261744 :                     spcnode: xnode.spcnode,
    1225       261744 :                     dbnode: xnode.dbnode,
    1226       261744 :                     relnode: xnode.relnode,
    1227       261744 :                 };
    1228       261744 :                 if modification
    1229       261744 :                     .tline
    1230       261744 :                     .get_rel_exists(rel, Version::Modified(modification), true, ctx)
    1231            1 :                     .await?
    1232              :                 {
    1233        67298 :                     self.put_rel_drop(modification, rel, ctx).await?;
    1234       194446 :                 }
    1235              :             }
    1236              :         }
    1237      6132817 :         Ok(())
    1238      6132817 :     }
    1239              : 
    1240            3 :     async fn ingest_clog_truncate_record(
    1241            3 :         &mut self,
    1242            3 :         modification: &mut DatadirModification<'_>,
    1243            3 :         xlrec: &XlClogTruncate,
    1244            3 :         ctx: &RequestContext,
    1245            3 :     ) -> anyhow::Result<()> {
    1246            3 :         info!(
    1247            3 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
    1248            3 :             xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
    1249            3 :         );
    1250              : 
    1251              :         // Here we treat oldestXid and oldestXidDB
    1252              :         // differently from postgres redo routines.
    1253              :         // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
    1254              :         // until checkpoint happens and updates the value.
    1255              :         // Here we can use the most recent value.
    1256              :         // It's just an optimization, though and can be deleted.
    1257              :         // TODO Figure out if there will be any issues with replica.
    1258            3 :         self.checkpoint.oldestXid = xlrec.oldest_xid;
    1259            3 :         self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
    1260            3 :         self.checkpoint_modified = true;
    1261            3 : 
    1262            3 :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
    1263            3 : 
    1264            3 :         let latest_page_number =
    1265            3 :             self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
    1266            3 : 
    1267            3 :         // Now delete all segments containing pages between xlrec.pageno
    1268            3 :         // and latest_page_number.
    1269            3 : 
    1270            3 :         // First, make an important safety check:
    1271            3 :         // the current endpoint page must not be eligible for removal.
    1272            3 :         // See SimpleLruTruncate() in slru.c
    1273            3 :         if clogpage_precedes(latest_page_number, xlrec.pageno) {
    1274            0 :             info!("could not truncate directory pg_xact apparent wraparound");
    1275            0 :             return Ok(());
    1276            3 :         }
    1277              : 
    1278              :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
    1279              :         //
    1280              :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
    1281              :         // will block waiting for the last valid LSN to advance up to
    1282              :         // it. So we use the previous record's LSN in the get calls
    1283              :         // instead.
    1284           23 :         for segno in modification
    1285            3 :             .tline
    1286            3 :             .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
    1287            0 :             .await?
    1288              :         {
    1289           23 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
    1290           23 :             if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
    1291           18 :                 modification
    1292           18 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
    1293            0 :                     .await?;
    1294            0 :                 trace!("Drop CLOG segment {:>04X}", segno);
    1295            5 :             }
    1296              :         }
    1297              : 
    1298            3 :         Ok(())
    1299            3 :     }
    1300              : 
    1301        24816 :     fn ingest_multixact_create_record(
    1302        24816 :         &mut self,
    1303        24816 :         modification: &mut DatadirModification,
    1304        24816 :         xlrec: &XlMultiXactCreate,
    1305        24816 :     ) -> Result<()> {
    1306        24816 :         // Create WAL record for updating the multixact-offsets page
    1307        24816 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1308        24816 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1309        24816 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1310        24816 : 
    1311        24816 :         modification.put_slru_wal_record(
    1312        24816 :             SlruKind::MultiXactOffsets,
    1313        24816 :             segno,
    1314        24816 :             rpageno,
    1315        24816 :             NeonWalRecord::MultixactOffsetCreate {
    1316        24816 :                 mid: xlrec.mid,
    1317        24816 :                 moff: xlrec.moff,
    1318        24816 :             },
    1319        24816 :         )?;
    1320              : 
    1321              :         // Create WAL records for the update of each affected multixact-members page
    1322        24816 :         let mut members = xlrec.members.iter();
    1323        24816 :         let mut offset = xlrec.moff;
    1324              :         loop {
    1325        49906 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1326        49906 : 
    1327        49906 :             // How many members fit on this page?
    1328        49906 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1329        49906 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1330        49906 : 
    1331        49906 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1332        49906 :             for _ in 0..page_remain {
    1333       524292 :                 if let Some(m) = members.next() {
    1334       474674 :                     this_page_members.push(m.clone());
    1335       474674 :                 } else {
    1336        49618 :                     break;
    1337              :                 }
    1338              :             }
    1339        49906 :             if this_page_members.is_empty() {
    1340              :                 // all done
    1341        24816 :                 break;
    1342        25090 :             }
    1343        25090 :             let n_this_page = this_page_members.len();
    1344        25090 : 
    1345        25090 :             modification.put_slru_wal_record(
    1346        25090 :                 SlruKind::MultiXactMembers,
    1347        25090 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1348        25090 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1349        25090 :                 NeonWalRecord::MultixactMembersCreate {
    1350        25090 :                     moff: offset,
    1351        25090 :                     members: this_page_members,
    1352        25090 :                 },
    1353        25090 :             )?;
    1354              : 
    1355              :             // Note: The multixact members can wrap around, even within one WAL record.
    1356        25090 :             offset = offset.wrapping_add(n_this_page as u32);
    1357              :         }
    1358        24816 :         if xlrec.mid >= self.checkpoint.nextMulti {
    1359        24816 :             self.checkpoint.nextMulti = xlrec.mid + 1;
    1360        24816 :             self.checkpoint_modified = true;
    1361        24816 :         }
    1362        24816 :         if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
    1363        24816 :             self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
    1364        24816 :             self.checkpoint_modified = true;
    1365        24816 :         }
    1366       474674 :         let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
    1367       474674 :             if let Some(max_xid) = acc {
    1368       449858 :                 if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
    1369       449633 :                     Some(mbr.xid)
    1370              :                 } else {
    1371          225 :                     acc
    1372              :                 }
    1373              :             } else {
    1374        24816 :                 Some(mbr.xid)
    1375              :             }
    1376       474674 :         });
    1377              : 
    1378        24816 :         if let Some(max_xid) = max_mbr_xid {
    1379        24816 :             if self.checkpoint.update_next_xid(max_xid) {
    1380            0 :                 self.checkpoint_modified = true;
    1381        24816 :             }
    1382            0 :         }
    1383        24816 :         Ok(())
    1384        24816 :     }
    1385              : 
    1386            0 :     async fn ingest_multixact_truncate_record(
    1387            0 :         &mut self,
    1388            0 :         modification: &mut DatadirModification<'_>,
    1389            0 :         xlrec: &XlMultiXactTruncate,
    1390            0 :         ctx: &RequestContext,
    1391            0 :     ) -> Result<()> {
    1392            0 :         self.checkpoint.oldestMulti = xlrec.end_trunc_off;
    1393            0 :         self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
    1394            0 :         self.checkpoint_modified = true;
    1395            0 : 
    1396            0 :         // PerformMembersTruncation
    1397            0 :         let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
    1398            0 :         let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1399            0 :         let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1400            0 :         let mut segment: i32 = startsegment;
    1401              : 
    1402              :         // Delete all the segments except the last one. The last segment can still
    1403              :         // contain, possibly partially, valid data.
    1404            0 :         while segment != endsegment {
    1405            0 :             modification
    1406            0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1407            0 :                 .await?;
    1408              : 
    1409              :             /* move to next segment, handling wraparound correctly */
    1410            0 :             if segment == maxsegment {
    1411            0 :                 segment = 0;
    1412            0 :             } else {
    1413            0 :                 segment += 1;
    1414            0 :             }
    1415              :         }
    1416              : 
    1417              :         // Truncate offsets
    1418              :         // FIXME: this did not handle wraparound correctly
    1419              : 
    1420            0 :         Ok(())
    1421            0 :     }
    1422              : 
    1423           62 :     async fn ingest_relmap_page(
    1424           62 :         &mut self,
    1425           62 :         modification: &mut DatadirModification<'_>,
    1426           62 :         xlrec: &XlRelmapUpdate,
    1427           62 :         decoded: &DecodedWALRecord,
    1428           62 :         ctx: &RequestContext,
    1429           62 :     ) -> Result<()> {
    1430           62 :         let mut buf = decoded.record.clone();
    1431           62 :         buf.advance(decoded.main_data_offset);
    1432           62 :         // skip xl_relmap_update
    1433           62 :         buf.advance(12);
    1434           62 : 
    1435           62 :         modification
    1436           62 :             .put_relmap_file(
    1437           62 :                 xlrec.tsid,
    1438           62 :                 xlrec.dbid,
    1439           62 :                 Bytes::copy_from_slice(&buf[..]),
    1440           62 :                 ctx,
    1441           62 :             )
    1442            2 :             .await
    1443           62 :     }
    1444              : 
    1445        75380 :     async fn put_rel_creation(
    1446        75380 :         &mut self,
    1447        75380 :         modification: &mut DatadirModification<'_>,
    1448        75380 :         rel: RelTag,
    1449        75380 :         ctx: &RequestContext,
    1450        75380 :     ) -> Result<()> {
    1451        75380 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1452        75380 :         Ok(())
    1453        75380 :     }
    1454              : 
    1455       440078 :     async fn put_rel_page_image(
    1456       440078 :         &mut self,
    1457       440078 :         modification: &mut DatadirModification<'_>,
    1458       440078 :         rel: RelTag,
    1459       440078 :         blknum: BlockNumber,
    1460       440078 :         img: Bytes,
    1461       440078 :         ctx: &RequestContext,
    1462       440078 :     ) -> Result<(), PageReconstructError> {
    1463       440078 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1464         6747 :             .await?;
    1465       440078 :         modification.put_rel_page_image(rel, blknum, img)?;
    1466       440078 :         Ok(())
    1467       440078 :     }
    1468              : 
    1469     53007400 :     async fn put_rel_wal_record(
    1470     53007400 :         &mut self,
    1471     53007400 :         modification: &mut DatadirModification<'_>,
    1472     53007400 :         rel: RelTag,
    1473     53007400 :         blknum: BlockNumber,
    1474     53007400 :         rec: NeonWalRecord,
    1475     53007400 :         ctx: &RequestContext,
    1476     53007416 :     ) -> Result<()> {
    1477     53007416 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1478         6970 :             .await?;
    1479     53007413 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1480     53007413 :         Ok(())
    1481     53007416 :     }
    1482              : 
    1483         6359 :     async fn put_rel_truncation(
    1484         6359 :         &mut self,
    1485         6359 :         modification: &mut DatadirModification<'_>,
    1486         6359 :         rel: RelTag,
    1487         6359 :         nblocks: BlockNumber,
    1488         6359 :         ctx: &RequestContext,
    1489         6359 :     ) -> anyhow::Result<()> {
    1490         6359 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1491         6359 :         Ok(())
    1492         6359 :     }
    1493              : 
    1494        67300 :     async fn put_rel_drop(
    1495        67300 :         &mut self,
    1496        67300 :         modification: &mut DatadirModification<'_>,
    1497        67300 :         rel: RelTag,
    1498        67300 :         ctx: &RequestContext,
    1499        67300 :     ) -> Result<()> {
    1500        67300 :         modification.put_rel_drop(rel, ctx).await?;
    1501        67300 :         Ok(())
    1502        67300 :     }
    1503              : 
    1504     56967514 :     async fn handle_rel_extend(
    1505     56967514 :         &mut self,
    1506     56967514 :         modification: &mut DatadirModification<'_>,
    1507     56967514 :         rel: RelTag,
    1508     56967514 :         blknum: BlockNumber,
    1509     56967514 :         ctx: &RequestContext,
    1510     56967530 :     ) -> Result<(), PageReconstructError> {
    1511     56967530 :         let new_nblocks = blknum + 1;
    1512              :         // Check if the relation exists. We implicitly create relations on first
    1513              :         // record.
    1514              :         // TODO: would be nice if to be more explicit about it
    1515              : 
    1516              :         // Get current size and put rel creation if rel doesn't exist
    1517              :         //
    1518              :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1519              :         //       check the cache too. This is because eagerly checking the cache results in
    1520              :         //       less work overall and 10% better performance. It's more work on cache miss
    1521              :         //       but cache miss is rare.
    1522     56967530 :         let old_nblocks = if let Some(nblocks) = modification
    1523     56967530 :             .tline
    1524     56967530 :             .get_cached_rel_size(&rel, modification.get_lsn())
    1525              :         {
    1526     56963032 :             nblocks
    1527         4498 :         } else if !modification
    1528         4498 :             .tline
    1529         4498 :             .get_rel_exists(rel, Version::Modified(modification), true, ctx)
    1530           58 :             .await?
    1531              :         {
    1532              :             // create it with 0 size initially, the logic below will extend it
    1533         3656 :             modification
    1534         3656 :                 .put_rel_creation(rel, 0, ctx)
    1535          357 :                 .await
    1536         3656 :                 .context("Relation Error")?;
    1537         3656 :             0
    1538              :         } else {
    1539          842 :             modification
    1540          842 :                 .tline
    1541          842 :                 .get_rel_size(rel, Version::Modified(modification), true, ctx)
    1542          152 :                 .await?
    1543              :         };
    1544              : 
    1545     56967530 :         if new_nblocks > old_nblocks {
    1546              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1547      1264419 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1548              : 
    1549      1264416 :             let mut key = rel_block_to_key(rel, blknum);
    1550              :             // fill the gap with zeros
    1551      1264416 :             for gap_blknum in old_nblocks..blknum {
    1552       236715 :                 key.field6 = gap_blknum;
    1553       236715 : 
    1554       236715 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1555         1359 :                     continue;
    1556       235356 :                 }
    1557       235356 : 
    1558       235356 :                 modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
    1559              :             }
    1560     55703111 :         }
    1561     56967527 :         Ok(())
    1562     56967530 :     }
    1563              : 
    1564         1646 :     async fn put_slru_page_image(
    1565         1646 :         &mut self,
    1566         1646 :         modification: &mut DatadirModification<'_>,
    1567         1646 :         kind: SlruKind,
    1568         1646 :         segno: u32,
    1569         1646 :         blknum: BlockNumber,
    1570         1646 :         img: Bytes,
    1571         1646 :         ctx: &RequestContext,
    1572         1646 :     ) -> Result<()> {
    1573         1646 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1574          115 :             .await?;
    1575         1646 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1576         1646 :         Ok(())
    1577         1646 :     }
    1578              : 
    1579         1646 :     async fn handle_slru_extend(
    1580         1646 :         &mut self,
    1581         1646 :         modification: &mut DatadirModification<'_>,
    1582         1646 :         kind: SlruKind,
    1583         1646 :         segno: u32,
    1584         1646 :         blknum: BlockNumber,
    1585         1646 :         ctx: &RequestContext,
    1586         1646 :     ) -> anyhow::Result<()> {
    1587         1646 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1588         1646 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1589         1646 :         // a lot less frequently.
    1590         1646 : 
    1591         1646 :         let new_nblocks = blknum + 1;
    1592              :         // Check if the relation exists. We implicitly create relations on first
    1593              :         // record.
    1594              :         // TODO: would be nice if to be more explicit about it
    1595         1646 :         let old_nblocks = if !modification
    1596         1646 :             .tline
    1597         1646 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1598           97 :             .await?
    1599              :         {
    1600              :             // create it with 0 size initially, the logic below will extend it
    1601           44 :             modification
    1602           44 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1603            3 :                 .await?;
    1604           44 :             0
    1605              :         } else {
    1606         1602 :             modification
    1607         1602 :                 .tline
    1608         1602 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1609           15 :                 .await?
    1610              :         };
    1611              : 
    1612         1646 :         if new_nblocks > old_nblocks {
    1613            0 :             trace!(
    1614            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1615            0 :                 kind,
    1616            0 :                 segno,
    1617            0 :                 old_nblocks,
    1618            0 :                 new_nblocks
    1619            0 :             );
    1620         1622 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1621              : 
    1622              :             // fill the gap with zeros
    1623         1622 :             for gap_blknum in old_nblocks..blknum {
    1624            0 :                 modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
    1625              :             }
    1626           24 :         }
    1627         1646 :         Ok(())
    1628         1646 :     }
    1629              : }
    1630              : 
    1631        95571 : async fn get_relsize(
    1632        95571 :     modification: &DatadirModification<'_>,
    1633        95571 :     rel: RelTag,
    1634        95571 :     ctx: &RequestContext,
    1635        95571 : ) -> anyhow::Result<BlockNumber> {
    1636        95571 :     let nblocks = if !modification
    1637        95571 :         .tline
    1638        95571 :         .get_rel_exists(rel, Version::Modified(modification), true, ctx)
    1639           50 :         .await?
    1640              :     {
    1641         2654 :         0
    1642              :     } else {
    1643        92917 :         modification
    1644        92917 :             .tline
    1645        92917 :             .get_rel_size(rel, Version::Modified(modification), true, ctx)
    1646            3 :             .await?
    1647              :     };
    1648        95571 :     Ok(nblocks)
    1649        95571 : }
    1650              : 
    1651              : #[allow(clippy::bool_assert_comparison)]
    1652              : #[cfg(test)]
    1653              : mod tests {
    1654              :     use super::*;
    1655              :     use crate::tenant::harness::*;
    1656              :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    1657              :     use crate::tenant::Timeline;
    1658              :     use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
    1659              :     use postgres_ffi::RELSEG_SIZE;
    1660              : 
    1661              :     use crate::DEFAULT_PG_VERSION;
    1662              : 
    1663              :     /// Arbitrary relation tag, for testing.
    1664              :     const TESTREL_A: RelTag = RelTag {
    1665              :         spcnode: 0,
    1666              :         dbnode: 111,
    1667              :         relnode: 1000,
    1668              :         forknum: 0,
    1669              :     };
    1670              : 
    1671           12 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1672           12 :         // TODO
    1673           12 :     }
    1674              : 
    1675              :     static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
    1676              : 
    1677            8 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    1678            8 :         let mut m = tline.begin_modification(Lsn(0x10));
    1679            8 :         m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1680           16 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1681            8 :         m.commit(ctx).await?;
    1682            8 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1683              : 
    1684            8 :         Ok(walingest)
    1685            8 :     }
    1686              : 
    1687            2 :     #[tokio::test]
    1688            2 :     async fn test_relsize() -> Result<()> {
    1689            2 :         let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
    1690            2 :         let tline = tenant
    1691            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1692            6 :             .await?;
    1693            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1694            2 : 
    1695            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1696            2 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1697            2 :         walingest
    1698            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1699            2 :             .await?;
    1700            2 :         m.commit(&ctx).await?;
    1701            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    1702            2 :         walingest
    1703            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
    1704            2 :             .await?;
    1705            2 :         m.commit(&ctx).await?;
    1706            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    1707            2 :         walingest
    1708            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
    1709            2 :             .await?;
    1710            2 :         m.commit(&ctx).await?;
    1711            2 :         let mut m = tline.begin_modification(Lsn(0x50));
    1712            2 :         walingest
    1713            2 :             .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
    1714            2 :             .await?;
    1715            2 :         m.commit(&ctx).await?;
    1716            2 : 
    1717            2 :         assert_current_logical_size(&tline, Lsn(0x50));
    1718            2 : 
    1719            2 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1720            2 :         assert_eq!(
    1721            2 :             tline
    1722            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    1723            2 :                 .await?,
    1724            2 :             false
    1725            2 :         );
    1726            2 :         assert!(tline
    1727            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    1728            2 :             .await
    1729            2 :             .is_err());
    1730            2 :         assert_eq!(
    1731            2 :             tline
    1732            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1733            2 :                 .await?,
    1734            2 :             true
    1735            2 :         );
    1736            2 :         assert_eq!(
    1737            2 :             tline
    1738            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1739            2 :                 .await?,
    1740            2 :             1
    1741            2 :         );
    1742            2 :         assert_eq!(
    1743            2 :             tline
    1744            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
    1745            2 :                 .await?,
    1746            2 :             3
    1747            2 :         );
    1748            2 : 
    1749            2 :         // Check page contents at each LSN
    1750            2 :         assert_eq!(
    1751            2 :             tline
    1752            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
    1753            2 :                 .await?,
    1754            2 :             TEST_IMG("foo blk 0 at 2")
    1755            2 :         );
    1756            2 : 
    1757            2 :         assert_eq!(
    1758            2 :             tline
    1759            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
    1760            2 :                 .await?,
    1761            2 :             TEST_IMG("foo blk 0 at 3")
    1762            2 :         );
    1763            2 : 
    1764            2 :         assert_eq!(
    1765            2 :             tline
    1766            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
    1767            2 :                 .await?,
    1768            2 :             TEST_IMG("foo blk 0 at 3")
    1769            2 :         );
    1770            2 :         assert_eq!(
    1771            2 :             tline
    1772            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
    1773            2 :                 .await?,
    1774            2 :             TEST_IMG("foo blk 1 at 4")
    1775            2 :         );
    1776            2 : 
    1777            2 :         assert_eq!(
    1778            2 :             tline
    1779            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
    1780            2 :                 .await?,
    1781            2 :             TEST_IMG("foo blk 0 at 3")
    1782            2 :         );
    1783            2 :         assert_eq!(
    1784            2 :             tline
    1785            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
    1786            2 :                 .await?,
    1787            2 :             TEST_IMG("foo blk 1 at 4")
    1788            2 :         );
    1789            2 :         assert_eq!(
    1790            2 :             tline
    1791            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
    1792            2 :                 .await?,
    1793            2 :             TEST_IMG("foo blk 2 at 5")
    1794            2 :         );
    1795            2 : 
    1796            2 :         // Truncate last block
    1797            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    1798            2 :         walingest
    1799            2 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1800            2 :             .await?;
    1801            2 :         m.commit(&ctx).await?;
    1802            2 :         assert_current_logical_size(&tline, Lsn(0x60));
    1803            2 : 
    1804            2 :         // Check reported size and contents after truncation
    1805            2 :         assert_eq!(
    1806            2 :             tline
    1807            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
    1808            2 :                 .await?,
    1809            2 :             2
    1810            2 :         );
    1811            2 :         assert_eq!(
    1812            2 :             tline
    1813            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
    1814            2 :                 .await?,
    1815            2 :             TEST_IMG("foo blk 0 at 3")
    1816            2 :         );
    1817            2 :         assert_eq!(
    1818            2 :             tline
    1819            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
    1820            2 :                 .await?,
    1821            2 :             TEST_IMG("foo blk 1 at 4")
    1822            2 :         );
    1823            2 : 
    1824            2 :         // should still see the truncated block with older LSN
    1825            2 :         assert_eq!(
    1826            2 :             tline
    1827            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
    1828            2 :                 .await?,
    1829            2 :             3
    1830            2 :         );
    1831            2 :         assert_eq!(
    1832            2 :             tline
    1833            2 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
    1834            2 :                 .await?,
    1835            2 :             TEST_IMG("foo blk 2 at 5")
    1836            2 :         );
    1837            2 : 
    1838            2 :         // Truncate to zero length
    1839            2 :         let mut m = tline.begin_modification(Lsn(0x68));
    1840            2 :         walingest
    1841            2 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1842            2 :             .await?;
    1843            2 :         m.commit(&ctx).await?;
    1844            2 :         assert_eq!(
    1845            2 :             tline
    1846            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
    1847            2 :                 .await?,
    1848            2 :             0
    1849            2 :         );
    1850            2 : 
    1851            2 :         // Extend from 0 to 2 blocks, leaving a gap
    1852            2 :         let mut m = tline.begin_modification(Lsn(0x70));
    1853            2 :         walingest
    1854            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
    1855            2 :             .await?;
    1856            2 :         m.commit(&ctx).await?;
    1857            2 :         assert_eq!(
    1858            2 :             tline
    1859            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
    1860            2 :                 .await?,
    1861            2 :             2
    1862            2 :         );
    1863            2 :         assert_eq!(
    1864            2 :             tline
    1865            2 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
    1866            2 :                 .await?,
    1867            2 :             ZERO_PAGE
    1868            2 :         );
    1869            2 :         assert_eq!(
    1870            2 :             tline
    1871            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
    1872            2 :                 .await?,
    1873            2 :             TEST_IMG("foo blk 1")
    1874            2 :         );
    1875            2 : 
    1876            2 :         // Extend a lot more, leaving a big gap that spans across segments
    1877            2 :         let mut m = tline.begin_modification(Lsn(0x80));
    1878            2 :         walingest
    1879            2 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
    1880            2 :             .await?;
    1881           71 :         m.commit(&ctx).await?;
    1882            2 :         assert_eq!(
    1883            2 :             tline
    1884            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
    1885            2 :                 .await?,
    1886            2 :             1501
    1887            2 :         );
    1888         2998 :         for blk in 2..1500 {
    1889         2996 :             assert_eq!(
    1890         2996 :                 tline
    1891         2996 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
    1892         3037 :                     .await?,
    1893         2996 :                 ZERO_PAGE
    1894            2 :             );
    1895            2 :         }
    1896            2 :         assert_eq!(
    1897            2 :             tline
    1898            2 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
    1899            2 :                 .await?,
    1900            2 :             TEST_IMG("foo blk 1500")
    1901            2 :         );
    1902            2 : 
    1903            2 :         Ok(())
    1904            2 :     }
    1905              : 
    1906              :     // Test what happens if we dropped a relation
    1907              :     // and then created it again within the same layer.
    1908            2 :     #[tokio::test]
    1909            2 :     async fn test_drop_extend() -> Result<()> {
    1910            2 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
    1911            2 :         let tline = tenant
    1912            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1913            6 :             .await?;
    1914            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1915            2 : 
    1916            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1917            2 :         walingest
    1918            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1919            2 :             .await?;
    1920            2 :         m.commit(&ctx).await?;
    1921            2 : 
    1922            2 :         // Check that rel exists and size is correct
    1923            2 :         assert_eq!(
    1924            2 :             tline
    1925            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1926            2 :                 .await?,
    1927            2 :             true
    1928            2 :         );
    1929            2 :         assert_eq!(
    1930            2 :             tline
    1931            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1932            2 :                 .await?,
    1933            2 :             1
    1934            2 :         );
    1935            2 : 
    1936            2 :         // Drop rel
    1937            2 :         let mut m = tline.begin_modification(Lsn(0x30));
    1938            2 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    1939            2 :         m.commit(&ctx).await?;
    1940            2 : 
    1941            2 :         // Check that rel is not visible anymore
    1942            2 :         assert_eq!(
    1943            2 :             tline
    1944            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
    1945            2 :                 .await?,
    1946            2 :             false
    1947            2 :         );
    1948            2 : 
    1949            2 :         // FIXME: should fail
    1950            2 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    1951            2 : 
    1952            2 :         // Re-create it
    1953            2 :         let mut m = tline.begin_modification(Lsn(0x40));
    1954            2 :         walingest
    1955            2 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
    1956            2 :             .await?;
    1957            2 :         m.commit(&ctx).await?;
    1958            2 : 
    1959            2 :         // Check that rel exists and size is correct
    1960            2 :         assert_eq!(
    1961            2 :             tline
    1962            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
    1963            2 :                 .await?,
    1964            2 :             true
    1965            2 :         );
    1966            2 :         assert_eq!(
    1967            2 :             tline
    1968            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
    1969            2 :                 .await?,
    1970            2 :             1
    1971            2 :         );
    1972            2 : 
    1973            2 :         Ok(())
    1974            2 :     }
    1975              : 
    1976              :     // Test what happens if we truncated a relation
    1977              :     // so that one of its segments was dropped
    1978              :     // and then extended it again within the same layer.
    1979            2 :     #[tokio::test]
    1980            2 :     async fn test_truncate_extend() -> Result<()> {
    1981            2 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
    1982            2 :         let tline = tenant
    1983            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1984            6 :             .await?;
    1985            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1986            2 : 
    1987            2 :         // Create a 20 MB relation (the size is arbitrary)
    1988            2 :         let relsize = 20 * 1024 * 1024 / 8192;
    1989            2 :         let mut m = tline.begin_modification(Lsn(0x20));
    1990         5120 :         for blkno in 0..relsize {
    1991         5120 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    1992         5120 :             walingest
    1993         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    1994            2 :                 .await?;
    1995            2 :         }
    1996            2 :         m.commit(&ctx).await?;
    1997            2 : 
    1998            2 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    1999            2 :         assert_eq!(
    2000            2 :             tline
    2001            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    2002            2 :                 .await?,
    2003            2 :             false
    2004            2 :         );
    2005            2 :         assert!(tline
    2006            2 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    2007            2 :             .await
    2008            2 :             .is_err());
    2009            2 : 
    2010            2 :         assert_eq!(
    2011            2 :             tline
    2012            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    2013            2 :                 .await?,
    2014            2 :             true
    2015            2 :         );
    2016            2 :         assert_eq!(
    2017            2 :             tline
    2018            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    2019            2 :                 .await?,
    2020            2 :             relsize
    2021            2 :         );
    2022            2 : 
    2023            2 :         // Check relation content
    2024         5120 :         for blkno in 0..relsize {
    2025         5120 :             let lsn = Lsn(0x20);
    2026         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2027         5120 :             assert_eq!(
    2028         5120 :                 tline
    2029         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
    2030          201 :                     .await?,
    2031         5120 :                 TEST_IMG(&data)
    2032            2 :             );
    2033            2 :         }
    2034            2 : 
    2035            2 :         // Truncate relation so that second segment was dropped
    2036            2 :         // - only leave one page
    2037            2 :         let mut m = tline.begin_modification(Lsn(0x60));
    2038            2 :         walingest
    2039            2 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2040            2 :             .await?;
    2041            2 :         m.commit(&ctx).await?;
    2042            2 : 
    2043            2 :         // Check reported size and contents after truncation
    2044            2 :         assert_eq!(
    2045            2 :             tline
    2046            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
    2047            2 :                 .await?,
    2048            2 :             1
    2049            2 :         );
    2050            2 : 
    2051            4 :         for blkno in 0..1 {
    2052            2 :             let lsn = Lsn(0x20);
    2053            2 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2054            2 :             assert_eq!(
    2055            2 :                 tline
    2056            2 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
    2057            2 :                     .await?,
    2058            2 :                 TEST_IMG(&data)
    2059            2 :             );
    2060            2 :         }
    2061            2 : 
    2062            2 :         // should still see all blocks with older LSN
    2063            2 :         assert_eq!(
    2064            2 :             tline
    2065            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
    2066            2 :                 .await?,
    2067            2 :             relsize
    2068            2 :         );
    2069         5120 :         for blkno in 0..relsize {
    2070         5120 :             let lsn = Lsn(0x20);
    2071         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2072         5120 :             assert_eq!(
    2073         5120 :                 tline
    2074         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
    2075          400 :                     .await?,
    2076         5120 :                 TEST_IMG(&data)
    2077            2 :             );
    2078            2 :         }
    2079            2 : 
    2080            2 :         // Extend relation again.
    2081            2 :         // Add enough blocks to create second segment
    2082            2 :         let lsn = Lsn(0x80);
    2083            2 :         let mut m = tline.begin_modification(lsn);
    2084         5120 :         for blkno in 0..relsize {
    2085         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2086         5120 :             walingest
    2087         5120 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    2088            2 :                 .await?;
    2089            2 :         }
    2090            2 :         m.commit(&ctx).await?;
    2091            2 : 
    2092            2 :         assert_eq!(
    2093            2 :             tline
    2094            2 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
    2095            2 :                 .await?,
    2096            2 :             true
    2097            2 :         );
    2098            2 :         assert_eq!(
    2099            2 :             tline
    2100            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
    2101            2 :                 .await?,
    2102            2 :             relsize
    2103            2 :         );
    2104            2 :         // Check relation content
    2105         5120 :         for blkno in 0..relsize {
    2106         5120 :             let lsn = Lsn(0x80);
    2107         5120 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2108         5120 :             assert_eq!(
    2109         5120 :                 tline
    2110         5120 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
    2111          201 :                     .await?,
    2112         5120 :                 TEST_IMG(&data)
    2113            2 :             );
    2114            2 :         }
    2115            2 : 
    2116            2 :         Ok(())
    2117            2 :     }
    2118              : 
    2119              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2120              :     /// split into multiple 1 GB segments in Postgres.
    2121            2 :     #[tokio::test]
    2122            2 :     async fn test_large_rel() -> Result<()> {
    2123            2 :         let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
    2124            2 :         let tline = tenant
    2125            2 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2126            7 :             .await?;
    2127            5 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2128            2 : 
    2129            2 :         let mut lsn = 0x10;
    2130       262146 :         for blknum in 0..RELSEG_SIZE + 1 {
    2131       262146 :             lsn += 0x10;
    2132       262146 :             let mut m = tline.begin_modification(Lsn(lsn));
    2133       262146 :             let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2134       262146 :             walingest
    2135       262146 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2136         6182 :                 .await?;
    2137       262146 :             m.commit(&ctx).await?;
    2138            2 :         }
    2139            2 : 
    2140            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2141            2 : 
    2142            2 :         assert_eq!(
    2143            2 :             tline
    2144            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2145            2 :                 .await?,
    2146            2 :             RELSEG_SIZE + 1
    2147            2 :         );
    2148            2 : 
    2149            2 :         // Truncate one block
    2150            2 :         lsn += 0x10;
    2151            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    2152            2 :         walingest
    2153            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2154            2 :             .await?;
    2155            2 :         m.commit(&ctx).await?;
    2156            2 :         assert_eq!(
    2157            2 :             tline
    2158            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2159            2 :                 .await?,
    2160            2 :             RELSEG_SIZE
    2161            2 :         );
    2162            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2163            2 : 
    2164            2 :         // Truncate another block
    2165            2 :         lsn += 0x10;
    2166            2 :         let mut m = tline.begin_modification(Lsn(lsn));
    2167            2 :         walingest
    2168            2 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2169            2 :             .await?;
    2170            2 :         m.commit(&ctx).await?;
    2171            2 :         assert_eq!(
    2172            2 :             tline
    2173            2 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2174            2 :                 .await?,
    2175            2 :             RELSEG_SIZE - 1
    2176            2 :         );
    2177            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2178            2 : 
    2179            2 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2180            2 :         // This tests the behavior at segment boundaries
    2181            2 :         let mut size: i32 = 3000;
    2182         6004 :         while size >= 0 {
    2183         6002 :             lsn += 0x10;
    2184         6002 :             let mut m = tline.begin_modification(Lsn(lsn));
    2185         6002 :             walingest
    2186         6002 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2187          142 :                 .await?;
    2188         6002 :             m.commit(&ctx).await?;
    2189         6002 :             assert_eq!(
    2190         6002 :                 tline
    2191         6002 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2192            2 :                     .await?,
    2193         6002 :                 size as BlockNumber
    2194            2 :             );
    2195            2 : 
    2196         6002 :             size -= 1;
    2197            2 :         }
    2198            2 :         assert_current_logical_size(&tline, Lsn(lsn));
    2199            2 : 
    2200            2 :         Ok(())
    2201            2 :     }
    2202              : 
    2203              :     /// Replay a wal segment file taken directly from safekeepers.
    2204              :     ///
    2205              :     /// This test is useful for benchmarking since it allows us to profile only
    2206              :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2207              :     /// without waiting for unrelated steps.
    2208            2 :     #[tokio::test]
    2209            2 :     async fn test_ingest_real_wal() {
    2210            2 :         use crate::tenant::harness::*;
    2211            2 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2212            2 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2213            2 : 
    2214            2 :         // Define test data path and constants.
    2215            2 :         //
    2216            2 :         // Steps to reconstruct the data, if needed:
    2217            2 :         // 1. Run the pgbench python test
    2218            2 :         // 2. Take the first wal segment file from safekeeper
    2219            2 :         // 3. Compress it using `zstd --long input_file`
    2220            2 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2221            2 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2222            2 :         // 6. Run just the decoder from this test to get the endpoint.
    2223            2 :         //    It's the last LSN the decoder will output.
    2224            2 :         let pg_version = 15; // The test data was generated by pg15
    2225            2 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2226            2 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2227            2 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2228            2 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2229            2 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2230            2 : 
    2231            2 :         let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
    2232            2 :         let (tenant, ctx) = harness.load().await;
    2233            2 : 
    2234            2 :         let remote_initdb_path =
    2235            2 :             remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
    2236            2 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2237            2 : 
    2238            2 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2239            2 :             .expect("creating test dir should work");
    2240            2 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2241            2 : 
    2242            2 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2243            2 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2244            2 :         // the garbage data.
    2245            2 :         let tline = tenant
    2246            2 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2247        20533 :             .await
    2248            2 :             .unwrap();
    2249            2 : 
    2250            2 :         // We fully read and decompress this into memory before decoding
    2251            2 :         // to get a more accurate perf profile of the decoder.
    2252            2 :         let bytes = {
    2253            2 :             use async_compression::tokio::bufread::ZstdDecoder;
    2254            2 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2255            2 :             let reader = tokio::io::BufReader::new(file);
    2256            2 :             let decoder = ZstdDecoder::new(reader);
    2257            2 :             let mut reader = tokio::io::BufReader::new(decoder);
    2258            2 :             let mut buffer = Vec::new();
    2259          224 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2260            2 :             buffer
    2261            2 :         };
    2262            2 : 
    2263            2 :         // TODO start a profiler too
    2264            2 :         let started_at = std::time::Instant::now();
    2265            2 : 
    2266            2 :         // Initialize walingest
    2267            2 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2268            2 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2269            2 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2270            5 :             .await
    2271            2 :             .unwrap();
    2272            2 :         let mut modification = tline.begin_modification(startpoint);
    2273            2 :         let mut decoded = DecodedWALRecord::default();
    2274            2 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2275            2 : 
    2276            2 :         // Decode and ingest wal. We process the wal in chunks because
    2277            2 :         // that's what happens when we get bytes from safekeepers.
    2278       474686 :         for chunk in bytes[xlogoff..].chunks(50) {
    2279       474686 :             decoder.feed_bytes(chunk);
    2280       620536 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2281       145850 :                 walingest
    2282       145850 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
    2283          100 :                     .await
    2284       145850 :                     .unwrap();
    2285            2 :             }
    2286       474686 :             modification.commit(&ctx).await.unwrap();
    2287            2 :         }
    2288            2 : 
    2289            2 :         let duration = started_at.elapsed();
    2290            2 :         println!("done in {:?}", duration);
    2291            2 :     }
    2292              : }
        

Generated by: LCOV version 2.1-beta