LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - decoder.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 26.0 % 762 198
Test Date: 2025-01-07 20:58:07 Functions: 52.9 % 17 9

            Line data    Source code
       1              : //! This module contains logic for decoding and interpreting
       2              : //! raw bytes which represent a raw Postgres WAL record.
       3              : 
       4              : use crate::models::*;
       5              : use crate::serialized_batch::SerializedValueBatch;
       6              : use bytes::{Buf, Bytes};
       7              : use pageserver_api::key::rel_block_to_key;
       8              : use pageserver_api::reltag::{RelTag, SlruKind};
       9              : use pageserver_api::shard::ShardIdentity;
      10              : use postgres_ffi::pg_constants;
      11              : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      12              : use postgres_ffi::walrecord::*;
      13              : use utils::lsn::Lsn;
      14              : 
      15              : impl InterpretedWalRecord {
      16              :     /// Decode and interpreted raw bytes which represent one Postgres WAL record.
      17              :     /// Data blocks which do not match the provided shard identity are filtered out.
      18              :     /// Shard 0 is a special case since it tracks all relation sizes. We only give it
      19              :     /// the keys that are being written as that is enough for updating relation sizes.
      20       145852 :     pub fn from_bytes_filtered(
      21       145852 :         buf: Bytes,
      22       145852 :         shard: &ShardIdentity,
      23       145852 :         next_record_lsn: Lsn,
      24       145852 :         pg_version: u32,
      25       145852 :     ) -> anyhow::Result<InterpretedWalRecord> {
      26       145852 :         let mut decoded = DecodedWALRecord::default();
      27       145852 :         decode_wal_record(buf, &mut decoded, pg_version)?;
      28       145852 :         let xid = decoded.xl_xid;
      29              : 
      30       145852 :         let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
      31            0 :             FlushUncommittedRecords::Yes
      32              :         } else {
      33       145852 :             FlushUncommittedRecords::No
      34              :         };
      35              : 
      36       145852 :         let metadata_record =
      37       145852 :             MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?;
      38       145852 :         let batch = SerializedValueBatch::from_decoded_filtered(
      39       145852 :             decoded,
      40       145852 :             shard,
      41       145852 :             next_record_lsn,
      42       145852 :             pg_version,
      43       145852 :         )?;
      44              : 
      45       145852 :         Ok(InterpretedWalRecord {
      46       145852 :             metadata_record,
      47       145852 :             batch,
      48       145852 :             next_record_lsn,
      49       145852 :             flush_uncommitted,
      50       145852 :             xid,
      51       145852 :         })
      52       145852 :     }
      53              : }
      54              : 
      55              : impl MetadataRecord {
      56              :     /// Builds a metadata record for this WAL record, if any.
      57              :     ///
      58              :     /// Only metadata records relevant for the given shard are emitted. Currently, most metadata
      59              :     /// records are broadcast to all shards for simplicity, but this should be improved.
      60       145852 :     fn from_decoded_filtered(
      61       145852 :         decoded: &DecodedWALRecord,
      62       145852 :         shard: &ShardIdentity,
      63       145852 :         next_record_lsn: Lsn,
      64       145852 :         pg_version: u32,
      65       145852 :     ) -> anyhow::Result<Option<MetadataRecord>> {
      66       145852 :         // Note: this doesn't actually copy the bytes since
      67       145852 :         // the [`Bytes`] type implements it via a level of indirection.
      68       145852 :         let mut buf = decoded.record.clone();
      69       145852 :         buf.advance(decoded.main_data_offset);
      70              : 
      71              :         // First, generate metadata records from the decoded WAL record.
      72       145852 :         let mut metadata_record = match decoded.xl_rmid {
      73              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
      74       145474 :                 Self::decode_heapam_record(&mut buf, decoded, pg_version)?
      75              :             }
      76            0 :             pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?,
      77              :             // Handle other special record types
      78           16 :             pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?,
      79            0 :             pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?,
      80              :             pg_constants::RM_TBLSPC_ID => {
      81            0 :                 tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
      82            0 :                 None
      83              :             }
      84            0 :             pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?,
      85              :             pg_constants::RM_XACT_ID => {
      86           24 :                 Self::decode_xact_record(&mut buf, decoded, next_record_lsn)?
      87              :             }
      88              :             pg_constants::RM_MULTIXACT_ID => {
      89            0 :                 Self::decode_multixact_record(&mut buf, decoded, pg_version)?
      90              :             }
      91            0 :             pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?,
      92              :             // This is an odd duck. It needs to go to all shards.
      93              :             // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
      94              :             // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
      95              :             // the pageserver and decode it there.
      96              :             //
      97              :             // Alternatively, one can make the checkpoint part of the subscription protocol
      98              :             // to the pageserver. This should work fine, but can be done at a later point.
      99              :             pg_constants::RM_XLOG_ID => {
     100           30 :                 Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
     101              :             }
     102              :             pg_constants::RM_LOGICALMSG_ID => {
     103            0 :                 Self::decode_logical_message_record(&mut buf, decoded)?
     104              :             }
     105           16 :             pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
     106            0 :             pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
     107          292 :             _unexpected => {
     108          292 :                 // TODO: consider failing here instead of blindly doing something without
     109          292 :                 // understanding the protocol
     110          292 :                 None
     111              :             }
     112              :         };
     113              : 
     114              :         // Next, filter the metadata record by shard.
     115            0 :         match metadata_record {
     116              :             Some(
     117           12 :                 MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
     118            0 :                 | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
     119              :             ) => {
     120              :                 // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
     121              :                 // of the main relation. These are sharded and managed just like regular relation pages.
     122              :                 // See: https://github.com/neondatabase/neon/issues/9855
     123           12 :                 let is_local_vm_page = |heap_blk| {
     124           12 :                     let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
     125           12 :                     shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
     126           12 :                 };
     127              :                 // Send the old and new VM page updates to their respective shards.
     128           12 :                 clear_vm_bits.old_heap_blkno = clear_vm_bits
     129           12 :                     .old_heap_blkno
     130           12 :                     .filter(|&blkno| is_local_vm_page(blkno));
     131           12 :                 clear_vm_bits.new_heap_blkno = clear_vm_bits
     132           12 :                     .new_heap_blkno
     133           12 :                     .filter(|&blkno| is_local_vm_page(blkno));
     134           12 :                 // If neither VM page belongs to this shard, discard the record.
     135           12 :                 if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
     136              :                 {
     137            0 :                     metadata_record = None
     138           12 :                 }
     139              :             }
     140              :             Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
     141              :                 // Filter LogicalMessage records (AUX files) to only be stored on shard zero
     142            0 :                 if !shard.is_shard_zero() {
     143            0 :                     metadata_record = None;
     144            0 :                 }
     145              :             }
     146       145840 :             _ => {}
     147              :         }
     148              : 
     149       145852 :         Ok(metadata_record)
     150       145852 :     }
     151              : 
     152       145474 :     fn decode_heapam_record(
     153       145474 :         buf: &mut Bytes,
     154       145474 :         decoded: &DecodedWALRecord,
     155       145474 :         pg_version: u32,
     156       145474 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     157       145474 :         // Handle VM bit updates that are implicitly part of heap records.
     158       145474 : 
     159       145474 :         // First, look at the record to determine which VM bits need
     160       145474 :         // to be cleared. If either of these variables is set, we
     161       145474 :         // need to clear the corresponding bits in the visibility map.
     162       145474 :         let mut new_heap_blkno: Option<u32> = None;
     163       145474 :         let mut old_heap_blkno: Option<u32> = None;
     164       145474 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     165       145474 : 
     166       145474 :         match pg_version {
     167              :             14 => {
     168            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     169            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     170            0 : 
     171            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     172            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     173            0 :                         assert_eq!(0, buf.remaining());
     174            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     175            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     176            0 :                         }
     177            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     178            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     179            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     180            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     181            0 :                         }
     182            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     183            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     184              :                     {
     185            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     186            0 :                         // the size of tuple data is inferred from the size of the record.
     187            0 :                         // we can't validate the remaining number of bytes without parsing
     188            0 :                         // the tuple data.
     189            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     190            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     191            0 :                         }
     192            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     193            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     194            0 :                             // non-HOT update where the new tuple goes to different page than
     195            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     196            0 :                             // set.
     197            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     198            0 :                         }
     199            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     200            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     201            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     202            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     203            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     204            0 :                         }
     205            0 :                     }
     206            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     207            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     208            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     209            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     210              : 
     211            0 :                         let offset_array_len =
     212            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     213              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     214            0 :                                 0
     215              :                             } else {
     216            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     217              :                             };
     218            0 :                         assert_eq!(offset_array_len, buf.remaining());
     219              : 
     220            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     221            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     222            0 :                         }
     223            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     224            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     225            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     226            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     227            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     228            0 :                         }
     229            0 :                     }
     230              :                 } else {
     231            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     232              :                 }
     233              :             }
     234              :             15 => {
     235       145474 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     236       145286 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     237       145286 : 
     238       145286 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     239       145276 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     240       145276 :                         assert_eq!(0, buf.remaining());
     241       145276 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     242            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     243       145272 :                         }
     244           10 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     245            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     246            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     247            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     248            0 :                         }
     249           10 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     250            2 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     251              :                     {
     252            8 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     253            8 :                         // the size of tuple data is inferred from the size of the record.
     254            8 :                         // we can't validate the remaining number of bytes without parsing
     255            8 :                         // the tuple data.
     256            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     257            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     258            8 :                         }
     259            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     260            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     261            0 :                             // non-HOT update where the new tuple goes to different page than
     262            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     263            0 :                             // set.
     264            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     265            8 :                         }
     266            2 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     267            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     268            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     269            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     270            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     271            0 :                         }
     272            2 :                     }
     273          188 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     274          188 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     275          188 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     276           42 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     277              : 
     278           42 :                         let offset_array_len =
     279           42 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     280              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     281            2 :                                 0
     282              :                             } else {
     283           40 :                                 size_of::<u16>() * xlrec.ntuples as usize
     284              :                             };
     285           42 :                         assert_eq!(offset_array_len, buf.remaining());
     286              : 
     287           42 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     288            8 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     289           34 :                         }
     290          146 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     291            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     292            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     293            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     294            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     295            0 :                         }
     296          146 :                     }
     297              :                 } else {
     298            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     299              :                 }
     300              :             }
     301              :             16 => {
     302            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     303            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     304            0 : 
     305            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     306            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     307            0 :                         assert_eq!(0, buf.remaining());
     308            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     309            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     310            0 :                         }
     311            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     312            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     313            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     314            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     315            0 :                         }
     316            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     317            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     318              :                     {
     319            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     320            0 :                         // the size of tuple data is inferred from the size of the record.
     321            0 :                         // we can't validate the remaining number of bytes without parsing
     322            0 :                         // the tuple data.
     323            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     324            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     325            0 :                         }
     326            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     327            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     328            0 :                             // non-HOT update where the new tuple goes to different page than
     329            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     330            0 :                             // set.
     331            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     332            0 :                         }
     333            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     334            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     335            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     336            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     337            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     338            0 :                         }
     339            0 :                     }
     340            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     341            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     342            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     343            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     344              : 
     345            0 :                         let offset_array_len =
     346            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     347              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     348            0 :                                 0
     349              :                             } else {
     350            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     351              :                             };
     352            0 :                         assert_eq!(offset_array_len, buf.remaining());
     353              : 
     354            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     355            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     356            0 :                         }
     357            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     358            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     359            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     360            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     361            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     362            0 :                         }
     363            0 :                     }
     364              :                 } else {
     365            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     366              :                 }
     367              :             }
     368              :             17 => {
     369            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     370            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     371            0 : 
     372            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     373            0 :                         let xlrec = v17::XlHeapInsert::decode(buf);
     374            0 :                         assert_eq!(0, buf.remaining());
     375            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     376            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     377            0 :                         }
     378            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     379            0 :                         let xlrec = v17::XlHeapDelete::decode(buf);
     380            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     381            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     382            0 :                         }
     383            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     384            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     385              :                     {
     386            0 :                         let xlrec = v17::XlHeapUpdate::decode(buf);
     387            0 :                         // the size of tuple data is inferred from the size of the record.
     388            0 :                         // we can't validate the remaining number of bytes without parsing
     389            0 :                         // the tuple data.
     390            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     391            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     392            0 :                         }
     393            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     394            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     395            0 :                             // non-HOT update where the new tuple goes to different page than
     396            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     397            0 :                             // set.
     398            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     399            0 :                         }
     400            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     401            0 :                         let xlrec = v17::XlHeapLock::decode(buf);
     402            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     403            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     404            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     405            0 :                         }
     406            0 :                     }
     407            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     408            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     409            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     410            0 :                         let xlrec = v17::XlHeapMultiInsert::decode(buf);
     411              : 
     412            0 :                         let offset_array_len =
     413            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     414              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     415            0 :                                 0
     416              :                             } else {
     417            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     418              :                             };
     419            0 :                         assert_eq!(offset_array_len, buf.remaining());
     420              : 
     421            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     422            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     423            0 :                         }
     424            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     425            0 :                         let xlrec = v17::XlHeapLockUpdated::decode(buf);
     426            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     427            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     428            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     429            0 :                         }
     430            0 :                     }
     431              :                 } else {
     432            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     433              :                 }
     434              :             }
     435            0 :             _ => {}
     436              :         }
     437              : 
     438       145474 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     439           12 :             let vm_rel = RelTag {
     440           12 :                 forknum: VISIBILITYMAP_FORKNUM,
     441           12 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     442           12 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     443           12 :                 relnode: decoded.blocks[0].rnode_relnode,
     444           12 :             };
     445           12 : 
     446           12 :             Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
     447           12 :                 ClearVmBits {
     448           12 :                     new_heap_blkno,
     449           12 :                     old_heap_blkno,
     450           12 :                     vm_rel,
     451           12 :                     flags,
     452           12 :                 },
     453           12 :             ))))
     454              :         } else {
     455       145462 :             Ok(None)
     456              :         }
     457       145474 :     }
     458              : 
     459            0 :     fn decode_neonmgr_record(
     460            0 :         buf: &mut Bytes,
     461            0 :         decoded: &DecodedWALRecord,
     462            0 :         pg_version: u32,
     463            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     464            0 :         // Handle VM bit updates that are implicitly part of heap records.
     465            0 : 
     466            0 :         // First, look at the record to determine which VM bits need
     467            0 :         // to be cleared. If either of these variables is set, we
     468            0 :         // need to clear the corresponding bits in the visibility map.
     469            0 :         let mut new_heap_blkno: Option<u32> = None;
     470            0 :         let mut old_heap_blkno: Option<u32> = None;
     471            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     472            0 : 
     473            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     474              : 
     475            0 :         match pg_version {
     476              :             16 | 17 => {
     477            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     478            0 : 
     479            0 :                 match info {
     480              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     481            0 :                         let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
     482            0 :                         assert_eq!(0, buf.remaining());
     483            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     484            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     485            0 :                         }
     486              :                     }
     487              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     488            0 :                         let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
     489            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     490            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     491            0 :                         }
     492              :                     }
     493              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     494              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     495            0 :                         let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
     496            0 :                         // the size of tuple data is inferred from the size of the record.
     497            0 :                         // we can't validate the remaining number of bytes without parsing
     498            0 :                         // the tuple data.
     499            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     500            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     501            0 :                         }
     502            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     503            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     504            0 :                             // non-HOT update where the new tuple goes to different page than
     505            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     506            0 :                             // set.
     507            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     508            0 :                         }
     509              :                     }
     510              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     511            0 :                         let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     512              : 
     513            0 :                         let offset_array_len =
     514            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     515              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     516            0 :                                 0
     517              :                             } else {
     518            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     519              :                             };
     520            0 :                         assert_eq!(offset_array_len, buf.remaining());
     521              : 
     522            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     523            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     524            0 :                         }
     525              :                     }
     526              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     527            0 :                         let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
     528            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     529            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     530            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     531            0 :                         }
     532              :                     }
     533            0 :                     info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
     534              :                 }
     535              :             }
     536            0 :             _ => anyhow::bail!(
     537            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     538            0 :                 pg_version
     539            0 :             ),
     540              :         }
     541              : 
     542            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     543            0 :             let vm_rel = RelTag {
     544            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     545            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     546            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     547            0 :                 relnode: decoded.blocks[0].rnode_relnode,
     548            0 :             };
     549            0 : 
     550            0 :             Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
     551            0 :                 ClearVmBits {
     552            0 :                     new_heap_blkno,
     553            0 :                     old_heap_blkno,
     554            0 :                     vm_rel,
     555            0 :                     flags,
     556            0 :                 },
     557            0 :             ))))
     558              :         } else {
     559            0 :             Ok(None)
     560              :         }
     561            0 :     }
     562              : 
     563           16 :     fn decode_smgr_record(
     564           16 :         buf: &mut Bytes,
     565           16 :         decoded: &DecodedWALRecord,
     566           16 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     567           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     568           16 :         if info == pg_constants::XLOG_SMGR_CREATE {
     569           16 :             let create = XlSmgrCreate::decode(buf);
     570           16 :             let rel = RelTag {
     571           16 :                 spcnode: create.rnode.spcnode,
     572           16 :                 dbnode: create.rnode.dbnode,
     573           16 :                 relnode: create.rnode.relnode,
     574           16 :                 forknum: create.forknum,
     575           16 :             };
     576           16 : 
     577           16 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
     578           16 :                 rel,
     579           16 :             }))));
     580            0 :         } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     581            0 :             let truncate = XlSmgrTruncate::decode(buf);
     582            0 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
     583            0 :         }
     584            0 : 
     585            0 :         Ok(None)
     586           16 :     }
     587              : 
     588            0 :     fn decode_dbase_record(
     589            0 :         buf: &mut Bytes,
     590            0 :         decoded: &DecodedWALRecord,
     591            0 :         pg_version: u32,
     592            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     593            0 :         // TODO: Refactor this to avoid the duplication between postgres versions.
     594            0 : 
     595            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     596            0 :         tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
     597              : 
     598            0 :         if pg_version == 14 {
     599            0 :             if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     600            0 :                 let createdb = XlCreateDatabase::decode(buf);
     601            0 :                 tracing::debug!("XLOG_DBASE_CREATE v14");
     602              : 
     603            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     604            0 :                     db_id: createdb.db_id,
     605            0 :                     tablespace_id: createdb.tablespace_id,
     606            0 :                     src_db_id: createdb.src_db_id,
     607            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     608            0 :                 }));
     609            0 : 
     610            0 :                 return Ok(Some(record));
     611            0 :             } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     612            0 :                 let dropdb = XlDropDatabase::decode(buf);
     613            0 : 
     614            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     615            0 :                     db_id: dropdb.db_id,
     616            0 :                     tablespace_ids: dropdb.tablespace_ids,
     617            0 :                 }));
     618            0 : 
     619            0 :                 return Ok(Some(record));
     620            0 :             }
     621            0 :         } else if pg_version == 15 {
     622            0 :             if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     623            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     624            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     625              :                 // The XLOG record was renamed between v14 and v15,
     626              :                 // but the record format is the same.
     627              :                 // So we can reuse XlCreateDatabase here.
     628            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     629              : 
     630            0 :                 let createdb = XlCreateDatabase::decode(buf);
     631            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     632            0 :                     db_id: createdb.db_id,
     633            0 :                     tablespace_id: createdb.tablespace_id,
     634            0 :                     src_db_id: createdb.src_db_id,
     635            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     636            0 :                 }));
     637            0 : 
     638            0 :                 return Ok(Some(record));
     639            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     640            0 :                 let dropdb = XlDropDatabase::decode(buf);
     641            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     642            0 :                     db_id: dropdb.db_id,
     643            0 :                     tablespace_ids: dropdb.tablespace_ids,
     644            0 :                 }));
     645            0 : 
     646            0 :                 return Ok(Some(record));
     647            0 :             }
     648            0 :         } else if pg_version == 16 {
     649            0 :             if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     650            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     651            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     652              :                 // The XLOG record was renamed between v14 and v15,
     653              :                 // but the record format is the same.
     654              :                 // So we can reuse XlCreateDatabase here.
     655            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     656              : 
     657            0 :                 let createdb = XlCreateDatabase::decode(buf);
     658            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     659            0 :                     db_id: createdb.db_id,
     660            0 :                     tablespace_id: createdb.tablespace_id,
     661            0 :                     src_db_id: createdb.src_db_id,
     662            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     663            0 :                 }));
     664            0 : 
     665            0 :                 return Ok(Some(record));
     666            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     667            0 :                 let dropdb = XlDropDatabase::decode(buf);
     668            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     669            0 :                     db_id: dropdb.db_id,
     670            0 :                     tablespace_ids: dropdb.tablespace_ids,
     671            0 :                 }));
     672            0 : 
     673            0 :                 return Ok(Some(record));
     674            0 :             }
     675            0 :         } else if pg_version == 17 {
     676            0 :             if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     677            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     678            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     679              :                 // The XLOG record was renamed between v14 and v15,
     680              :                 // but the record format is the same.
     681              :                 // So we can reuse XlCreateDatabase here.
     682            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     683              : 
     684            0 :                 let createdb = XlCreateDatabase::decode(buf);
     685            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     686            0 :                     db_id: createdb.db_id,
     687            0 :                     tablespace_id: createdb.tablespace_id,
     688            0 :                     src_db_id: createdb.src_db_id,
     689            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     690            0 :                 }));
     691            0 : 
     692            0 :                 return Ok(Some(record));
     693            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
     694            0 :                 let dropdb = XlDropDatabase::decode(buf);
     695            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     696            0 :                     db_id: dropdb.db_id,
     697            0 :                     tablespace_ids: dropdb.tablespace_ids,
     698            0 :                 }));
     699            0 : 
     700            0 :                 return Ok(Some(record));
     701            0 :             }
     702            0 :         }
     703              : 
     704            0 :         Ok(None)
     705            0 :     }
     706              : 
     707            0 :     fn decode_clog_record(
     708            0 :         buf: &mut Bytes,
     709            0 :         decoded: &DecodedWALRecord,
     710            0 :         pg_version: u32,
     711            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     712            0 :         let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     713            0 : 
     714            0 :         if info == pg_constants::CLOG_ZEROPAGE {
     715            0 :             let pageno = if pg_version < 17 {
     716            0 :                 buf.get_u32_le()
     717              :             } else {
     718            0 :                 buf.get_u64_le() as u32
     719              :             };
     720            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     721            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     722            0 : 
     723            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
     724            0 :                 ClogZeroPage { segno, rpageno },
     725            0 :             ))))
     726              :         } else {
     727            0 :             assert!(info == pg_constants::CLOG_TRUNCATE);
     728            0 :             let xlrec = XlClogTruncate::decode(buf, pg_version);
     729            0 : 
     730            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
     731            0 :                 ClogTruncate {
     732            0 :                     pageno: xlrec.pageno,
     733            0 :                     oldest_xid: xlrec.oldest_xid,
     734            0 :                     oldest_xid_db: xlrec.oldest_xid_db,
     735            0 :                 },
     736            0 :             ))))
     737              :         }
     738            0 :     }
     739              : 
     740           24 :     fn decode_xact_record(
     741           24 :         buf: &mut Bytes,
     742           24 :         decoded: &DecodedWALRecord,
     743           24 :         lsn: Lsn,
     744           24 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     745           24 :         let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     746           24 :         let origin_id = decoded.origin_id;
     747           24 :         let xl_xid = decoded.xl_xid;
     748           24 : 
     749           24 :         if info == pg_constants::XLOG_XACT_COMMIT {
     750            8 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     751            8 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
     752            8 :                 parsed,
     753            8 :                 origin_id,
     754            8 :                 xl_xid,
     755            8 :                 lsn,
     756            8 :             }))));
     757           16 :         } else if info == pg_constants::XLOG_XACT_ABORT {
     758            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     759            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
     760            0 :                 parsed,
     761            0 :                 origin_id,
     762            0 :                 xl_xid,
     763            0 :                 lsn,
     764            0 :             }))));
     765           16 :         } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
     766            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     767            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
     768            0 :                 XactCommon {
     769            0 :                     parsed,
     770            0 :                     origin_id,
     771            0 :                     xl_xid,
     772            0 :                     lsn,
     773            0 :                 },
     774            0 :             ))));
     775           16 :         } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
     776            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     777            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
     778            0 :                 XactCommon {
     779            0 :                     parsed,
     780            0 :                     origin_id,
     781            0 :                     xl_xid,
     782            0 :                     lsn,
     783            0 :                 },
     784            0 :             ))));
     785           16 :         } else if info == pg_constants::XLOG_XACT_PREPARE {
     786            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
     787            0 :                 XactPrepare {
     788            0 :                     xl_xid: decoded.xl_xid,
     789            0 :                     data: Bytes::copy_from_slice(&buf[..]),
     790            0 :                 },
     791            0 :             ))));
     792           16 :         }
     793           16 : 
     794           16 :         Ok(None)
     795           24 :     }
     796              : 
     797            0 :     fn decode_multixact_record(
     798            0 :         buf: &mut Bytes,
     799            0 :         decoded: &DecodedWALRecord,
     800            0 :         pg_version: u32,
     801            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     802            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     803            0 : 
     804            0 :         if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
     805            0 :             || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
     806              :         {
     807            0 :             let pageno = if pg_version < 17 {
     808            0 :                 buf.get_u32_le()
     809              :             } else {
     810            0 :                 buf.get_u64_le() as u32
     811              :             };
     812            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     813            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     814              : 
     815            0 :             let slru_kind = match info {
     816            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
     817            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
     818            0 :                 _ => unreachable!(),
     819              :             };
     820              : 
     821            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
     822            0 :                 MultiXactZeroPage {
     823            0 :                     slru_kind,
     824            0 :                     segno,
     825            0 :                     rpageno,
     826            0 :                 },
     827            0 :             ))));
     828            0 :         } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     829            0 :             let xlrec = XlMultiXactCreate::decode(buf);
     830            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
     831            0 :                 xlrec,
     832            0 :             ))));
     833            0 :         } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     834            0 :             let xlrec = XlMultiXactTruncate::decode(buf);
     835            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
     836            0 :                 xlrec,
     837            0 :             ))));
     838            0 :         }
     839            0 : 
     840            0 :         Ok(None)
     841            0 :     }
     842              : 
     843            0 :     fn decode_relmap_record(
     844            0 :         buf: &mut Bytes,
     845            0 :         decoded: &DecodedWALRecord,
     846            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     847            0 :         let update = XlRelmapUpdate::decode(buf);
     848            0 : 
     849            0 :         let mut buf = decoded.record.clone();
     850            0 :         buf.advance(decoded.main_data_offset);
     851            0 :         // skip xl_relmap_update
     852            0 :         buf.advance(12);
     853            0 : 
     854            0 :         Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
     855            0 :             RelmapUpdate {
     856            0 :                 update,
     857            0 :                 buf: Bytes::copy_from_slice(&buf[..]),
     858            0 :             },
     859            0 :         ))))
     860            0 :     }
     861              : 
     862           30 :     fn decode_xlog_record(
     863           30 :         buf: &mut Bytes,
     864           30 :         decoded: &DecodedWALRecord,
     865           30 :         lsn: Lsn,
     866           30 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     867           30 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     868           30 :         Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
     869           30 :             info,
     870           30 :             lsn,
     871           30 :             buf: buf.clone(),
     872           30 :         }))))
     873           30 :     }
     874              : 
     875            0 :     fn decode_logical_message_record(
     876            0 :         buf: &mut Bytes,
     877            0 :         decoded: &DecodedWALRecord,
     878            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     879            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     880            0 :         if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     881            0 :             let xlrec = XlLogicalMessage::decode(buf);
     882            0 :             let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     883              : 
     884              :             #[cfg(feature = "testing")]
     885            0 :             if prefix == "neon-test" {
     886            0 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     887            0 :                     LogicalMessageRecord::Failpoint,
     888            0 :                 )));
     889            0 :             }
     890              : 
     891            0 :             if let Some(path) = prefix.strip_prefix("neon-file:") {
     892            0 :                 let buf_size = xlrec.prefix_size + xlrec.message_size;
     893            0 :                 let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
     894            0 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     895            0 :                     LogicalMessageRecord::Put(PutLogicalMessage {
     896            0 :                         path: path.to_string(),
     897            0 :                         buf,
     898            0 :                     }),
     899            0 :                 )));
     900            0 :             }
     901            0 :         }
     902              : 
     903            0 :         Ok(None)
     904            0 :     }
     905              : 
     906           16 :     fn decode_standby_record(
     907           16 :         buf: &mut Bytes,
     908           16 :         decoded: &DecodedWALRecord,
     909           16 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     910           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     911           16 :         if info == pg_constants::XLOG_RUNNING_XACTS {
     912            0 :             let xlrec = XlRunningXacts::decode(buf);
     913            0 :             return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
     914            0 :                 StandbyRunningXacts {
     915            0 :                     oldest_running_xid: xlrec.oldest_running_xid,
     916            0 :                 },
     917            0 :             ))));
     918           16 :         }
     919           16 : 
     920           16 :         Ok(None)
     921           16 :     }
     922              : 
     923            0 :     fn decode_replorigin_record(
     924            0 :         buf: &mut Bytes,
     925            0 :         decoded: &DecodedWALRecord,
     926            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     927            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     928            0 :         if info == pg_constants::XLOG_REPLORIGIN_SET {
     929            0 :             let xlrec = XlReploriginSet::decode(buf);
     930            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
     931            0 :                 xlrec,
     932            0 :             ))));
     933            0 :         } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
     934            0 :             let xlrec = XlReploriginDrop::decode(buf);
     935            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
     936            0 :                 xlrec,
     937            0 :             ))));
     938            0 :         }
     939            0 : 
     940            0 :         Ok(None)
     941            0 :     }
     942              : }
        

Generated by: LCOV version 2.1-beta