LCOV - differential code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit LBC UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 72.4 % 1583 1146 3 434 1146
Current Date: 2023-10-19 02:04:12 Functions: 70.1 % 77 54 23 1 53 1
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3                 : //!
       4                 : //! The pipeline for ingesting WAL looks like this:
       5                 : //!
       6                 : //! WAL receiver  ->   WalIngest  ->   Repository
       7                 : //!
       8                 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9                 : //! and decodes it to individual WAL records. It feeds the WAL records
      10                 : //! to WalIngest, which parses them and stores them in the Repository.
      11                 : //!
      12                 : //! The neon Repository can store page versions in two formats: as
      13                 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14                 : //! page images out of some WAL records, but most it stores as WAL
      15                 : //! records. If a WAL record modifies multiple pages, WalIngest
      16                 : //! will call Repository::put_wal_record or put_page_image functions
      17                 : //! separately for each modified page.
      18                 : //!
      19                 : //! To reconstruct a page using a WAL record, the Repository calls the
      20                 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21                 : //! redo Postgres process, but some records it can handle directly with
      22                 : //! bespoken Rust code.
      23                 : 
      24                 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
      25                 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
      26                 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      27                 : 
      28                 : use anyhow::{bail, Context, Result};
      29                 : use bytes::{Buf, Bytes, BytesMut};
      30                 : use tracing::*;
      31                 : 
      32                 : use crate::context::RequestContext;
      33                 : use crate::pgdatadir_mapping::*;
      34                 : use crate::tenant::PageReconstructError;
      35                 : use crate::tenant::Timeline;
      36                 : use crate::walrecord::*;
      37                 : use crate::ZERO_PAGE;
      38                 : use pageserver_api::reltag::{RelTag, SlruKind};
      39                 : use postgres_ffi::pg_constants;
      40                 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      41                 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
      42                 : use postgres_ffi::v14::xlog_utils::*;
      43                 : use postgres_ffi::v14::CheckPoint;
      44                 : use postgres_ffi::TransactionId;
      45                 : use postgres_ffi::BLCKSZ;
      46                 : use utils::lsn::Lsn;
      47                 : 
      48                 : pub struct WalIngest<'a> {
      49                 :     timeline: &'a Timeline,
      50                 : 
      51                 :     checkpoint: CheckPoint,
      52                 :     checkpoint_modified: bool,
      53                 : }
      54                 : 
      55                 : impl<'a> WalIngest<'a> {
      56 CBC        1257 :     pub async fn new(
      57            1257 :         timeline: &'a Timeline,
      58            1257 :         startpoint: Lsn,
      59            1257 :         ctx: &'_ RequestContext,
      60            1257 :     ) -> anyhow::Result<WalIngest<'a>> {
      61                 :         // Fetch the latest checkpoint into memory, so that we can compare with it
      62                 :         // quickly in `ingest_record` and update it when it changes.
      63            1257 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
      64            1248 :         let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
      65 UBC           0 :         trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
      66                 : 
      67 CBC        1248 :         Ok(WalIngest {
      68            1248 :             timeline,
      69            1248 :             checkpoint,
      70            1248 :             checkpoint_modified: false,
      71            1248 :         })
      72            1257 :     }
      73                 : 
      74                 :     ///
      75                 :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
      76                 :     ///
      77                 :     /// This function updates `lsn` field of `DatadirModification`
      78                 :     ///
      79                 :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
      80                 :     /// relations/pages that the record affects.
      81                 :     ///
      82        68588857 :     pub async fn ingest_record(
      83        68588857 :         &mut self,
      84        68588857 :         recdata: Bytes,
      85        68588857 :         lsn: Lsn,
      86        68588857 :         modification: &mut DatadirModification<'_>,
      87        68588857 :         decoded: &mut DecodedWALRecord,
      88        68588857 :         ctx: &RequestContext,
      89        68588857 :     ) -> anyhow::Result<()> {
      90        68588717 :         modification.lsn = lsn;
      91        68588717 :         decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
      92                 : 
      93        68588717 :         let mut buf = decoded.record.clone();
      94        68588717 :         buf.advance(decoded.main_data_offset);
      95        68588717 : 
      96        68588717 :         assert!(!self.checkpoint_modified);
      97        68588717 :         if self.checkpoint.update_next_xid(decoded.xl_xid) {
      98            3622 :             self.checkpoint_modified = true;
      99        68585095 :         }
     100                 : 
     101                 :         // Heap AM records need some special handling, because they modify VM pages
     102                 :         // without registering them with the standard mechanism.
     103        68588717 :         if decoded.xl_rmid == pg_constants::RM_HEAP_ID
     104        15564608 :             || decoded.xl_rmid == pg_constants::RM_HEAP2_ID
     105                 :         {
     106        55203024 :             self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
     107              16 :                 .await?;
     108        13385693 :         }
     109        68588717 :         if decoded.xl_rmid == pg_constants::RM_NEON_ID {
     110 UBC           0 :             self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
     111               0 :                 .await?;
     112 CBC    68588717 :         }
     113                 :         // Handle other special record types
     114        68588717 :         if decoded.xl_rmid == pg_constants::RM_SMGR_ID
     115           21661 :             && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     116           21661 :                 == pg_constants::XLOG_SMGR_CREATE
     117                 :         {
     118           21617 :             let create = XlSmgrCreate::decode(&mut buf);
     119           21617 :             self.ingest_xlog_smgr_create(modification, &create, ctx)
     120            1124 :                 .await?;
     121        68567100 :         } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
     122              44 :             && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     123              44 :                 == pg_constants::XLOG_SMGR_TRUNCATE
     124                 :         {
     125              44 :             let truncate = XlSmgrTruncate::decode(&mut buf);
     126              44 :             self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
     127               2 :                 .await?;
     128        68567056 :         } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
     129 UBC           0 :             debug!(
     130               0 :                 "handle RM_DBASE_ID for Postgres version {:?}",
     131               0 :                 self.timeline.pg_version
     132               0 :             );
     133 CBC          16 :             if self.timeline.pg_version == 14 {
     134              16 :                 if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     135              16 :                     == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
     136                 :                 {
     137              13 :                     let createdb = XlCreateDatabase::decode(&mut buf);
     138 UBC           0 :                     debug!("XLOG_DBASE_CREATE v14");
     139                 : 
     140 CBC          13 :                     self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     141            1430 :                         .await?;
     142               3 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     143               3 :                     == postgres_ffi::v14::bindings::XLOG_DBASE_DROP
     144                 :                 {
     145               3 :                     let dropdb = XlDropDatabase::decode(&mut buf);
     146               6 :                     for tablespace_id in dropdb.tablespace_ids {
     147 UBC           0 :                         trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     148 CBC           3 :                         modification
     149               3 :                             .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     150 UBC           0 :                             .await?;
     151                 :                     }
     152               0 :                 }
     153               0 :             } else if self.timeline.pg_version == 15 {
     154               0 :                 if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     155               0 :                     == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
     156                 :                 {
     157               0 :                     debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     158               0 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     159               0 :                     == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
     160                 :                 {
     161                 :                     // The XLOG record was renamed between v14 and v15,
     162                 :                     // but the record format is the same.
     163                 :                     // So we can reuse XlCreateDatabase here.
     164               0 :                     debug!("XLOG_DBASE_CREATE_FILE_COPY");
     165               0 :                     let createdb = XlCreateDatabase::decode(&mut buf);
     166               0 :                     self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     167               0 :                         .await?;
     168               0 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     169               0 :                     == postgres_ffi::v15::bindings::XLOG_DBASE_DROP
     170                 :                 {
     171               0 :                     let dropdb = XlDropDatabase::decode(&mut buf);
     172               0 :                     for tablespace_id in dropdb.tablespace_ids {
     173               0 :                         trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     174               0 :                         modification
     175               0 :                             .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     176               0 :                             .await?;
     177                 :                     }
     178               0 :                 }
     179               0 :             } else if self.timeline.pg_version == 16 {
     180               0 :                 if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     181               0 :                     == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG
     182                 :                 {
     183               0 :                     debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     184               0 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     185               0 :                     == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY
     186                 :                 {
     187                 :                     // The XLOG record was renamed between v14 and v15,
     188                 :                     // but the record format is the same.
     189                 :                     // So we can reuse XlCreateDatabase here.
     190               0 :                     debug!("XLOG_DBASE_CREATE_FILE_COPY");
     191               0 :                     let createdb = XlCreateDatabase::decode(&mut buf);
     192               0 :                     self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     193               0 :                         .await?;
     194               0 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     195               0 :                     == postgres_ffi::v16::bindings::XLOG_DBASE_DROP
     196                 :                 {
     197               0 :                     let dropdb = XlDropDatabase::decode(&mut buf);
     198               0 :                     for tablespace_id in dropdb.tablespace_ids {
     199               0 :                         trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     200               0 :                         modification
     201               0 :                             .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     202               0 :                             .await?;
     203                 :                     }
     204               0 :                 }
     205               0 :             }
     206 CBC    68567040 :         } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
     207 UBC           0 :             trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     208 CBC    68567036 :         } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID {
     209             363 :             let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     210             363 :             if info == pg_constants::CLOG_ZEROPAGE {
     211             362 :                 let pageno = buf.get_u32_le();
     212             362 :                 let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     213             362 :                 let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     214             362 :                 self.put_slru_page_image(
     215             362 :                     modification,
     216             362 :                     SlruKind::Clog,
     217             362 :                     segno,
     218             362 :                     rpageno,
     219             362 :                     ZERO_PAGE.clone(),
     220             362 :                     ctx,
     221             362 :                 )
     222              28 :                 .await?;
     223                 :             } else {
     224               1 :                 assert!(info == pg_constants::CLOG_TRUNCATE);
     225               1 :                 let xlrec = XlClogTruncate::decode(&mut buf);
     226               1 :                 self.ingest_clog_truncate_record(modification, &xlrec, ctx)
     227 UBC           0 :                     .await?;
     228                 :             }
     229 CBC    68566673 :         } else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
     230         2059356 :             let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     231         2059356 :             if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
     232         1950128 :                 let parsed_xact =
     233         1950128 :                     XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     234         1950128 :                 self.ingest_xact_record(
     235         1950128 :                     modification,
     236         1950128 :                     &parsed_xact,
     237         1950128 :                     info == pg_constants::XLOG_XACT_COMMIT,
     238         1950128 :                     ctx,
     239         1950128 :                 )
     240            4063 :                 .await?;
     241          109228 :             } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
     242          109227 :                 || info == pg_constants::XLOG_XACT_ABORT_PREPARED
     243                 :             {
     244               2 :                 let parsed_xact =
     245               2 :                     XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     246               2 :                 self.ingest_xact_record(
     247               2 :                     modification,
     248               2 :                     &parsed_xact,
     249               2 :                     info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
     250               2 :                     ctx,
     251               2 :                 )
     252 UBC           0 :                 .await?;
     253                 :                 // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     254               0 :                 trace!(
     255               0 :                     "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     256               0 :                     decoded.xl_xid,
     257               0 :                     parsed_xact.xid,
     258               0 :                     lsn,
     259               0 :                 );
     260 CBC           2 :                 modification
     261               2 :                     .drop_twophase_file(parsed_xact.xid, ctx)
     262 UBC           0 :                     .await?;
     263 CBC      109226 :             } else if info == pg_constants::XLOG_XACT_PREPARE {
     264               4 :                 modification
     265               4 :                     .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
     266 LBC         (1) :                     .await?;
     267 CBC      109222 :             }
     268        66507317 :         } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
     269           24332 :             let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     270           24332 : 
     271           24332 :             if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
     272              14 :                 let pageno = buf.get_u32_le();
     273              14 :                 let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     274              14 :                 let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     275              14 :                 self.put_slru_page_image(
     276              14 :                     modification,
     277              14 :                     SlruKind::MultiXactOffsets,
     278              14 :                     segno,
     279              14 :                     rpageno,
     280              14 :                     ZERO_PAGE.clone(),
     281              14 :                     ctx,
     282              14 :                 )
     283 UBC           0 :                 .await?;
     284 CBC       24318 :             } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
     285             291 :                 let pageno = buf.get_u32_le();
     286             291 :                 let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     287             291 :                 let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     288             291 :                 self.put_slru_page_image(
     289             291 :                     modification,
     290             291 :                     SlruKind::MultiXactMembers,
     291             291 :                     segno,
     292             291 :                     rpageno,
     293             291 :                     ZERO_PAGE.clone(),
     294             291 :                     ctx,
     295             291 :                 )
     296               3 :                 .await?;
     297           24027 :             } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     298           24027 :                 let xlrec = XlMultiXactCreate::decode(&mut buf);
     299           24027 :                 self.ingest_multixact_create_record(modification, &xlrec)?;
     300 UBC           0 :             } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     301               0 :                 let xlrec = XlMultiXactTruncate::decode(&mut buf);
     302               0 :                 self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
     303               0 :                     .await?;
     304               0 :             }
     305 CBC    66482985 :         } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
     306              54 :             let xlrec = XlRelmapUpdate::decode(&mut buf);
     307              54 :             self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
     308               5 :                 .await?;
     309        66482931 :         } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
     310          133074 :             let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     311          133074 :             if info == pg_constants::XLOG_NEXTOID {
     312             412 :                 let next_oid = buf.get_u32_le();
     313             412 :                 if self.checkpoint.nextOid != next_oid {
     314             412 :                     self.checkpoint.nextOid = next_oid;
     315             412 :                     self.checkpoint_modified = true;
     316             412 :                 }
     317          132662 :             } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
     318          132602 :                 || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     319                 :             {
     320             634 :                 let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
     321             634 :                 buf.copy_to_slice(&mut checkpoint_bytes);
     322             634 :                 let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
     323 UBC           0 :                 trace!(
     324               0 :                     "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
     325               0 :                     xlog_checkpoint.oldestXid,
     326               0 :                     self.checkpoint.oldestXid
     327               0 :                 );
     328 CBC         634 :                 if (self
     329             634 :                     .checkpoint
     330             634 :                     .oldestXid
     331             634 :                     .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
     332             634 :                     < 0
     333 UBC           0 :                 {
     334               0 :                     self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
     335               0 :                     self.checkpoint_modified = true;
     336 CBC         634 :                 }
     337          132028 :             }
     338        66349857 :         } else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
     339             109 :             let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     340             109 :             if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     341             109 :                 let xlrec = XlLogicalMessage::decode(&mut buf);
     342             109 :                 let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     343             109 :                 let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
     344             109 :                 if prefix == "neon-test" {
     345                 :                     // This is a convenient way to make the WAL ingestion pause at
     346                 :                     // particular point in the WAL. For more fine-grained control,
     347                 :                     // we could peek into the message and only pause if it contains
     348                 :                     // a particular string, for example, but this is enough for now.
     349              18 :                     crate::failpoint_support::sleep_millis_async!(
     350              18 :                         "wal-ingest-logical-message-sleep"
     351              18 :                     );
     352              97 :                 } else if let Some(path) = prefix.strip_prefix("neon-file:") {
     353              94 :                     modification.put_file(path, message, ctx).await?;
     354               3 :                 }
     355 UBC           0 :             }
     356 CBC    66349748 :         }
     357                 : 
     358                 :         // Iterate through all the blocks that the record modifies, and
     359                 :         // "put" a separate copy of the record for each block.
     360        70056431 :         for blk in decoded.blocks.iter() {
     361        70056431 :             self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
     362           23659 :                 .await?;
     363                 :         }
     364                 : 
     365                 :         // If checkpoint data was updated, store the new version in the repository
     366        68588715 :         if self.checkpoint_modified {
     367           28042 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     368                 : 
     369           28042 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     370           28042 :             self.checkpoint_modified = false;
     371        68560673 :         }
     372                 : 
     373                 :         // Now that this record has been fully handled, including updating the
     374                 :         // checkpoint data, let the repository know that it is up-to-date to this LSN
     375        68588715 :         modification.commit(ctx).await?;
     376                 : 
     377        68588709 :         Ok(())
     378        68588709 :     }
     379                 : 
     380        70056619 :     async fn ingest_decoded_block(
     381        70056619 :         &mut self,
     382        70056619 :         modification: &mut DatadirModification<'_>,
     383        70056619 :         lsn: Lsn,
     384        70056619 :         decoded: &DecodedWALRecord,
     385        70056619 :         blk: &DecodedBkpBlock,
     386        70056619 :         ctx: &RequestContext,
     387        70056619 :     ) -> Result<(), PageReconstructError> {
     388        70056433 :         let rel = RelTag {
     389        70056433 :             spcnode: blk.rnode_spcnode,
     390        70056433 :             dbnode: blk.rnode_dbnode,
     391        70056433 :             relnode: blk.rnode_relnode,
     392        70056433 :             forknum: blk.forknum,
     393        70056433 :         };
     394        70056433 : 
     395        70056433 :         //
     396        70056433 :         // Instead of storing full-page-image WAL record,
     397        70056433 :         // it is better to store extracted image: we can skip wal-redo
     398        70056433 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     399        70056433 :         // so them have to be copied multiple times.
     400        70056433 :         //
     401        70056433 :         if blk.apply_image
     402          172031 :             && blk.has_image
     403          172031 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     404          133267 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     405 UBC           0 :                 || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     406                 :         // compression of WAL is not yet supported: fall back to storing the original WAL record
     407 CBC      133267 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
     408                 :         {
     409                 :             // Extract page image from FPI record
     410          133267 :             let img_len = blk.bimg_len as usize;
     411          133267 :             let img_offs = blk.bimg_offset as usize;
     412          133267 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     413          133267 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     414          133267 : 
     415          133267 :             if blk.hole_length != 0 {
     416          109519 :                 let tail = image.split_off(blk.hole_offset as usize);
     417          109519 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     418          109519 :                 image.unsplit(tail);
     419          109519 :             }
     420                 :             //
     421                 :             // Match the logic of XLogReadBufferForRedoExtended:
     422                 :             // The page may be uninitialized. If so, we can't set the LSN because
     423                 :             // that would corrupt the page.
     424                 :             //
     425          133267 :             if !page_is_new(&image) {
     426          128411 :                 page_set_lsn(&mut image, lsn)
     427            4856 :             }
     428          133267 :             assert_eq!(image.len(), BLCKSZ as usize);
     429          133267 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     430            3260 :                 .await?;
     431                 :         } else {
     432        69923166 :             let rec = NeonWalRecord::Postgres {
     433        69923166 :                 will_init: blk.will_init || blk.apply_image,
     434        69923166 :                 rec: decoded.record.clone(),
     435        69923166 :             };
     436        69923166 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     437           20399 :                 .await?;
     438                 :         }
     439        70056433 :         Ok(())
     440        70056433 :     }
     441                 : 
     442        55203122 :     async fn ingest_heapam_record(
     443        55203122 :         &mut self,
     444        55203122 :         buf: &mut Bytes,
     445        55203122 :         modification: &mut DatadirModification<'_>,
     446        55203122 :         decoded: &mut DecodedWALRecord,
     447        55203122 :         ctx: &RequestContext,
     448        55203122 :     ) -> anyhow::Result<()> {
     449        55203025 :         // Handle VM bit updates that are implicitly part of heap records.
     450        55203025 : 
     451        55203025 :         // First, look at the record to determine which VM bits need
     452        55203025 :         // to be cleared. If either of these variables is set, we
     453        55203025 :         // need to clear the corresponding bits in the visibility map.
     454        55203025 :         let mut new_heap_blkno: Option<u32> = None;
     455        55203025 :         let mut old_heap_blkno: Option<u32> = None;
     456        55203025 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     457        55203025 : 
     458        55203025 :         match self.timeline.pg_version {
     459                 :             14 => {
     460        55203025 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     461        53024110 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     462        53024110 : 
     463        53024110 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     464        42667615 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     465        42667615 :                         assert_eq!(0, buf.remaining());
     466        42667615 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     467            2586 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     468        42665029 :                         }
     469        10356495 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     470          535608 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     471          535608 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     472             732 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     473          534876 :                         }
     474         9820887 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     475         6341229 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     476                 :                     {
     477         5096400 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     478         5096400 :                         // the size of tuple data is inferred from the size of the record.
     479         5096400 :                         // we can't validate the remaining number of bytes without parsing
     480         5096400 :                         // the tuple data.
     481         5096400 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     482          113991 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     483         4982409 :                         }
     484         5096400 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     485            9335 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     486            9335 :                             // non-HOT update where the new tuple goes to different page than
     487            9335 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     488            9335 :                             // set.
     489            9335 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     490         5087064 :                         }
     491         4724487 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     492         4668326 :                         let xlrec = v14::XlHeapLock::decode(buf);
     493         4668326 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     494              52 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     495              52 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     496         4668274 :                         }
     497           56161 :                     }
     498         2178915 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     499         2178915 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     500         2178915 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     501          710747 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     502                 : 
     503          710747 :                         let offset_array_len =
     504          710747 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     505                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     506          563808 :                                 0
     507                 :                             } else {
     508          146939 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     509                 :                             };
     510          710747 :                         assert_eq!(offset_array_len, buf.remaining());
     511                 : 
     512          710747 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     513            1934 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     514          708813 :                         }
     515         1468168 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     516            3898 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     517            3898 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     518 UBC           0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     519               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     520 CBC        3898 :                         }
     521         1464270 :                     }
     522                 :                 } else {
     523 UBC           0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     524                 :                 }
     525                 :             }
     526                 :             15 => {
     527               0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     528               0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     529               0 : 
     530               0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     531               0 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     532               0 :                         assert_eq!(0, buf.remaining());
     533               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     534               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     535               0 :                         }
     536               0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     537               0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     538               0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     539               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     540               0 :                         }
     541               0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     542               0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     543                 :                     {
     544               0 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     545               0 :                         // the size of tuple data is inferred from the size of the record.
     546               0 :                         // we can't validate the remaining number of bytes without parsing
     547               0 :                         // the tuple data.
     548               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     549               0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     550               0 :                         }
     551               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     552               0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     553               0 :                             // non-HOT update where the new tuple goes to different page than
     554               0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     555               0 :                             // set.
     556               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     557               0 :                         }
     558               0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     559               0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     560               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     561               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     562               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     563               0 :                         }
     564               0 :                     }
     565               0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     566               0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     567               0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     568               0 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     569                 : 
     570               0 :                         let offset_array_len =
     571               0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     572                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     573               0 :                                 0
     574                 :                             } else {
     575               0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     576                 :                             };
     577               0 :                         assert_eq!(offset_array_len, buf.remaining());
     578                 : 
     579               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     580               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     581               0 :                         }
     582               0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     583               0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     584               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     585               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     586               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     587               0 :                         }
     588               0 :                     }
     589                 :                 } else {
     590               0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     591                 :                 }
     592                 :             }
     593                 :             16 => {
     594               0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     595               0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     596               0 : 
     597               0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     598               0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     599               0 :                         assert_eq!(0, buf.remaining());
     600               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     601               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     602               0 :                         }
     603               0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     604               0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     605               0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     606               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     607               0 :                         }
     608               0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     609               0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     610                 :                     {
     611               0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     612               0 :                         // the size of tuple data is inferred from the size of the record.
     613               0 :                         // we can't validate the remaining number of bytes without parsing
     614               0 :                         // the tuple data.
     615               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     616               0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     617               0 :                         }
     618               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     619               0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     620               0 :                             // non-HOT update where the new tuple goes to different page than
     621               0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     622               0 :                             // set.
     623               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     624               0 :                         }
     625               0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     626               0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     627               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     628               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     629               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     630               0 :                         }
     631               0 :                     }
     632               0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     633               0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     634               0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     635               0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     636                 : 
     637               0 :                         let offset_array_len =
     638               0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     639                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     640               0 :                                 0
     641                 :                             } else {
     642               0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     643                 :                             };
     644               0 :                         assert_eq!(offset_array_len, buf.remaining());
     645                 : 
     646               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     647               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     648               0 :                         }
     649               0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     650               0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     651               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     652               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     653               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     654               0 :                         }
     655               0 :                     }
     656                 :                 } else {
     657               0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     658                 :                 }
     659                 :             }
     660               0 :             _ => {}
     661                 :         }
     662                 : 
     663                 :         // Clear the VM bits if required.
     664 CBC    55203024 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     665          128572 :             let vm_rel = RelTag {
     666          128572 :                 forknum: VISIBILITYMAP_FORKNUM,
     667          128572 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     668          128572 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     669          128572 :                 relnode: decoded.blocks[0].rnode_relnode,
     670          128572 :             };
     671          128572 : 
     672          128572 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     673          128572 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     674                 : 
     675                 :             // Sometimes, Postgres seems to create heap WAL records with the
     676                 :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     677                 :             // not set. In fact, it's possible that the VM page does not exist at all.
     678                 :             // In that case, we don't want to store a record to clear the VM bit;
     679                 :             // replaying it would fail to find the previous image of the page, because
     680                 :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     681                 :             // record if it doesn't.
     682          128572 :             let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
     683          128572 :             if let Some(blknum) = new_vm_blk {
     684           14587 :                 if blknum >= vm_size {
     685 UBC           0 :                     new_vm_blk = None;
     686 CBC       14587 :                 }
     687          113985 :             }
     688          128572 :             if let Some(blknum) = old_vm_blk {
     689          114043 :                 if blknum >= vm_size {
     690 UBC           0 :                     old_vm_blk = None;
     691 CBC      114043 :                 }
     692           14529 :             }
     693                 : 
     694          128572 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     695          128572 :                 if new_vm_blk == old_vm_blk {
     696                 :                     // An UPDATE record that needs to clear the bits for both old and the
     697                 :                     // new page, both of which reside on the same VM page.
     698              58 :                     self.put_rel_wal_record(
     699              58 :                         modification,
     700              58 :                         vm_rel,
     701              58 :                         new_vm_blk.unwrap(),
     702              58 :                         NeonWalRecord::ClearVisibilityMapFlags {
     703              58 :                             new_heap_blkno,
     704              58 :                             old_heap_blkno,
     705              58 :                             flags,
     706              58 :                         },
     707              58 :                         ctx,
     708              58 :                     )
     709 UBC           0 :                     .await?;
     710                 :                 } else {
     711                 :                     // Clear VM bits for one heap page, or for two pages that reside on
     712                 :                     // different VM pages.
     713 CBC      128514 :                     if let Some(new_vm_blk) = new_vm_blk {
     714           14529 :                         self.put_rel_wal_record(
     715           14529 :                             modification,
     716           14529 :                             vm_rel,
     717           14529 :                             new_vm_blk,
     718           14529 :                             NeonWalRecord::ClearVisibilityMapFlags {
     719           14529 :                                 new_heap_blkno,
     720           14529 :                                 old_heap_blkno: None,
     721           14529 :                                 flags,
     722           14529 :                             },
     723           14529 :                             ctx,
     724           14529 :                         )
     725 UBC           0 :                         .await?;
     726 CBC      113985 :                     }
     727          128514 :                     if let Some(old_vm_blk) = old_vm_blk {
     728          113985 :                         self.put_rel_wal_record(
     729          113985 :                             modification,
     730          113985 :                             vm_rel,
     731          113985 :                             old_vm_blk,
     732          113985 :                             NeonWalRecord::ClearVisibilityMapFlags {
     733          113985 :                                 new_heap_blkno: None,
     734          113985 :                                 old_heap_blkno,
     735          113985 :                                 flags,
     736          113985 :                             },
     737          113985 :                             ctx,
     738          113985 :                         )
     739 UBC           0 :                         .await?;
     740 CBC       14529 :                     }
     741                 :                 }
     742 UBC           0 :             }
     743 CBC    55074452 :         }
     744                 : 
     745        55203024 :         Ok(())
     746        55203024 :     }
     747                 : 
     748 UBC           0 :     async fn ingest_neonrmgr_record(
     749               0 :         &mut self,
     750               0 :         buf: &mut Bytes,
     751               0 :         modification: &mut DatadirModification<'_>,
     752               0 :         decoded: &mut DecodedWALRecord,
     753               0 :         ctx: &RequestContext,
     754               0 :     ) -> anyhow::Result<()> {
     755               0 :         // Handle VM bit updates that are implicitly part of heap records.
     756               0 : 
     757               0 :         // First, look at the record to determine which VM bits need
     758               0 :         // to be cleared. If either of these variables is set, we
     759               0 :         // need to clear the corresponding bits in the visibility map.
     760               0 :         let mut new_heap_blkno: Option<u32> = None;
     761               0 :         let mut old_heap_blkno: Option<u32> = None;
     762               0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     763               0 : 
     764               0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     765                 : 
     766               0 :         match self.timeline.pg_version {
     767                 :             16 => {
     768               0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     769               0 : 
     770               0 :                 match info {
     771                 :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     772               0 :                         let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
     773               0 :                         assert_eq!(0, buf.remaining());
     774               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     775               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     776               0 :                         }
     777                 :                     }
     778                 :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     779               0 :                         let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
     780               0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     781               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     782               0 :                         }
     783                 :                     }
     784                 :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     785                 :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     786               0 :                         let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
     787               0 :                         // the size of tuple data is inferred from the size of the record.
     788               0 :                         // we can't validate the remaining number of bytes without parsing
     789               0 :                         // the tuple data.
     790               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     791               0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     792               0 :                         }
     793               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     794               0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     795               0 :                             // non-HOT update where the new tuple goes to different page than
     796               0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     797               0 :                             // set.
     798               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     799               0 :                         }
     800                 :                     }
     801                 :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     802               0 :                         let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     803                 : 
     804               0 :                         let offset_array_len =
     805               0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     806                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     807               0 :                                 0
     808                 :                             } else {
     809               0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     810                 :                             };
     811               0 :                         assert_eq!(offset_array_len, buf.remaining());
     812                 : 
     813               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     814               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     815               0 :                         }
     816                 :                     }
     817                 :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     818               0 :                         let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
     819               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     820               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     821               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     822               0 :                         }
     823                 :                     }
     824               0 :                     info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
     825                 :                 }
     826                 :             }
     827               0 :             _ => bail!(
     828               0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     829               0 :                 self.timeline.pg_version
     830               0 :             ),
     831                 :         }
     832                 : 
     833                 :         // Clear the VM bits if required.
     834               0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     835               0 :             let vm_rel = RelTag {
     836               0 :                 forknum: VISIBILITYMAP_FORKNUM,
     837               0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     838               0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     839               0 :                 relnode: decoded.blocks[0].rnode_relnode,
     840               0 :             };
     841               0 : 
     842               0 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     843               0 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     844                 : 
     845                 :             // Sometimes, Postgres seems to create heap WAL records with the
     846                 :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     847                 :             // not set. In fact, it's possible that the VM page does not exist at all.
     848                 :             // In that case, we don't want to store a record to clear the VM bit;
     849                 :             // replaying it would fail to find the previous image of the page, because
     850                 :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     851                 :             // record if it doesn't.
     852               0 :             let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
     853               0 :             if let Some(blknum) = new_vm_blk {
     854               0 :                 if blknum >= vm_size {
     855               0 :                     new_vm_blk = None;
     856               0 :                 }
     857               0 :             }
     858               0 :             if let Some(blknum) = old_vm_blk {
     859               0 :                 if blknum >= vm_size {
     860               0 :                     old_vm_blk = None;
     861               0 :                 }
     862               0 :             }
     863                 : 
     864               0 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     865               0 :                 if new_vm_blk == old_vm_blk {
     866                 :                     // An UPDATE record that needs to clear the bits for both old and the
     867                 :                     // new page, both of which reside on the same VM page.
     868               0 :                     self.put_rel_wal_record(
     869               0 :                         modification,
     870               0 :                         vm_rel,
     871               0 :                         new_vm_blk.unwrap(),
     872               0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     873               0 :                             new_heap_blkno,
     874               0 :                             old_heap_blkno,
     875               0 :                             flags,
     876               0 :                         },
     877               0 :                         ctx,
     878               0 :                     )
     879               0 :                     .await?;
     880                 :                 } else {
     881                 :                     // Clear VM bits for one heap page, or for two pages that reside on
     882                 :                     // different VM pages.
     883               0 :                     if let Some(new_vm_blk) = new_vm_blk {
     884               0 :                         self.put_rel_wal_record(
     885               0 :                             modification,
     886               0 :                             vm_rel,
     887               0 :                             new_vm_blk,
     888               0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     889               0 :                                 new_heap_blkno,
     890               0 :                                 old_heap_blkno: None,
     891               0 :                                 flags,
     892               0 :                             },
     893               0 :                             ctx,
     894               0 :                         )
     895               0 :                         .await?;
     896               0 :                     }
     897               0 :                     if let Some(old_vm_blk) = old_vm_blk {
     898               0 :                         self.put_rel_wal_record(
     899               0 :                             modification,
     900               0 :                             vm_rel,
     901               0 :                             old_vm_blk,
     902               0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     903               0 :                                 new_heap_blkno: None,
     904               0 :                                 old_heap_blkno,
     905               0 :                                 flags,
     906               0 :                             },
     907               0 :                             ctx,
     908               0 :                         )
     909               0 :                         .await?;
     910               0 :                     }
     911                 :                 }
     912               0 :             }
     913               0 :         }
     914                 : 
     915               0 :         Ok(())
     916               0 :     }
     917                 : 
     918                 :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
     919 CBC          13 :     async fn ingest_xlog_dbase_create(
     920              13 :         &mut self,
     921              13 :         modification: &mut DatadirModification<'_>,
     922              13 :         rec: &XlCreateDatabase,
     923              13 :         ctx: &RequestContext,
     924              13 :     ) -> anyhow::Result<()> {
     925              13 :         let db_id = rec.db_id;
     926              13 :         let tablespace_id = rec.tablespace_id;
     927              13 :         let src_db_id = rec.src_db_id;
     928              13 :         let src_tablespace_id = rec.src_tablespace_id;
     929              13 : 
     930              13 :         // Creating a database is implemented by copying the template (aka. source) database.
     931              13 :         // To copy all the relations, we need to ask for the state as of the same LSN, but we
     932              13 :         // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for
     933              13 :         // the last valid LSN to advance up to it. So we use the previous record's LSN in the
     934              13 :         // get calls instead.
     935              13 :         let req_lsn = modification.tline.get_last_record_lsn();
     936                 : 
     937              13 :         let rels = modification
     938              13 :             .tline
     939              13 :             .list_rels(src_tablespace_id, src_db_id, req_lsn, ctx)
     940 UBC           0 :             .await?;
     941                 : 
     942               0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
     943                 : 
     944                 :         // Copy relfilemap
     945 CBC          13 :         let filemap = modification
     946              13 :             .tline
     947              13 :             .get_relmap_file(src_tablespace_id, src_db_id, req_lsn, ctx)
     948 UBC           0 :             .await?;
     949 CBC          13 :         modification
     950              13 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
     951 UBC           0 :             .await?;
     952                 : 
     953 CBC          13 :         let mut num_rels_copied = 0;
     954              13 :         let mut num_blocks_copied = 0;
     955            3809 :         for src_rel in rels {
     956            3796 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
     957            3796 :             assert_eq!(src_rel.dbnode, src_db_id);
     958                 : 
     959            3796 :             let nblocks = modification
     960            3796 :                 .tline
     961            3796 :                 .get_rel_size(src_rel, req_lsn, true, ctx)
     962             296 :                 .await?;
     963            3796 :             let dst_rel = RelTag {
     964            3796 :                 spcnode: tablespace_id,
     965            3796 :                 dbnode: db_id,
     966            3796 :                 relnode: src_rel.relnode,
     967            3796 :                 forknum: src_rel.forknum,
     968            3796 :             };
     969            3796 : 
     970            3796 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
     971                 : 
     972                 :             // Copy content
     973 UBC           0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
     974 CBC       13130 :             for blknum in 0..nblocks {
     975 UBC           0 :                 debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
     976                 : 
     977 CBC       13130 :                 let content = modification
     978           13130 :                     .tline
     979           13130 :                     .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true, ctx)
     980            1134 :                     .await?;
     981           13130 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
     982           13130 :                 num_blocks_copied += 1;
     983                 :             }
     984                 : 
     985            3796 :             num_rels_copied += 1;
     986                 :         }
     987                 : 
     988              13 :         info!(
     989              13 :             "Created database {}/{}, copied {} blocks in {} rels",
     990              13 :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
     991              13 :         );
     992              13 :         Ok(())
     993              13 :     }
     994                 : 
     995           21617 :     async fn ingest_xlog_smgr_create(
     996           21617 :         &mut self,
     997           21617 :         modification: &mut DatadirModification<'_>,
     998           21617 :         rec: &XlSmgrCreate,
     999           21617 :         ctx: &RequestContext,
    1000           21617 :     ) -> anyhow::Result<()> {
    1001           21617 :         let rel = RelTag {
    1002           21617 :             spcnode: rec.rnode.spcnode,
    1003           21617 :             dbnode: rec.rnode.dbnode,
    1004           21617 :             relnode: rec.rnode.relnode,
    1005           21617 :             forknum: rec.forknum,
    1006           21617 :         };
    1007           21617 :         self.put_rel_creation(modification, rel, ctx).await?;
    1008           21617 :         Ok(())
    1009           21617 :     }
    1010                 : 
    1011                 :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
    1012                 :     ///
    1013                 :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
    1014              44 :     async fn ingest_xlog_smgr_truncate(
    1015              44 :         &mut self,
    1016              44 :         modification: &mut DatadirModification<'_>,
    1017              44 :         rec: &XlSmgrTruncate,
    1018              44 :         ctx: &RequestContext,
    1019              44 :     ) -> anyhow::Result<()> {
    1020              44 :         let spcnode = rec.rnode.spcnode;
    1021              44 :         let dbnode = rec.rnode.dbnode;
    1022              44 :         let relnode = rec.rnode.relnode;
    1023              44 : 
    1024              44 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
    1025              44 :             let rel = RelTag {
    1026              44 :                 spcnode,
    1027              44 :                 dbnode,
    1028              44 :                 relnode,
    1029              44 :                 forknum: MAIN_FORKNUM,
    1030              44 :             };
    1031              44 :             self.put_rel_truncation(modification, rel, rec.blkno, ctx)
    1032               1 :                 .await?;
    1033 UBC           0 :         }
    1034 CBC          44 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
    1035              44 :             let rel = RelTag {
    1036              44 :                 spcnode,
    1037              44 :                 dbnode,
    1038              44 :                 relnode,
    1039              44 :                 forknum: FSM_FORKNUM,
    1040              44 :             };
    1041              44 : 
    1042              44 :             let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
    1043              44 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
    1044              44 :             if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
    1045                 :                 // Tail of last remaining FSM page has to be zeroed.
    1046                 :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
    1047              22 :                 modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
    1048              22 :                 fsm_physical_page_no += 1;
    1049              22 :             }
    1050              44 :             let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
    1051              44 :             if nblocks > fsm_physical_page_no {
    1052                 :                 // check if something to do: FSM is larger than truncate position
    1053              22 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
    1054 LBC         (2) :                     .await?;
    1055 CBC          22 :             }
    1056 UBC           0 :         }
    1057 CBC          44 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
    1058              44 :             let rel = RelTag {
    1059              44 :                 spcnode,
    1060              44 :                 dbnode,
    1061              44 :                 relnode,
    1062              44 :                 forknum: VISIBILITYMAP_FORKNUM,
    1063              44 :             };
    1064              44 : 
    1065              44 :             let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
    1066              44 :             if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
    1067                 :                 // Tail of last remaining vm page has to be zeroed.
    1068                 :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
    1069              22 :                 modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
    1070              22 :                 vm_page_no += 1;
    1071              22 :             }
    1072              44 :             let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
    1073              44 :             if nblocks > vm_page_no {
    1074                 :                 // check if something to do: VM is larger than truncate position
    1075              22 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
    1076               1 :                     .await?;
    1077              22 :             }
    1078 UBC           0 :         }
    1079 CBC          44 :         Ok(())
    1080              44 :     }
    1081                 : 
    1082                 :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
    1083                 :     ///
    1084         1950130 :     async fn ingest_xact_record(
    1085         1950130 :         &mut self,
    1086         1950130 :         modification: &mut DatadirModification<'_>,
    1087         1950130 :         parsed: &XlXactParsedRecord,
    1088         1950130 :         is_commit: bool,
    1089         1950130 :         ctx: &RequestContext,
    1090         1950130 :     ) -> anyhow::Result<()> {
    1091         1950130 :         // Record update of CLOG pages
    1092         1950130 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
    1093         1950130 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1094         1950130 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1095         1950130 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
    1096                 : 
    1097         2150179 :         for subxact in &parsed.subxacts {
    1098          200049 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
    1099          200049 :             if subxact_pageno != pageno {
    1100                 :                 // This subxact goes to different page. Write the record
    1101                 :                 // for all the XIDs on the previous page, and continue
    1102                 :                 // accumulating XIDs on this new page.
    1103               6 :                 modification.put_slru_wal_record(
    1104               6 :                     SlruKind::Clog,
    1105               6 :                     segno,
    1106               6 :                     rpageno,
    1107               6 :                     if is_commit {
    1108               6 :                         NeonWalRecord::ClogSetCommitted {
    1109               6 :                             xids: page_xids,
    1110               6 :                             timestamp: parsed.xact_time,
    1111               6 :                         }
    1112                 :                     } else {
    1113 UBC           0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
    1114                 :                     },
    1115               0 :                 )?;
    1116 CBC           6 :                 page_xids = Vec::new();
    1117          200043 :             }
    1118          200049 :             pageno = subxact_pageno;
    1119          200049 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1120          200049 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1121          200049 :             page_xids.push(*subxact);
    1122                 :         }
    1123         1950130 :         modification.put_slru_wal_record(
    1124         1950130 :             SlruKind::Clog,
    1125         1950130 :             segno,
    1126         1950130 :             rpageno,
    1127         1950130 :             if is_commit {
    1128         1948337 :                 NeonWalRecord::ClogSetCommitted {
    1129         1948337 :                     xids: page_xids,
    1130         1948337 :                     timestamp: parsed.xact_time,
    1131         1948337 :                 }
    1132                 :             } else {
    1133            1793 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
    1134                 :             },
    1135 UBC           0 :         )?;
    1136                 : 
    1137 CBC     1966894 :         for xnode in &parsed.xnodes {
    1138           83820 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
    1139           67056 :                 let rel = RelTag {
    1140           67056 :                     forknum,
    1141           67056 :                     spcnode: xnode.spcnode,
    1142           67056 :                     dbnode: xnode.dbnode,
    1143           67056 :                     relnode: xnode.relnode,
    1144           67056 :                 };
    1145           67056 :                 let last_lsn = self.timeline.get_last_record_lsn();
    1146           67056 :                 if modification
    1147           67056 :                     .tline
    1148           67056 :                     .get_rel_exists(rel, last_lsn, true, ctx)
    1149            2719 :                     .await?
    1150                 :                 {
    1151           17672 :                     self.put_rel_drop(modification, rel, ctx).await?;
    1152           49384 :                 }
    1153                 :             }
    1154                 :         }
    1155         1950130 :         Ok(())
    1156         1950130 :     }
    1157                 : 
    1158               1 :     async fn ingest_clog_truncate_record(
    1159               1 :         &mut self,
    1160               1 :         modification: &mut DatadirModification<'_>,
    1161               1 :         xlrec: &XlClogTruncate,
    1162               1 :         ctx: &RequestContext,
    1163               1 :     ) -> anyhow::Result<()> {
    1164               1 :         info!(
    1165               1 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
    1166               1 :             xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
    1167               1 :         );
    1168                 : 
    1169                 :         // Here we treat oldestXid and oldestXidDB
    1170                 :         // differently from postgres redo routines.
    1171                 :         // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
    1172                 :         // until checkpoint happens and updates the value.
    1173                 :         // Here we can use the most recent value.
    1174                 :         // It's just an optimization, though and can be deleted.
    1175                 :         // TODO Figure out if there will be any issues with replica.
    1176               1 :         self.checkpoint.oldestXid = xlrec.oldest_xid;
    1177               1 :         self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
    1178               1 :         self.checkpoint_modified = true;
    1179               1 : 
    1180               1 :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
    1181               1 : 
    1182               1 :         let latest_page_number =
    1183               1 :             self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
    1184               1 : 
    1185               1 :         // Now delete all segments containing pages between xlrec.pageno
    1186               1 :         // and latest_page_number.
    1187               1 : 
    1188               1 :         // First, make an important safety check:
    1189               1 :         // the current endpoint page must not be eligible for removal.
    1190               1 :         // See SimpleLruTruncate() in slru.c
    1191               1 :         if clogpage_precedes(latest_page_number, xlrec.pageno) {
    1192 UBC           0 :             info!("could not truncate directory pg_xact apparent wraparound");
    1193               0 :             return Ok(());
    1194 CBC           1 :         }
    1195               1 : 
    1196               1 :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
    1197               1 :         //
    1198               1 :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
    1199               1 :         // will block waiting for the last valid LSN to advance up to
    1200               1 :         // it. So we use the previous record's LSN in the get calls
    1201               1 :         // instead.
    1202               1 :         let req_lsn = modification.tline.get_last_record_lsn();
    1203              10 :         for segno in modification
    1204               1 :             .tline
    1205               1 :             .list_slru_segments(SlruKind::Clog, req_lsn, ctx)
    1206 UBC           0 :             .await?
    1207                 :         {
    1208 CBC          10 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
    1209              10 :             if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
    1210               9 :                 modification
    1211               9 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
    1212 UBC           0 :                     .await?;
    1213               0 :                 trace!("Drop CLOG segment {:>04X}", segno);
    1214 CBC           1 :             }
    1215                 :         }
    1216                 : 
    1217               1 :         Ok(())
    1218               1 :     }
    1219                 : 
    1220           24027 :     fn ingest_multixact_create_record(
    1221           24027 :         &mut self,
    1222           24027 :         modification: &mut DatadirModification,
    1223           24027 :         xlrec: &XlMultiXactCreate,
    1224           24027 :     ) -> Result<()> {
    1225           24027 :         // Create WAL record for updating the multixact-offsets page
    1226           24027 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1227           24027 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1228           24027 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1229           24027 : 
    1230           24027 :         modification.put_slru_wal_record(
    1231           24027 :             SlruKind::MultiXactOffsets,
    1232           24027 :             segno,
    1233           24027 :             rpageno,
    1234           24027 :             NeonWalRecord::MultixactOffsetCreate {
    1235           24027 :                 mid: xlrec.mid,
    1236           24027 :                 moff: xlrec.moff,
    1237           24027 :             },
    1238           24027 :         )?;
    1239                 : 
    1240                 :         // Create WAL records for the update of each affected multixact-members page
    1241           24027 :         let mut members = xlrec.members.iter();
    1242           24027 :         let mut offset = xlrec.moff;
    1243                 :         loop {
    1244           48328 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1245           48328 : 
    1246           48328 :             // How many members fit on this page?
    1247           48328 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1248           48328 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1249           48328 : 
    1250           48328 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1251           48328 :             for _ in 0..page_remain {
    1252          520988 :                 if let Some(m) = members.next() {
    1253          472948 :                     this_page_members.push(m.clone());
    1254          472948 :                 } else {
    1255           48040 :                     break;
    1256                 :                 }
    1257                 :             }
    1258           48328 :             if this_page_members.is_empty() {
    1259                 :                 // all done
    1260           24027 :                 break;
    1261           24301 :             }
    1262           24301 :             let n_this_page = this_page_members.len();
    1263           24301 : 
    1264           24301 :             modification.put_slru_wal_record(
    1265           24301 :                 SlruKind::MultiXactMembers,
    1266           24301 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1267           24301 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1268           24301 :                 NeonWalRecord::MultixactMembersCreate {
    1269           24301 :                     moff: offset,
    1270           24301 :                     members: this_page_members,
    1271           24301 :                 },
    1272           24301 :             )?;
    1273                 : 
    1274                 :             // Note: The multixact members can wrap around, even within one WAL record.
    1275           24301 :             offset = offset.wrapping_add(n_this_page as u32);
    1276                 :         }
    1277           24027 :         if xlrec.mid >= self.checkpoint.nextMulti {
    1278           24027 :             self.checkpoint.nextMulti = xlrec.mid + 1;
    1279           24027 :             self.checkpoint_modified = true;
    1280           24027 :         }
    1281           24027 :         if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
    1282           24027 :             self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
    1283           24027 :             self.checkpoint_modified = true;
    1284           24027 :         }
    1285          472948 :         let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
    1286          472948 :             if mbr.xid.wrapping_sub(acc) as i32 > 0 {
    1287          472903 :                 mbr.xid
    1288                 :             } else {
    1289              45 :                 acc
    1290                 :             }
    1291          472948 :         });
    1292           24027 : 
    1293           24027 :         if self.checkpoint.update_next_xid(max_mbr_xid) {
    1294 UBC           0 :             self.checkpoint_modified = true;
    1295 CBC       24027 :         }
    1296           24027 :         Ok(())
    1297           24027 :     }
    1298                 : 
    1299 UBC           0 :     async fn ingest_multixact_truncate_record(
    1300               0 :         &mut self,
    1301               0 :         modification: &mut DatadirModification<'_>,
    1302               0 :         xlrec: &XlMultiXactTruncate,
    1303               0 :         ctx: &RequestContext,
    1304               0 :     ) -> Result<()> {
    1305               0 :         self.checkpoint.oldestMulti = xlrec.end_trunc_off;
    1306               0 :         self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
    1307               0 :         self.checkpoint_modified = true;
    1308               0 : 
    1309               0 :         // PerformMembersTruncation
    1310               0 :         let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
    1311               0 :         let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1312               0 :         let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1313               0 :         let mut segment: i32 = startsegment;
    1314                 : 
    1315                 :         // Delete all the segments except the last one. The last segment can still
    1316                 :         // contain, possibly partially, valid data.
    1317               0 :         while segment != endsegment {
    1318               0 :             modification
    1319               0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1320               0 :                 .await?;
    1321                 : 
    1322                 :             /* move to next segment, handling wraparound correctly */
    1323               0 :             if segment == maxsegment {
    1324               0 :                 segment = 0;
    1325               0 :             } else {
    1326               0 :                 segment += 1;
    1327               0 :             }
    1328                 :         }
    1329                 : 
    1330                 :         // Truncate offsets
    1331                 :         // FIXME: this did not handle wraparound correctly
    1332                 : 
    1333               0 :         Ok(())
    1334               0 :     }
    1335                 : 
    1336 CBC          54 :     async fn ingest_relmap_page(
    1337              54 :         &mut self,
    1338              54 :         modification: &mut DatadirModification<'_>,
    1339              54 :         xlrec: &XlRelmapUpdate,
    1340              54 :         decoded: &DecodedWALRecord,
    1341              54 :         ctx: &RequestContext,
    1342              54 :     ) -> Result<()> {
    1343              54 :         let mut buf = decoded.record.clone();
    1344              54 :         buf.advance(decoded.main_data_offset);
    1345              54 :         // skip xl_relmap_update
    1346              54 :         buf.advance(12);
    1347              54 : 
    1348              54 :         modification
    1349              54 :             .put_relmap_file(
    1350              54 :                 xlrec.tsid,
    1351              54 :                 xlrec.dbid,
    1352              54 :                 Bytes::copy_from_slice(&buf[..]),
    1353              54 :                 ctx,
    1354              54 :             )
    1355               5 :             .await
    1356              54 :     }
    1357                 : 
    1358           21618 :     async fn put_rel_creation(
    1359           21618 :         &mut self,
    1360           21618 :         modification: &mut DatadirModification<'_>,
    1361           21618 :         rel: RelTag,
    1362           21618 :         ctx: &RequestContext,
    1363           21618 :     ) -> Result<()> {
    1364           21618 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1365           21618 :         Ok(())
    1366           21618 :     }
    1367                 : 
    1368          269468 :     async fn put_rel_page_image(
    1369          269468 :         &mut self,
    1370          269468 :         modification: &mut DatadirModification<'_>,
    1371          269468 :         rel: RelTag,
    1372          269468 :         blknum: BlockNumber,
    1373          269468 :         img: Bytes,
    1374          269468 :         ctx: &RequestContext,
    1375          269468 :     ) -> Result<(), PageReconstructError> {
    1376          269468 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1377            6219 :             .await?;
    1378          269468 :         modification.put_rel_page_image(rel, blknum, img)?;
    1379          269468 :         Ok(())
    1380          269468 :     }
    1381                 : 
    1382        70051924 :     async fn put_rel_wal_record(
    1383        70051924 :         &mut self,
    1384        70051924 :         modification: &mut DatadirModification<'_>,
    1385        70051924 :         rel: RelTag,
    1386        70051924 :         blknum: BlockNumber,
    1387        70051924 :         rec: NeonWalRecord,
    1388        70051924 :         ctx: &RequestContext,
    1389        70051924 :     ) -> Result<()> {
    1390        70051738 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1391           20399 :             .await?;
    1392        70051738 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1393        70051738 :         Ok(())
    1394        70051738 :     }
    1395                 : 
    1396            3094 :     async fn put_rel_truncation(
    1397            3094 :         &mut self,
    1398            3094 :         modification: &mut DatadirModification<'_>,
    1399            3094 :         rel: RelTag,
    1400            3094 :         nblocks: BlockNumber,
    1401            3094 :         ctx: &RequestContext,
    1402            3094 :     ) -> anyhow::Result<()> {
    1403            3094 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1404            3094 :         Ok(())
    1405            3094 :     }
    1406                 : 
    1407           17673 :     async fn put_rel_drop(
    1408           17673 :         &mut self,
    1409           17673 :         modification: &mut DatadirModification<'_>,
    1410           17673 :         rel: RelTag,
    1411           17673 :         ctx: &RequestContext,
    1412           17673 :     ) -> Result<()> {
    1413           17673 :         modification.put_rel_drop(rel, ctx).await?;
    1414           17673 :         Ok(())
    1415           17673 :     }
    1416                 : 
    1417          128660 :     async fn get_relsize(
    1418          128660 :         &mut self,
    1419          128660 :         rel: RelTag,
    1420          128660 :         lsn: Lsn,
    1421          128660 :         ctx: &RequestContext,
    1422          128660 :     ) -> anyhow::Result<BlockNumber> {
    1423          128660 :         let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true, ctx).await? {
    1424 LBC         (4) :             0
    1425                 :         } else {
    1426 CBC      128660 :             self.timeline.get_rel_size(rel, lsn, true, ctx).await?
    1427                 :         };
    1428          128660 :         Ok(nblocks)
    1429          128660 :     }
    1430                 : 
    1431        70321392 :     async fn handle_rel_extend(
    1432        70321392 :         &mut self,
    1433        70321392 :         modification: &mut DatadirModification<'_>,
    1434        70321392 :         rel: RelTag,
    1435        70321392 :         blknum: BlockNumber,
    1436        70321392 :         ctx: &RequestContext,
    1437        70321392 :     ) -> Result<(), PageReconstructError> {
    1438        70321206 :         let new_nblocks = blknum + 1;
    1439        70321206 :         // Check if the relation exists. We implicitly create relations on first
    1440        70321206 :         // record.
    1441        70321206 :         // TODO: would be nice if to be more explicit about it
    1442        70321206 :         let last_lsn = modification.lsn;
    1443        70321206 :         let old_nblocks = if !self
    1444        70321206 :             .timeline
    1445        70321206 :             .get_rel_exists(rel, last_lsn, true, ctx)
    1446              82 :             .await?
    1447                 :         {
    1448                 :             // create it with 0 size initially, the logic below will extend it
    1449            2057 :             modification
    1450            2057 :                 .put_rel_creation(rel, 0, ctx)
    1451             368 :                 .await
    1452            2057 :                 .context("Relation Error")?;
    1453            2057 :             0
    1454                 :         } else {
    1455        70319149 :             self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?
    1456                 :         };
    1457                 : 
    1458        70321206 :         if new_nblocks > old_nblocks {
    1459                 :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1460         1026813 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1461                 : 
    1462                 :             // fill the gap with zeros
    1463         1026813 :             for gap_blknum in old_nblocks..blknum {
    1464          232812 :                 modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
    1465                 :             }
    1466        69294393 :         }
    1467        70321206 :         Ok(())
    1468        70321206 :     }
    1469                 : 
    1470             667 :     async fn put_slru_page_image(
    1471             667 :         &mut self,
    1472             667 :         modification: &mut DatadirModification<'_>,
    1473             667 :         kind: SlruKind,
    1474             667 :         segno: u32,
    1475             667 :         blknum: BlockNumber,
    1476             667 :         img: Bytes,
    1477             667 :         ctx: &RequestContext,
    1478             667 :     ) -> Result<()> {
    1479             667 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1480              31 :             .await?;
    1481             667 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1482             667 :         Ok(())
    1483             667 :     }
    1484                 : 
    1485             667 :     async fn handle_slru_extend(
    1486             667 :         &mut self,
    1487             667 :         modification: &mut DatadirModification<'_>,
    1488             667 :         kind: SlruKind,
    1489             667 :         segno: u32,
    1490             667 :         blknum: BlockNumber,
    1491             667 :         ctx: &RequestContext,
    1492             667 :     ) -> anyhow::Result<()> {
    1493             667 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1494             667 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1495             667 :         // a lot less frequently.
    1496             667 : 
    1497             667 :         let new_nblocks = blknum + 1;
    1498             667 :         // Check if the relation exists. We implicitly create relations on first
    1499             667 :         // record.
    1500             667 :         // TODO: would be nice if to be more explicit about it
    1501             667 :         let last_lsn = self.timeline.get_last_record_lsn();
    1502             667 :         let old_nblocks = if !self
    1503             667 :             .timeline
    1504             667 :             .get_slru_segment_exists(kind, segno, last_lsn, ctx)
    1505              17 :             .await?
    1506                 :         {
    1507                 :             // create it with 0 size initially, the logic below will extend it
    1508              18 :             modification
    1509              18 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1510 UBC           0 :                 .await?;
    1511 CBC          18 :             0
    1512                 :         } else {
    1513             649 :             self.timeline
    1514             649 :                 .get_slru_segment_size(kind, segno, last_lsn, ctx)
    1515              14 :                 .await?
    1516                 :         };
    1517                 : 
    1518             667 :         if new_nblocks > old_nblocks {
    1519 UBC           0 :             trace!(
    1520               0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1521               0 :                 kind,
    1522               0 :                 segno,
    1523               0 :                 old_nblocks,
    1524               0 :                 new_nblocks
    1525               0 :             );
    1526 CBC         661 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1527                 : 
    1528                 :             // fill the gap with zeros
    1529             661 :             for gap_blknum in old_nblocks..blknum {
    1530 UBC           0 :                 modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
    1531                 :             }
    1532 CBC           6 :         }
    1533             667 :         Ok(())
    1534             667 :     }
    1535                 : }
    1536                 : 
    1537                 : #[allow(clippy::bool_assert_comparison)]
    1538                 : #[cfg(test)]
    1539                 : mod tests {
    1540                 :     use super::*;
    1541                 :     use crate::tenant::harness::*;
    1542                 :     use crate::tenant::Timeline;
    1543                 :     use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
    1544                 :     use postgres_ffi::RELSEG_SIZE;
    1545                 : 
    1546                 :     use crate::DEFAULT_PG_VERSION;
    1547                 : 
    1548                 :     /// Arbitrary relation tag, for testing.
    1549                 :     const TESTREL_A: RelTag = RelTag {
    1550                 :         spcnode: 0,
    1551                 :         dbnode: 111,
    1552                 :         relnode: 1000,
    1553                 :         forknum: 0,
    1554                 :     };
    1555                 : 
    1556               6 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1557               6 :         // TODO
    1558               6 :     }
    1559                 : 
    1560                 :     static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
    1561                 : 
    1562               4 :     async fn init_walingest_test<'a>(
    1563               4 :         tline: &'a Timeline,
    1564               4 :         ctx: &RequestContext,
    1565               4 :     ) -> Result<WalIngest<'a>> {
    1566               4 :         let mut m = tline.begin_modification(Lsn(0x10));
    1567               4 :         m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1568               4 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1569               4 :         m.commit(ctx).await?;
    1570               4 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1571                 : 
    1572               4 :         Ok(walingest)
    1573               4 :     }
    1574                 : 
    1575               1 :     #[tokio::test]
    1576               1 :     async fn test_relsize() -> Result<()> {
    1577               1 :         let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
    1578               1 :         let tline = tenant
    1579               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1580               2 :             .await?;
    1581               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1582                 : 
    1583               1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1584               1 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1585               1 :         walingest
    1586               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1587 UBC           0 :             .await?;
    1588 CBC           1 :         m.commit(&ctx).await?;
    1589               1 :         let mut m = tline.begin_modification(Lsn(0x30));
    1590               1 :         walingest
    1591               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
    1592 UBC           0 :             .await?;
    1593 CBC           1 :         m.commit(&ctx).await?;
    1594               1 :         let mut m = tline.begin_modification(Lsn(0x40));
    1595               1 :         walingest
    1596               1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
    1597 UBC           0 :             .await?;
    1598 CBC           1 :         m.commit(&ctx).await?;
    1599               1 :         let mut m = tline.begin_modification(Lsn(0x50));
    1600               1 :         walingest
    1601               1 :             .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
    1602 UBC           0 :             .await?;
    1603 CBC           1 :         m.commit(&ctx).await?;
    1604                 : 
    1605               1 :         assert_current_logical_size(&tline, Lsn(0x50));
    1606                 : 
    1607                 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1608               1 :         assert_eq!(
    1609               1 :             tline
    1610               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
    1611 UBC           0 :                 .await?,
    1612                 :             false
    1613                 :         );
    1614 CBC           1 :         assert!(tline
    1615               1 :             .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
    1616 UBC           0 :             .await
    1617 CBC           1 :             .is_err());
    1618               1 :         assert_eq!(
    1619               1 :             tline
    1620               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
    1621 UBC           0 :                 .await?,
    1622                 :             true
    1623                 :         );
    1624 CBC           1 :         assert_eq!(
    1625               1 :             tline
    1626               1 :                 .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
    1627 UBC           0 :                 .await?,
    1628                 :             1
    1629                 :         );
    1630 CBC           1 :         assert_eq!(
    1631               1 :             tline
    1632               1 :                 .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
    1633 UBC           0 :                 .await?,
    1634                 :             3
    1635                 :         );
    1636                 : 
    1637                 :         // Check page contents at each LSN
    1638 CBC           1 :         assert_eq!(
    1639               1 :             tline
    1640               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false, &ctx)
    1641 UBC           0 :                 .await?,
    1642 CBC           1 :             TEST_IMG("foo blk 0 at 2")
    1643                 :         );
    1644                 : 
    1645               1 :         assert_eq!(
    1646               1 :             tline
    1647               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false, &ctx)
    1648 UBC           0 :                 .await?,
    1649 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1650                 :         );
    1651                 : 
    1652               1 :         assert_eq!(
    1653               1 :             tline
    1654               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false, &ctx)
    1655 UBC           0 :                 .await?,
    1656 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1657                 :         );
    1658               1 :         assert_eq!(
    1659               1 :             tline
    1660               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false, &ctx)
    1661 UBC           0 :                 .await?,
    1662 CBC           1 :             TEST_IMG("foo blk 1 at 4")
    1663                 :         );
    1664                 : 
    1665               1 :         assert_eq!(
    1666               1 :             tline
    1667               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false, &ctx)
    1668 UBC           0 :                 .await?,
    1669 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1670                 :         );
    1671               1 :         assert_eq!(
    1672               1 :             tline
    1673               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false, &ctx)
    1674 UBC           0 :                 .await?,
    1675 CBC           1 :             TEST_IMG("foo blk 1 at 4")
    1676                 :         );
    1677               1 :         assert_eq!(
    1678               1 :             tline
    1679               1 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
    1680 UBC           0 :                 .await?,
    1681 CBC           1 :             TEST_IMG("foo blk 2 at 5")
    1682                 :         );
    1683                 : 
    1684                 :         // Truncate last block
    1685               1 :         let mut m = tline.begin_modification(Lsn(0x60));
    1686               1 :         walingest
    1687               1 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1688 UBC           0 :             .await?;
    1689 CBC           1 :         m.commit(&ctx).await?;
    1690               1 :         assert_current_logical_size(&tline, Lsn(0x60));
    1691                 : 
    1692                 :         // Check reported size and contents after truncation
    1693               1 :         assert_eq!(
    1694               1 :             tline
    1695               1 :                 .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
    1696 UBC           0 :                 .await?,
    1697                 :             2
    1698                 :         );
    1699 CBC           1 :         assert_eq!(
    1700               1 :             tline
    1701               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false, &ctx)
    1702 UBC           0 :                 .await?,
    1703 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1704                 :         );
    1705               1 :         assert_eq!(
    1706               1 :             tline
    1707               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false, &ctx)
    1708 UBC           0 :                 .await?,
    1709 CBC           1 :             TEST_IMG("foo blk 1 at 4")
    1710                 :         );
    1711                 : 
    1712                 :         // should still see the truncated block with older LSN
    1713               1 :         assert_eq!(
    1714               1 :             tline
    1715               1 :                 .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
    1716 UBC           0 :                 .await?,
    1717                 :             3
    1718                 :         );
    1719 CBC           1 :         assert_eq!(
    1720               1 :             tline
    1721               1 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
    1722 UBC           0 :                 .await?,
    1723 CBC           1 :             TEST_IMG("foo blk 2 at 5")
    1724                 :         );
    1725                 : 
    1726                 :         // Truncate to zero length
    1727               1 :         let mut m = tline.begin_modification(Lsn(0x68));
    1728               1 :         walingest
    1729               1 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1730 UBC           0 :             .await?;
    1731 CBC           1 :         m.commit(&ctx).await?;
    1732               1 :         assert_eq!(
    1733               1 :             tline
    1734               1 :                 .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
    1735 UBC           0 :                 .await?,
    1736                 :             0
    1737                 :         );
    1738                 : 
    1739                 :         // Extend from 0 to 2 blocks, leaving a gap
    1740 CBC           1 :         let mut m = tline.begin_modification(Lsn(0x70));
    1741               1 :         walingest
    1742               1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
    1743               1 :             .await?;
    1744               1 :         m.commit(&ctx).await?;
    1745               1 :         assert_eq!(
    1746               1 :             tline
    1747               1 :                 .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
    1748 UBC           0 :                 .await?,
    1749                 :             2
    1750                 :         );
    1751 CBC           1 :         assert_eq!(
    1752               1 :             tline
    1753               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false, &ctx)
    1754 UBC           0 :                 .await?,
    1755 CBC           1 :             ZERO_PAGE
    1756                 :         );
    1757               1 :         assert_eq!(
    1758               1 :             tline
    1759               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false, &ctx)
    1760 UBC           0 :                 .await?,
    1761 CBC           1 :             TEST_IMG("foo blk 1")
    1762                 :         );
    1763                 : 
    1764                 :         // Extend a lot more, leaving a big gap that spans across segments
    1765               1 :         let mut m = tline.begin_modification(Lsn(0x80));
    1766               1 :         walingest
    1767               1 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
    1768 UBC           0 :             .await?;
    1769 CBC          35 :         m.commit(&ctx).await?;
    1770               1 :         assert_eq!(
    1771               1 :             tline
    1772               1 :                 .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
    1773 UBC           0 :                 .await?,
    1774                 :             1501
    1775                 :         );
    1776 CBC        1499 :         for blk in 2..1500 {
    1777            1498 :             assert_eq!(
    1778            1498 :                 tline
    1779            1498 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false, &ctx)
    1780              59 :                     .await?,
    1781            1498 :                 ZERO_PAGE
    1782                 :             );
    1783                 :         }
    1784               1 :         assert_eq!(
    1785               1 :             tline
    1786               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false, &ctx)
    1787 UBC           0 :                 .await?,
    1788 CBC           1 :             TEST_IMG("foo blk 1500")
    1789                 :         );
    1790                 : 
    1791               1 :         Ok(())
    1792                 :     }
    1793                 : 
    1794                 :     // Test what happens if we dropped a relation
    1795                 :     // and then created it again within the same layer.
    1796               1 :     #[tokio::test]
    1797               1 :     async fn test_drop_extend() -> Result<()> {
    1798               1 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
    1799               1 :         let tline = tenant
    1800               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1801               2 :             .await?;
    1802               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1803                 : 
    1804               1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1805               1 :         walingest
    1806               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1807 UBC           0 :             .await?;
    1808 CBC           1 :         m.commit(&ctx).await?;
    1809                 : 
    1810                 :         // Check that rel exists and size is correct
    1811               1 :         assert_eq!(
    1812               1 :             tline
    1813               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
    1814 UBC           0 :                 .await?,
    1815                 :             true
    1816                 :         );
    1817 CBC           1 :         assert_eq!(
    1818               1 :             tline
    1819               1 :                 .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
    1820 UBC           0 :                 .await?,
    1821                 :             1
    1822                 :         );
    1823                 : 
    1824                 :         // Drop rel
    1825 CBC           1 :         let mut m = tline.begin_modification(Lsn(0x30));
    1826               1 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    1827               1 :         m.commit(&ctx).await?;
    1828                 : 
    1829                 :         // Check that rel is not visible anymore
    1830               1 :         assert_eq!(
    1831               1 :             tline
    1832               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x30), false, &ctx)
    1833 UBC           0 :                 .await?,
    1834                 :             false
    1835                 :         );
    1836                 : 
    1837                 :         // FIXME: should fail
    1838                 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    1839                 : 
    1840                 :         // Re-create it
    1841 CBC           1 :         let mut m = tline.begin_modification(Lsn(0x40));
    1842               1 :         walingest
    1843               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
    1844 UBC           0 :             .await?;
    1845 CBC           1 :         m.commit(&ctx).await?;
    1846                 : 
    1847                 :         // Check that rel exists and size is correct
    1848               1 :         assert_eq!(
    1849               1 :             tline
    1850               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x40), false, &ctx)
    1851 UBC           0 :                 .await?,
    1852                 :             true
    1853                 :         );
    1854 CBC           1 :         assert_eq!(
    1855               1 :             tline
    1856               1 :                 .get_rel_size(TESTREL_A, Lsn(0x40), false, &ctx)
    1857 UBC           0 :                 .await?,
    1858                 :             1
    1859                 :         );
    1860                 : 
    1861 CBC           1 :         Ok(())
    1862                 :     }
    1863                 : 
    1864                 :     // Test what happens if we truncated a relation
    1865                 :     // so that one of its segments was dropped
    1866                 :     // and then extended it again within the same layer.
    1867               1 :     #[tokio::test]
    1868               1 :     async fn test_truncate_extend() -> Result<()> {
    1869               1 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
    1870               1 :         let tline = tenant
    1871               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1872               2 :             .await?;
    1873               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1874                 : 
    1875                 :         // Create a 20 MB relation (the size is arbitrary)
    1876               1 :         let relsize = 20 * 1024 * 1024 / 8192;
    1877               1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1878            2560 :         for blkno in 0..relsize {
    1879            2560 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    1880            2560 :             walingest
    1881            2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    1882 UBC           0 :                 .await?;
    1883                 :         }
    1884 CBC          40 :         m.commit(&ctx).await?;
    1885                 : 
    1886                 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    1887               1 :         assert_eq!(
    1888               1 :             tline
    1889               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
    1890 UBC           0 :                 .await?,
    1891                 :             false
    1892                 :         );
    1893 CBC           1 :         assert!(tline
    1894               1 :             .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
    1895 UBC           0 :             .await
    1896 CBC           1 :             .is_err());
    1897                 : 
    1898               1 :         assert_eq!(
    1899               1 :             tline
    1900               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
    1901 UBC           0 :                 .await?,
    1902                 :             true
    1903                 :         );
    1904 CBC           1 :         assert_eq!(
    1905               1 :             tline
    1906               1 :                 .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
    1907 UBC           0 :                 .await?,
    1908                 :             relsize
    1909                 :         );
    1910                 : 
    1911                 :         // Check relation content
    1912 CBC        2560 :         for blkno in 0..relsize {
    1913            2560 :             let lsn = Lsn(0x20);
    1914            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1915            2560 :             assert_eq!(
    1916            2560 :                 tline
    1917            2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false, &ctx)
    1918              92 :                     .await?,
    1919            2560 :                 TEST_IMG(&data)
    1920                 :             );
    1921                 :         }
    1922                 : 
    1923                 :         // Truncate relation so that second segment was dropped
    1924                 :         // - only leave one page
    1925               1 :         let mut m = tline.begin_modification(Lsn(0x60));
    1926               1 :         walingest
    1927               1 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    1928 UBC           0 :             .await?;
    1929 CBC           1 :         m.commit(&ctx).await?;
    1930                 : 
    1931                 :         // Check reported size and contents after truncation
    1932               1 :         assert_eq!(
    1933               1 :             tline
    1934               1 :                 .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
    1935 UBC           0 :                 .await?,
    1936                 :             1
    1937                 :         );
    1938                 : 
    1939 CBC           2 :         for blkno in 0..1 {
    1940               1 :             let lsn = Lsn(0x20);
    1941               1 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1942               1 :             assert_eq!(
    1943               1 :                 tline
    1944               1 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false, &ctx)
    1945 UBC           0 :                     .await?,
    1946 CBC           1 :                 TEST_IMG(&data)
    1947                 :             );
    1948                 :         }
    1949                 : 
    1950                 :         // should still see all blocks with older LSN
    1951               1 :         assert_eq!(
    1952               1 :             tline
    1953               1 :                 .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
    1954 UBC           0 :                 .await?,
    1955                 :             relsize
    1956                 :         );
    1957 CBC        2560 :         for blkno in 0..relsize {
    1958            2560 :             let lsn = Lsn(0x20);
    1959            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1960            2560 :             assert_eq!(
    1961            2560 :                 tline
    1962            2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false, &ctx)
    1963             195 :                     .await?,
    1964            2560 :                 TEST_IMG(&data)
    1965                 :             );
    1966                 :         }
    1967                 : 
    1968                 :         // Extend relation again.
    1969                 :         // Add enough blocks to create second segment
    1970               1 :         let lsn = Lsn(0x80);
    1971               1 :         let mut m = tline.begin_modification(lsn);
    1972            2560 :         for blkno in 0..relsize {
    1973            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1974            2560 :             walingest
    1975            2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    1976 UBC           0 :                 .await?;
    1977                 :         }
    1978 CBC          40 :         m.commit(&ctx).await?;
    1979                 : 
    1980               1 :         assert_eq!(
    1981               1 :             tline
    1982               1 :                 .get_rel_exists(TESTREL_A, Lsn(0x80), false, &ctx)
    1983 UBC           0 :                 .await?,
    1984                 :             true
    1985                 :         );
    1986 CBC           1 :         assert_eq!(
    1987               1 :             tline
    1988               1 :                 .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
    1989 UBC           0 :                 .await?,
    1990                 :             relsize
    1991                 :         );
    1992                 :         // Check relation content
    1993 CBC        2560 :         for blkno in 0..relsize {
    1994            2560 :             let lsn = Lsn(0x80);
    1995            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1996            2560 :             assert_eq!(
    1997            2560 :                 tline
    1998            2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false, &ctx)
    1999              98 :                     .await?,
    2000            2560 :                 TEST_IMG(&data)
    2001                 :             );
    2002                 :         }
    2003                 : 
    2004               1 :         Ok(())
    2005                 :     }
    2006                 : 
    2007                 :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2008                 :     /// split into multiple 1 GB segments in Postgres.
    2009               1 :     #[tokio::test]
    2010               1 :     async fn test_large_rel() -> Result<()> {
    2011               1 :         let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
    2012               1 :         let tline = tenant
    2013               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2014               2 :             .await?;
    2015               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2016                 : 
    2017               1 :         let mut lsn = 0x10;
    2018          131073 :         for blknum in 0..RELSEG_SIZE + 1 {
    2019          131073 :             lsn += 0x10;
    2020          131073 :             let mut m = tline.begin_modification(Lsn(lsn));
    2021          131073 :             let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2022          131073 :             walingest
    2023          131073 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2024            2958 :                 .await?;
    2025          131073 :             m.commit(&ctx).await?;
    2026                 :         }
    2027                 : 
    2028               1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2029                 : 
    2030               1 :         assert_eq!(
    2031               1 :             tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    2032               1 :             RELSEG_SIZE + 1
    2033                 :         );
    2034                 : 
    2035                 :         // Truncate one block
    2036               1 :         lsn += 0x10;
    2037               1 :         let mut m = tline.begin_modification(Lsn(lsn));
    2038               1 :         walingest
    2039               1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2040 UBC           0 :             .await?;
    2041 CBC           1 :         m.commit(&ctx).await?;
    2042               1 :         assert_eq!(
    2043               1 :             tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    2044                 :             RELSEG_SIZE
    2045                 :         );
    2046               1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2047               1 : 
    2048               1 :         // Truncate another block
    2049               1 :         lsn += 0x10;
    2050               1 :         let mut m = tline.begin_modification(Lsn(lsn));
    2051               1 :         walingest
    2052               1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2053 UBC           0 :             .await?;
    2054 CBC           1 :         m.commit(&ctx).await?;
    2055               1 :         assert_eq!(
    2056               1 :             tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    2057               1 :             RELSEG_SIZE - 1
    2058                 :         );
    2059               1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2060               1 : 
    2061               1 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2062               1 :         // This tests the behavior at segment boundaries
    2063               1 :         let mut size: i32 = 3000;
    2064            3002 :         while size >= 0 {
    2065            3001 :             lsn += 0x10;
    2066            3001 :             let mut m = tline.begin_modification(Lsn(lsn));
    2067            3001 :             walingest
    2068            3001 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2069              72 :                 .await?;
    2070            3001 :             m.commit(&ctx).await?;
    2071            3001 :             assert_eq!(
    2072            3001 :                 tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    2073            3001 :                 size as BlockNumber
    2074                 :             );
    2075                 : 
    2076            3001 :             size -= 1;
    2077                 :         }
    2078               1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2079               1 : 
    2080               1 :         Ok(())
    2081                 :     }
    2082                 : }
        

Generated by: LCOV version 2.1-beta