LCOV - differential code coverage report
Current view: top level - pageserver/src - walingest.rs (source / functions) Coverage Total Hit LBC UBC GBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 74.6 % 1691 1261 2 428 2 1259
Current Date: 2024-01-09 02:06:09 Functions: 68.3 % 82 56 26 56
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Parse PostgreSQL WAL records and store them in a neon Timeline.
       3                 : //!
       4                 : //! The pipeline for ingesting WAL looks like this:
       5                 : //!
       6                 : //! WAL receiver  ->   WalIngest  ->   Repository
       7                 : //!
       8                 : //! The WAL receiver receives a stream of WAL from the WAL safekeepers,
       9                 : //! and decodes it to individual WAL records. It feeds the WAL records
      10                 : //! to WalIngest, which parses them and stores them in the Repository.
      11                 : //!
      12                 : //! The neon Repository can store page versions in two formats: as
      13                 : //! page images, or a WAL records. WalIngest::ingest_record() extracts
      14                 : //! page images out of some WAL records, but most it stores as WAL
      15                 : //! records. If a WAL record modifies multiple pages, WalIngest
      16                 : //! will call Repository::put_wal_record or put_page_image functions
      17                 : //! separately for each modified page.
      18                 : //!
      19                 : //! To reconstruct a page using a WAL record, the Repository calls the
      20                 : //! code in walredo.rs. walredo.rs passes most WAL records to the WAL
      21                 : //! redo Postgres process, but some records it can handle directly with
      22                 : //! bespoken Rust code.
      23                 : 
      24                 : use pageserver_api::shard::ShardIdentity;
      25                 : use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
      26                 : use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
      27                 : use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
      28                 : 
      29                 : use anyhow::{bail, Context, Result};
      30                 : use bytes::{Buf, Bytes, BytesMut};
      31                 : use tracing::*;
      32                 : use utils::failpoint_support;
      33                 : 
      34                 : use crate::context::RequestContext;
      35                 : use crate::metrics::WAL_INGEST;
      36                 : use crate::pgdatadir_mapping::*;
      37                 : use crate::tenant::PageReconstructError;
      38                 : use crate::tenant::Timeline;
      39                 : use crate::walrecord::*;
      40                 : use crate::ZERO_PAGE;
      41                 : use pageserver_api::reltag::{RelTag, SlruKind};
      42                 : use postgres_ffi::pg_constants;
      43                 : use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
      44                 : use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
      45                 : use postgres_ffi::v14::xlog_utils::*;
      46                 : use postgres_ffi::v14::CheckPoint;
      47                 : use postgres_ffi::TransactionId;
      48                 : use postgres_ffi::BLCKSZ;
      49                 : use utils::lsn::Lsn;
      50                 : 
      51                 : pub struct WalIngest {
      52                 :     shard: ShardIdentity,
      53                 :     checkpoint: CheckPoint,
      54                 :     checkpoint_modified: bool,
      55                 : }
      56                 : 
      57                 : impl WalIngest {
      58 CBC        1241 :     pub async fn new(
      59            1241 :         timeline: &Timeline,
      60            1241 :         startpoint: Lsn,
      61            1241 :         ctx: &RequestContext,
      62            1241 :     ) -> anyhow::Result<WalIngest> {
      63                 :         // Fetch the latest checkpoint into memory, so that we can compare with it
      64                 :         // quickly in `ingest_record` and update it when it changes.
      65            1241 :         let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
      66            1232 :         let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
      67 UBC           0 :         trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
      68                 : 
      69 CBC        1232 :         Ok(WalIngest {
      70            1232 :             shard: *timeline.get_shard_identity(),
      71            1232 :             checkpoint,
      72            1232 :             checkpoint_modified: false,
      73            1232 :         })
      74            1241 :     }
      75                 : 
      76                 :     ///
      77                 :     /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
      78                 :     ///
      79                 :     /// This function updates `lsn` field of `DatadirModification`
      80                 :     ///
      81                 :     /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
      82                 :     /// relations/pages that the record affects.
      83                 :     ///
      84                 :     /// This function returns `true` if the record was ingested, and `false` if it was filtered out
      85                 :     ///
      86        47422616 :     pub async fn ingest_record(
      87        47422616 :         &mut self,
      88        47422616 :         recdata: Bytes,
      89        47422616 :         lsn: Lsn,
      90        47422616 :         modification: &mut DatadirModification<'_>,
      91        47422616 :         decoded: &mut DecodedWALRecord,
      92        47422616 :         ctx: &RequestContext,
      93        47422616 :     ) -> anyhow::Result<bool> {
      94        47422616 :         WAL_INGEST.records_received.inc();
      95        47422616 :         let pg_version = modification.tline.pg_version;
      96        47422616 :         let prev_len = modification.len();
      97        47422616 : 
      98        47422616 :         modification.set_lsn(lsn)?;
      99        47422616 :         decode_wal_record(recdata, decoded, pg_version)?;
     100                 : 
     101        47422616 :         let mut buf = decoded.record.clone();
     102        47422616 :         buf.advance(decoded.main_data_offset);
     103        47422616 : 
     104        47422616 :         assert!(!self.checkpoint_modified);
     105        47422616 :         if self.checkpoint.update_next_xid(decoded.xl_xid) {
     106            3403 :             self.checkpoint_modified = true;
     107        47419213 :         }
     108                 : 
     109        47422616 :         match decoded.xl_rmid {
     110                 :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
     111                 :                 // Heap AM records need some special handling, because they modify VM pages
     112                 :                 // without registering them with the standard mechanism.
     113        37638804 :                 self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
     114               7 :                     .await?;
     115                 :             }
     116                 :             pg_constants::RM_NEON_ID => {
     117 UBC           0 :                 self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
     118               0 :                     .await?;
     119                 :             }
     120                 :             // Handle other special record types
     121                 :             pg_constants::RM_SMGR_ID => {
     122 CBC       21472 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     123           21472 : 
     124           21472 :                 if info == pg_constants::XLOG_SMGR_CREATE {
     125           21428 :                     let create = XlSmgrCreate::decode(&mut buf);
     126           21428 :                     self.ingest_xlog_smgr_create(modification, &create, ctx)
     127             656 :                         .await?;
     128              44 :                 } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     129              44 :                     let truncate = XlSmgrTruncate::decode(&mut buf);
     130              44 :                     self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
     131 GBC           1 :                         .await?;
     132 UBC           0 :                 }
     133                 :             }
     134                 :             pg_constants::RM_DBASE_ID => {
     135 CBC          12 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     136 UBC           0 :                 debug!(%info, %pg_version, "handle RM_DBASE_ID");
     137                 : 
     138 CBC          12 :                 if pg_version == 14 {
     139              12 :                     if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     140               9 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     141 UBC           0 :                         debug!("XLOG_DBASE_CREATE v14");
     142                 : 
     143 CBC           9 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     144             926 :                             .await?;
     145               3 :                     } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     146               3 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     147               6 :                         for tablespace_id in dropdb.tablespace_ids {
     148 UBC           0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     149 CBC           3 :                             modification
     150               3 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     151 UBC           0 :                                 .await?;
     152                 :                         }
     153               0 :                     }
     154               0 :                 } else if pg_version == 15 {
     155               0 :                     if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     156               0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     157               0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     158                 :                         // The XLOG record was renamed between v14 and v15,
     159                 :                         // but the record format is the same.
     160                 :                         // So we can reuse XlCreateDatabase here.
     161               0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     162               0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     163               0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     164               0 :                             .await?;
     165               0 :                     } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     166               0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     167               0 :                         for tablespace_id in dropdb.tablespace_ids {
     168               0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     169               0 :                             modification
     170               0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     171               0 :                                 .await?;
     172                 :                         }
     173               0 :                     }
     174               0 :                 } else if pg_version == 16 {
     175               0 :                     if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     176               0 :                         debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     177               0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     178                 :                         // The XLOG record was renamed between v14 and v15,
     179                 :                         // but the record format is the same.
     180                 :                         // So we can reuse XlCreateDatabase here.
     181               0 :                         debug!("XLOG_DBASE_CREATE_FILE_COPY");
     182               0 :                         let createdb = XlCreateDatabase::decode(&mut buf);
     183               0 :                         self.ingest_xlog_dbase_create(modification, &createdb, ctx)
     184               0 :                             .await?;
     185               0 :                     } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     186               0 :                         let dropdb = XlDropDatabase::decode(&mut buf);
     187               0 :                         for tablespace_id in dropdb.tablespace_ids {
     188               0 :                             trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
     189               0 :                             modification
     190               0 :                                 .drop_dbdir(tablespace_id, dropdb.db_id, ctx)
     191               0 :                                 .await?;
     192                 :                         }
     193               0 :                     }
     194               0 :                 }
     195                 :             }
     196                 :             pg_constants::RM_TBLSPC_ID => {
     197               0 :                 trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
     198                 :             }
     199                 :             pg_constants::RM_CLOG_ID => {
     200 CBC         360 :                 let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     201             360 : 
     202             360 :                 if info == pg_constants::CLOG_ZEROPAGE {
     203             359 :                     let pageno = buf.get_u32_le();
     204             359 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     205             359 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     206             359 :                     self.put_slru_page_image(
     207             359 :                         modification,
     208             359 :                         SlruKind::Clog,
     209             359 :                         segno,
     210             359 :                         rpageno,
     211             359 :                         ZERO_PAGE.clone(),
     212             359 :                         ctx,
     213             359 :                     )
     214               7 :                     .await?;
     215                 :                 } else {
     216               1 :                     assert!(info == pg_constants::CLOG_TRUNCATE);
     217               1 :                     let xlrec = XlClogTruncate::decode(&mut buf);
     218               1 :                     self.ingest_clog_truncate_record(modification, &xlrec, ctx)
     219 UBC           0 :                         .await?;
     220                 :                 }
     221                 :             }
     222                 :             pg_constants::RM_XACT_ID => {
     223 CBC     1910750 :                 let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     224         1910750 : 
     225         1910750 :                 if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
     226         1801602 :                     let parsed_xact =
     227         1801602 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     228         1801602 :                     self.ingest_xact_record(
     229         1801602 :                         modification,
     230         1801602 :                         &parsed_xact,
     231         1801602 :                         info == pg_constants::XLOG_XACT_COMMIT,
     232         1801602 :                         ctx,
     233         1801602 :                     )
     234             785 :                     .await?;
     235          109148 :                 } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
     236          109147 :                     || info == pg_constants::XLOG_XACT_ABORT_PREPARED
     237                 :                 {
     238               2 :                     let parsed_xact =
     239               2 :                         XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
     240               2 :                     self.ingest_xact_record(
     241               2 :                         modification,
     242               2 :                         &parsed_xact,
     243               2 :                         info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
     244               2 :                         ctx,
     245               2 :                     )
     246 UBC           0 :                     .await?;
     247                 :                     // Remove twophase file. see RemoveTwoPhaseFile() in postgres code
     248               0 :                     trace!(
     249               0 :                         "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
     250               0 :                         decoded.xl_xid,
     251               0 :                         parsed_xact.xid,
     252               0 :                         lsn,
     253               0 :                     );
     254 CBC           2 :                     modification
     255               2 :                         .drop_twophase_file(parsed_xact.xid, ctx)
     256 UBC           0 :                         .await?;
     257 CBC      109146 :                 } else if info == pg_constants::XLOG_XACT_PREPARE {
     258               4 :                     modification
     259               4 :                         .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
     260 UBC           0 :                         .await?;
     261 CBC      109142 :                 }
     262                 :             }
     263                 :             pg_constants::RM_MULTIXACT_ID => {
     264           24332 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     265           24332 : 
     266           24332 :                 if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
     267              14 :                     let pageno = buf.get_u32_le();
     268              14 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     269              14 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     270              14 :                     self.put_slru_page_image(
     271              14 :                         modification,
     272              14 :                         SlruKind::MultiXactOffsets,
     273              14 :                         segno,
     274              14 :                         rpageno,
     275              14 :                         ZERO_PAGE.clone(),
     276              14 :                         ctx,
     277              14 :                     )
     278 UBC           0 :                     .await?;
     279 CBC       24318 :                 } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
     280             291 :                     let pageno = buf.get_u32_le();
     281             291 :                     let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     282             291 :                     let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     283             291 :                     self.put_slru_page_image(
     284             291 :                         modification,
     285             291 :                         SlruKind::MultiXactMembers,
     286             291 :                         segno,
     287             291 :                         rpageno,
     288             291 :                         ZERO_PAGE.clone(),
     289             291 :                         ctx,
     290             291 :                     )
     291 UBC           0 :                     .await?;
     292 CBC       24027 :                 } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     293           24027 :                     let xlrec = XlMultiXactCreate::decode(&mut buf);
     294           24027 :                     self.ingest_multixact_create_record(modification, &xlrec)?;
     295 UBC           0 :                 } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     296               0 :                     let xlrec = XlMultiXactTruncate::decode(&mut buf);
     297               0 :                     self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
     298               0 :                         .await?;
     299               0 :                 }
     300                 :             }
     301                 :             pg_constants::RM_RELMAP_ID => {
     302 CBC          54 :                 let xlrec = XlRelmapUpdate::decode(&mut buf);
     303              54 :                 self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
     304               2 :                     .await?;
     305                 :             }
     306                 :             pg_constants::RM_XLOG_ID => {
     307          129668 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     308          129668 : 
     309          129668 :                 if info == pg_constants::XLOG_NEXTOID {
     310             383 :                     let next_oid = buf.get_u32_le();
     311             383 :                     if self.checkpoint.nextOid != next_oid {
     312             383 :                         self.checkpoint.nextOid = next_oid;
     313             383 :                         self.checkpoint_modified = true;
     314             383 :                     }
     315          129285 :                 } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
     316          129230 :                     || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
     317                 :                 {
     318             579 :                     let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
     319             579 :                     buf.copy_to_slice(&mut checkpoint_bytes);
     320             579 :                     let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
     321 UBC           0 :                     trace!(
     322               0 :                         "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
     323               0 :                         xlog_checkpoint.oldestXid,
     324               0 :                         self.checkpoint.oldestXid
     325               0 :                     );
     326 CBC         579 :                     if (self
     327             579 :                         .checkpoint
     328             579 :                         .oldestXid
     329             579 :                         .wrapping_sub(xlog_checkpoint.oldestXid) as i32)
     330             579 :                         < 0
     331 UBC           0 :                     {
     332               0 :                         self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
     333               0 :                         self.checkpoint_modified = true;
     334 CBC         579 :                     }
     335          128706 :                 }
     336                 :             }
     337                 :             pg_constants::RM_LOGICALMSG_ID => {
     338             116 :                 let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     339             116 : 
     340             116 :                 if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     341             116 :                     let xlrec = XlLogicalMessage::decode(&mut buf);
     342             116 :                     let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     343             116 :                     let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
     344             116 :                     if prefix == "neon-test" {
     345                 :                         // This is a convenient way to make the WAL ingestion pause at
     346                 :                         // particular point in the WAL. For more fine-grained control,
     347                 :                         // we could peek into the message and only pause if it contains
     348                 :                         // a particular string, for example, but this is enough for now.
     349               6 :                         failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
     350             112 :                     } else if let Some(path) = prefix.strip_prefix("neon-file:") {
     351             105 :                         modification.put_file(path, message, ctx).await?;
     352               7 :                     }
     353 UBC           0 :                 }
     354                 :             }
     355 CBC     7697044 :             _x => {
     356         7697044 :                 // TODO: should probably log & fail here instead of blindly
     357         7697044 :                 // doing something without understanding the protocol
     358         7697044 :             }
     359                 :         }
     360                 : 
     361                 :         // Iterate through all the blocks that the record modifies, and
     362                 :         // "put" a separate copy of the record for each block.
     363        47422615 :         for blk in decoded.blocks.iter() {
     364        47001894 :             let rel = RelTag {
     365        47001894 :                 spcnode: blk.rnode_spcnode,
     366        47001894 :                 dbnode: blk.rnode_dbnode,
     367        47001894 :                 relnode: blk.rnode_relnode,
     368        47001894 :                 forknum: blk.forknum,
     369        47001894 :             };
     370        47001894 : 
     371        47001894 :             let key = rel_block_to_key(rel, blk.blkno);
     372        47001894 :             let key_is_local = self.shard.is_key_local(&key);
     373                 : 
     374 UBC           0 :             tracing::debug!(
     375               0 :                 lsn=%lsn,
     376               0 :                 key=%key,
     377               0 :                 "ingest: shard decision {} (checkpoint={})",
     378               0 :                 if !key_is_local { "drop" } else { "keep" },
     379               0 :                 self.checkpoint_modified
     380               0 :             );
     381                 : 
     382 CBC    47001894 :             if !key_is_local {
     383 UBC           0 :                 if self.shard.is_zero() {
     384                 :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     385                 :                     // its blkno in case it implicitly extends a relation.
     386               0 :                     self.observe_decoded_block(modification, blk, ctx).await?;
     387               0 :                 }
     388                 : 
     389               0 :                 continue;
     390 CBC    47001894 :             }
     391        47001894 :             self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
     392            5776 :                 .await?;
     393                 :         }
     394                 : 
     395                 :         // If checkpoint data was updated, store the new version in the repository
     396        47422611 :         if self.checkpoint_modified {
     397           27795 :             let new_checkpoint_bytes = self.checkpoint.encode()?;
     398                 : 
     399           27795 :             modification.put_checkpoint(new_checkpoint_bytes)?;
     400           27795 :             self.checkpoint_modified = false;
     401        47394816 :         }
     402                 : 
     403                 :         // Note that at this point this record is only cached in the modification
     404                 :         // until commit() is called to flush the data into the repository and update
     405                 :         // the latest LSN.
     406                 : 
     407        47422611 :         Ok(modification.len() > prev_len)
     408        47422616 :     }
     409                 : 
     410                 :     /// Do not store this block, but observe it for the purposes of updating our relation size state.
     411 UBC           0 :     async fn observe_decoded_block(
     412               0 :         &mut self,
     413               0 :         modification: &mut DatadirModification<'_>,
     414               0 :         blk: &DecodedBkpBlock,
     415               0 :         ctx: &RequestContext,
     416               0 :     ) -> Result<(), PageReconstructError> {
     417               0 :         let rel = RelTag {
     418               0 :             spcnode: blk.rnode_spcnode,
     419               0 :             dbnode: blk.rnode_dbnode,
     420               0 :             relnode: blk.rnode_relnode,
     421               0 :             forknum: blk.forknum,
     422               0 :         };
     423               0 :         self.handle_rel_extend(modification, rel, blk.blkno, ctx)
     424               0 :             .await
     425               0 :     }
     426                 : 
     427 CBC    47001894 :     async fn ingest_decoded_block(
     428        47001894 :         &mut self,
     429        47001894 :         modification: &mut DatadirModification<'_>,
     430        47001894 :         lsn: Lsn,
     431        47001894 :         decoded: &DecodedWALRecord,
     432        47001894 :         blk: &DecodedBkpBlock,
     433        47001894 :         ctx: &RequestContext,
     434        47001894 :     ) -> Result<(), PageReconstructError> {
     435        47001894 :         let rel = RelTag {
     436        47001894 :             spcnode: blk.rnode_spcnode,
     437        47001894 :             dbnode: blk.rnode_dbnode,
     438        47001894 :             relnode: blk.rnode_relnode,
     439        47001894 :             forknum: blk.forknum,
     440        47001894 :         };
     441        47001894 : 
     442        47001894 :         //
     443        47001894 :         // Instead of storing full-page-image WAL record,
     444        47001894 :         // it is better to store extracted image: we can skip wal-redo
     445        47001894 :         // in this case. Also some FPI records may contain multiple (up to 32) pages,
     446        47001894 :         // so them have to be copied multiple times.
     447        47001894 :         //
     448        47001894 :         if blk.apply_image
     449          148064 :             && blk.has_image
     450          148064 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     451          129995 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     452 UBC           0 :                 || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     453                 :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     454 CBC      129995 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
     455                 :             // do not materialize null pages because them most likely be soon replaced with real data
     456          129995 :             && blk.bimg_len != 0
     457                 :         {
     458                 :             // Extract page image from FPI record
     459          129995 :             let img_len = blk.bimg_len as usize;
     460          129995 :             let img_offs = blk.bimg_offset as usize;
     461          129995 :             let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     462          129995 :             image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     463          129995 : 
     464          129995 :             if blk.hole_length != 0 {
     465          107511 :                 let tail = image.split_off(blk.hole_offset as usize);
     466          107511 :                 image.resize(image.len() + blk.hole_length as usize, 0u8);
     467          107511 :                 image.unsplit(tail);
     468          107511 :             }
     469                 :             //
     470                 :             // Match the logic of XLogReadBufferForRedoExtended:
     471                 :             // The page may be uninitialized. If so, we can't set the LSN because
     472                 :             // that would corrupt the page.
     473                 :             //
     474          129995 :             if !page_is_new(&image) {
     475          125364 :                 page_set_lsn(&mut image, lsn)
     476            4631 :             }
     477          129995 :             assert_eq!(image.len(), BLCKSZ as usize);
     478          129995 :             self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
     479             340 :                 .await?;
     480                 :         } else {
     481        46871899 :             let rec = NeonWalRecord::Postgres {
     482        46871899 :                 will_init: blk.will_init || blk.apply_image,
     483        46871899 :                 rec: decoded.record.clone(),
     484        46871899 :             };
     485        46871899 :             self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
     486            5436 :                 .await?;
     487                 :         }
     488        47001890 :         Ok(())
     489        47001894 :     }
     490                 : 
     491        37638804 :     async fn ingest_heapam_record(
     492        37638804 :         &mut self,
     493        37638804 :         buf: &mut Bytes,
     494        37638804 :         modification: &mut DatadirModification<'_>,
     495        37638804 :         decoded: &DecodedWALRecord,
     496        37638804 :         ctx: &RequestContext,
     497        37638804 :     ) -> anyhow::Result<()> {
     498        37638804 :         // Handle VM bit updates that are implicitly part of heap records.
     499        37638804 : 
     500        37638804 :         // First, look at the record to determine which VM bits need
     501        37638804 :         // to be cleared. If either of these variables is set, we
     502        37638804 :         // need to clear the corresponding bits in the visibility map.
     503        37638804 :         let mut new_heap_blkno: Option<u32> = None;
     504        37638804 :         let mut old_heap_blkno: Option<u32> = None;
     505        37638804 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     506        37638804 : 
     507        37638804 :         match modification.tline.pg_version {
     508                 :             14 => {
     509        37566067 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     510        35337961 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     511        35337961 : 
     512        35337961 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     513        29256546 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     514        29256546 :                         assert_eq!(0, buf.remaining());
     515        29256546 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     516            2562 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     517        29253984 :                         }
     518         6081415 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     519          535744 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     520          535744 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     521             730 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     522          535014 :                         }
     523         5545671 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     524         4075052 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     525                 :                     {
     526         2801135 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     527         2801135 :                         // the size of tuple data is inferred from the size of the record.
     528         2801135 :                         // we can't validate the remaining number of bytes without parsing
     529         2801135 :                         // the tuple data.
     530         2801135 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     531           83421 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     532         2717714 :                         }
     533         2801135 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     534            1701 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     535            1701 :                             // non-HOT update where the new tuple goes to different page than
     536            1701 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     537            1701 :                             // set.
     538            1701 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     539         2799434 :                         }
     540         2744536 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     541         2688546 :                         let xlrec = v14::XlHeapLock::decode(buf);
     542         2688546 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     543              53 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     544              53 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     545         2688493 :                         }
     546           55990 :                     }
     547         2228106 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     548         2228106 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     549         2228106 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     550          780174 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     551                 : 
     552          780174 :                         let offset_array_len =
     553          780174 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     554                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     555          629382 :                                 0
     556                 :                             } else {
     557          150792 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     558                 :                             };
     559          780174 :                         assert_eq!(offset_array_len, buf.remaining());
     560                 : 
     561          780174 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     562            1852 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     563          778322 :                         }
     564         1447932 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     565            3898 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     566            3898 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     567 UBC           0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     568               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     569 CBC        3898 :                         }
     570         1444034 :                     }
     571                 :                 } else {
     572 UBC           0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     573                 :                 }
     574                 :             }
     575                 :             15 => {
     576 CBC       72737 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     577           72643 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     578           72643 : 
     579           72643 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     580           72638 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     581           72638 :                         assert_eq!(0, buf.remaining());
     582           72638 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     583               2 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     584           72636 :                         }
     585               5 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     586 UBC           0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     587               0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     588               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     589               0 :                         }
     590 CBC           5 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     591               1 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     592                 :                     {
     593               4 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     594               4 :                         // the size of tuple data is inferred from the size of the record.
     595               4 :                         // we can't validate the remaining number of bytes without parsing
     596               4 :                         // the tuple data.
     597               4 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     598 UBC           0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     599 CBC           4 :                         }
     600               4 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     601 UBC           0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     602               0 :                             // non-HOT update where the new tuple goes to different page than
     603               0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     604               0 :                             // set.
     605               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     606 CBC           4 :                         }
     607               1 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     608 UBC           0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     609               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     610               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     611               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     612               0 :                         }
     613 CBC           1 :                     }
     614              94 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     615              94 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     616              94 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     617              21 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     618                 : 
     619              21 :                         let offset_array_len =
     620              21 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     621                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     622               1 :                                 0
     623                 :                             } else {
     624              20 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     625                 :                             };
     626              21 :                         assert_eq!(offset_array_len, buf.remaining());
     627                 : 
     628              21 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     629               4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     630              17 :                         }
     631              73 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     632 UBC           0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     633               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     634               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     635               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     636               0 :                         }
     637 CBC          73 :                     }
     638                 :                 } else {
     639 UBC           0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     640                 :                 }
     641                 :             }
     642                 :             16 => {
     643               0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     644               0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     645               0 : 
     646               0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     647               0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     648               0 :                         assert_eq!(0, buf.remaining());
     649               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     650               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     651               0 :                         }
     652               0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     653               0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     654               0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     655               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     656               0 :                         }
     657               0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     658               0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     659                 :                     {
     660               0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     661               0 :                         // the size of tuple data is inferred from the size of the record.
     662               0 :                         // we can't validate the remaining number of bytes without parsing
     663               0 :                         // the tuple data.
     664               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     665               0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     666               0 :                         }
     667               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     668               0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     669               0 :                             // non-HOT update where the new tuple goes to different page than
     670               0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     671               0 :                             // set.
     672               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     673               0 :                         }
     674               0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     675               0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     676               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     677               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     678               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     679               0 :                         }
     680               0 :                     }
     681               0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     682               0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     683               0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     684               0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     685                 : 
     686               0 :                         let offset_array_len =
     687               0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     688                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     689               0 :                                 0
     690                 :                             } else {
     691               0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     692                 :                             };
     693               0 :                         assert_eq!(offset_array_len, buf.remaining());
     694                 : 
     695               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     696               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     697               0 :                         }
     698               0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     699               0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     700               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     701               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     702               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     703               0 :                         }
     704               0 :                     }
     705                 :                 } else {
     706               0 :                     bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     707                 :                 }
     708                 :             }
     709               0 :             _ => {}
     710                 :         }
     711                 : 
     712                 :         // Clear the VM bits if required.
     713 CBC    37638804 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     714           90274 :             let vm_rel = RelTag {
     715           90274 :                 forknum: VISIBILITYMAP_FORKNUM,
     716           90274 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     717           90274 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     718           90274 :                 relnode: decoded.blocks[0].rnode_relnode,
     719           90274 :             };
     720           90274 : 
     721           90274 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     722           90274 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     723                 : 
     724                 :             // Sometimes, Postgres seems to create heap WAL records with the
     725                 :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     726                 :             // not set. In fact, it's possible that the VM page does not exist at all.
     727                 :             // In that case, we don't want to store a record to clear the VM bit;
     728                 :             // replaying it would fail to find the previous image of the page, because
     729                 :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     730                 :             // record if it doesn't.
     731           90274 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     732           90274 :             if let Some(blknum) = new_vm_blk {
     733            6851 :                 if blknum >= vm_size {
     734 UBC           0 :                     new_vm_blk = None;
     735 CBC        6851 :                 }
     736           83423 :             }
     737           90274 :             if let Some(blknum) = old_vm_blk {
     738           83474 :                 if blknum >= vm_size {
     739 UBC           0 :                     old_vm_blk = None;
     740 CBC       83474 :                 }
     741            6800 :             }
     742                 : 
     743           90274 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     744           90274 :                 if new_vm_blk == old_vm_blk {
     745                 :                     // An UPDATE record that needs to clear the bits for both old and the
     746                 :                     // new page, both of which reside on the same VM page.
     747              51 :                     self.put_rel_wal_record(
     748              51 :                         modification,
     749              51 :                         vm_rel,
     750              51 :                         new_vm_blk.unwrap(),
     751              51 :                         NeonWalRecord::ClearVisibilityMapFlags {
     752              51 :                             new_heap_blkno,
     753              51 :                             old_heap_blkno,
     754              51 :                             flags,
     755              51 :                         },
     756              51 :                         ctx,
     757              51 :                     )
     758 UBC           0 :                     .await?;
     759                 :                 } else {
     760                 :                     // Clear VM bits for one heap page, or for two pages that reside on
     761                 :                     // different VM pages.
     762 CBC       90223 :                     if let Some(new_vm_blk) = new_vm_blk {
     763            6800 :                         self.put_rel_wal_record(
     764            6800 :                             modification,
     765            6800 :                             vm_rel,
     766            6800 :                             new_vm_blk,
     767            6800 :                             NeonWalRecord::ClearVisibilityMapFlags {
     768            6800 :                                 new_heap_blkno,
     769            6800 :                                 old_heap_blkno: None,
     770            6800 :                                 flags,
     771            6800 :                             },
     772            6800 :                             ctx,
     773            6800 :                         )
     774 UBC           0 :                         .await?;
     775 CBC       83423 :                     }
     776           90223 :                     if let Some(old_vm_blk) = old_vm_blk {
     777           83423 :                         self.put_rel_wal_record(
     778           83423 :                             modification,
     779           83423 :                             vm_rel,
     780           83423 :                             old_vm_blk,
     781           83423 :                             NeonWalRecord::ClearVisibilityMapFlags {
     782           83423 :                                 new_heap_blkno: None,
     783           83423 :                                 old_heap_blkno,
     784           83423 :                                 flags,
     785           83423 :                             },
     786           83423 :                             ctx,
     787           83423 :                         )
     788 UBC           0 :                         .await?;
     789 CBC        6800 :                     }
     790                 :                 }
     791 UBC           0 :             }
     792 CBC    37548530 :         }
     793                 : 
     794        37638804 :         Ok(())
     795        37638804 :     }
     796                 : 
     797 UBC           0 :     async fn ingest_neonrmgr_record(
     798               0 :         &mut self,
     799               0 :         buf: &mut Bytes,
     800               0 :         modification: &mut DatadirModification<'_>,
     801               0 :         decoded: &DecodedWALRecord,
     802               0 :         ctx: &RequestContext,
     803               0 :     ) -> anyhow::Result<()> {
     804               0 :         // Handle VM bit updates that are implicitly part of heap records.
     805               0 : 
     806               0 :         // First, look at the record to determine which VM bits need
     807               0 :         // to be cleared. If either of these variables is set, we
     808               0 :         // need to clear the corresponding bits in the visibility map.
     809               0 :         let mut new_heap_blkno: Option<u32> = None;
     810               0 :         let mut old_heap_blkno: Option<u32> = None;
     811               0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     812               0 :         let pg_version = modification.tline.pg_version;
     813               0 : 
     814               0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     815                 : 
     816               0 :         match pg_version {
     817                 :             16 => {
     818               0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     819               0 : 
     820               0 :                 match info {
     821                 :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     822               0 :                         let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
     823               0 :                         assert_eq!(0, buf.remaining());
     824               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     825               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     826               0 :                         }
     827                 :                     }
     828                 :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     829               0 :                         let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
     830               0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     831               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     832               0 :                         }
     833                 :                     }
     834                 :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     835                 :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     836               0 :                         let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
     837               0 :                         // the size of tuple data is inferred from the size of the record.
     838               0 :                         // we can't validate the remaining number of bytes without parsing
     839               0 :                         // the tuple data.
     840               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     841               0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     842               0 :                         }
     843               0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     844               0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     845               0 :                             // non-HOT update where the new tuple goes to different page than
     846               0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     847               0 :                             // set.
     848               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     849               0 :                         }
     850                 :                     }
     851                 :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     852               0 :                         let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     853                 : 
     854               0 :                         let offset_array_len =
     855               0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     856                 :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     857               0 :                                 0
     858                 :                             } else {
     859               0 :                                 std::mem::size_of::<u16>() * xlrec.ntuples as usize
     860                 :                             };
     861               0 :                         assert_eq!(offset_array_len, buf.remaining());
     862                 : 
     863               0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     864               0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     865               0 :                         }
     866                 :                     }
     867                 :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     868               0 :                         let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
     869               0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     870               0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     871               0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     872               0 :                         }
     873                 :                     }
     874               0 :                     info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
     875                 :                 }
     876                 :             }
     877               0 :             _ => bail!(
     878               0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     879               0 :                 pg_version
     880               0 :             ),
     881                 :         }
     882                 : 
     883                 :         // Clear the VM bits if required.
     884               0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     885               0 :             let vm_rel = RelTag {
     886               0 :                 forknum: VISIBILITYMAP_FORKNUM,
     887               0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     888               0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     889               0 :                 relnode: decoded.blocks[0].rnode_relnode,
     890               0 :             };
     891               0 : 
     892               0 :             let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     893               0 :             let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
     894                 : 
     895                 :             // Sometimes, Postgres seems to create heap WAL records with the
     896                 :             // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
     897                 :             // not set. In fact, it's possible that the VM page does not exist at all.
     898                 :             // In that case, we don't want to store a record to clear the VM bit;
     899                 :             // replaying it would fail to find the previous image of the page, because
     900                 :             // it doesn't exist. So check if the VM page(s) exist, and skip the WAL
     901                 :             // record if it doesn't.
     902               0 :             let vm_size = get_relsize(modification, vm_rel, ctx).await?;
     903               0 :             if let Some(blknum) = new_vm_blk {
     904               0 :                 if blknum >= vm_size {
     905               0 :                     new_vm_blk = None;
     906               0 :                 }
     907               0 :             }
     908               0 :             if let Some(blknum) = old_vm_blk {
     909               0 :                 if blknum >= vm_size {
     910               0 :                     old_vm_blk = None;
     911               0 :                 }
     912               0 :             }
     913                 : 
     914               0 :             if new_vm_blk.is_some() || old_vm_blk.is_some() {
     915               0 :                 if new_vm_blk == old_vm_blk {
     916                 :                     // An UPDATE record that needs to clear the bits for both old and the
     917                 :                     // new page, both of which reside on the same VM page.
     918               0 :                     self.put_rel_wal_record(
     919               0 :                         modification,
     920               0 :                         vm_rel,
     921               0 :                         new_vm_blk.unwrap(),
     922               0 :                         NeonWalRecord::ClearVisibilityMapFlags {
     923               0 :                             new_heap_blkno,
     924               0 :                             old_heap_blkno,
     925               0 :                             flags,
     926               0 :                         },
     927               0 :                         ctx,
     928               0 :                     )
     929               0 :                     .await?;
     930                 :                 } else {
     931                 :                     // Clear VM bits for one heap page, or for two pages that reside on
     932                 :                     // different VM pages.
     933               0 :                     if let Some(new_vm_blk) = new_vm_blk {
     934               0 :                         self.put_rel_wal_record(
     935               0 :                             modification,
     936               0 :                             vm_rel,
     937               0 :                             new_vm_blk,
     938               0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     939               0 :                                 new_heap_blkno,
     940               0 :                                 old_heap_blkno: None,
     941               0 :                                 flags,
     942               0 :                             },
     943               0 :                             ctx,
     944               0 :                         )
     945               0 :                         .await?;
     946               0 :                     }
     947               0 :                     if let Some(old_vm_blk) = old_vm_blk {
     948               0 :                         self.put_rel_wal_record(
     949               0 :                             modification,
     950               0 :                             vm_rel,
     951               0 :                             old_vm_blk,
     952               0 :                             NeonWalRecord::ClearVisibilityMapFlags {
     953               0 :                                 new_heap_blkno: None,
     954               0 :                                 old_heap_blkno,
     955               0 :                                 flags,
     956               0 :                             },
     957               0 :                             ctx,
     958               0 :                         )
     959               0 :                         .await?;
     960               0 :                     }
     961                 :                 }
     962               0 :             }
     963               0 :         }
     964                 : 
     965               0 :         Ok(())
     966               0 :     }
     967                 : 
     968                 :     /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
     969 CBC           9 :     async fn ingest_xlog_dbase_create(
     970               9 :         &mut self,
     971               9 :         modification: &mut DatadirModification<'_>,
     972               9 :         rec: &XlCreateDatabase,
     973               9 :         ctx: &RequestContext,
     974               9 :     ) -> anyhow::Result<()> {
     975               9 :         let db_id = rec.db_id;
     976               9 :         let tablespace_id = rec.tablespace_id;
     977               9 :         let src_db_id = rec.src_db_id;
     978               9 :         let src_tablespace_id = rec.src_tablespace_id;
     979                 : 
     980               9 :         let rels = modification
     981               9 :             .tline
     982               9 :             .list_rels(
     983               9 :                 src_tablespace_id,
     984               9 :                 src_db_id,
     985               9 :                 Version::Modified(modification),
     986               9 :                 ctx,
     987               9 :             )
     988 UBC           0 :             .await?;
     989                 : 
     990               0 :         debug!("ingest_xlog_dbase_create: {} rels", rels.len());
     991                 : 
     992                 :         // Copy relfilemap
     993 CBC           9 :         let filemap = modification
     994               9 :             .tline
     995               9 :             .get_relmap_file(
     996               9 :                 src_tablespace_id,
     997               9 :                 src_db_id,
     998               9 :                 Version::Modified(modification),
     999               9 :                 ctx,
    1000               9 :             )
    1001 UBC           0 :             .await?;
    1002 CBC           9 :         modification
    1003               9 :             .put_relmap_file(tablespace_id, db_id, filemap, ctx)
    1004 UBC           0 :             .await?;
    1005                 : 
    1006 CBC           9 :         let mut num_rels_copied = 0;
    1007               9 :         let mut num_blocks_copied = 0;
    1008            2637 :         for src_rel in rels {
    1009            2628 :             assert_eq!(src_rel.spcnode, src_tablespace_id);
    1010            2628 :             assert_eq!(src_rel.dbnode, src_db_id);
    1011                 : 
    1012            2628 :             let nblocks = modification
    1013            2628 :                 .tline
    1014            2628 :                 .get_rel_size(src_rel, Version::Modified(modification), true, ctx)
    1015             211 :                 .await?;
    1016            2628 :             let dst_rel = RelTag {
    1017            2628 :                 spcnode: tablespace_id,
    1018            2628 :                 dbnode: db_id,
    1019            2628 :                 relnode: src_rel.relnode,
    1020            2628 :                 forknum: src_rel.forknum,
    1021            2628 :             };
    1022            2628 : 
    1023            2628 :             modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
    1024                 : 
    1025                 :             // Copy content
    1026 UBC           0 :             debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
    1027 CBC        9099 :             for blknum in 0..nblocks {
    1028 UBC           0 :                 debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
    1029                 : 
    1030 CBC        9099 :                 let content = modification
    1031            9099 :                     .tline
    1032            9099 :                     .get_rel_page_at_lsn(
    1033            9099 :                         src_rel,
    1034            9099 :                         blknum,
    1035            9099 :                         Version::Modified(modification),
    1036            9099 :                         true,
    1037            9099 :                         ctx,
    1038            9099 :                     )
    1039             715 :                     .await?;
    1040            9099 :                 modification.put_rel_page_image(dst_rel, blknum, content)?;
    1041            9099 :                 num_blocks_copied += 1;
    1042                 :             }
    1043                 : 
    1044            2628 :             num_rels_copied += 1;
    1045                 :         }
    1046                 : 
    1047               9 :         info!(
    1048               9 :             "Created database {}/{}, copied {} blocks in {} rels",
    1049               9 :             tablespace_id, db_id, num_blocks_copied, num_rels_copied
    1050               9 :         );
    1051               9 :         Ok(())
    1052               9 :     }
    1053                 : 
    1054           21428 :     async fn ingest_xlog_smgr_create(
    1055           21428 :         &mut self,
    1056           21428 :         modification: &mut DatadirModification<'_>,
    1057           21428 :         rec: &XlSmgrCreate,
    1058           21428 :         ctx: &RequestContext,
    1059           21428 :     ) -> anyhow::Result<()> {
    1060           21428 :         let rel = RelTag {
    1061           21428 :             spcnode: rec.rnode.spcnode,
    1062           21428 :             dbnode: rec.rnode.dbnode,
    1063           21428 :             relnode: rec.rnode.relnode,
    1064           21428 :             forknum: rec.forknum,
    1065           21428 :         };
    1066           21428 :         self.put_rel_creation(modification, rel, ctx).await?;
    1067           21427 :         Ok(())
    1068           21428 :     }
    1069                 : 
    1070                 :     /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
    1071                 :     ///
    1072                 :     /// This is the same logic as in PostgreSQL's smgr_redo() function.
    1073              44 :     async fn ingest_xlog_smgr_truncate(
    1074              44 :         &mut self,
    1075              44 :         modification: &mut DatadirModification<'_>,
    1076              44 :         rec: &XlSmgrTruncate,
    1077              44 :         ctx: &RequestContext,
    1078              44 :     ) -> anyhow::Result<()> {
    1079              44 :         let spcnode = rec.rnode.spcnode;
    1080              44 :         let dbnode = rec.rnode.dbnode;
    1081              44 :         let relnode = rec.rnode.relnode;
    1082              44 : 
    1083              44 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
    1084              44 :             let rel = RelTag {
    1085              44 :                 spcnode,
    1086              44 :                 dbnode,
    1087              44 :                 relnode,
    1088              44 :                 forknum: MAIN_FORKNUM,
    1089              44 :             };
    1090              44 :             self.put_rel_truncation(modification, rel, rec.blkno, ctx)
    1091 GBC           1 :                 .await?;
    1092 UBC           0 :         }
    1093 CBC          44 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
    1094              44 :             let rel = RelTag {
    1095              44 :                 spcnode,
    1096              44 :                 dbnode,
    1097              44 :                 relnode,
    1098              44 :                 forknum: FSM_FORKNUM,
    1099              44 :             };
    1100              44 : 
    1101              44 :             let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
    1102              44 :             let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
    1103              44 :             if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
    1104                 :                 // Tail of last remaining FSM page has to be zeroed.
    1105                 :                 // We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
    1106              21 :                 modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
    1107              21 :                 fsm_physical_page_no += 1;
    1108              23 :             }
    1109              44 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1110              44 :             if nblocks > fsm_physical_page_no {
    1111                 :                 // check if something to do: FSM is larger than truncate position
    1112              21 :                 self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
    1113 UBC           0 :                     .await?;
    1114 CBC          23 :             }
    1115 UBC           0 :         }
    1116 CBC          44 :         if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
    1117              44 :             let rel = RelTag {
    1118              44 :                 spcnode,
    1119              44 :                 dbnode,
    1120              44 :                 relnode,
    1121              44 :                 forknum: VISIBILITYMAP_FORKNUM,
    1122              44 :             };
    1123              44 : 
    1124              44 :             let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
    1125              44 :             if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
    1126                 :                 // Tail of last remaining vm page has to be zeroed.
    1127                 :                 // We are not precise here and instead of digging in VM bitmap format just clear the whole page.
    1128              21 :                 modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
    1129              21 :                 vm_page_no += 1;
    1130              23 :             }
    1131              44 :             let nblocks = get_relsize(modification, rel, ctx).await?;
    1132              44 :             if nblocks > vm_page_no {
    1133                 :                 // check if something to do: VM is larger than truncate position
    1134              21 :                 self.put_rel_truncation(modification, rel, vm_page_no, ctx)
    1135 UBC           0 :                     .await?;
    1136 CBC          23 :             }
    1137 UBC           0 :         }
    1138 CBC          44 :         Ok(())
    1139              44 :     }
    1140                 : 
    1141                 :     /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
    1142                 :     ///
    1143         1801604 :     async fn ingest_xact_record(
    1144         1801604 :         &mut self,
    1145         1801604 :         modification: &mut DatadirModification<'_>,
    1146         1801604 :         parsed: &XlXactParsedRecord,
    1147         1801604 :         is_commit: bool,
    1148         1801604 :         ctx: &RequestContext,
    1149         1801604 :     ) -> anyhow::Result<()> {
    1150         1801604 :         // Record update of CLOG pages
    1151         1801604 :         let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
    1152         1801604 :         let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1153         1801604 :         let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1154         1801604 :         let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
    1155                 : 
    1156         2001653 :         for subxact in &parsed.subxacts {
    1157          200049 :             let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
    1158          200049 :             if subxact_pageno != pageno {
    1159                 :                 // This subxact goes to different page. Write the record
    1160                 :                 // for all the XIDs on the previous page, and continue
    1161                 :                 // accumulating XIDs on this new page.
    1162               6 :                 modification.put_slru_wal_record(
    1163               6 :                     SlruKind::Clog,
    1164               6 :                     segno,
    1165               6 :                     rpageno,
    1166               6 :                     if is_commit {
    1167               6 :                         NeonWalRecord::ClogSetCommitted {
    1168               6 :                             xids: page_xids,
    1169               6 :                             timestamp: parsed.xact_time,
    1170               6 :                         }
    1171                 :                     } else {
    1172 UBC           0 :                         NeonWalRecord::ClogSetAborted { xids: page_xids }
    1173                 :                     },
    1174               0 :                 )?;
    1175 CBC           6 :                 page_xids = Vec::new();
    1176          200043 :             }
    1177          200049 :             pageno = subxact_pageno;
    1178          200049 :             segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1179          200049 :             rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1180          200049 :             page_xids.push(*subxact);
    1181                 :         }
    1182         1801604 :         modification.put_slru_wal_record(
    1183         1801604 :             SlruKind::Clog,
    1184         1801604 :             segno,
    1185         1801604 :             rpageno,
    1186         1801604 :             if is_commit {
    1187         1799812 :                 NeonWalRecord::ClogSetCommitted {
    1188         1799812 :                     xids: page_xids,
    1189         1799812 :                     timestamp: parsed.xact_time,
    1190         1799812 :                 }
    1191                 :             } else {
    1192            1792 :                 NeonWalRecord::ClogSetAborted { xids: page_xids }
    1193                 :             },
    1194 UBC           0 :         )?;
    1195                 : 
    1196 CBC     1818296 :         for xnode in &parsed.xnodes {
    1197           83460 :             for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
    1198           66768 :                 let rel = RelTag {
    1199           66768 :                     forknum,
    1200           66768 :                     spcnode: xnode.spcnode,
    1201           66768 :                     dbnode: xnode.dbnode,
    1202           66768 :                     relnode: xnode.relnode,
    1203           66768 :                 };
    1204           66768 :                 if modification
    1205           66768 :                     .tline
    1206           66768 :                     .get_rel_exists(rel, Version::Modified(modification), true, ctx)
    1207 LBC         (1) :                     .await?
    1208                 :                 {
    1209 CBC       17593 :                     self.put_rel_drop(modification, rel, ctx).await?;
    1210           49175 :                 }
    1211                 :             }
    1212                 :         }
    1213         1801604 :         Ok(())
    1214         1801604 :     }
    1215                 : 
    1216               1 :     async fn ingest_clog_truncate_record(
    1217               1 :         &mut self,
    1218               1 :         modification: &mut DatadirModification<'_>,
    1219               1 :         xlrec: &XlClogTruncate,
    1220               1 :         ctx: &RequestContext,
    1221               1 :     ) -> anyhow::Result<()> {
    1222               1 :         info!(
    1223               1 :             "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
    1224               1 :             xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
    1225               1 :         );
    1226                 : 
    1227                 :         // Here we treat oldestXid and oldestXidDB
    1228                 :         // differently from postgres redo routines.
    1229                 :         // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
    1230                 :         // until checkpoint happens and updates the value.
    1231                 :         // Here we can use the most recent value.
    1232                 :         // It's just an optimization, though and can be deleted.
    1233                 :         // TODO Figure out if there will be any issues with replica.
    1234               1 :         self.checkpoint.oldestXid = xlrec.oldest_xid;
    1235               1 :         self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
    1236               1 :         self.checkpoint_modified = true;
    1237               1 : 
    1238               1 :         // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
    1239               1 : 
    1240               1 :         let latest_page_number =
    1241               1 :             self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
    1242               1 : 
    1243               1 :         // Now delete all segments containing pages between xlrec.pageno
    1244               1 :         // and latest_page_number.
    1245               1 : 
    1246               1 :         // First, make an important safety check:
    1247               1 :         // the current endpoint page must not be eligible for removal.
    1248               1 :         // See SimpleLruTruncate() in slru.c
    1249               1 :         if clogpage_precedes(latest_page_number, xlrec.pageno) {
    1250 UBC           0 :             info!("could not truncate directory pg_xact apparent wraparound");
    1251               0 :             return Ok(());
    1252 CBC           1 :         }
    1253                 : 
    1254                 :         // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
    1255                 :         //
    1256                 :         // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
    1257                 :         // will block waiting for the last valid LSN to advance up to
    1258                 :         // it. So we use the previous record's LSN in the get calls
    1259                 :         // instead.
    1260              10 :         for segno in modification
    1261               1 :             .tline
    1262               1 :             .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
    1263 UBC           0 :             .await?
    1264                 :         {
    1265 CBC          10 :             let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
    1266              10 :             if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
    1267               9 :                 modification
    1268               9 :                     .drop_slru_segment(SlruKind::Clog, segno, ctx)
    1269 UBC           0 :                     .await?;
    1270               0 :                 trace!("Drop CLOG segment {:>04X}", segno);
    1271 CBC           1 :             }
    1272                 :         }
    1273                 : 
    1274               1 :         Ok(())
    1275               1 :     }
    1276                 : 
    1277           24027 :     fn ingest_multixact_create_record(
    1278           24027 :         &mut self,
    1279           24027 :         modification: &mut DatadirModification,
    1280           24027 :         xlrec: &XlMultiXactCreate,
    1281           24027 :     ) -> Result<()> {
    1282           24027 :         // Create WAL record for updating the multixact-offsets page
    1283           24027 :         let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
    1284           24027 :         let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
    1285           24027 :         let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
    1286           24027 : 
    1287           24027 :         modification.put_slru_wal_record(
    1288           24027 :             SlruKind::MultiXactOffsets,
    1289           24027 :             segno,
    1290           24027 :             rpageno,
    1291           24027 :             NeonWalRecord::MultixactOffsetCreate {
    1292           24027 :                 mid: xlrec.mid,
    1293           24027 :                 moff: xlrec.moff,
    1294           24027 :             },
    1295           24027 :         )?;
    1296                 : 
    1297                 :         // Create WAL records for the update of each affected multixact-members page
    1298           24027 :         let mut members = xlrec.members.iter();
    1299           24027 :         let mut offset = xlrec.moff;
    1300                 :         loop {
    1301           48328 :             let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1302           48328 : 
    1303           48328 :             // How many members fit on this page?
    1304           48328 :             let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
    1305           48328 :                 - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
    1306           48328 : 
    1307           48328 :             let mut this_page_members: Vec<MultiXactMember> = Vec::new();
    1308           48328 :             for _ in 0..page_remain {
    1309          520988 :                 if let Some(m) = members.next() {
    1310          472948 :                     this_page_members.push(m.clone());
    1311          472948 :                 } else {
    1312           48040 :                     break;
    1313                 :                 }
    1314                 :             }
    1315           48328 :             if this_page_members.is_empty() {
    1316                 :                 // all done
    1317           24027 :                 break;
    1318           24301 :             }
    1319           24301 :             let n_this_page = this_page_members.len();
    1320           24301 : 
    1321           24301 :             modification.put_slru_wal_record(
    1322           24301 :                 SlruKind::MultiXactMembers,
    1323           24301 :                 pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
    1324           24301 :                 pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
    1325           24301 :                 NeonWalRecord::MultixactMembersCreate {
    1326           24301 :                     moff: offset,
    1327           24301 :                     members: this_page_members,
    1328           24301 :                 },
    1329           24301 :             )?;
    1330                 : 
    1331                 :             // Note: The multixact members can wrap around, even within one WAL record.
    1332           24301 :             offset = offset.wrapping_add(n_this_page as u32);
    1333                 :         }
    1334           24027 :         if xlrec.mid >= self.checkpoint.nextMulti {
    1335           24027 :             self.checkpoint.nextMulti = xlrec.mid + 1;
    1336           24027 :             self.checkpoint_modified = true;
    1337           24027 :         }
    1338           24027 :         if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
    1339           24027 :             self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
    1340           24027 :             self.checkpoint_modified = true;
    1341           24027 :         }
    1342          472948 :         let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
    1343          472948 :             if mbr.xid.wrapping_sub(acc) as i32 > 0 {
    1344          472903 :                 mbr.xid
    1345                 :             } else {
    1346              45 :                 acc
    1347                 :             }
    1348          472948 :         });
    1349           24027 : 
    1350           24027 :         if self.checkpoint.update_next_xid(max_mbr_xid) {
    1351 UBC           0 :             self.checkpoint_modified = true;
    1352 CBC       24027 :         }
    1353           24027 :         Ok(())
    1354           24027 :     }
    1355                 : 
    1356 UBC           0 :     async fn ingest_multixact_truncate_record(
    1357               0 :         &mut self,
    1358               0 :         modification: &mut DatadirModification<'_>,
    1359               0 :         xlrec: &XlMultiXactTruncate,
    1360               0 :         ctx: &RequestContext,
    1361               0 :     ) -> Result<()> {
    1362               0 :         self.checkpoint.oldestMulti = xlrec.end_trunc_off;
    1363               0 :         self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
    1364               0 :         self.checkpoint_modified = true;
    1365               0 : 
    1366               0 :         // PerformMembersTruncation
    1367               0 :         let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
    1368               0 :         let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
    1369               0 :         let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
    1370               0 :         let mut segment: i32 = startsegment;
    1371                 : 
    1372                 :         // Delete all the segments except the last one. The last segment can still
    1373                 :         // contain, possibly partially, valid data.
    1374               0 :         while segment != endsegment {
    1375               0 :             modification
    1376               0 :                 .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
    1377               0 :                 .await?;
    1378                 : 
    1379                 :             /* move to next segment, handling wraparound correctly */
    1380               0 :             if segment == maxsegment {
    1381               0 :                 segment = 0;
    1382               0 :             } else {
    1383               0 :                 segment += 1;
    1384               0 :             }
    1385                 :         }
    1386                 : 
    1387                 :         // Truncate offsets
    1388                 :         // FIXME: this did not handle wraparound correctly
    1389                 : 
    1390               0 :         Ok(())
    1391               0 :     }
    1392                 : 
    1393 CBC          54 :     async fn ingest_relmap_page(
    1394              54 :         &mut self,
    1395              54 :         modification: &mut DatadirModification<'_>,
    1396              54 :         xlrec: &XlRelmapUpdate,
    1397              54 :         decoded: &DecodedWALRecord,
    1398              54 :         ctx: &RequestContext,
    1399              54 :     ) -> Result<()> {
    1400              54 :         let mut buf = decoded.record.clone();
    1401              54 :         buf.advance(decoded.main_data_offset);
    1402              54 :         // skip xl_relmap_update
    1403              54 :         buf.advance(12);
    1404              54 : 
    1405              54 :         modification
    1406              54 :             .put_relmap_file(
    1407              54 :                 xlrec.tsid,
    1408              54 :                 xlrec.dbid,
    1409              54 :                 Bytes::copy_from_slice(&buf[..]),
    1410              54 :                 ctx,
    1411              54 :             )
    1412               2 :             .await
    1413              54 :     }
    1414                 : 
    1415           21429 :     async fn put_rel_creation(
    1416           21429 :         &mut self,
    1417           21429 :         modification: &mut DatadirModification<'_>,
    1418           21429 :         rel: RelTag,
    1419           21429 :         ctx: &RequestContext,
    1420           21429 :     ) -> Result<()> {
    1421           21429 :         modification.put_rel_creation(rel, 0, ctx).await?;
    1422           21428 :         Ok(())
    1423           21429 :     }
    1424                 : 
    1425          266196 :     async fn put_rel_page_image(
    1426          266196 :         &mut self,
    1427          266196 :         modification: &mut DatadirModification<'_>,
    1428          266196 :         rel: RelTag,
    1429          266196 :         blknum: BlockNumber,
    1430          266196 :         img: Bytes,
    1431          266196 :         ctx: &RequestContext,
    1432          266196 :     ) -> Result<(), PageReconstructError> {
    1433          266196 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1434            3439 :             .await?;
    1435          266196 :         modification.put_rel_page_image(rel, blknum, img)?;
    1436          266196 :         Ok(())
    1437          266196 :     }
    1438                 : 
    1439        46962173 :     async fn put_rel_wal_record(
    1440        46962173 :         &mut self,
    1441        46962173 :         modification: &mut DatadirModification<'_>,
    1442        46962173 :         rel: RelTag,
    1443        46962173 :         blknum: BlockNumber,
    1444        46962173 :         rec: NeonWalRecord,
    1445        46962173 :         ctx: &RequestContext,
    1446        46962173 :     ) -> Result<()> {
    1447        46962173 :         self.handle_rel_extend(modification, rel, blknum, ctx)
    1448            5436 :             .await?;
    1449        46962169 :         modification.put_rel_wal_record(rel, blknum, rec)?;
    1450        46962169 :         Ok(())
    1451        46962173 :     }
    1452                 : 
    1453            3092 :     async fn put_rel_truncation(
    1454            3092 :         &mut self,
    1455            3092 :         modification: &mut DatadirModification<'_>,
    1456            3092 :         rel: RelTag,
    1457            3092 :         nblocks: BlockNumber,
    1458            3092 :         ctx: &RequestContext,
    1459            3092 :     ) -> anyhow::Result<()> {
    1460            3092 :         modification.put_rel_truncation(rel, nblocks, ctx).await?;
    1461            3092 :         Ok(())
    1462            3092 :     }
    1463                 : 
    1464           17594 :     async fn put_rel_drop(
    1465           17594 :         &mut self,
    1466           17594 :         modification: &mut DatadirModification<'_>,
    1467           17594 :         rel: RelTag,
    1468           17594 :         ctx: &RequestContext,
    1469           17594 :     ) -> Result<()> {
    1470           17594 :         modification.put_rel_drop(rel, ctx).await?;
    1471           17594 :         Ok(())
    1472           17594 :     }
    1473                 : 
    1474        47228369 :     async fn handle_rel_extend(
    1475        47228369 :         &mut self,
    1476        47228369 :         modification: &mut DatadirModification<'_>,
    1477        47228369 :         rel: RelTag,
    1478        47228369 :         blknum: BlockNumber,
    1479        47228369 :         ctx: &RequestContext,
    1480        47228369 :     ) -> Result<(), PageReconstructError> {
    1481        47228369 :         let new_nblocks = blknum + 1;
    1482                 :         // Check if the relation exists. We implicitly create relations on first
    1483                 :         // record.
    1484                 :         // TODO: would be nice if to be more explicit about it
    1485                 : 
    1486                 :         // Get current size and put rel creation if rel doesn't exist
    1487                 :         //
    1488                 :         // NOTE: we check the cache first even though get_rel_exists and get_rel_size would
    1489                 :         //       check the cache too. This is because eagerly checking the cache results in
    1490                 :         //       less work overall and 10% better performance. It's more work on cache miss
    1491                 :         //       but cache miss is rare.
    1492        47228369 :         let old_nblocks = if let Some(nblocks) = modification
    1493        47228369 :             .tline
    1494        47228369 :             .get_cached_rel_size(&rel, modification.get_lsn())
    1495                 :         {
    1496        47225596 :             nblocks
    1497            2773 :         } else if !modification
    1498            2773 :             .tline
    1499            2773 :             .get_rel_exists(rel, Version::Modified(modification), true, ctx)
    1500              71 :             .await?
    1501                 :         {
    1502                 :             // create it with 0 size initially, the logic below will extend it
    1503            1959 :             modification
    1504            1959 :                 .put_rel_creation(rel, 0, ctx)
    1505             200 :                 .await
    1506            1959 :                 .context("Relation Error")?;
    1507            1959 :             0
    1508                 :         } else {
    1509             814 :             modification
    1510             814 :                 .tline
    1511             814 :                 .get_rel_size(rel, Version::Modified(modification), true, ctx)
    1512             250 :                 .await?
    1513                 :         };
    1514                 : 
    1515        47228369 :         if new_nblocks > old_nblocks {
    1516                 :             //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
    1517          963140 :             modification.put_rel_extend(rel, new_nblocks, ctx).await?;
    1518                 : 
    1519          963136 :             let mut key = rel_block_to_key(rel, blknum);
    1520                 :             // fill the gap with zeros
    1521          963136 :             for gap_blknum in old_nblocks..blknum {
    1522          228405 :                 key.field6 = gap_blknum;
    1523          228405 : 
    1524          228405 :                 if self.shard.get_shard_number(&key) != self.shard.number {
    1525 UBC           0 :                     continue;
    1526 CBC      228405 :                 }
    1527          228405 : 
    1528          228405 :                 modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
    1529                 :             }
    1530        46265229 :         }
    1531        47228365 :         Ok(())
    1532        47228369 :     }
    1533                 : 
    1534             664 :     async fn put_slru_page_image(
    1535             664 :         &mut self,
    1536             664 :         modification: &mut DatadirModification<'_>,
    1537             664 :         kind: SlruKind,
    1538             664 :         segno: u32,
    1539             664 :         blknum: BlockNumber,
    1540             664 :         img: Bytes,
    1541             664 :         ctx: &RequestContext,
    1542             664 :     ) -> Result<()> {
    1543             664 :         self.handle_slru_extend(modification, kind, segno, blknum, ctx)
    1544               7 :             .await?;
    1545             664 :         modification.put_slru_page_image(kind, segno, blknum, img)?;
    1546             664 :         Ok(())
    1547             664 :     }
    1548                 : 
    1549             664 :     async fn handle_slru_extend(
    1550             664 :         &mut self,
    1551             664 :         modification: &mut DatadirModification<'_>,
    1552             664 :         kind: SlruKind,
    1553             664 :         segno: u32,
    1554             664 :         blknum: BlockNumber,
    1555             664 :         ctx: &RequestContext,
    1556             664 :     ) -> anyhow::Result<()> {
    1557             664 :         // we don't use a cache for this like we do for relations. SLRUS are explcitly
    1558             664 :         // extended with ZEROPAGE records, not with commit records, so it happens
    1559             664 :         // a lot less frequently.
    1560             664 : 
    1561             664 :         let new_nblocks = blknum + 1;
    1562                 :         // Check if the relation exists. We implicitly create relations on first
    1563                 :         // record.
    1564                 :         // TODO: would be nice if to be more explicit about it
    1565             664 :         let old_nblocks = if !modification
    1566             664 :             .tline
    1567             664 :             .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
    1568               7 :             .await?
    1569                 :         {
    1570                 :             // create it with 0 size initially, the logic below will extend it
    1571              18 :             modification
    1572              18 :                 .put_slru_segment_creation(kind, segno, 0, ctx)
    1573 LBC         (1) :                 .await?;
    1574 CBC          18 :             0
    1575                 :         } else {
    1576             646 :             modification
    1577             646 :                 .tline
    1578             646 :                 .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
    1579 UBC           0 :                 .await?
    1580                 :         };
    1581                 : 
    1582 CBC         664 :         if new_nblocks > old_nblocks {
    1583 UBC           0 :             trace!(
    1584               0 :                 "extending SLRU {:?} seg {} from {} to {} blocks",
    1585               0 :                 kind,
    1586               0 :                 segno,
    1587               0 :                 old_nblocks,
    1588               0 :                 new_nblocks
    1589               0 :             );
    1590 CBC         658 :             modification.put_slru_extend(kind, segno, new_nblocks)?;
    1591                 : 
    1592                 :             // fill the gap with zeros
    1593             658 :             for gap_blknum in old_nblocks..blknum {
    1594 UBC           0 :                 modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
    1595                 :             }
    1596 CBC           6 :         }
    1597             664 :         Ok(())
    1598             664 :     }
    1599                 : }
    1600                 : 
    1601           90362 : async fn get_relsize(
    1602           90362 :     modification: &DatadirModification<'_>,
    1603           90362 :     rel: RelTag,
    1604           90362 :     ctx: &RequestContext,
    1605           90362 : ) -> anyhow::Result<BlockNumber> {
    1606           90362 :     let nblocks = if !modification
    1607           90362 :         .tline
    1608           90362 :         .get_rel_exists(rel, Version::Modified(modification), true, ctx)
    1609 UBC           0 :         .await?
    1610                 :     {
    1611 CBC           4 :         0
    1612                 :     } else {
    1613           90358 :         modification
    1614           90358 :             .tline
    1615           90358 :             .get_rel_size(rel, Version::Modified(modification), true, ctx)
    1616               7 :             .await?
    1617                 :     };
    1618           90362 :     Ok(nblocks)
    1619           90362 : }
    1620                 : 
    1621                 : #[allow(clippy::bool_assert_comparison)]
    1622                 : #[cfg(test)]
    1623                 : mod tests {
    1624                 :     use super::*;
    1625                 :     use crate::tenant::harness::*;
    1626                 :     use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
    1627                 :     use crate::tenant::Timeline;
    1628                 :     use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
    1629                 :     use postgres_ffi::RELSEG_SIZE;
    1630                 : 
    1631                 :     use crate::DEFAULT_PG_VERSION;
    1632                 : 
    1633                 :     /// Arbitrary relation tag, for testing.
    1634                 :     const TESTREL_A: RelTag = RelTag {
    1635                 :         spcnode: 0,
    1636                 :         dbnode: 111,
    1637                 :         relnode: 1000,
    1638                 :         forknum: 0,
    1639                 :     };
    1640                 : 
    1641               6 :     fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
    1642               6 :         // TODO
    1643               6 :     }
    1644                 : 
    1645                 :     static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
    1646                 : 
    1647               4 :     async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
    1648               4 :         let mut m = tline.begin_modification(Lsn(0x10));
    1649               4 :         m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
    1650               4 :         m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
    1651               4 :         m.commit(ctx).await?;
    1652               4 :         let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
    1653                 : 
    1654               4 :         Ok(walingest)
    1655               4 :     }
    1656                 : 
    1657               1 :     #[tokio::test]
    1658               1 :     async fn test_relsize() -> Result<()> {
    1659               1 :         let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
    1660               1 :         let tline = tenant
    1661               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1662               1 :             .await?;
    1663               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1664                 : 
    1665               1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1666               1 :         walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
    1667               1 :         walingest
    1668               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1669 UBC           0 :             .await?;
    1670 CBC           1 :         m.commit(&ctx).await?;
    1671               1 :         let mut m = tline.begin_modification(Lsn(0x30));
    1672               1 :         walingest
    1673               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
    1674 UBC           0 :             .await?;
    1675 CBC           1 :         m.commit(&ctx).await?;
    1676               1 :         let mut m = tline.begin_modification(Lsn(0x40));
    1677               1 :         walingest
    1678               1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
    1679 UBC           0 :             .await?;
    1680 CBC           1 :         m.commit(&ctx).await?;
    1681               1 :         let mut m = tline.begin_modification(Lsn(0x50));
    1682               1 :         walingest
    1683               1 :             .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
    1684 UBC           0 :             .await?;
    1685 CBC           1 :         m.commit(&ctx).await?;
    1686                 : 
    1687               1 :         assert_current_logical_size(&tline, Lsn(0x50));
    1688                 : 
    1689                 :         // The relation was created at LSN 2, not visible at LSN 1 yet.
    1690               1 :         assert_eq!(
    1691               1 :             tline
    1692               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    1693 UBC           0 :                 .await?,
    1694                 :             false
    1695                 :         );
    1696 CBC           1 :         assert!(tline
    1697               1 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    1698 UBC           0 :             .await
    1699 CBC           1 :             .is_err());
    1700               1 :         assert_eq!(
    1701               1 :             tline
    1702               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1703 UBC           0 :                 .await?,
    1704                 :             true
    1705                 :         );
    1706 CBC           1 :         assert_eq!(
    1707               1 :             tline
    1708               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1709 UBC           0 :                 .await?,
    1710                 :             1
    1711                 :         );
    1712 CBC           1 :         assert_eq!(
    1713               1 :             tline
    1714               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
    1715 UBC           0 :                 .await?,
    1716                 :             3
    1717                 :         );
    1718                 : 
    1719                 :         // Check page contents at each LSN
    1720 CBC           1 :         assert_eq!(
    1721               1 :             tline
    1722               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
    1723 UBC           0 :                 .await?,
    1724 CBC           1 :             TEST_IMG("foo blk 0 at 2")
    1725                 :         );
    1726                 : 
    1727               1 :         assert_eq!(
    1728               1 :             tline
    1729               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
    1730 UBC           0 :                 .await?,
    1731 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1732                 :         );
    1733                 : 
    1734               1 :         assert_eq!(
    1735               1 :             tline
    1736               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
    1737 UBC           0 :                 .await?,
    1738 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1739                 :         );
    1740               1 :         assert_eq!(
    1741               1 :             tline
    1742               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
    1743 UBC           0 :                 .await?,
    1744 CBC           1 :             TEST_IMG("foo blk 1 at 4")
    1745                 :         );
    1746                 : 
    1747               1 :         assert_eq!(
    1748               1 :             tline
    1749               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
    1750 UBC           0 :                 .await?,
    1751 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1752                 :         );
    1753               1 :         assert_eq!(
    1754               1 :             tline
    1755               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
    1756 UBC           0 :                 .await?,
    1757 CBC           1 :             TEST_IMG("foo blk 1 at 4")
    1758                 :         );
    1759               1 :         assert_eq!(
    1760               1 :             tline
    1761               1 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
    1762 UBC           0 :                 .await?,
    1763 CBC           1 :             TEST_IMG("foo blk 2 at 5")
    1764                 :         );
    1765                 : 
    1766                 :         // Truncate last block
    1767               1 :         let mut m = tline.begin_modification(Lsn(0x60));
    1768               1 :         walingest
    1769               1 :             .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
    1770 UBC           0 :             .await?;
    1771 CBC           1 :         m.commit(&ctx).await?;
    1772               1 :         assert_current_logical_size(&tline, Lsn(0x60));
    1773                 : 
    1774                 :         // Check reported size and contents after truncation
    1775               1 :         assert_eq!(
    1776               1 :             tline
    1777               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
    1778 UBC           0 :                 .await?,
    1779                 :             2
    1780                 :         );
    1781 CBC           1 :         assert_eq!(
    1782               1 :             tline
    1783               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
    1784 UBC           0 :                 .await?,
    1785 CBC           1 :             TEST_IMG("foo blk 0 at 3")
    1786                 :         );
    1787               1 :         assert_eq!(
    1788               1 :             tline
    1789               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
    1790 UBC           0 :                 .await?,
    1791 CBC           1 :             TEST_IMG("foo blk 1 at 4")
    1792                 :         );
    1793                 : 
    1794                 :         // should still see the truncated block with older LSN
    1795               1 :         assert_eq!(
    1796               1 :             tline
    1797               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
    1798 UBC           0 :                 .await?,
    1799                 :             3
    1800                 :         );
    1801 CBC           1 :         assert_eq!(
    1802               1 :             tline
    1803               1 :                 .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
    1804 UBC           0 :                 .await?,
    1805 CBC           1 :             TEST_IMG("foo blk 2 at 5")
    1806                 :         );
    1807                 : 
    1808                 :         // Truncate to zero length
    1809               1 :         let mut m = tline.begin_modification(Lsn(0x68));
    1810               1 :         walingest
    1811               1 :             .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
    1812 UBC           0 :             .await?;
    1813 CBC           1 :         m.commit(&ctx).await?;
    1814               1 :         assert_eq!(
    1815               1 :             tline
    1816               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
    1817 UBC           0 :                 .await?,
    1818                 :             0
    1819                 :         );
    1820                 : 
    1821                 :         // Extend from 0 to 2 blocks, leaving a gap
    1822 CBC           1 :         let mut m = tline.begin_modification(Lsn(0x70));
    1823               1 :         walingest
    1824               1 :             .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
    1825 UBC           0 :             .await?;
    1826 CBC           1 :         m.commit(&ctx).await?;
    1827               1 :         assert_eq!(
    1828               1 :             tline
    1829               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
    1830 UBC           0 :                 .await?,
    1831                 :             2
    1832                 :         );
    1833 CBC           1 :         assert_eq!(
    1834               1 :             tline
    1835               1 :                 .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
    1836 UBC           0 :                 .await?,
    1837 CBC           1 :             ZERO_PAGE
    1838                 :         );
    1839               1 :         assert_eq!(
    1840               1 :             tline
    1841               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
    1842 UBC           0 :                 .await?,
    1843 CBC           1 :             TEST_IMG("foo blk 1")
    1844                 :         );
    1845                 : 
    1846                 :         // Extend a lot more, leaving a big gap that spans across segments
    1847               1 :         let mut m = tline.begin_modification(Lsn(0x80));
    1848               1 :         walingest
    1849               1 :             .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
    1850               1 :             .await?;
    1851              11 :         m.commit(&ctx).await?;
    1852               1 :         assert_eq!(
    1853               1 :             tline
    1854               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
    1855 UBC           0 :                 .await?,
    1856                 :             1501
    1857                 :         );
    1858 CBC        1499 :         for blk in 2..1500 {
    1859            1498 :             assert_eq!(
    1860            1498 :                 tline
    1861            1498 :                     .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
    1862              59 :                     .await?,
    1863            1498 :                 ZERO_PAGE
    1864                 :             );
    1865                 :         }
    1866               1 :         assert_eq!(
    1867               1 :             tline
    1868               1 :                 .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
    1869 UBC           0 :                 .await?,
    1870 CBC           1 :             TEST_IMG("foo blk 1500")
    1871                 :         );
    1872                 : 
    1873               1 :         Ok(())
    1874                 :     }
    1875                 : 
    1876                 :     // Test what happens if we dropped a relation
    1877                 :     // and then created it again within the same layer.
    1878               1 :     #[tokio::test]
    1879               1 :     async fn test_drop_extend() -> Result<()> {
    1880               1 :         let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
    1881               1 :         let tline = tenant
    1882               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1883               2 :             .await?;
    1884               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1885                 : 
    1886               1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1887               1 :         walingest
    1888               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
    1889 UBC           0 :             .await?;
    1890 CBC           1 :         m.commit(&ctx).await?;
    1891                 : 
    1892                 :         // Check that rel exists and size is correct
    1893               1 :         assert_eq!(
    1894               1 :             tline
    1895               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1896 UBC           0 :                 .await?,
    1897                 :             true
    1898                 :         );
    1899 CBC           1 :         assert_eq!(
    1900               1 :             tline
    1901               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1902 UBC           0 :                 .await?,
    1903                 :             1
    1904                 :         );
    1905                 : 
    1906                 :         // Drop rel
    1907 CBC           1 :         let mut m = tline.begin_modification(Lsn(0x30));
    1908               1 :         walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
    1909               1 :         m.commit(&ctx).await?;
    1910                 : 
    1911                 :         // Check that rel is not visible anymore
    1912               1 :         assert_eq!(
    1913               1 :             tline
    1914               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
    1915 UBC           0 :                 .await?,
    1916                 :             false
    1917                 :         );
    1918                 : 
    1919                 :         // FIXME: should fail
    1920                 :         //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
    1921                 : 
    1922                 :         // Re-create it
    1923 CBC           1 :         let mut m = tline.begin_modification(Lsn(0x40));
    1924               1 :         walingest
    1925               1 :             .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
    1926 UBC           0 :             .await?;
    1927 CBC           1 :         m.commit(&ctx).await?;
    1928                 : 
    1929                 :         // Check that rel exists and size is correct
    1930               1 :         assert_eq!(
    1931               1 :             tline
    1932               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
    1933 UBC           0 :                 .await?,
    1934                 :             true
    1935                 :         );
    1936 CBC           1 :         assert_eq!(
    1937               1 :             tline
    1938               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
    1939 UBC           0 :                 .await?,
    1940                 :             1
    1941                 :         );
    1942                 : 
    1943 CBC           1 :         Ok(())
    1944                 :     }
    1945                 : 
    1946                 :     // Test what happens if we truncated a relation
    1947                 :     // so that one of its segments was dropped
    1948                 :     // and then extended it again within the same layer.
    1949               1 :     #[tokio::test]
    1950               1 :     async fn test_truncate_extend() -> Result<()> {
    1951               1 :         let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
    1952               1 :         let tline = tenant
    1953               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1954               1 :             .await?;
    1955               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    1956                 : 
    1957                 :         // Create a 20 MB relation (the size is arbitrary)
    1958               1 :         let relsize = 20 * 1024 * 1024 / 8192;
    1959               1 :         let mut m = tline.begin_modification(Lsn(0x20));
    1960            2560 :         for blkno in 0..relsize {
    1961            2560 :             let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
    1962            2560 :             walingest
    1963            2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    1964 UBC           0 :                 .await?;
    1965                 :         }
    1966 CBC           1 :         m.commit(&ctx).await?;
    1967                 : 
    1968                 :         // The relation was created at LSN 20, not visible at LSN 1 yet.
    1969               1 :         assert_eq!(
    1970               1 :             tline
    1971               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    1972 UBC           0 :                 .await?,
    1973                 :             false
    1974                 :         );
    1975 CBC           1 :         assert!(tline
    1976               1 :             .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
    1977 UBC           0 :             .await
    1978 CBC           1 :             .is_err());
    1979                 : 
    1980               1 :         assert_eq!(
    1981               1 :             tline
    1982               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1983 UBC           0 :                 .await?,
    1984                 :             true
    1985                 :         );
    1986 CBC           1 :         assert_eq!(
    1987               1 :             tline
    1988               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
    1989 UBC           0 :                 .await?,
    1990                 :             relsize
    1991                 :         );
    1992                 : 
    1993                 :         // Check relation content
    1994 CBC        2560 :         for blkno in 0..relsize {
    1995            2560 :             let lsn = Lsn(0x20);
    1996            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    1997            2560 :             assert_eq!(
    1998            2560 :                 tline
    1999            2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
    2000             100 :                     .await?,
    2001            2560 :                 TEST_IMG(&data)
    2002                 :             );
    2003                 :         }
    2004                 : 
    2005                 :         // Truncate relation so that second segment was dropped
    2006                 :         // - only leave one page
    2007               1 :         let mut m = tline.begin_modification(Lsn(0x60));
    2008               1 :         walingest
    2009               1 :             .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
    2010 UBC           0 :             .await?;
    2011 CBC           1 :         m.commit(&ctx).await?;
    2012                 : 
    2013                 :         // Check reported size and contents after truncation
    2014               1 :         assert_eq!(
    2015               1 :             tline
    2016               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
    2017 UBC           0 :                 .await?,
    2018                 :             1
    2019                 :         );
    2020                 : 
    2021 CBC           2 :         for blkno in 0..1 {
    2022               1 :             let lsn = Lsn(0x20);
    2023               1 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2024               1 :             assert_eq!(
    2025               1 :                 tline
    2026               1 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
    2027 UBC           0 :                     .await?,
    2028 CBC           1 :                 TEST_IMG(&data)
    2029                 :             );
    2030                 :         }
    2031                 : 
    2032                 :         // should still see all blocks with older LSN
    2033               1 :         assert_eq!(
    2034               1 :             tline
    2035               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
    2036 UBC           0 :                 .await?,
    2037                 :             relsize
    2038                 :         );
    2039 CBC        2560 :         for blkno in 0..relsize {
    2040            2560 :             let lsn = Lsn(0x20);
    2041            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2042            2560 :             assert_eq!(
    2043            2560 :                 tline
    2044            2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
    2045             201 :                     .await?,
    2046            2560 :                 TEST_IMG(&data)
    2047                 :             );
    2048                 :         }
    2049                 : 
    2050                 :         // Extend relation again.
    2051                 :         // Add enough blocks to create second segment
    2052               1 :         let lsn = Lsn(0x80);
    2053               1 :         let mut m = tline.begin_modification(lsn);
    2054            2560 :         for blkno in 0..relsize {
    2055            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2056            2560 :             walingest
    2057            2560 :                 .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
    2058 UBC           0 :                 .await?;
    2059                 :         }
    2060 CBC           1 :         m.commit(&ctx).await?;
    2061                 : 
    2062               1 :         assert_eq!(
    2063               1 :             tline
    2064               1 :                 .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
    2065 UBC           0 :                 .await?,
    2066                 :             true
    2067                 :         );
    2068 CBC           1 :         assert_eq!(
    2069               1 :             tline
    2070               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
    2071 UBC           0 :                 .await?,
    2072                 :             relsize
    2073                 :         );
    2074                 :         // Check relation content
    2075 CBC        2560 :         for blkno in 0..relsize {
    2076            2560 :             let lsn = Lsn(0x80);
    2077            2560 :             let data = format!("foo blk {} at {}", blkno, lsn);
    2078            2560 :             assert_eq!(
    2079            2560 :                 tline
    2080            2560 :                     .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
    2081             100 :                     .await?,
    2082            2560 :                 TEST_IMG(&data)
    2083                 :             );
    2084                 :         }
    2085                 : 
    2086               1 :         Ok(())
    2087                 :     }
    2088                 : 
    2089                 :     /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
    2090                 :     /// split into multiple 1 GB segments in Postgres.
    2091               1 :     #[tokio::test]
    2092               1 :     async fn test_large_rel() -> Result<()> {
    2093               1 :         let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
    2094               1 :         let tline = tenant
    2095               1 :             .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    2096               2 :             .await?;
    2097               1 :         let mut walingest = init_walingest_test(&tline, &ctx).await?;
    2098                 : 
    2099               1 :         let mut lsn = 0x10;
    2100          131073 :         for blknum in 0..RELSEG_SIZE + 1 {
    2101          131073 :             lsn += 0x10;
    2102          131073 :             let mut m = tline.begin_modification(Lsn(lsn));
    2103          131073 :             let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
    2104          131073 :             walingest
    2105          131073 :                 .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
    2106            3098 :                 .await?;
    2107          131073 :             m.commit(&ctx).await?;
    2108                 :         }
    2109                 : 
    2110               1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2111                 : 
    2112               1 :         assert_eq!(
    2113               1 :             tline
    2114               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2115 UBC           0 :                 .await?,
    2116 CBC           1 :             RELSEG_SIZE + 1
    2117                 :         );
    2118                 : 
    2119                 :         // Truncate one block
    2120               1 :         lsn += 0x10;
    2121               1 :         let mut m = tline.begin_modification(Lsn(lsn));
    2122               1 :         walingest
    2123               1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
    2124 UBC           0 :             .await?;
    2125 CBC           1 :         m.commit(&ctx).await?;
    2126               1 :         assert_eq!(
    2127               1 :             tline
    2128               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2129 UBC           0 :                 .await?,
    2130                 :             RELSEG_SIZE
    2131                 :         );
    2132 CBC           1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2133               1 : 
    2134               1 :         // Truncate another block
    2135               1 :         lsn += 0x10;
    2136               1 :         let mut m = tline.begin_modification(Lsn(lsn));
    2137               1 :         walingest
    2138               1 :             .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
    2139 UBC           0 :             .await?;
    2140 CBC           1 :         m.commit(&ctx).await?;
    2141               1 :         assert_eq!(
    2142               1 :             tline
    2143               1 :                 .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2144 UBC           0 :                 .await?,
    2145 CBC           1 :             RELSEG_SIZE - 1
    2146                 :         );
    2147               1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2148               1 : 
    2149               1 :         // Truncate to 1500, and then truncate all the way down to 0, one block at a time
    2150               1 :         // This tests the behavior at segment boundaries
    2151               1 :         let mut size: i32 = 3000;
    2152            3002 :         while size >= 0 {
    2153            3001 :             lsn += 0x10;
    2154            3001 :             let mut m = tline.begin_modification(Lsn(lsn));
    2155            3001 :             walingest
    2156            3001 :                 .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
    2157              72 :                 .await?;
    2158            3001 :             m.commit(&ctx).await?;
    2159            3001 :             assert_eq!(
    2160            3001 :                 tline
    2161            3001 :                     .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
    2162 UBC           0 :                     .await?,
    2163 CBC        3001 :                 size as BlockNumber
    2164                 :             );
    2165                 : 
    2166            3001 :             size -= 1;
    2167                 :         }
    2168               1 :         assert_current_logical_size(&tline, Lsn(lsn));
    2169               1 : 
    2170               1 :         Ok(())
    2171                 :     }
    2172                 : 
    2173                 :     /// Replay a wal segment file taken directly from safekeepers.
    2174                 :     ///
    2175                 :     /// This test is useful for benchmarking since it allows us to profile only
    2176                 :     /// the walingest code in a single-threaded executor, and iterate more quickly
    2177                 :     /// without waiting for unrelated steps.
    2178               1 :     #[tokio::test]
    2179               1 :     async fn test_ingest_real_wal() {
    2180               1 :         use crate::tenant::harness::*;
    2181               1 :         use postgres_ffi::waldecoder::WalStreamDecoder;
    2182               1 :         use postgres_ffi::WAL_SEGMENT_SIZE;
    2183               1 : 
    2184               1 :         // Define test data path and constants.
    2185               1 :         //
    2186               1 :         // Steps to reconstruct the data, if needed:
    2187               1 :         // 1. Run the pgbench python test
    2188               1 :         // 2. Take the first wal segment file from safekeeper
    2189               1 :         // 3. Compress it using `zstd --long input_file`
    2190               1 :         // 4. Copy initdb.tar.zst from local_fs_remote_storage
    2191               1 :         // 5. Grep sk logs for "restart decoder" to get startpoint
    2192               1 :         // 6. Run just the decoder from this test to get the endpoint.
    2193               1 :         //    It's the last LSN the decoder will output.
    2194               1 :         let pg_version = 15; // The test data was generated by pg15
    2195               1 :         let path = "test_data/sk_wal_segment_from_pgbench";
    2196               1 :         let wal_segment_path = format!("{path}/000000010000000000000001.zst");
    2197               1 :         let source_initdb_path = format!("{path}/{INITDB_PATH}");
    2198               1 :         let startpoint = Lsn::from_hex("14AEC08").unwrap();
    2199               1 :         let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
    2200               1 : 
    2201               1 :         let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
    2202               1 :         let (tenant, ctx) = harness.load().await;
    2203                 : 
    2204               1 :         let remote_initdb_path = remote_initdb_archive_path(&tenant.tenant_id(), &TIMELINE_ID);
    2205               1 :         let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
    2206               1 : 
    2207               1 :         std::fs::create_dir_all(initdb_path.parent().unwrap())
    2208               1 :             .expect("creating test dir should work");
    2209               1 :         std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
    2210                 : 
    2211                 :         // Bootstrap a real timeline. We can't use create_test_timeline because
    2212                 :         // it doesn't create a real checkpoint, and Walingest::new tries to parse
    2213                 :         // the garbage data.
    2214               1 :         let tline = tenant
    2215               1 :             .bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
    2216           10237 :             .await
    2217               1 :             .unwrap();
    2218                 : 
    2219                 :         // We fully read and decompress this into memory before decoding
    2220                 :         // to get a more accurate perf profile of the decoder.
    2221               1 :         let bytes = {
    2222                 :             use async_compression::tokio::bufread::ZstdDecoder;
    2223               1 :             let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
    2224               1 :             let reader = tokio::io::BufReader::new(file);
    2225               1 :             let decoder = ZstdDecoder::new(reader);
    2226               1 :             let mut reader = tokio::io::BufReader::new(decoder);
    2227               1 :             let mut buffer = Vec::new();
    2228             112 :             tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
    2229               1 :             buffer
    2230               1 :         };
    2231               1 : 
    2232               1 :         // TODO start a profiler too
    2233               1 :         let started_at = std::time::Instant::now();
    2234               1 : 
    2235               1 :         // Initialize walingest
    2236               1 :         let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
    2237               1 :         let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
    2238               1 :         let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
    2239 UBC           0 :             .await
    2240 CBC           1 :             .unwrap();
    2241               1 :         let mut modification = tline.begin_modification(startpoint);
    2242               1 :         let mut decoded = DecodedWALRecord::default();
    2243               1 :         println!("decoding {} bytes", bytes.len() - xlogoff);
    2244                 : 
    2245                 :         // Decode and ingest wal. We process the wal in chunks because
    2246                 :         // that's what happens when we get bytes from safekeepers.
    2247          237343 :         for chunk in bytes[xlogoff..].chunks(50) {
    2248          237343 :             decoder.feed_bytes(chunk);
    2249          310268 :             while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
    2250           72925 :                 walingest
    2251           72925 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
    2252              56 :                     .await
    2253           72925 :                     .unwrap();
    2254                 :             }
    2255          237343 :             modification.commit(&ctx).await.unwrap();
    2256                 :         }
    2257                 : 
    2258               1 :         let duration = started_at.elapsed();
    2259               1 :         println!("done in {:?}", duration);
    2260                 :     }
    2261                 : }
        

Generated by: LCOV version 2.1-beta