LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - decoder.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 31.7 % 717 227
Test Date: 2025-07-16 12:29:03 Functions: 58.8 % 17 10

            Line data    Source code
       1              : //! This module contains logic for decoding and interpreting
       2              : //! raw bytes which represent a raw Postgres WAL record.
       3              : 
       4              : use std::collections::HashMap;
       5              : 
       6              : use bytes::{Buf, Bytes};
       7              : use pageserver_api::key::rel_block_to_key;
       8              : use pageserver_api::reltag::{RelTag, SlruKind};
       9              : use pageserver_api::shard::ShardIdentity;
      10              : use postgres_ffi::walrecord::*;
      11              : use postgres_ffi::{PgMajorVersion, pg_constants};
      12              : use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
      13              : use utils::lsn::Lsn;
      14              : 
      15              : use crate::models::*;
      16              : use crate::serialized_batch::SerializedValueBatch;
      17              : 
      18              : impl InterpretedWalRecord {
      19              :     /// Decode and interpreted raw bytes which represent one Postgres WAL record.
      20              :     /// Data blocks which do not match any of the provided shard identities are filtered out.
      21              :     /// Shard 0 is a special case since it tracks all relation sizes. We only give it
      22              :     /// the keys that are being written as that is enough for updating relation sizes.
      23        73532 :     pub fn from_bytes_filtered(
      24        73532 :         buf: Bytes,
      25        73532 :         shards: &[ShardIdentity],
      26        73532 :         next_record_lsn: Lsn,
      27        73532 :         pg_version: PgMajorVersion,
      28        73532 :     ) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
      29        73532 :         let mut decoded = DecodedWALRecord::default();
      30        73532 :         decode_wal_record(buf, &mut decoded, pg_version)?;
      31        73532 :         let xid = decoded.xl_xid;
      32              : 
      33        73532 :         let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
      34            0 :             FlushUncommittedRecords::Yes
      35              :         } else {
      36        73532 :             FlushUncommittedRecords::No
      37              :         };
      38              : 
      39        73532 :         let mut shard_records: HashMap<ShardIdentity, InterpretedWalRecord> =
      40        73532 :             HashMap::with_capacity(shards.len());
      41       147264 :         for shard in shards {
      42        73732 :             shard_records.insert(
      43        73732 :                 *shard,
      44        73732 :                 InterpretedWalRecord {
      45        73732 :                     metadata_record: None,
      46        73732 :                     batch: SerializedValueBatch::default(),
      47        73732 :                     next_record_lsn,
      48        73732 :                     flush_uncommitted,
      49        73732 :                     xid,
      50        73732 :                 },
      51        73732 :             );
      52        73732 :         }
      53              : 
      54        73532 :         MetadataRecord::from_decoded_filtered(
      55        73532 :             &decoded,
      56        73532 :             &mut shard_records,
      57        73532 :             next_record_lsn,
      58        73532 :             pg_version,
      59            0 :         )?;
      60        73532 :         SerializedValueBatch::from_decoded_filtered(
      61        73532 :             decoded,
      62        73532 :             &mut shard_records,
      63        73532 :             next_record_lsn,
      64        73532 :             pg_version,
      65            0 :         )?;
      66              : 
      67        73532 :         Ok(shard_records)
      68        73532 :     }
      69              : }
      70              : 
      71              : impl MetadataRecord {
      72              :     /// Populates the given `shard_records` with metadata records from this WAL record, if any,
      73              :     /// discarding those belonging to other shards.
      74              :     ///
      75              :     /// Only metadata records relevant for the given shards is emitted. Currently, most metadata
      76              :     /// records are broadcast to all shards for simplicity, but this should be improved.
      77        73532 :     fn from_decoded_filtered(
      78        73532 :         decoded: &DecodedWALRecord,
      79        73532 :         shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
      80        73532 :         next_record_lsn: Lsn,
      81        73532 :         pg_version: PgMajorVersion,
      82        73532 :     ) -> anyhow::Result<()> {
      83              :         // Note: this doesn't actually copy the bytes since
      84              :         // the [`Bytes`] type implements it via a level of indirection.
      85        73532 :         let mut buf = decoded.record.clone();
      86        73532 :         buf.advance(decoded.main_data_offset);
      87              : 
      88              :         // First, generate metadata records from the decoded WAL record.
      89        73532 :         let metadata_record = match decoded.xl_rmid {
      90              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
      91        72737 :                 Self::decode_heapam_record(&mut buf, decoded, pg_version)?
      92              :             }
      93            0 :             pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?,
      94              :             // Handle other special record types
      95            8 :             pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?,
      96            0 :             pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?,
      97              :             pg_constants::RM_TBLSPC_ID => {
      98            0 :                 tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
      99            0 :                 None
     100              :             }
     101            0 :             pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?,
     102              :             pg_constants::RM_XACT_ID => {
     103           12 :                 Self::decode_xact_record(&mut buf, decoded, next_record_lsn)?
     104              :             }
     105              :             pg_constants::RM_MULTIXACT_ID => {
     106            0 :                 Self::decode_multixact_record(&mut buf, decoded, pg_version)?
     107              :             }
     108            0 :             pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?,
     109              :             // This is an odd duck. It needs to go to all shards.
     110              :             // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
     111              :             // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
     112              :             // the pageserver and decode it there.
     113              :             //
     114              :             // Alternatively, one can make the checkpoint part of the subscription protocol
     115              :             // to the pageserver. This should work fine, but can be done at a later point.
     116              :             pg_constants::RM_XLOG_ID => {
     117           15 :                 Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
     118              :             }
     119              :             pg_constants::RM_LOGICALMSG_ID => {
     120          606 :                 Self::decode_logical_message_record(&mut buf, decoded)?
     121              :             }
     122            8 :             pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
     123            0 :             pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
     124          146 :             _unexpected => {
     125              :                 // TODO: consider failing here instead of blindly doing something without
     126              :                 // understanding the protocol
     127          146 :                 None
     128              :             }
     129              :         };
     130              : 
     131              :         // Next, filter the metadata record by shard.
     132        73655 :         for (shard, record) in shard_records.iter_mut() {
     133          720 :             match metadata_record {
     134              :                 Some(
     135            6 :                     MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref clear_vm_bits))
     136            0 :                     | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref clear_vm_bits)),
     137              :                 ) => {
     138              :                     // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
     139              :                     // of the main relation. These are sharded and managed just like regular relation pages.
     140              :                     // See: https://github.com/neondatabase/neon/issues/9855
     141            6 :                     let is_local_vm_page = |heap_blk| {
     142            6 :                         let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
     143            6 :                         shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
     144            6 :                     };
     145              :                     // Send the old and new VM page updates to their respective shards.
     146            6 :                     let updated_old_heap_blkno = clear_vm_bits
     147            6 :                         .old_heap_blkno
     148            6 :                         .filter(|&blkno| is_local_vm_page(blkno));
     149            6 :                     let updated_new_heap_blkno = clear_vm_bits
     150            6 :                         .new_heap_blkno
     151            6 :                         .filter(|&blkno| is_local_vm_page(blkno));
     152              :                     // If neither VM page belongs to this shard, discard the record.
     153            6 :                     if updated_old_heap_blkno.is_some() || updated_new_heap_blkno.is_some() {
     154              :                         // Clone the record and update it for the current shard.
     155            6 :                         let mut for_shard = metadata_record.clone();
     156            6 :                         match for_shard {
     157              :                             Some(
     158              :                                 MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
     159            6 :                                     ref mut clear_vm_bits,
     160              :                                 ))
     161              :                                 | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
     162            0 :                                     ref mut clear_vm_bits,
     163              :                                 )),
     164            6 :                             ) => {
     165            6 :                                 clear_vm_bits.old_heap_blkno = updated_old_heap_blkno;
     166            6 :                                 clear_vm_bits.new_heap_blkno = updated_new_heap_blkno;
     167            6 :                                 record.metadata_record = for_shard;
     168            6 :                             }
     169              :                             _ => {
     170            0 :                                 unreachable!("for_shard is a clone of what we checked above")
     171              :                             }
     172              :                         }
     173            0 :                     }
     174              :                 }
     175              :                 Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
     176              :                     // Filter LogicalMessage records (AUX files) to only be stored on shard zero
     177          720 :                     if shard.is_shard_zero() {
     178          597 :                         record.metadata_record = metadata_record;
     179              :                         // No other shards should receive this record, so we stop traversing shards early.
     180          597 :                         break;
     181          123 :                     }
     182              :                 }
     183        72929 :                 _ => {
     184        72929 :                     // All other metadata records are sent to all shards.
     185        72929 :                     record.metadata_record = metadata_record.clone();
     186        72929 :                 }
     187              :             }
     188              :         }
     189              : 
     190        73532 :         Ok(())
     191        73532 :     }
     192              : 
     193        72737 :     fn decode_heapam_record(
     194        72737 :         buf: &mut Bytes,
     195        72737 :         decoded: &DecodedWALRecord,
     196        72737 :         pg_version: PgMajorVersion,
     197        72737 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     198              :         // Handle VM bit updates that are implicitly part of heap records.
     199              : 
     200              :         // First, look at the record to determine which VM bits need
     201              :         // to be cleared. If either of these variables is set, we
     202              :         // need to clear the corresponding bits in the visibility map.
     203        72737 :         let mut new_heap_blkno: Option<u32> = None;
     204        72737 :         let mut old_heap_blkno: Option<u32> = None;
     205        72737 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     206              : 
     207        72737 :         match pg_version {
     208              :             PgMajorVersion::PG14 => {
     209            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     210            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     211              : 
     212            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     213            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     214            0 :                         assert_eq!(0, buf.remaining());
     215            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     216            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     217            0 :                         }
     218            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     219            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     220            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     221            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     222            0 :                         }
     223            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     224            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     225              :                     {
     226            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     227              :                         // the size of tuple data is inferred from the size of the record.
     228              :                         // we can't validate the remaining number of bytes without parsing
     229              :                         // the tuple data.
     230            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     231            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     232            0 :                         }
     233            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     234            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     235            0 :                             // non-HOT update where the new tuple goes to different page than
     236            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     237            0 :                             // set.
     238            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     239            0 :                         }
     240            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     241            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     242            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     243            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     244            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     245            0 :                         }
     246            0 :                     }
     247            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     248            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     249            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     250            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     251              : 
     252            0 :                         let offset_array_len =
     253            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     254              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     255            0 :                                 0
     256              :                             } else {
     257            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     258              :                             };
     259            0 :                         assert_eq!(offset_array_len, buf.remaining());
     260              : 
     261            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     262            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     263            0 :                         }
     264            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     265            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     266            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     267            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     268            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     269            0 :                         }
     270            0 :                     }
     271              :                 } else {
     272            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     273              :                 }
     274              :             }
     275              :             PgMajorVersion::PG15 => {
     276        72737 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     277        72643 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     278              : 
     279        72643 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     280        72638 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     281        72638 :                         assert_eq!(0, buf.remaining());
     282        72638 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     283            2 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     284        72636 :                         }
     285            5 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     286            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     287            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     288            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     289            0 :                         }
     290            5 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     291            1 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     292              :                     {
     293            4 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     294              :                         // the size of tuple data is inferred from the size of the record.
     295              :                         // we can't validate the remaining number of bytes without parsing
     296              :                         // the tuple data.
     297            4 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     298            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     299            4 :                         }
     300            4 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     301            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     302            0 :                             // non-HOT update where the new tuple goes to different page than
     303            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     304            0 :                             // set.
     305            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     306            4 :                         }
     307            1 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     308            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     309            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     310            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     311            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     312            0 :                         }
     313            1 :                     }
     314           94 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     315           94 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     316           94 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     317           21 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     318              : 
     319           21 :                         let offset_array_len =
     320           21 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     321              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     322            1 :                                 0
     323              :                             } else {
     324           20 :                                 size_of::<u16>() * xlrec.ntuples as usize
     325              :                             };
     326           21 :                         assert_eq!(offset_array_len, buf.remaining());
     327              : 
     328           21 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     329            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     330           17 :                         }
     331           73 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     332            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     333            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     334            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     335            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     336            0 :                         }
     337           73 :                     }
     338              :                 } else {
     339            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     340              :                 }
     341              :             }
     342              :             PgMajorVersion::PG16 => {
     343            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     344            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     345              : 
     346            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     347            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     348            0 :                         assert_eq!(0, buf.remaining());
     349            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     350            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     351            0 :                         }
     352            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     353            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     354            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     355            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     356            0 :                         }
     357            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     358            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     359              :                     {
     360            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     361              :                         // the size of tuple data is inferred from the size of the record.
     362              :                         // we can't validate the remaining number of bytes without parsing
     363              :                         // the tuple data.
     364            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     365            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     366            0 :                         }
     367            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     368            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     369            0 :                             // non-HOT update where the new tuple goes to different page than
     370            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     371            0 :                             // set.
     372            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     373            0 :                         }
     374            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     375            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     376            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     377            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     378            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     379            0 :                         }
     380            0 :                     }
     381            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     382            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     383            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     384            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     385              : 
     386            0 :                         let offset_array_len =
     387            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     388              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     389            0 :                                 0
     390              :                             } else {
     391            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     392              :                             };
     393            0 :                         assert_eq!(offset_array_len, buf.remaining());
     394              : 
     395            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     396            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     397            0 :                         }
     398            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     399            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     400            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     401            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     402            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     403            0 :                         }
     404            0 :                     }
     405              :                 } else {
     406            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     407              :                 }
     408              :             }
     409              :             PgMajorVersion::PG17 => {
     410            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     411            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     412              : 
     413            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     414            0 :                         let xlrec = v17::XlHeapInsert::decode(buf);
     415            0 :                         assert_eq!(0, buf.remaining());
     416            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     417            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     418            0 :                         }
     419            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     420            0 :                         let xlrec = v17::XlHeapDelete::decode(buf);
     421            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     422            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     423            0 :                         }
     424            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     425            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     426              :                     {
     427            0 :                         let xlrec = v17::XlHeapUpdate::decode(buf);
     428              :                         // the size of tuple data is inferred from the size of the record.
     429              :                         // we can't validate the remaining number of bytes without parsing
     430              :                         // the tuple data.
     431            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     432            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     433            0 :                         }
     434            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     435            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     436            0 :                             // non-HOT update where the new tuple goes to different page than
     437            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     438            0 :                             // set.
     439            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     440            0 :                         }
     441            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     442            0 :                         let xlrec = v17::XlHeapLock::decode(buf);
     443            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     444            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     445            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     446            0 :                         }
     447            0 :                     }
     448            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     449            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     450            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     451            0 :                         let xlrec = v17::XlHeapMultiInsert::decode(buf);
     452              : 
     453            0 :                         let offset_array_len =
     454            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     455              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     456            0 :                                 0
     457              :                             } else {
     458            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     459              :                             };
     460            0 :                         assert_eq!(offset_array_len, buf.remaining());
     461              : 
     462            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     463            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     464            0 :                         }
     465            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     466            0 :                         let xlrec = v17::XlHeapLockUpdated::decode(buf);
     467            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     468            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     469            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     470            0 :                         }
     471            0 :                     }
     472              :                 } else {
     473            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     474              :                 }
     475              :             }
     476              :         }
     477              : 
     478        72737 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     479            6 :             let vm_rel = RelTag {
     480            6 :                 forknum: VISIBILITYMAP_FORKNUM,
     481            6 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     482            6 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     483            6 :                 relnode: decoded.blocks[0].rnode_relnode,
     484            6 :             };
     485              : 
     486            6 :             Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
     487            6 :                 ClearVmBits {
     488            6 :                     new_heap_blkno,
     489            6 :                     old_heap_blkno,
     490            6 :                     vm_rel,
     491            6 :                     flags,
     492            6 :                 },
     493            6 :             ))))
     494              :         } else {
     495        72731 :             Ok(None)
     496              :         }
     497        72737 :     }
     498              : 
     499            0 :     fn decode_neonmgr_record(
     500            0 :         buf: &mut Bytes,
     501            0 :         decoded: &DecodedWALRecord,
     502            0 :         pg_version: PgMajorVersion,
     503            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     504              :         // Handle VM bit updates that are implicitly part of heap records.
     505              : 
     506              :         // First, look at the record to determine which VM bits need
     507              :         // to be cleared. If either of these variables is set, we
     508              :         // need to clear the corresponding bits in the visibility map.
     509            0 :         let mut new_heap_blkno: Option<u32> = None;
     510            0 :         let mut old_heap_blkno: Option<u32> = None;
     511            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     512              : 
     513            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     514              : 
     515            0 :         match pg_version {
     516              :             PgMajorVersion::PG16 | PgMajorVersion::PG17 => {
     517            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     518              : 
     519            0 :                 match info {
     520              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     521            0 :                         let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
     522            0 :                         assert_eq!(0, buf.remaining());
     523            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     524            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     525            0 :                         }
     526              :                     }
     527              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     528            0 :                         let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
     529            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     530            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     531            0 :                         }
     532              :                     }
     533              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     534              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     535            0 :                         let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
     536              :                         // the size of tuple data is inferred from the size of the record.
     537              :                         // we can't validate the remaining number of bytes without parsing
     538              :                         // the tuple data.
     539            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     540            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     541            0 :                         }
     542            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     543            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     544            0 :                             // non-HOT update where the new tuple goes to different page than
     545            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     546            0 :                             // set.
     547            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     548            0 :                         }
     549              :                     }
     550              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     551            0 :                         let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     552              : 
     553            0 :                         let offset_array_len =
     554            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     555              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     556            0 :                                 0
     557              :                             } else {
     558            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     559              :                             };
     560            0 :                         assert_eq!(offset_array_len, buf.remaining());
     561              : 
     562            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     563            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     564            0 :                         }
     565              :                     }
     566              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     567            0 :                         let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
     568            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     569            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     570            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     571            0 :                         }
     572              :                     }
     573            0 :                     info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
     574              :                 }
     575              :             }
     576            0 :             PgMajorVersion::PG15 | PgMajorVersion::PG14 => anyhow::bail!(
     577            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     578              :                 pg_version
     579              :             ),
     580              :         }
     581              : 
     582            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     583            0 :             let vm_rel = RelTag {
     584            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     585            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     586            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     587            0 :                 relnode: decoded.blocks[0].rnode_relnode,
     588            0 :             };
     589              : 
     590            0 :             Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
     591            0 :                 ClearVmBits {
     592            0 :                     new_heap_blkno,
     593            0 :                     old_heap_blkno,
     594            0 :                     vm_rel,
     595            0 :                     flags,
     596            0 :                 },
     597            0 :             ))))
     598              :         } else {
     599            0 :             Ok(None)
     600              :         }
     601            0 :     }
     602              : 
     603            8 :     fn decode_smgr_record(
     604            8 :         buf: &mut Bytes,
     605            8 :         decoded: &DecodedWALRecord,
     606            8 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     607            8 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     608            8 :         if info == pg_constants::XLOG_SMGR_CREATE {
     609            8 :             let create = XlSmgrCreate::decode(buf);
     610            8 :             let rel = RelTag {
     611            8 :                 spcnode: create.rnode.spcnode,
     612            8 :                 dbnode: create.rnode.dbnode,
     613            8 :                 relnode: create.rnode.relnode,
     614            8 :                 forknum: create.forknum,
     615            8 :             };
     616              : 
     617            8 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
     618            8 :                 rel,
     619            8 :             }))));
     620            0 :         } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     621            0 :             let truncate = XlSmgrTruncate::decode(buf);
     622            0 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
     623            0 :         }
     624              : 
     625            0 :         Ok(None)
     626            8 :     }
     627              : 
     628            0 :     fn decode_dbase_record(
     629            0 :         buf: &mut Bytes,
     630            0 :         decoded: &DecodedWALRecord,
     631            0 :         pg_version: PgMajorVersion,
     632            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     633              :         // TODO: Refactor this to avoid the duplication between postgres versions.
     634              : 
     635            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     636            0 :         tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
     637              : 
     638            0 :         match pg_version {
     639              :             PgMajorVersion::PG14 => {
     640            0 :                 if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     641            0 :                     let createdb = XlCreateDatabase::decode(buf);
     642            0 :                     tracing::debug!("XLOG_DBASE_CREATE v14");
     643              : 
     644            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     645            0 :                         db_id: createdb.db_id,
     646            0 :                         tablespace_id: createdb.tablespace_id,
     647            0 :                         src_db_id: createdb.src_db_id,
     648            0 :                         src_tablespace_id: createdb.src_tablespace_id,
     649            0 :                     }));
     650              : 
     651            0 :                     return Ok(Some(record));
     652            0 :                 } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     653            0 :                     let dropdb = XlDropDatabase::decode(buf);
     654              : 
     655            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     656            0 :                         db_id: dropdb.db_id,
     657            0 :                         tablespace_ids: dropdb.tablespace_ids,
     658            0 :                     }));
     659              : 
     660            0 :                     return Ok(Some(record));
     661            0 :                 }
     662              :             }
     663              :             PgMajorVersion::PG15 => {
     664            0 :                 if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     665            0 :                     tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     666            0 :                 } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     667              :                     // The XLOG record was renamed between v14 and v15,
     668              :                     // but the record format is the same.
     669              :                     // So we can reuse XlCreateDatabase here.
     670            0 :                     tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     671              : 
     672            0 :                     let createdb = XlCreateDatabase::decode(buf);
     673            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     674            0 :                         db_id: createdb.db_id,
     675            0 :                         tablespace_id: createdb.tablespace_id,
     676            0 :                         src_db_id: createdb.src_db_id,
     677            0 :                         src_tablespace_id: createdb.src_tablespace_id,
     678            0 :                     }));
     679              : 
     680            0 :                     return Ok(Some(record));
     681            0 :                 } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     682            0 :                     let dropdb = XlDropDatabase::decode(buf);
     683            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     684            0 :                         db_id: dropdb.db_id,
     685            0 :                         tablespace_ids: dropdb.tablespace_ids,
     686            0 :                     }));
     687              : 
     688            0 :                     return Ok(Some(record));
     689            0 :                 }
     690              :             }
     691              :             PgMajorVersion::PG16 => {
     692            0 :                 if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     693            0 :                     tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     694            0 :                 } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     695              :                     // The XLOG record was renamed between v14 and v15,
     696              :                     // but the record format is the same.
     697              :                     // So we can reuse XlCreateDatabase here.
     698            0 :                     tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     699              : 
     700            0 :                     let createdb = XlCreateDatabase::decode(buf);
     701            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     702            0 :                         db_id: createdb.db_id,
     703            0 :                         tablespace_id: createdb.tablespace_id,
     704            0 :                         src_db_id: createdb.src_db_id,
     705            0 :                         src_tablespace_id: createdb.src_tablespace_id,
     706            0 :                     }));
     707              : 
     708            0 :                     return Ok(Some(record));
     709            0 :                 } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     710            0 :                     let dropdb = XlDropDatabase::decode(buf);
     711            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     712            0 :                         db_id: dropdb.db_id,
     713            0 :                         tablespace_ids: dropdb.tablespace_ids,
     714            0 :                     }));
     715              : 
     716            0 :                     return Ok(Some(record));
     717            0 :                 }
     718              :             }
     719              :             PgMajorVersion::PG17 => {
     720            0 :                 if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     721            0 :                     tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     722            0 :                 } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     723              :                     // The XLOG record was renamed between v14 and v15,
     724              :                     // but the record format is the same.
     725              :                     // So we can reuse XlCreateDatabase here.
     726            0 :                     tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     727              : 
     728            0 :                     let createdb = XlCreateDatabase::decode(buf);
     729            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     730            0 :                         db_id: createdb.db_id,
     731            0 :                         tablespace_id: createdb.tablespace_id,
     732            0 :                         src_db_id: createdb.src_db_id,
     733            0 :                         src_tablespace_id: createdb.src_tablespace_id,
     734            0 :                     }));
     735              : 
     736            0 :                     return Ok(Some(record));
     737            0 :                 } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
     738            0 :                     let dropdb = XlDropDatabase::decode(buf);
     739            0 :                     let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     740            0 :                         db_id: dropdb.db_id,
     741            0 :                         tablespace_ids: dropdb.tablespace_ids,
     742            0 :                     }));
     743              : 
     744            0 :                     return Ok(Some(record));
     745            0 :                 }
     746              :             }
     747              :         }
     748              : 
     749            0 :         Ok(None)
     750            0 :     }
     751              : 
     752            0 :     fn decode_clog_record(
     753            0 :         buf: &mut Bytes,
     754            0 :         decoded: &DecodedWALRecord,
     755            0 :         pg_version: PgMajorVersion,
     756            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     757            0 :         let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     758              : 
     759            0 :         if info == pg_constants::CLOG_ZEROPAGE {
     760            0 :             let pageno = if pg_version < PgMajorVersion::PG17 {
     761            0 :                 buf.get_u32_le()
     762              :             } else {
     763            0 :                 buf.get_u64_le() as u32
     764              :             };
     765            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     766            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     767              : 
     768            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
     769            0 :                 ClogZeroPage { segno, rpageno },
     770            0 :             ))))
     771              :         } else {
     772            0 :             assert_eq!(info, pg_constants::CLOG_TRUNCATE);
     773            0 :             let xlrec = XlClogTruncate::decode(buf, pg_version);
     774              : 
     775            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
     776            0 :                 ClogTruncate {
     777            0 :                     pageno: xlrec.pageno,
     778            0 :                     oldest_xid: xlrec.oldest_xid,
     779            0 :                     oldest_xid_db: xlrec.oldest_xid_db,
     780            0 :                 },
     781            0 :             ))))
     782              :         }
     783            0 :     }
     784              : 
     785           12 :     fn decode_xact_record(
     786           12 :         buf: &mut Bytes,
     787           12 :         decoded: &DecodedWALRecord,
     788           12 :         lsn: Lsn,
     789           12 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     790           12 :         let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     791           12 :         let origin_id = decoded.origin_id;
     792           12 :         let xl_xid = decoded.xl_xid;
     793              : 
     794           12 :         if info == pg_constants::XLOG_XACT_COMMIT {
     795            4 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     796            4 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
     797            4 :                 parsed,
     798            4 :                 origin_id,
     799            4 :                 xl_xid,
     800            4 :                 lsn,
     801            4 :             }))));
     802            8 :         } else if info == pg_constants::XLOG_XACT_ABORT {
     803            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     804            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
     805            0 :                 parsed,
     806            0 :                 origin_id,
     807            0 :                 xl_xid,
     808            0 :                 lsn,
     809            0 :             }))));
     810            8 :         } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
     811            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     812            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
     813            0 :                 XactCommon {
     814            0 :                     parsed,
     815            0 :                     origin_id,
     816            0 :                     xl_xid,
     817            0 :                     lsn,
     818            0 :                 },
     819            0 :             ))));
     820            8 :         } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
     821            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     822            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
     823            0 :                 XactCommon {
     824            0 :                     parsed,
     825            0 :                     origin_id,
     826            0 :                     xl_xid,
     827            0 :                     lsn,
     828            0 :                 },
     829            0 :             ))));
     830            8 :         } else if info == pg_constants::XLOG_XACT_PREPARE {
     831            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
     832            0 :                 XactPrepare {
     833            0 :                     xl_xid: decoded.xl_xid,
     834            0 :                     data: Bytes::copy_from_slice(&buf[..]),
     835            0 :                 },
     836            0 :             ))));
     837            8 :         }
     838              : 
     839            8 :         Ok(None)
     840           12 :     }
     841              : 
     842            0 :     fn decode_multixact_record(
     843            0 :         buf: &mut Bytes,
     844            0 :         decoded: &DecodedWALRecord,
     845            0 :         pg_version: PgMajorVersion,
     846            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     847            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     848              : 
     849            0 :         if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
     850            0 :             || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
     851              :         {
     852            0 :             let pageno = if pg_version < PgMajorVersion::PG17 {
     853            0 :                 buf.get_u32_le()
     854              :             } else {
     855            0 :                 buf.get_u64_le() as u32
     856              :             };
     857            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     858            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     859              : 
     860            0 :             let slru_kind = match info {
     861            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
     862            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
     863            0 :                 _ => unreachable!(),
     864              :             };
     865              : 
     866            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
     867            0 :                 MultiXactZeroPage {
     868            0 :                     slru_kind,
     869            0 :                     segno,
     870            0 :                     rpageno,
     871            0 :                 },
     872            0 :             ))));
     873            0 :         } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     874            0 :             let xlrec = XlMultiXactCreate::decode(buf);
     875            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
     876            0 :                 xlrec,
     877            0 :             ))));
     878            0 :         } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     879            0 :             let xlrec = XlMultiXactTruncate::decode(buf);
     880            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
     881            0 :                 xlrec,
     882            0 :             ))));
     883            0 :         }
     884              : 
     885            0 :         Ok(None)
     886            0 :     }
     887              : 
     888            0 :     fn decode_relmap_record(
     889            0 :         buf: &mut Bytes,
     890            0 :         decoded: &DecodedWALRecord,
     891            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     892            0 :         let update = XlRelmapUpdate::decode(buf);
     893              : 
     894            0 :         let mut buf = decoded.record.clone();
     895            0 :         buf.advance(decoded.main_data_offset);
     896              :         // skip xl_relmap_update
     897            0 :         buf.advance(12);
     898              : 
     899            0 :         Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
     900            0 :             RelmapUpdate {
     901            0 :                 update,
     902            0 :                 buf: Bytes::copy_from_slice(&buf[..]),
     903            0 :             },
     904            0 :         ))))
     905            0 :     }
     906              : 
     907           15 :     fn decode_xlog_record(
     908           15 :         buf: &mut Bytes,
     909           15 :         decoded: &DecodedWALRecord,
     910           15 :         lsn: Lsn,
     911           15 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     912           15 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     913           15 :         Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
     914           15 :             info,
     915           15 :             lsn,
     916           15 :             buf: buf.clone(),
     917           15 :         }))))
     918           15 :     }
     919              : 
     920          606 :     fn decode_logical_message_record(
     921          606 :         buf: &mut Bytes,
     922          606 :         decoded: &DecodedWALRecord,
     923          606 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     924          606 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     925          606 :         if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     926          606 :             let xlrec = XlLogicalMessage::decode(buf);
     927          606 :             let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     928              : 
     929              :             #[cfg(feature = "testing")]
     930          606 :             if prefix == "neon-test" {
     931            0 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     932            0 :                     LogicalMessageRecord::Failpoint,
     933            0 :                 )));
     934          606 :             }
     935              : 
     936          606 :             if let Some(path) = prefix.strip_prefix("neon-file:") {
     937          597 :                 let buf_size = xlrec.prefix_size + xlrec.message_size;
     938          597 :                 let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
     939          597 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     940          597 :                     LogicalMessageRecord::Put(PutLogicalMessage {
     941          597 :                         path: path.to_string(),
     942          597 :                         buf,
     943          597 :                     }),
     944          597 :                 )));
     945            9 :             }
     946            0 :         }
     947              : 
     948            9 :         Ok(None)
     949          606 :     }
     950              : 
     951            8 :     fn decode_standby_record(
     952            8 :         buf: &mut Bytes,
     953            8 :         decoded: &DecodedWALRecord,
     954            8 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     955            8 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     956            8 :         if info == pg_constants::XLOG_RUNNING_XACTS {
     957            0 :             let xlrec = XlRunningXacts::decode(buf);
     958            0 :             return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
     959            0 :                 StandbyRunningXacts {
     960            0 :                     oldest_running_xid: xlrec.oldest_running_xid,
     961            0 :                 },
     962            0 :             ))));
     963            8 :         }
     964              : 
     965            8 :         Ok(None)
     966            8 :     }
     967              : 
     968            0 :     fn decode_replorigin_record(
     969            0 :         buf: &mut Bytes,
     970            0 :         decoded: &DecodedWALRecord,
     971            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     972            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     973            0 :         if info == pg_constants::XLOG_REPLORIGIN_SET {
     974            0 :             let xlrec = XlReploriginSet::decode(buf);
     975            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
     976            0 :                 xlrec,
     977            0 :             ))));
     978            0 :         } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
     979            0 :             let xlrec = XlReploriginDrop::decode(buf);
     980            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
     981            0 :                 xlrec,
     982            0 :             ))));
     983            0 :         }
     984              : 
     985            0 :         Ok(None)
     986            0 :     }
     987              : }
        

Generated by: LCOV version 2.1-beta