LCOV - code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 86.6 % 1288 1116
Test Date: 2023-09-06 10:18:01 Functions: 75.0 % 72 54

            Line data    Source code
       1              : //!
       2              : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3              : //!
       4              : //! The pipeline for ingesting WAL looks like this:
       5              : //!
       6              : //! WAL receiver  ->   WalIngest  ->   Repository
       7              : //!
       8              : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9              : //! and decodes it to individual WAL records. It feeds the WAL records
      10              : //! to WalIngest, which parses them and stores them in the Repository.
      11              : //!
      12              : //! The neon Repository can store page versions in two formats: as
      13              : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14              : //! page images out of some WAL records, but most it stores as WAL
      15              : //! records. If a WAL record modifies multiple pages, WalIngest
      16              : //! will call Repository::put_wal_record or put_page_image functions
      17              : //! separately for each modified page.
      18              : //!
      19              : //! To reconstruct a page using a WAL record, the Repository calls the
      20              : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21              : //! redo Postgres process, but some records it can handle directly with
      22              : //! bespoken Rust code.
      23              : 
      24              : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
      25              : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
      26              : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      27              : 
      28              : use anyhow::{Context, Result};
      29              : use bytes::{Buf, Bytes, BytesMut};
      30              : use tracing::*;
      31              : 
      32              : use crate::context::RequestContext;
      33              : use crate::pgdatadir_mapping::*;
      34              : use crate::tenant::PageReconstructError;
      35              : use crate::tenant::Timeline;
      36              : use crate::walrecord::*;
      37              : use crate::ZERO_PAGE;
      38              : use pageserver_api::reltag::{RelTag, SlruKind};
      39              : use postgres_ffi::pg_constants;
      40              : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      41              : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
      42              : use postgres_ffi::v14::xlog_utils::*;
      43              : use postgres_ffi::v14::CheckPoint;
      44              : use postgres_ffi::TransactionId;
      45              : use postgres_ffi::BLCKSZ;
      46              : use utils::lsn::Lsn;
      47              : 
      48              : pub struct WalIngest<'a> {
      49              :     timeline: &'a Timeline,
      50              : 
      51              :     checkpoint: CheckPoint,
      52              :     checkpoint_modified: bool,
      53              : }
      54              : 
      55              : impl<'a> WalIngest<'a> {
      56         1399 :     pub async fn new(
      57         1399 :         timeline: &'a Timeline,
      58         1399 :         startpoint: Lsn,
      59         1399 :         ctx: &'_ RequestContext,
      60         1399 :     ) -> anyhow::Result<WalIngest<'a>> {
      61              :         // Fetch the latest checkpoint into memory, so that we can compare with it
      62              :         // quickly in `ingest_record` and update it when it changes.
      63         1399 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
      64         1389 :         let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
      65            0 :         trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
      66              : 
      67         1389 :         Ok(WalIngest {
      68         1389 :             timeline,
      69         1389 :             checkpoint,
      70         1389 :             checkpoint_modified: false,
      71         1389 :         })
      72         1399 :     }
      73              : 
      74              :     ///
      75              :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
      76              :     ///
      77              :     /// This function updates `lsn` field of `DatadirModification`
      78              :     ///
      79              :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
      80              :     /// relations/pages that the record affects.
      81              :     ///
      82     73526651 :     pub async fn ingest_record(
      83     73526651 :         &mut self,
      84     73526651 :         recdata: Bytes,
      85     73526651 :         lsn: Lsn,
      86     73526651 :         modification: &mut DatadirModification<'_>,
      87     73526651 :         decoded: &mut DecodedWALRecord,
      88     73526651 :         ctx: &RequestContext,
      89     73526768 :     ) -> anyhow::Result<()> {
      90     73526768 :         modification.lsn = lsn;
      91     73526768 :         decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
      92              : 
      93     73526768 :         let mut buf = decoded.record.clone();
      94     73526768 :         buf.advance(decoded.main_data_offset);
      95     73526768 : 
      96     73526768 :         assert!(!self.checkpoint_modified);
      97     73526768 :         if self.checkpoint.update_next_xid(decoded.xl_xid) {
      98         3936 :             self.checkpoint_modified = true;
      99     73522832 :         }
     100              : 
     101              :         // Heap AM records need some special handling, because they modify VM pages
     102              :         // without registering them with the standard mechanism.
     103     73526768 :         if decoded.xl_rmid == pg_constants::RM_HEAP_ID
     104     15667578 :             || decoded.xl_rmid == pg_constants::RM_HEAP2_ID
     105              :         {
     106     59861282 :             self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
     107           21 :                 .await?;
     108     13665486 :         }
     109              :         // Handle other special record types
     110     73526768 :         if decoded.xl_rmid == pg_constants::RM_SMGR_ID
     111        21995 :             && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     112        21995 :                 == pg_constants::XLOG_SMGR_CREATE
     113              :         {
     114        21950 :             let create = XlSmgrCreate::decode(&mut buf);
     115        21950 :             self.ingest_xlog_smgr_create(modification, &create, ctx)
     116         1011 :                 .await?;
     117     73504818 :         } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
     118           45 :             && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     119           45 :                 == pg_constants::XLOG_SMGR_TRUNCATE
     120              :         {
     121           45 :             let truncate = XlSmgrTruncate::decode(&mut buf);
     122           45 :             self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
     123            0 :                 .await?;
     124     73504773 :         } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
     125            0 :             debug!(
     126            0 :                 "handle RM_DBASE_ID for Postgres version {:?}",
     127            0 :                 self.timeline.pg_version
     128            0 :             );
     129           14 :             if self.timeline.pg_version == 14 {
     130           14 :                 if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     131           14 :                     == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
     132              :                 {
     133           12 :                     let createdb = XlCreateDatabase::decode(&mut buf);
     134            0 :                     debug!("XLOG_DBASE_CREATE v14");
     135              : 
     136           12 :                     self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     137          816 :                         .await?;
     138            2 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     139            2 :                     == postgres_ffi::v14::bindings::XLOG_DBASE_DROP
     140              :                 {
     141            2 :                     let dropdb = XlDropDatabase::decode(&mut buf);
     142            4 :                     for tablespace_id in dropdb.tablespace_ids {
     143            0 :                         trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     144            2 :                         modification
     145            2 :                             .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     146            0 :                             .await?;
     147              :                     }
     148            0 :                 }
     149            0 :             } else if self.timeline.pg_version == 15 {
     150            0 :                 if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     151            0 :                     == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
     152              :                 {
     153            0 :                     debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     154            0 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     155            0 :                     == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
     156              :                 {
     157              :                     // The XLOG record was renamed between v14 and v15,
     158              :                     // but the record format is the same.
     159              :                     // So we can reuse XlCreateDatabase here.
     160            0 :                     debug!("XLOG_DBASE_CREATE_FILE_COPY");
     161            0 :                     let createdb = XlCreateDatabase::decode(&mut buf);
     162            0 :                     self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     163            0 :                         .await?;
     164            0 :                 } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
     165            0 :                     == postgres_ffi::v15::bindings::XLOG_DBASE_DROP
     166              :                 {
     167            0 :                     let dropdb = XlDropDatabase::decode(&mut buf);
     168            0 :                     for tablespace_id in dropdb.tablespace_ids {
     169            0 :                         trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     170            0 :                         modification
     171            0 :                             .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     172            0 :                             .await?;
     173              :                     }
     174            0 :                 }
     175            0 :             }
     176     73504759 :         } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
     177            0 :             trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     178     73504755 :         } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID {
     179          370 :             let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     180          370 :             if info == pg_constants::CLOG_ZEROPAGE {
     181          369 :                 let pageno = buf.get_u32_le();
     182          369 :                 let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     183          369 :                 let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     184          369 :                 self.put_slru_page_image(
     185          369 :                     modification,
     186          369 :                     SlruKind::Clog,
     187          369 :                     segno,
     188          369 :                     rpageno,
     189          369 :                     ZERO_PAGE.clone(),
     190          369 :                     ctx,
     191          369 :                 )
     192           24 :                 .await?;
     193              :             } else {
     194            1 :                 assert!(info == pg_constants::CLOG_TRUNCATE);
     195            1 :                 let xlrec = XlClogTruncate::decode(&mut buf);
     196            1 :                 self.ingest_clog_truncate_record(modification, &xlrec, ctx)
     197            0 :                     .await?;
     198              :             }
     199     73504385 :         } else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
     200      2266604 :             let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     201      2266604 :             if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
     202      2265097 :                 let parsed_xact =
     203      2265097 :                     XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     204      2265097 :                 self.ingest_xact_record(
     205      2265097 :                     modification,
     206      2265097 :                     &parsed_xact,
     207      2265097 :                     info == pg_constants::XLOG_XACT_COMMIT,
     208      2265097 :                     ctx,
     209      2265097 :                 )
     210         2436 :                 .await?;
     211         1507 :             } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
     212         1506 :                 || info == pg_constants::XLOG_XACT_ABORT_PREPARED
     213              :             {
     214            2 :                 let parsed_xact =
     215            2 :                     XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     216            2 :                 self.ingest_xact_record(
     217            2 :                     modification,
     218            2 :                     &parsed_xact,
     219            2 :                     info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
     220            2 :                     ctx,
     221            2 :                 )
     222            0 :                 .await?;
     223              :                 // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     224            0 :                 trace!(
     225            0 :                     "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     226            0 :                     decoded.xl_xid,
     227            0 :                     parsed_xact.xid,
     228            0 :                     lsn,
     229            0 :                 );
     230            2 :                 modification
     231            2 :                     .drop_twophase_file(parsed_xact.xid, ctx)
     232            0 :                     .await?;
     233         1505 :             } else if info == pg_constants::XLOG_XACT_PREPARE {
     234            4 :                 modification
     235            4 :                     .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
     236            0 :                     .await?;
     237         1501 :             }
     238     71237781 :         } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
     239        24332 :             let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     240        24332 : 
     241        24332 :             if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
     242           14 :                 let pageno = buf.get_u32_le();
     243           14 :                 let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     244           14 :                 let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     245           14 :                 self.put_slru_page_image(
     246           14 :                     modification,
     247           14 :                     SlruKind::MultiXactOffsets,
     248           14 :                     segno,
     249           14 :                     rpageno,
     250           14 :                     ZERO_PAGE.clone(),
     251           14 :                     ctx,
     252           14 :                 )
     253            0 :                 .await?;
     254        24318 :             } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
     255          291 :                 let pageno = buf.get_u32_le();
     256          291 :                 let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     257          291 :                 let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     258          291 :                 self.put_slru_page_image(
     259          291 :                     modification,
     260          291 :                     SlruKind::MultiXactMembers,
     261          291 :                     segno,
     262          291 :                     rpageno,
     263          291 :                     ZERO_PAGE.clone(),
     264          291 :                     ctx,
     265          291 :                 )
     266            1 :                 .await?;
     267        24027 :             } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     268        24027 :                 let xlrec = XlMultiXactCreate::decode(&mut buf);
     269        24027 :                 self.ingest_multixact_create_record(modification, &xlrec)?;
     270            0 :             } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     271            0 :                 let xlrec = XlMultiXactTruncate::decode(&mut buf);
     272            0 :                 self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
     273            0 :                     .await?;
     274            0 :             }
     275     71213449 :         } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
     276           45 :             let xlrec = XlRelmapUpdate::decode(&mut buf);
     277           45 :             self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
     278            1 :                 .await?;
     279     71213404 :         } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
     280       142663 :             let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     281       142663 :             if info == pg_constants::XLOG_NEXTOID {
     282          449 :                 let next_oid = buf.get_u32_le();
     283          449 :                 if self.checkpoint.nextOid != next_oid {
     284          449 :                     self.checkpoint.nextOid = next_oid;
     285          449 :                     self.checkpoint_modified = true;
     286          449 :                 }
     287       142214 :             } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
     288       142161 :                 || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     289              :             {
     290          696 :                 let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
     291          696 :                 buf.copy_to_slice(&mut checkpoint_bytes);
     292          696 :                 let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
     293            0 :                 trace!(
     294            0 :                     "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
     295            0 :                     xlog_checkpoint.oldestXid,
     296            0 :                     self.checkpoint.oldestXid
     297            0 :                 );
     298          696 :                 if (self
     299          696 :                     .checkpoint
     300          696 :                     .oldestXid
     301          696 :                     .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
     302          696 :                     < 0
     303            0 :                 {
     304            0 :                     self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
     305            0 :                     self.checkpoint_modified = true;
     306          696 :                 }
     307       141518 :             }
     308     71070741 :         } else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
     309           15 :             let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     310           15 :             if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     311              :                 // This is a convenient way to make the WAL ingestion pause at
     312              :                 // particular point in the WAL. For more fine-grained control,
     313              :                 // we could peek into the message and only pause if it contains
     314              :                 // a particular string, for example, but this is enough for now.
     315           18 :                 crate::failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
     316            0 :             }
     317     71070726 :         }
     318              : 
     319              :         // Iterate through all the blocks that the record modifies, and
     320              :         // "put" a separate copy of the record for each block.
     321     75336655 :         for blk in decoded.blocks.iter() {
     322     75336655 :             self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
     323        14044 :                 .await?;
     324              :         }
     325              : 
     326              :         // If checkpoint data was updated, store the new version in the repository
     327     73526767 :         if self.checkpoint_modified {
     328        28394 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     329              : 
     330        28394 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     331        28394 :             self.checkpoint_modified = false;
     332     73498373 :         }
     333              : 
     334              :         // Now that this record has been fully handled, including updating the
     335              :         // checkpoint data, let the repository know that it is up-to-date to this LSN
     336     73526767 :         modification.commit().await?;
     337              : 
     338     73526755 :         Ok(())
     339     73526755 :     }
     340              : 
     341     75336498 :     async fn ingest_decoded_block(
     342     75336498 :         &mut self,
     343     75336498 :         modification: &mut DatadirModification<'_>,
     344     75336498 :         lsn: Lsn,
     345     75336498 :         decoded: &DecodedWALRecord,
     346     75336498 :         blk: &DecodedBkpBlock,
     347     75336498 :         ctx: &RequestContext,
     348     75336655 :     ) -> Result<(), PageReconstructError> {
     349     75336655 :         let rel = RelTag {
     350     75336655 :             spcnode: blk.rnode_spcnode,
     351     75336655 :             dbnode: blk.rnode_dbnode,
     352     75336655 :             relnode: blk.rnode_relnode,
     353     75336655 :             forknum: blk.forknum,
     354     75336655 :         };
     355     75336655 : 
     356     75336655 :         //
     357     75336655 :         // Instead of storing full-page-image WAL record,
     358     75336655 :         // it is better to store extracted image: we can skip wal-redo
     359     75336655 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     360     75336655 :         // so them have to be copied multiple times.
     361     75336655 :         //
     362     75336655 :         if blk.apply_image
     363       170593 :             && blk.has_image
     364       170593 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     365       143829 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     366            0 :                 || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     367              :         // compression of WAL is not yet supported: fall back to storing the original WAL record
     368       143829 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
     369              :         {
     370              :             // Extract page image from FPI record
     371       143829 :             let img_len = blk.bimg_len as usize;
     372       143829 :             let img_offs = blk.bimg_offset as usize;
     373       143829 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     374       143829 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     375       143829 : 
     376       143829 :             if blk.hole_length != 0 {
     377       117283 :                 let tail = image.split_off(blk.hole_offset as usize);
     378       117283 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     379       117283 :                 image.unsplit(tail);
     380       117283 :             }
     381              :             //
     382              :             // Match the logic of XLogReadBufferForRedoExtended:
     383              :             // The page may be uninitialized. If so, we can't set the LSN because
     384              :             // that would corrupt the page.
     385              :             //
     386       143829 :             if !page_is_new(&image) {
     387       138540 :                 page_set_lsn(&mut image, lsn)
     388         5289 :             }
     389       143829 :             assert_eq!(image.len(), BLCKSZ as usize);
     390       143829 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     391         1958 :                 .await?;
     392              :         } else {
     393     75192826 :             let rec = NeonWalRecord::Postgres {
     394     75192826 :                 will_init: blk.will_init || blk.apply_image,
     395     75192826 :                 rec: decoded.record.clone(),
     396     75192826 :             };
     397     75192826 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     398        12086 :                 .await?;
     399              :         }
     400     75336654 :         Ok(())
     401     75336654 :     }
     402              : 
     403     59861204 :     async fn ingest_heapam_record(
     404     59861204 :         &mut self,
     405     59861204 :         buf: &mut Bytes,
     406     59861204 :         modification: &mut DatadirModification<'_>,
     407     59861204 :         decoded: &mut DecodedWALRecord,
     408     59861204 :         ctx: &RequestContext,
     409     59861282 :     ) -> anyhow::Result<()> {
     410     59861282 :         // Handle VM bit updates that are implicitly part of heap records.
     411     59861282 : 
     412     59861282 :         // First, look at the record to determine which VM bits need
     413     59861282 :         // to be cleared. If either of these variables is set, we
     414     59861282 :         // need to clear the corresponding bits in the visibility map.
     415     59861282 :         let mut new_heap_blkno: Option<u32> = None;
     416     59861282 :         let mut old_heap_blkno: Option<u32> = None;
     417     59861282 :         if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     418     57859190 :             let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     419     57859190 :             if info == pg_constants::XLOG_HEAP_INSERT {
     420     47303205 :                 let xlrec = XlHeapInsert::decode(buf);
     421     47303205 :                 assert_eq!(0, buf.remaining());
     422     47303205 :                 if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     423         2943 :                     new_heap_blkno = Some(decoded.blocks[0].blkno);
     424     47300262 :                 }
     425     10555985 :             } else if info == pg_constants::XLOG_HEAP_DELETE {
     426       535296 :                 let xlrec = XlHeapDelete::decode(buf);
     427       535296 :                 assert_eq!(0, buf.remaining());
     428       535296 :                 if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     429          732 :                     new_heap_blkno = Some(decoded.blocks[0].blkno);
     430       534564 :                 }
     431     10020689 :             } else if info == pg_constants::XLOG_HEAP_UPDATE
     432      6590864 :                 || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     433              :             {
     434      5355809 :                 let xlrec = XlHeapUpdate::decode(buf);
     435      5355809 :                 // the size of tuple data is inferred from the size of the record.
     436      5355809 :                 // we can't validate the remaining number of bytes without parsing
     437      5355809 :                 // the tuple data.
     438      5355809 :                 if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     439       104268 :                     old_heap_blkno = Some(decoded.blocks[0].blkno);
     440      5251541 :                 }
     441      5355809 :                 if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     442         9268 :                     // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     443         9268 :                     // non-HOT update where the new tuple goes to different page than
     444         9268 :                     // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     445         9268 :                     // set.
     446         9268 :                     new_heap_blkno = Some(decoded.blocks[1].blkno);
     447      5346541 :                 }
     448      4664880 :             }
     449      2002092 :         } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     450      2002092 :             let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     451      2002092 :             if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     452       692488 :                 let xlrec = XlHeapMultiInsert::decode(buf);
     453              : 
     454       692488 :                 let offset_array_len = if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     455              :                     // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     456       545769 :                     0
     457              :                 } else {
     458       146719 :                     std::mem::size_of::<u16>() * xlrec.ntuples as usize
     459              :                 };
     460       692488 :                 assert_eq!(offset_array_len, buf.remaining());
     461              : 
     462       692488 :                 if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     463         2045 :                     new_heap_blkno = Some(decoded.blocks[0].blkno);
     464       690443 :                 }
     465      1309604 :             }
     466            0 :         }
     467              :         // FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED?
     468              : 
     469              :         // Clear the VM bits if required.
     470     59861282 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     471       119084 :             let vm_rel = RelTag {
     472       119084 :                 forknum: VISIBILITYMAP_FORKNUM,
     473       119084 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     474       119084 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     475       119084 :                 relnode: decoded.blocks[0].rnode_relnode,
     476       119084 :             };
     477       119084 : 
     478       119084 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     479       119084 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     480              : 
     481              :             // Sometimes, Postgres seems to create heap WAL records with the
     482              :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     483              :             // not set. In fact, it's possible that the VM page does not exist at all.
     484              :             // In that case, we don't want to store a record to clear the VM bit;
     485              :             // replaying it would fail to find the previous image of the page, because
     486              :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     487              :             // record if it doesn't.
     488       119084 :             let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?;
     489       119084 :             if let Some(blknum) = new_vm_blk {
     490        14988 :                 if blknum >= vm_size {
     491            0 :                     new_vm_blk = None;
     492        14988 :                 }
     493       104096 :             }
     494       119084 :             if let Some(blknum) = old_vm_blk {
     495       104268 :                 if blknum >= vm_size {
     496            0 :                     old_vm_blk = None;
     497       104268 :                 }
     498        14816 :             }
     499              : 
     500       119084 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     501       119084 :                 if new_vm_blk == old_vm_blk {
     502              :                     // An UPDATE record that needs to clear the bits for both old and the
     503              :                     // new page, both of which reside on the same VM page.
     504          172 :                     self.put_rel_wal_record(
     505          172 :                         modification,
     506          172 :                         vm_rel,
     507          172 :                         new_vm_blk.unwrap(),
     508          172 :                         NeonWalRecord::ClearVisibilityMapFlags {
     509          172 :                             new_heap_blkno,
     510          172 :                             old_heap_blkno,
     511          172 :                             flags: pg_constants::VISIBILITYMAP_VALID_BITS,
     512          172 :                         },
     513          172 :                         ctx,
     514          172 :                     )
     515            0 :                     .await?;
     516              :                 } else {
     517              :                     // Clear VM bits for one heap page, or for two pages that reside on
     518              :                     // different VM pages.
     519       118912 :                     if let Some(new_vm_blk) = new_vm_blk {
     520        14816 :                         self.put_rel_wal_record(
     521        14816 :                             modification,
     522        14816 :                             vm_rel,
     523        14816 :                             new_vm_blk,
     524        14816 :                             NeonWalRecord::ClearVisibilityMapFlags {
     525        14816 :                                 new_heap_blkno,
     526        14816 :                                 old_heap_blkno: None,
     527        14816 :                                 flags: pg_constants::VISIBILITYMAP_VALID_BITS,
     528        14816 :                             },
     529        14816 :                             ctx,
     530        14816 :                         )
     531            0 :                         .await?;
     532       104096 :                     }
     533       118912 :                     if let Some(old_vm_blk) = old_vm_blk {
     534       104096 :                         self.put_rel_wal_record(
     535       104096 :                             modification,
     536       104096 :                             vm_rel,
     537       104096 :                             old_vm_blk,
     538       104096 :                             NeonWalRecord::ClearVisibilityMapFlags {
     539       104096 :                                 new_heap_blkno: None,
     540       104096 :                                 old_heap_blkno,
     541       104096 :                                 flags: pg_constants::VISIBILITYMAP_VALID_BITS,
     542       104096 :                             },
     543       104096 :                             ctx,
     544       104096 :                         )
     545            0 :                         .await?;
     546        14816 :                     }
     547              :                 }
     548            0 :             }
     549     59742198 :         }
     550              : 
     551     59861282 :         Ok(())
     552     59861282 :     }
     553              : 
     554              :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
     555           12 :     async fn ingest_xlog_dbase_create(
     556           12 :         &mut self,
     557           12 :         modification: &mut DatadirModification<'_>,
     558           12 :         rec: &XlCreateDatabase,
     559           12 :         ctx: &RequestContext,
     560           12 :     ) -> anyhow::Result<()> {
     561           12 :         let db_id = rec.db_id;
     562           12 :         let tablespace_id = rec.tablespace_id;
     563           12 :         let src_db_id = rec.src_db_id;
     564           12 :         let src_tablespace_id = rec.src_tablespace_id;
     565           12 : 
     566           12 :         // Creating a database is implemented by copying the template (aka. source) database.
     567           12 :         // To copy all the relations, we need to ask for the state as of the same LSN, but we
     568           12 :         // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for
     569           12 :         // the last valid LSN to advance up to it. So we use the previous record's LSN in the
     570           12 :         // get calls instead.
     571           12 :         let req_lsn = modification.tline.get_last_record_lsn();
     572              : 
     573           12 :         let rels = modification
     574           12 :             .tline
     575           12 :             .list_rels(src_tablespace_id, src_db_id, req_lsn, ctx)
     576            0 :             .await?;
     577              : 
     578            0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
     579              : 
     580              :         // Copy relfilemap
     581           12 :         let filemap = modification
     582           12 :             .tline
     583           12 :             .get_relmap_file(src_tablespace_id, src_db_id, req_lsn, ctx)
     584            0 :             .await?;
     585           12 :         modification
     586           12 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
     587            0 :             .await?;
     588              : 
     589           12 :         let mut num_rels_copied = 0;
     590           12 :         let mut num_blocks_copied = 0;
     591         3516 :         for src_rel in rels {
     592         3504 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
     593         3504 :             assert_eq!(src_rel.dbnode, src_db_id);
     594              : 
     595         3504 :             let nblocks = modification
     596         3504 :                 .tline
     597         3504 :                 .get_rel_size(src_rel, req_lsn, true, ctx)
     598          148 :                 .await?;
     599         3504 :             let dst_rel = RelTag {
     600         3504 :                 spcnode: tablespace_id,
     601         3504 :                 dbnode: db_id,
     602         3504 :                 relnode: src_rel.relnode,
     603         3504 :                 forknum: src_rel.forknum,
     604         3504 :             };
     605         3504 : 
     606         3504 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
     607              : 
     608              :             // Copy content
     609            0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
     610        12120 :             for blknum in 0..nblocks {
     611            0 :                 debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
     612              : 
     613        12120 :                 let content = modification
     614        12120 :                     .tline
     615        12120 :                     .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true, ctx)
     616          668 :                     .await?;
     617        12120 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
     618        12120 :                 num_blocks_copied += 1;
     619              :             }
     620              : 
     621         3504 :             num_rels_copied += 1;
     622              :         }
     623              : 
     624           12 :         info!(
     625           12 :             "Created database {}/{}, copied {} blocks in {} rels",
     626           12 :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
     627           12 :         );
     628           12 :         Ok(())
     629           12 :     }
     630              : 
     631        21950 :     async fn ingest_xlog_smgr_create(
     632        21950 :         &mut self,
     633        21950 :         modification: &mut DatadirModification<'_>,
     634        21950 :         rec: &XlSmgrCreate,
     635        21950 :         ctx: &RequestContext,
     636        21950 :     ) -> anyhow::Result<()> {
     637        21950 :         let rel = RelTag {
     638        21950 :             spcnode: rec.rnode.spcnode,
     639        21950 :             dbnode: rec.rnode.dbnode,
     640        21950 :             relnode: rec.rnode.relnode,
     641        21950 :             forknum: rec.forknum,
     642        21950 :         };
     643        21950 :         self.put_rel_creation(modification, rel, ctx).await?;
     644        21950 :         Ok(())
     645        21950 :     }
     646              : 
     647              :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
     648              :     ///
     649              :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
     650           45 :     async fn ingest_xlog_smgr_truncate(
     651           45 :         &mut self,
     652           45 :         modification: &mut DatadirModification<'_>,
     653           45 :         rec: &XlSmgrTruncate,
     654           45 :         ctx: &RequestContext,
     655           45 :     ) -> anyhow::Result<()> {
     656           45 :         let spcnode = rec.rnode.spcnode;
     657           45 :         let dbnode = rec.rnode.dbnode;
     658           45 :         let relnode = rec.rnode.relnode;
     659           45 : 
     660           45 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
     661           45 :             let rel = RelTag {
     662           45 :                 spcnode,
     663           45 :                 dbnode,
     664           45 :                 relnode,
     665           45 :                 forknum: MAIN_FORKNUM,
     666           45 :             };
     667           45 :             self.put_rel_truncation(modification, rel, rec.blkno, ctx)
     668            0 :                 .await?;
     669            0 :         }
     670           45 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
     671           45 :             let rel = RelTag {
     672           45 :                 spcnode,
     673           45 :                 dbnode,
     674           45 :                 relnode,
     675           45 :                 forknum: FSM_FORKNUM,
     676           45 :             };
     677           45 : 
     678           45 :             let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
     679           45 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
     680           45 :             if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
     681              :                 // Tail of last remaining FSM page has to be zeroed.
     682              :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
     683           21 :                 modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
     684           21 :                 fsm_physical_page_no += 1;
     685           24 :             }
     686           45 :             let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
     687           45 :             if nblocks > fsm_physical_page_no {
     688              :                 // check if something to do: FSM is larger than truncate position
     689           22 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
     690            0 :                     .await?;
     691           23 :             }
     692            0 :         }
     693           45 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
     694           45 :             let rel = RelTag {
     695           45 :                 spcnode,
     696           45 :                 dbnode,
     697           45 :                 relnode,
     698           45 :                 forknum: VISIBILITYMAP_FORKNUM,
     699           45 :             };
     700           45 : 
     701           45 :             let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
     702           45 :             if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
     703              :                 // Tail of last remaining vm page has to be zeroed.
     704              :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
     705           21 :                 modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
     706           21 :                 vm_page_no += 1;
     707           24 :             }
     708           45 :             let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?;
     709           45 :             if nblocks > vm_page_no {
     710              :                 // check if something to do: VM is larger than truncate position
     711           22 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
     712            0 :                     .await?;
     713           23 :             }
     714            0 :         }
     715           45 :         Ok(())
     716           45 :     }
     717              : 
     718              :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
     719              :     ///
     720      2265099 :     async fn ingest_xact_record(
     721      2265099 :         &mut self,
     722      2265099 :         modification: &mut DatadirModification<'_>,
     723      2265099 :         parsed: &XlXactParsedRecord,
     724      2265099 :         is_commit: bool,
     725      2265099 :         ctx: &RequestContext,
     726      2265099 :     ) -> anyhow::Result<()> {
     727      2265099 :         // Record update of CLOG pages
     728      2265099 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
     729      2265099 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     730      2265099 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     731      2265099 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
     732              : 
     733      2365148 :         for subxact in &parsed.subxacts {
     734       100049 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
     735       100049 :             if subxact_pageno != pageno {
     736              :                 // This subxact goes to different page. Write the record
     737              :                 // for all the XIDs on the previous page, and continue
     738              :                 // accumulating XIDs on this new page.
     739              :                 modification.put_slru_wal_record(
     740            3 :                     SlruKind::Clog,
     741            3 :                     segno,
     742            3 :                     rpageno,
     743            3 :                     if is_commit {
     744            3 :                         NeonWalRecord::ClogSetCommitted {
     745            3 :                             xids: page_xids,
     746            3 :                             timestamp: parsed.xact_time,
     747            3 :                         }
     748              :                     } else {
     749            0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
     750              :                     },
     751            0 :                 )?;
     752            3 :                 page_xids = Vec::new();
     753       100046 :             }
     754       100049 :             pageno = subxact_pageno;
     755       100049 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     756       100049 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     757       100049 :             page_xids.push(*subxact);
     758              :         }
     759              :         modification.put_slru_wal_record(
     760      2265099 :             SlruKind::Clog,
     761      2265099 :             segno,
     762      2265099 :             rpageno,
     763      2265099 :             if is_commit {
     764      2263310 :                 NeonWalRecord::ClogSetCommitted {
     765      2263310 :                     xids: page_xids,
     766      2263310 :                     timestamp: parsed.xact_time,
     767      2263310 :                 }
     768              :             } else {
     769         1789 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
     770              :             },
     771            0 :         )?;
     772              : 
     773      2281971 :         for xnode in &parsed.xnodes {
     774        84360 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
     775        67488 :                 let rel = RelTag {
     776        67488 :                     forknum,
     777        67488 :                     spcnode: xnode.spcnode,
     778        67488 :                     dbnode: xnode.dbnode,
     779        67488 :                     relnode: xnode.relnode,
     780        67488 :                 };
     781        67488 :                 let last_lsn = self.timeline.get_last_record_lsn();
     782        67488 :                 if modification
     783        67488 :                     .tline
     784        67488 :                     .get_rel_exists(rel, last_lsn, true, ctx)
     785         1491 :                     .await?
     786              :                 {
     787        17780 :                     self.put_rel_drop(modification, rel, ctx).await?;
     788        49708 :                 }
     789              :             }
     790              :         }
     791      2265099 :         Ok(())
     792      2265099 :     }
     793              : 
     794            1 :     async fn ingest_clog_truncate_record(
     795            1 :         &mut self,
     796            1 :         modification: &mut DatadirModification<'_>,
     797            1 :         xlrec: &XlClogTruncate,
     798            1 :         ctx: &RequestContext,
     799            1 :     ) -> anyhow::Result<()> {
     800            1 :         info!(
     801            1 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
     802            1 :             xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
     803            1 :         );
     804              : 
     805              :         // Here we treat oldestXid and oldestXidDB
     806              :         // differently from postgres redo routines.
     807              :         // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
     808              :         // until checkpoint happens and updates the value.
     809              :         // Here we can use the most recent value.
     810              :         // It's just an optimization, though and can be deleted.
     811              :         // TODO Figure out if there will be any issues with replica.
     812            1 :         self.checkpoint.oldestXid = xlrec.oldest_xid;
     813            1 :         self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
     814            1 :         self.checkpoint_modified = true;
     815            1 : 
     816            1 :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
     817            1 : 
     818            1 :         let latest_page_number =
     819            1 :             self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
     820            1 : 
     821            1 :         // Now delete all segments containing pages between xlrec.pageno
     822            1 :         // and latest_page_number.
     823            1 : 
     824            1 :         // First, make an important safety check:
     825            1 :         // the current endpoint page must not be eligible for removal.
     826            1 :         // See SimpleLruTruncate() in slru.c
     827            1 :         if clogpage_precedes(latest_page_number, xlrec.pageno) {
     828            0 :             info!("could not truncate directory pg_xact apparent wraparound");
     829            0 :             return Ok(());
     830            1 :         }
     831            1 : 
     832            1 :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
     833            1 :         //
     834            1 :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
     835            1 :         // will block waiting for the last valid LSN to advance up to
     836            1 :         // it. So we use the previous record's LSN in the get calls
     837            1 :         // instead.
     838            1 :         let req_lsn = modification.tline.get_last_record_lsn();
     839           10 :         for segno in modification
     840            1 :             .tline
     841            1 :             .list_slru_segments(SlruKind::Clog, req_lsn, ctx)
     842            0 :             .await?
     843              :         {
     844           10 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
     845           10 :             if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
     846            9 :                 modification
     847            9 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
     848            0 :                     .await?;
     849            0 :                 trace!("Drop CLOG segment {:>04X}", segno);
     850            1 :             }
     851              :         }
     852              : 
     853            1 :         Ok(())
     854            1 :     }
     855              : 
     856        24027 :     fn ingest_multixact_create_record(
     857        24027 :         &mut self,
     858        24027 :         modification: &mut DatadirModification,
     859        24027 :         xlrec: &XlMultiXactCreate,
     860        24027 :     ) -> Result<()> {
     861        24027 :         // Create WAL record for updating the multixact-offsets page
     862        24027 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     863        24027 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     864        24027 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     865        24027 : 
     866        24027 :         modification.put_slru_wal_record(
     867        24027 :             SlruKind::MultiXactOffsets,
     868        24027 :             segno,
     869        24027 :             rpageno,
     870        24027 :             NeonWalRecord::MultixactOffsetCreate {
     871        24027 :                 mid: xlrec.mid,
     872        24027 :                 moff: xlrec.moff,
     873        24027 :             },
     874        24027 :         )?;
     875              : 
     876              :         // Create WAL records for the update of each affected multixact-members page
     877        24027 :         let mut members = xlrec.members.iter();
     878        24027 :         let mut offset = xlrec.moff;
     879              :         loop {
     880        48328 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     881        48328 : 
     882        48328 :             // How many members fit on this page?
     883        48328 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
     884        48328 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     885        48328 : 
     886        48328 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
     887        48328 :             for _ in 0..page_remain {
     888       520988 :                 if let Some(m) = members.next() {
     889       472948 :                     this_page_members.push(m.clone());
     890       472948 :                 } else {
     891        48040 :                     break;
     892              :                 }
     893              :             }
     894        48328 :             if this_page_members.is_empty() {
     895              :                 // all done
     896        24027 :                 break;
     897        24301 :             }
     898        24301 :             let n_this_page = this_page_members.len();
     899        24301 : 
     900        24301 :             modification.put_slru_wal_record(
     901        24301 :                 SlruKind::MultiXactMembers,
     902        24301 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
     903        24301 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
     904        24301 :                 NeonWalRecord::MultixactMembersCreate {
     905        24301 :                     moff: offset,
     906        24301 :                     members: this_page_members,
     907        24301 :                 },
     908        24301 :             )?;
     909              : 
     910              :             // Note: The multixact members can wrap around, even within one WAL record.
     911        24301 :             offset = offset.wrapping_add(n_this_page as u32);
     912              :         }
     913        24027 :         if xlrec.mid >= self.checkpoint.nextMulti {
     914        24027 :             self.checkpoint.nextMulti = xlrec.mid + 1;
     915        24027 :             self.checkpoint_modified = true;
     916        24027 :         }
     917        24027 :         if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
     918        24027 :             self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
     919        24027 :             self.checkpoint_modified = true;
     920        24027 :         }
     921       472948 :         let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
     922       472948 :             if mbr.xid.wrapping_sub(acc) as i32 > 0 {
     923       472903 :                 mbr.xid
     924              :             } else {
     925           45 :                 acc
     926              :             }
     927       472948 :         });
     928        24027 : 
     929        24027 :         if self.checkpoint.update_next_xid(max_mbr_xid) {
     930            0 :             self.checkpoint_modified = true;
     931        24027 :         }
     932        24027 :         Ok(())
     933        24027 :     }
     934              : 
     935            0 :     async fn ingest_multixact_truncate_record(
     936            0 :         &mut self,
     937            0 :         modification: &mut DatadirModification<'_>,
     938            0 :         xlrec: &XlMultiXactTruncate,
     939            0 :         ctx: &RequestContext,
     940            0 :     ) -> Result<()> {
     941            0 :         self.checkpoint.oldestMulti = xlrec.end_trunc_off;
     942            0 :         self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
     943            0 :         self.checkpoint_modified = true;
     944            0 : 
     945            0 :         // PerformMembersTruncation
     946            0 :         let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
     947            0 :         let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
     948            0 :         let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
     949            0 :         let mut segment: i32 = startsegment;
     950              : 
     951              :         // Delete all the segments except the last one. The last segment can still
     952              :         // contain, possibly partially, valid data.
     953            0 :         while segment != endsegment {
     954            0 :             modification
     955            0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
     956            0 :                 .await?;
     957              : 
     958              :             /* move to next segment, handling wraparound correctly */
     959            0 :             if segment == maxsegment {
     960            0 :                 segment = 0;
     961            0 :             } else {
     962            0 :                 segment += 1;
     963            0 :             }
     964              :         }
     965              : 
     966              :         // Truncate offsets
     967              :         // FIXME: this did not handle wraparound correctly
     968              : 
     969            0 :         Ok(())
     970            0 :     }
     971              : 
     972           45 :     async fn ingest_relmap_page(
     973           45 :         &mut self,
     974           45 :         modification: &mut DatadirModification<'_>,
     975           45 :         xlrec: &XlRelmapUpdate,
     976           45 :         decoded: &DecodedWALRecord,
     977           45 :         ctx: &RequestContext,
     978           45 :     ) -> Result<()> {
     979           45 :         let mut buf = decoded.record.clone();
     980           45 :         buf.advance(decoded.main_data_offset);
     981           45 :         // skip xl_relmap_update
     982           45 :         buf.advance(12);
     983           45 : 
     984           45 :         modification
     985           45 :             .put_relmap_file(
     986           45 :                 xlrec.tsid,
     987           45 :                 xlrec.dbid,
     988           45 :                 Bytes::copy_from_slice(&buf[..]),
     989           45 :                 ctx,
     990           45 :             )
     991            1 :             .await
     992           45 :     }
     993              : 
     994        21951 :     async fn put_rel_creation(
     995        21951 :         &mut self,
     996        21951 :         modification: &mut DatadirModification<'_>,
     997        21951 :         rel: RelTag,
     998        21951 :         ctx: &RequestContext,
     999        21951 :     ) -> Result<()> {
    1000        21951 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1001        21951 :         Ok(())
    1002        21951 :     }
    1003              : 
    1004       280030 :     async fn put_rel_page_image(
    1005       280030 :         &mut self,
    1006       280030 :         modification: &mut DatadirModification<'_>,
    1007       280030 :         rel: RelTag,
    1008       280030 :         blknum: BlockNumber,
    1009       280030 :         img: Bytes,
    1010       280030 :         ctx: &RequestContext,
    1011       280030 :     ) -> Result<(), PageReconstructError> {
    1012       280030 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1013         4005 :             .await?;
    1014       280030 :         modification.put_rel_page_image(rel, blknum, img)?;
    1015       280030 :         Ok(())
    1016       280030 :     }
    1017              : 
    1018     75311752 :     async fn put_rel_wal_record(
    1019     75311752 :         &mut self,
    1020     75311752 :         modification: &mut DatadirModification<'_>,
    1021     75311752 :         rel: RelTag,
    1022     75311752 :         blknum: BlockNumber,
    1023     75311752 :         rec: NeonWalRecord,
    1024     75311752 :         ctx: &RequestContext,
    1025     75311752 :     ) -> Result<()> {
    1026     75311910 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1027        12086 :             .await?;
    1028     75311909 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1029     75311909 :         Ok(())
    1030     75311909 :     }
    1031              : 
    1032         3095 :     async fn put_rel_truncation(
    1033         3095 :         &mut self,
    1034         3095 :         modification: &mut DatadirModification<'_>,
    1035         3095 :         rel: RelTag,
    1036         3095 :         nblocks: BlockNumber,
    1037         3095 :         ctx: &RequestContext,
    1038         3095 :     ) -> anyhow::Result<()> {
    1039         3095 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1040         3095 :         Ok(())
    1041         3095 :     }
    1042              : 
    1043        17781 :     async fn put_rel_drop(
    1044        17781 :         &mut self,
    1045        17781 :         modification: &mut DatadirModification<'_>,
    1046        17781 :         rel: RelTag,
    1047        17781 :         ctx: &RequestContext,
    1048        17781 :     ) -> Result<()> {
    1049        17781 :         modification.put_rel_drop(rel, ctx).await?;
    1050        17781 :         Ok(())
    1051        17781 :     }
    1052              : 
    1053       119174 :     async fn get_relsize(
    1054       119174 :         &mut self,
    1055       119174 :         rel: RelTag,
    1056       119174 :         lsn: Lsn,
    1057       119174 :         ctx: &RequestContext,
    1058       119174 :     ) -> anyhow::Result<BlockNumber> {
    1059       119174 :         let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true, ctx).await? {
    1060            4 :             0
    1061              :         } else {
    1062       119170 :             self.timeline.get_rel_size(rel, lsn, true, ctx).await?
    1063              :         };
    1064       119174 :         Ok(nblocks)
    1065       119174 :     }
    1066              : 
    1067     75591782 :     async fn handle_rel_extend(
    1068     75591782 :         &mut self,
    1069     75591782 :         modification: &mut DatadirModification<'_>,
    1070     75591782 :         rel: RelTag,
    1071     75591782 :         blknum: BlockNumber,
    1072     75591782 :         ctx: &RequestContext,
    1073     75591939 :     ) -> Result<(), PageReconstructError> {
    1074     75591939 :         let new_nblocks = blknum + 1;
    1075     75591939 :         // Check if the relation exists. We implicitly create relations on first
    1076     75591939 :         // record.
    1077     75591939 :         // TODO: would be nice if to be more explicit about it
    1078     75591939 :         let last_lsn = modification.lsn;
    1079     75591939 :         let old_nblocks = if !self
    1080     75591939 :             .timeline
    1081     75591939 :             .get_rel_exists(rel, last_lsn, true, ctx)
    1082           24 :             .await?
    1083              :         {
    1084              :             // create it with 0 size initially, the logic below will extend it
    1085         2270 :             modification
    1086         2270 :                 .put_rel_creation(rel, 0, ctx)
    1087          205 :                 .await
    1088         2270 :                 .context("Relation Error")?;
    1089         2270 :             0
    1090              :         } else {
    1091     75589669 :             self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?
    1092              :         };
    1093              : 
    1094     75591939 :         if new_nblocks > old_nblocks {
    1095              :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1096      1092069 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1097              : 
    1098              :             // fill the gap with zeros
    1099      1092069 :             for gap_blknum in old_nblocks..blknum {
    1100       227386 :                 modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
    1101              :             }
    1102     74499870 :         }
    1103     75591939 :         Ok(())
    1104     75591939 :     }
    1105              : 
    1106          674 :     async fn put_slru_page_image(
    1107          674 :         &mut self,
    1108          674 :         modification: &mut DatadirModification<'_>,
    1109          674 :         kind: SlruKind,
    1110          674 :         segno: u32,
    1111          674 :         blknum: BlockNumber,
    1112          674 :         img: Bytes,
    1113          674 :         ctx: &RequestContext,
    1114          674 :     ) -> Result<()> {
    1115          674 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1116           25 :             .await?;
    1117          674 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1118          674 :         Ok(())
    1119          674 :     }
    1120              : 
    1121          674 :     async fn handle_slru_extend(
    1122          674 :         &mut self,
    1123          674 :         modification: &mut DatadirModification<'_>,
    1124          674 :         kind: SlruKind,
    1125          674 :         segno: u32,
    1126          674 :         blknum: BlockNumber,
    1127          674 :         ctx: &RequestContext,
    1128          674 :     ) -> anyhow::Result<()> {
    1129          674 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1130          674 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1131          674 :         // a lot less frequently.
    1132          674 : 
    1133          674 :         let new_nblocks = blknum + 1;
    1134          674 :         // Check if the relation exists. We implicitly create relations on first
    1135          674 :         // record.
    1136          674 :         // TODO: would be nice if to be more explicit about it
    1137          674 :         let last_lsn = self.timeline.get_last_record_lsn();
    1138          674 :         let old_nblocks = if !self
    1139          674 :             .timeline
    1140          674 :             .get_slru_segment_exists(kind, segno, last_lsn, ctx)
    1141           16 :             .await?
    1142              :         {
    1143              :             // create it with 0 size initially, the logic below will extend it
    1144           18 :             modification
    1145           18 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1146            0 :                 .await?;
    1147           18 :             0
    1148              :         } else {
    1149          656 :             self.timeline
    1150          656 :                 .get_slru_segment_size(kind, segno, last_lsn, ctx)
    1151            9 :                 .await?
    1152              :         };
    1153              : 
    1154          674 :         if new_nblocks > old_nblocks {
    1155            0 :             trace!(
    1156            0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1157            0 :                 kind,
    1158            0 :                 segno,
    1159            0 :                 old_nblocks,
    1160            0 :                 new_nblocks
    1161            0 :             );
    1162          668 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1163              : 
    1164              :             // fill the gap with zeros
    1165          668 :             for gap_blknum in old_nblocks..blknum {
    1166            0 :                 modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
    1167              :             }
    1168            6 :         }
    1169          674 :         Ok(())
    1170          674 :     }
    1171              : }
    1172              : 
    1173              : #[allow(clippy::bool_assert_comparison)]
    1174              : #[cfg(test)]
    1175              : mod tests {
    1176              :     use super::*;
    1177              :     use crate::tenant::harness::*;
    1178              :     use crate::tenant::Timeline;
    1179              :     use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
    1180              :     use postgres_ffi::RELSEG_SIZE;
    1181              : 
    1182              :     use crate::DEFAULT_PG_VERSION;
    1183              : 
    1184              :     /// Arbitrary relation tag, for testing.
    1185              :     const TESTREL_A: RelTag = RelTag {
    1186              :         spcnode: 0,
    1187              :         dbnode: 111,
    1188              :         relnode: 1000,
    1189              :         forknum: 0,
    1190              :     };
    1191              : 
    1192            6 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1193            6 :         // TODO
    1194            6 :     }
    1195              : 
    1196              :     static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
    1197              : 
    1198            4 :     async fn init_walingest_test<'a>(
    1199            4 :         tline: &'a Timeline,
    1200            4 :         ctx: &RequestContext,
    1201            4 :     ) -> Result<WalIngest<'a>> {
    1202            4 :         let mut m = tline.begin_modification(Lsn(0x10));
    1203            4 :         m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1204            4 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1205            4 :         m.commit().await?;
    1206            4 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1207              : 
    1208            4 :         Ok(walingest)
    1209            4 :     }
    1210              : 
    1211            1 :     #[tokio::test]
    1212            1 :     async fn test_relsize() -> Result<()> {
    1213            1 :         let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
    1214            1 :         let tline = tenant
    1215            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1216            2 :             .await?;
    1217            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1218              : 
    1219            1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1220            1 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1221            1 :         walingest
    1222            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1223            0 :             .await?;
    1224            1 :         m.commit().await?;
    1225            1 :         let mut m = tline.begin_modification(Lsn(0x30));
    1226            1 :         walingest
    1227            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
    1228            0 :             .await?;
    1229            1 :         m.commit().await?;
    1230            1 :         let mut m = tline.begin_modification(Lsn(0x40));
    1231            1 :         walingest
    1232            1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
    1233            0 :             .await?;
    1234            1 :         m.commit().await?;
    1235            1 :         let mut m = tline.begin_modification(Lsn(0x50));
    1236            1 :         walingest
    1237            1 :             .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
    1238            0 :             .await?;
    1239            1 :         m.commit().await?;
    1240              : 
    1241            1 :         assert_current_logical_size(&tline, Lsn(0x50));
    1242              : 
    1243              :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1244            1 :         assert_eq!(
    1245            1 :             tline
    1246            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
    1247            0 :                 .await?,
    1248              :             false
    1249              :         );
    1250            1 :         assert!(tline
    1251            1 :             .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
    1252            0 :             .await
    1253            1 :             .is_err());
    1254            1 :         assert_eq!(
    1255            1 :             tline
    1256            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
    1257            0 :                 .await?,
    1258              :             true
    1259              :         );
    1260            1 :         assert_eq!(
    1261            1 :             tline
    1262            1 :                 .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
    1263            0 :                 .await?,
    1264              :             1
    1265              :         );
    1266            1 :         assert_eq!(
    1267            1 :             tline
    1268            1 :                 .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
    1269            0 :                 .await?,
    1270              :             3
    1271              :         );
    1272              : 
    1273              :         // Check page contents at each LSN
    1274            1 :         assert_eq!(
    1275            1 :             tline
    1276            1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false, &ctx)
    1277            0 :                 .await?,
    1278            1 :             TEST_IMG("foo blk 0 at 2")
    1279              :         );
    1280              : 
    1281            1 :         assert_eq!(
    1282            1 :             tline
    1283            1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false, &ctx)
    1284            0 :                 .await?,
    1285            1 :             TEST_IMG("foo blk 0 at 3")
    1286              :         );
    1287              : 
    1288            1 :         assert_eq!(
    1289            1 :             tline
    1290            1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false, &ctx)
    1291            0 :                 .await?,
    1292            1 :             TEST_IMG("foo blk 0 at 3")
    1293              :         );
    1294            1 :         assert_eq!(
    1295            1 :             tline
    1296            1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false, &ctx)
    1297            0 :                 .await?,
    1298            1 :             TEST_IMG("foo blk 1 at 4")
    1299              :         );
    1300              : 
    1301            1 :         assert_eq!(
    1302            1 :             tline
    1303            1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false, &ctx)
    1304            0 :                 .await?,
    1305            1 :             TEST_IMG("foo blk 0 at 3")
    1306              :         );
    1307            1 :         assert_eq!(
    1308            1 :             tline
    1309            1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false, &ctx)
    1310            0 :                 .await?,
    1311            1 :             TEST_IMG("foo blk 1 at 4")
    1312              :         );
    1313            1 :         assert_eq!(
    1314            1 :             tline
    1315            1 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
    1316            0 :                 .await?,
    1317            1 :             TEST_IMG("foo blk 2 at 5")
    1318              :         );
    1319              : 
    1320              :         // Truncate last block
    1321            1 :         let mut m = tline.begin_modification(Lsn(0x60));
    1322            1 :         walingest
    1323            1 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1324            0 :             .await?;
    1325            1 :         m.commit().await?;
    1326            1 :         assert_current_logical_size(&tline, Lsn(0x60));
    1327              : 
    1328              :         // Check reported size and contents after truncation
    1329            1 :         assert_eq!(
    1330            1 :             tline
    1331            1 :                 .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
    1332            0 :                 .await?,
    1333              :             2
    1334              :         );
    1335            1 :         assert_eq!(
    1336            1 :             tline
    1337            1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false, &ctx)
    1338            0 :                 .await?,
    1339            1 :             TEST_IMG("foo blk 0 at 3")
    1340              :         );
    1341            1 :         assert_eq!(
    1342            1 :             tline
    1343            1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false, &ctx)
    1344            0 :                 .await?,
    1345            1 :             TEST_IMG("foo blk 1 at 4")
    1346              :         );
    1347              : 
    1348              :         // should still see the truncated block with older LSN
    1349            1 :         assert_eq!(
    1350            1 :             tline
    1351            1 :                 .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
    1352            0 :                 .await?,
    1353              :             3
    1354              :         );
    1355            1 :         assert_eq!(
    1356            1 :             tline
    1357            1 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx)
    1358            0 :                 .await?,
    1359            1 :             TEST_IMG("foo blk 2 at 5")
    1360              :         );
    1361              : 
    1362              :         // Truncate to zero length
    1363            1 :         let mut m = tline.begin_modification(Lsn(0x68));
    1364            1 :         walingest
    1365            1 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1366            0 :             .await?;
    1367            1 :         m.commit().await?;
    1368            1 :         assert_eq!(
    1369            1 :             tline
    1370            1 :                 .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
    1371            0 :                 .await?,
    1372              :             0
    1373              :         );
    1374              : 
    1375              :         // Extend from 0 to 2 blocks, leaving a gap
    1376            1 :         let mut m = tline.begin_modification(Lsn(0x70));
    1377            1 :         walingest
    1378            1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
    1379            0 :             .await?;
    1380            1 :         m.commit().await?;
    1381            1 :         assert_eq!(
    1382            1 :             tline
    1383            1 :                 .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
    1384            0 :                 .await?,
    1385              :             2
    1386              :         );
    1387            1 :         assert_eq!(
    1388            1 :             tline
    1389            1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false, &ctx)
    1390            0 :                 .await?,
    1391            1 :             ZERO_PAGE
    1392              :         );
    1393            1 :         assert_eq!(
    1394            1 :             tline
    1395            1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false, &ctx)
    1396            0 :                 .await?,
    1397            1 :             TEST_IMG("foo blk 1")
    1398              :         );
    1399              : 
    1400              :         // Extend a lot more, leaving a big gap that spans across segments
    1401            1 :         let mut m = tline.begin_modification(Lsn(0x80));
    1402            1 :         walingest
    1403            1 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
    1404            0 :             .await?;
    1405           24 :         m.commit().await?;
    1406            1 :         assert_eq!(
    1407            1 :             tline
    1408            1 :                 .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
    1409            0 :                 .await?,
    1410              :             1501
    1411              :         );
    1412         1499 :         for blk in 2..1500 {
    1413         1498 :             assert_eq!(
    1414         1498 :                 tline
    1415         1498 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false, &ctx)
    1416           47 :                     .await?,
    1417         1498 :                 ZERO_PAGE
    1418              :             );
    1419              :         }
    1420            1 :         assert_eq!(
    1421            1 :             tline
    1422            1 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false, &ctx)
    1423            0 :                 .await?,
    1424            1 :             TEST_IMG("foo blk 1500")
    1425              :         );
    1426              : 
    1427            1 :         Ok(())
    1428              :     }
    1429              : 
    1430              :     // Test what happens if we dropped a relation
    1431              :     // and then created it again within the same layer.
    1432            1 :     #[tokio::test]
    1433            1 :     async fn test_drop_extend() -> Result<()> {
    1434            1 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
    1435            1 :         let tline = tenant
    1436            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1437            2 :             .await?;
    1438            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1439              : 
    1440            1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1441            1 :         walingest
    1442            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1443            0 :             .await?;
    1444            1 :         m.commit().await?;
    1445              : 
    1446              :         // Check that rel exists and size is correct
    1447            1 :         assert_eq!(
    1448            1 :             tline
    1449            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
    1450            0 :                 .await?,
    1451              :             true
    1452              :         );
    1453            1 :         assert_eq!(
    1454            1 :             tline
    1455            1 :                 .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
    1456            0 :                 .await?,
    1457              :             1
    1458              :         );
    1459              : 
    1460              :         // Drop rel
    1461            1 :         let mut m = tline.begin_modification(Lsn(0x30));
    1462            1 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    1463            1 :         m.commit().await?;
    1464              : 
    1465              :         // Check that rel is not visible anymore
    1466            1 :         assert_eq!(
    1467            1 :             tline
    1468            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x30), false, &ctx)
    1469            0 :                 .await?,
    1470              :             false
    1471              :         );
    1472              : 
    1473              :         // FIXME: should fail
    1474              :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    1475              : 
    1476              :         // Re-create it
    1477            1 :         let mut m = tline.begin_modification(Lsn(0x40));
    1478            1 :         walingest
    1479            1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
    1480            0 :             .await?;
    1481            1 :         m.commit().await?;
    1482              : 
    1483              :         // Check that rel exists and size is correct
    1484            1 :         assert_eq!(
    1485            1 :             tline
    1486            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x40), false, &ctx)
    1487            0 :                 .await?,
    1488              :             true
    1489              :         );
    1490            1 :         assert_eq!(
    1491            1 :             tline
    1492            1 :                 .get_rel_size(TESTREL_A, Lsn(0x40), false, &ctx)
    1493            0 :                 .await?,
    1494              :             1
    1495              :         );
    1496              : 
    1497            1 :         Ok(())
    1498              :     }
    1499              : 
    1500              :     // Test what happens if we truncated a relation
    1501              :     // so that one of its segments was dropped
    1502              :     // and then extended it again within the same layer.
    1503            1 :     #[tokio::test]
    1504            1 :     async fn test_truncate_extend() -> Result<()> {
    1505            1 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
    1506            1 :         let tline = tenant
    1507            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1508            2 :             .await?;
    1509            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1510              : 
    1511              :         // Create a 20 MB relation (the size is arbitrary)
    1512            1 :         let relsize = 20 * 1024 * 1024 / 8192;
    1513            1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1514         2560 :         for blkno in 0..relsize {
    1515         2560 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    1516         2560 :             walingest
    1517         2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    1518            0 :                 .await?;
    1519              :         }
    1520           40 :         m.commit().await?;
    1521              : 
    1522              :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    1523            1 :         assert_eq!(
    1524            1 :             tline
    1525            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx)
    1526            0 :                 .await?,
    1527              :             false
    1528              :         );
    1529            1 :         assert!(tline
    1530            1 :             .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx)
    1531            0 :             .await
    1532            1 :             .is_err());
    1533              : 
    1534            1 :         assert_eq!(
    1535            1 :             tline
    1536            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx)
    1537            0 :                 .await?,
    1538              :             true
    1539              :         );
    1540            1 :         assert_eq!(
    1541            1 :             tline
    1542            1 :                 .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx)
    1543            0 :                 .await?,
    1544              :             relsize
    1545              :         );
    1546              : 
    1547              :         // Check relation content
    1548         2560 :         for blkno in 0..relsize {
    1549         2560 :             let lsn = Lsn(0x20);
    1550         2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1551         2560 :             assert_eq!(
    1552         2560 :                 tline
    1553         2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false, &ctx)
    1554           60 :                     .await?,
    1555         2560 :                 TEST_IMG(&data)
    1556              :             );
    1557              :         }
    1558              : 
    1559              :         // Truncate relation so that second segment was dropped
    1560              :         // - only leave one page
    1561            1 :         let mut m = tline.begin_modification(Lsn(0x60));
    1562            1 :         walingest
    1563            1 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    1564            0 :             .await?;
    1565            1 :         m.commit().await?;
    1566              : 
    1567              :         // Check reported size and contents after truncation
    1568            1 :         assert_eq!(
    1569            1 :             tline
    1570            1 :                 .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx)
    1571            0 :                 .await?,
    1572              :             1
    1573              :         );
    1574              : 
    1575            2 :         for blkno in 0..1 {
    1576            1 :             let lsn = Lsn(0x20);
    1577            1 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1578            1 :             assert_eq!(
    1579            1 :                 tline
    1580            1 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false, &ctx)
    1581            0 :                     .await?,
    1582            1 :                 TEST_IMG(&data)
    1583              :             );
    1584              :         }
    1585              : 
    1586              :         // should still see all blocks with older LSN
    1587            1 :         assert_eq!(
    1588            1 :             tline
    1589            1 :                 .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx)
    1590            0 :                 .await?,
    1591              :             relsize
    1592              :         );
    1593         2560 :         for blkno in 0..relsize {
    1594         2560 :             let lsn = Lsn(0x20);
    1595         2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1596         2560 :             assert_eq!(
    1597         2560 :                 tline
    1598         2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false, &ctx)
    1599          122 :                     .await?,
    1600         2560 :                 TEST_IMG(&data)
    1601              :             );
    1602              :         }
    1603              : 
    1604              :         // Extend relation again.
    1605              :         // Add enough blocks to create second segment
    1606            1 :         let lsn = Lsn(0x80);
    1607            1 :         let mut m = tline.begin_modification(lsn);
    1608         2560 :         for blkno in 0..relsize {
    1609         2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1610         2560 :             walingest
    1611         2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    1612            0 :                 .await?;
    1613              :         }
    1614           40 :         m.commit().await?;
    1615              : 
    1616            1 :         assert_eq!(
    1617            1 :             tline
    1618            1 :                 .get_rel_exists(TESTREL_A, Lsn(0x80), false, &ctx)
    1619            0 :                 .await?,
    1620              :             true
    1621              :         );
    1622            1 :         assert_eq!(
    1623            1 :             tline
    1624            1 :                 .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
    1625            0 :                 .await?,
    1626              :             relsize
    1627              :         );
    1628              :         // Check relation content
    1629         2560 :         for blkno in 0..relsize {
    1630         2560 :             let lsn = Lsn(0x80);
    1631         2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1632         2560 :             assert_eq!(
    1633         2560 :                 tline
    1634         2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false, &ctx)
    1635           60 :                     .await?,
    1636         2560 :                 TEST_IMG(&data)
    1637              :             );
    1638              :         }
    1639              : 
    1640            1 :         Ok(())
    1641              :     }
    1642              : 
    1643              :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    1644              :     /// split into multiple 1 GB segments in Postgres.
    1645            1 :     #[tokio::test]
    1646            1 :     async fn test_large_rel() -> Result<()> {
    1647            1 :         let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
    1648            1 :         let tline = tenant
    1649            1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1650            2 :             .await?;
    1651            1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1652              : 
    1653            1 :         let mut lsn = 0x10;
    1654       131073 :         for blknum in 0..RELSEG_SIZE + 1 {
    1655       131073 :             lsn += 0x10;
    1656       131073 :             let mut m = tline.begin_modification(Lsn(lsn));
    1657       131073 :             let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    1658       131073 :             walingest
    1659       131073 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    1660         2047 :                 .await?;
    1661       131073 :             m.commit().await?;
    1662              :         }
    1663              : 
    1664            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    1665              : 
    1666            1 :         assert_eq!(
    1667            1 :             tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    1668            1 :             RELSEG_SIZE + 1
    1669              :         );
    1670              : 
    1671              :         // Truncate one block
    1672            1 :         lsn += 0x10;
    1673            1 :         let mut m = tline.begin_modification(Lsn(lsn));
    1674            1 :         walingest
    1675            1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    1676            0 :             .await?;
    1677            1 :         m.commit().await?;
    1678            1 :         assert_eq!(
    1679            1 :             tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    1680              :             RELSEG_SIZE
    1681              :         );
    1682            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    1683            1 : 
    1684            1 :         // Truncate another block
    1685            1 :         lsn += 0x10;
    1686            1 :         let mut m = tline.begin_modification(Lsn(lsn));
    1687            1 :         walingest
    1688            1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    1689            0 :             .await?;
    1690            1 :         m.commit().await?;
    1691            1 :         assert_eq!(
    1692            1 :             tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    1693            1 :             RELSEG_SIZE - 1
    1694              :         );
    1695            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    1696            1 : 
    1697            1 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    1698            1 :         // This tests the behavior at segment boundaries
    1699            1 :         let mut size: i32 = 3000;
    1700         3002 :         while size >= 0 {
    1701         3001 :             lsn += 0x10;
    1702         3001 :             let mut m = tline.begin_modification(Lsn(lsn));
    1703         3001 :             walingest
    1704         3001 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    1705           47 :                 .await?;
    1706         3001 :             m.commit().await?;
    1707         3001 :             assert_eq!(
    1708         3001 :                 tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
    1709         3001 :                 size as BlockNumber
    1710              :             );
    1711              : 
    1712         3001 :             size -= 1;
    1713              :         }
    1714            1 :         assert_current_logical_size(&tline, Lsn(lsn));
    1715            1 : 
    1716            1 :         Ok(())
    1717              :     }
    1718              : }
        

Generated by: LCOV version 2.1-beta