LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - decoder.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 24.5 % 739 181
Test Date: 2024-11-13 18:23:39 Functions: 50.0 % 14 7

            Line data    Source code
       1              : //! This module contains logic for decoding and interpreting
       2              : //! raw bytes which represent a raw Postgres WAL record.
       3              : 
       4              : use crate::models::*;
       5              : use crate::serialized_batch::SerializedValueBatch;
       6              : use bytes::{Buf, Bytes};
       7              : use pageserver_api::reltag::{RelTag, SlruKind};
       8              : use pageserver_api::shard::ShardIdentity;
       9              : use postgres_ffi::pg_constants;
      10              : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      11              : use postgres_ffi::walrecord::*;
      12              : use utils::lsn::Lsn;
      13              : 
      14              : impl InterpretedWalRecord {
      15              :     /// Decode and interpreted raw bytes which represent one Postgres WAL record.
      16              :     /// Data blocks which do not match the provided shard identity are filtered out.
      17              :     /// Shard 0 is a special case since it tracks all relation sizes. We only give it
      18              :     /// the keys that are being written as that is enough for updating relation sizes.
      19       145852 :     pub fn from_bytes_filtered(
      20       145852 :         buf: Bytes,
      21       145852 :         shard: &ShardIdentity,
      22       145852 :         record_end_lsn: Lsn,
      23       145852 :         pg_version: u32,
      24       145852 :     ) -> anyhow::Result<InterpretedWalRecord> {
      25       145852 :         let mut decoded = DecodedWALRecord::default();
      26       145852 :         decode_wal_record(buf, &mut decoded, pg_version)?;
      27       145852 :         let xid = decoded.xl_xid;
      28              : 
      29       145852 :         let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
      30            0 :             FlushUncommittedRecords::Yes
      31              :         } else {
      32       145852 :             FlushUncommittedRecords::No
      33              :         };
      34              : 
      35       145852 :         let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?;
      36       145852 :         let batch = SerializedValueBatch::from_decoded_filtered(
      37       145852 :             decoded,
      38       145852 :             shard,
      39       145852 :             record_end_lsn,
      40       145852 :             pg_version,
      41       145852 :         )?;
      42              : 
      43       145852 :         Ok(InterpretedWalRecord {
      44       145852 :             metadata_record,
      45       145852 :             batch,
      46       145852 :             end_lsn: record_end_lsn,
      47       145852 :             flush_uncommitted,
      48       145852 :             xid,
      49       145852 :         })
      50       145852 :     }
      51              : }
      52              : 
      53              : impl MetadataRecord {
      54       145852 :     fn from_decoded(
      55       145852 :         decoded: &DecodedWALRecord,
      56       145852 :         record_end_lsn: Lsn,
      57       145852 :         pg_version: u32,
      58       145852 :     ) -> anyhow::Result<Option<MetadataRecord>> {
      59       145852 :         // Note: this doesn't actually copy the bytes since
      60       145852 :         // the [`Bytes`] type implements it via a level of indirection.
      61       145852 :         let mut buf = decoded.record.clone();
      62       145852 :         buf.advance(decoded.main_data_offset);
      63       145852 : 
      64       145852 :         match decoded.xl_rmid {
      65              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
      66       145474 :                 Self::decode_heapam_record(&mut buf, decoded, pg_version)
      67              :             }
      68            0 :             pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version),
      69              :             // Handle other special record types
      70           16 :             pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded),
      71            0 :             pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version),
      72              :             pg_constants::RM_TBLSPC_ID => {
      73            0 :                 tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
      74            0 :                 Ok(None)
      75              :             }
      76            0 :             pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
      77           24 :             pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn),
      78              :             pg_constants::RM_MULTIXACT_ID => {
      79            0 :                 Self::decode_multixact_record(&mut buf, decoded, pg_version)
      80              :             }
      81            0 :             pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded),
      82              :             // This is an odd duck. It needs to go to all shards.
      83              :             // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
      84              :             // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
      85              :             // the pageserver and decode it there.
      86              :             //
      87              :             // Alternatively, one can make the checkpoint part of the subscription protocol
      88              :             // to the pageserver. This should work fine, but can be done at a later point.
      89           30 :             pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn),
      90              :             pg_constants::RM_LOGICALMSG_ID => {
      91            0 :                 Self::decode_logical_message_record(&mut buf, decoded)
      92              :             }
      93           16 :             pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded),
      94            0 :             pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded),
      95          292 :             _unexpected => {
      96          292 :                 // TODO: consider failing here instead of blindly doing something without
      97          292 :                 // understanding the protocol
      98          292 :                 Ok(None)
      99              :             }
     100              :         }
     101       145852 :     }
     102              : 
     103       145474 :     fn decode_heapam_record(
     104       145474 :         buf: &mut Bytes,
     105       145474 :         decoded: &DecodedWALRecord,
     106       145474 :         pg_version: u32,
     107       145474 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     108       145474 :         // Handle VM bit updates that are implicitly part of heap records.
     109       145474 : 
     110       145474 :         // First, look at the record to determine which VM bits need
     111       145474 :         // to be cleared. If either of these variables is set, we
     112       145474 :         // need to clear the corresponding bits in the visibility map.
     113       145474 :         let mut new_heap_blkno: Option<u32> = None;
     114       145474 :         let mut old_heap_blkno: Option<u32> = None;
     115       145474 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     116       145474 : 
     117       145474 :         match pg_version {
     118              :             14 => {
     119            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     120            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     121            0 : 
     122            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     123            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     124            0 :                         assert_eq!(0, buf.remaining());
     125            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     126            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     127            0 :                         }
     128            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     129            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     130            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     131            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     132            0 :                         }
     133            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     134            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     135              :                     {
     136            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     137            0 :                         // the size of tuple data is inferred from the size of the record.
     138            0 :                         // we can't validate the remaining number of bytes without parsing
     139            0 :                         // the tuple data.
     140            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     141            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     142            0 :                         }
     143            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     144            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     145            0 :                             // non-HOT update where the new tuple goes to different page than
     146            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     147            0 :                             // set.
     148            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     149            0 :                         }
     150            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     151            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     152            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     153            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     154            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     155            0 :                         }
     156            0 :                     }
     157            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     158            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     159            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     160            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     161              : 
     162            0 :                         let offset_array_len =
     163            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     164              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     165            0 :                                 0
     166              :                             } else {
     167            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     168              :                             };
     169            0 :                         assert_eq!(offset_array_len, buf.remaining());
     170              : 
     171            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     172            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     173            0 :                         }
     174            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     175            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     176            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     177            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     178            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     179            0 :                         }
     180            0 :                     }
     181              :                 } else {
     182            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     183              :                 }
     184              :             }
     185              :             15 => {
     186       145474 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     187       145286 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     188       145286 : 
     189       145286 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     190       145276 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     191       145276 :                         assert_eq!(0, buf.remaining());
     192       145276 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     193            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     194       145272 :                         }
     195           10 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     196            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     197            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     198            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     199            0 :                         }
     200           10 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     201            2 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     202              :                     {
     203            8 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     204            8 :                         // the size of tuple data is inferred from the size of the record.
     205            8 :                         // we can't validate the remaining number of bytes without parsing
     206            8 :                         // the tuple data.
     207            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     208            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     209            8 :                         }
     210            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     211            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     212            0 :                             // non-HOT update where the new tuple goes to different page than
     213            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     214            0 :                             // set.
     215            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     216            8 :                         }
     217            2 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     218            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     219            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     220            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     221            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     222            0 :                         }
     223            2 :                     }
     224          188 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     225          188 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     226          188 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     227           42 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     228              : 
     229           42 :                         let offset_array_len =
     230           42 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     231              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     232            2 :                                 0
     233              :                             } else {
     234           40 :                                 size_of::<u16>() * xlrec.ntuples as usize
     235              :                             };
     236           42 :                         assert_eq!(offset_array_len, buf.remaining());
     237              : 
     238           42 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     239            8 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     240           34 :                         }
     241          146 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     242            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     243            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     244            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     245            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     246            0 :                         }
     247          146 :                     }
     248              :                 } else {
     249            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     250              :                 }
     251              :             }
     252              :             16 => {
     253            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     254            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     255            0 : 
     256            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     257            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     258            0 :                         assert_eq!(0, buf.remaining());
     259            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     260            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     261            0 :                         }
     262            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     263            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     264            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     265            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     266            0 :                         }
     267            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     268            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     269              :                     {
     270            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     271            0 :                         // the size of tuple data is inferred from the size of the record.
     272            0 :                         // we can't validate the remaining number of bytes without parsing
     273            0 :                         // the tuple data.
     274            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     275            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     276            0 :                         }
     277            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     278            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     279            0 :                             // non-HOT update where the new tuple goes to different page than
     280            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     281            0 :                             // set.
     282            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     283            0 :                         }
     284            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     285            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     286            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     287            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     288            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     289            0 :                         }
     290            0 :                     }
     291            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     292            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     293            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     294            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     295              : 
     296            0 :                         let offset_array_len =
     297            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     298              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     299            0 :                                 0
     300              :                             } else {
     301            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     302              :                             };
     303            0 :                         assert_eq!(offset_array_len, buf.remaining());
     304              : 
     305            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     306            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     307            0 :                         }
     308            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     309            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     310            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     311            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     312            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     313            0 :                         }
     314            0 :                     }
     315              :                 } else {
     316            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     317              :                 }
     318              :             }
     319              :             17 => {
     320            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     321            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     322            0 : 
     323            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     324            0 :                         let xlrec = v17::XlHeapInsert::decode(buf);
     325            0 :                         assert_eq!(0, buf.remaining());
     326            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     327            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     328            0 :                         }
     329            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     330            0 :                         let xlrec = v17::XlHeapDelete::decode(buf);
     331            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     332            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     333            0 :                         }
     334            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     335            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     336              :                     {
     337            0 :                         let xlrec = v17::XlHeapUpdate::decode(buf);
     338            0 :                         // the size of tuple data is inferred from the size of the record.
     339            0 :                         // we can't validate the remaining number of bytes without parsing
     340            0 :                         // the tuple data.
     341            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     342            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     343            0 :                         }
     344            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     345            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     346            0 :                             // non-HOT update where the new tuple goes to different page than
     347            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     348            0 :                             // set.
     349            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     350            0 :                         }
     351            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     352            0 :                         let xlrec = v17::XlHeapLock::decode(buf);
     353            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     354            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     355            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     356            0 :                         }
     357            0 :                     }
     358            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     359            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     360            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     361            0 :                         let xlrec = v17::XlHeapMultiInsert::decode(buf);
     362              : 
     363            0 :                         let offset_array_len =
     364            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     365              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     366            0 :                                 0
     367              :                             } else {
     368            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     369              :                             };
     370            0 :                         assert_eq!(offset_array_len, buf.remaining());
     371              : 
     372            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     373            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     374            0 :                         }
     375            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     376            0 :                         let xlrec = v17::XlHeapLockUpdated::decode(buf);
     377            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     378            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     379            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     380            0 :                         }
     381            0 :                     }
     382              :                 } else {
     383            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     384              :                 }
     385              :             }
     386            0 :             _ => {}
     387              :         }
     388              : 
     389       145474 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     390           12 :             let vm_rel = RelTag {
     391           12 :                 forknum: VISIBILITYMAP_FORKNUM,
     392           12 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     393           12 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     394           12 :                 relnode: decoded.blocks[0].rnode_relnode,
     395           12 :             };
     396           12 : 
     397           12 :             Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
     398           12 :                 ClearVmBits {
     399           12 :                     new_heap_blkno,
     400           12 :                     old_heap_blkno,
     401           12 :                     vm_rel,
     402           12 :                     flags,
     403           12 :                 },
     404           12 :             ))))
     405              :         } else {
     406       145462 :             Ok(None)
     407              :         }
     408       145474 :     }
     409              : 
     410            0 :     fn decode_neonmgr_record(
     411            0 :         buf: &mut Bytes,
     412            0 :         decoded: &DecodedWALRecord,
     413            0 :         pg_version: u32,
     414            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     415            0 :         // Handle VM bit updates that are implicitly part of heap records.
     416            0 : 
     417            0 :         // First, look at the record to determine which VM bits need
     418            0 :         // to be cleared. If either of these variables is set, we
     419            0 :         // need to clear the corresponding bits in the visibility map.
     420            0 :         let mut new_heap_blkno: Option<u32> = None;
     421            0 :         let mut old_heap_blkno: Option<u32> = None;
     422            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     423            0 : 
     424            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     425              : 
     426            0 :         match pg_version {
     427              :             16 | 17 => {
     428            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     429            0 : 
     430            0 :                 match info {
     431              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     432            0 :                         let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
     433            0 :                         assert_eq!(0, buf.remaining());
     434            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     435            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     436            0 :                         }
     437              :                     }
     438              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     439            0 :                         let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
     440            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     441            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     442            0 :                         }
     443              :                     }
     444              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     445              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     446            0 :                         let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
     447            0 :                         // the size of tuple data is inferred from the size of the record.
     448            0 :                         // we can't validate the remaining number of bytes without parsing
     449            0 :                         // the tuple data.
     450            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     451            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     452            0 :                         }
     453            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     454            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     455            0 :                             // non-HOT update where the new tuple goes to different page than
     456            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     457            0 :                             // set.
     458            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     459            0 :                         }
     460              :                     }
     461              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     462            0 :                         let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     463              : 
     464            0 :                         let offset_array_len =
     465            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     466              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     467            0 :                                 0
     468              :                             } else {
     469            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     470              :                             };
     471            0 :                         assert_eq!(offset_array_len, buf.remaining());
     472              : 
     473            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     474            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     475            0 :                         }
     476              :                     }
     477              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     478            0 :                         let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
     479            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     480            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     481            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     482            0 :                         }
     483              :                     }
     484            0 :                     info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
     485              :                 }
     486              :             }
     487            0 :             _ => anyhow::bail!(
     488            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     489            0 :                 pg_version
     490            0 :             ),
     491              :         }
     492              : 
     493            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     494            0 :             let vm_rel = RelTag {
     495            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     496            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     497            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     498            0 :                 relnode: decoded.blocks[0].rnode_relnode,
     499            0 :             };
     500            0 : 
     501            0 :             Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
     502            0 :                 ClearVmBits {
     503            0 :                     new_heap_blkno,
     504            0 :                     old_heap_blkno,
     505            0 :                     vm_rel,
     506            0 :                     flags,
     507            0 :                 },
     508            0 :             ))))
     509              :         } else {
     510            0 :             Ok(None)
     511              :         }
     512            0 :     }
     513              : 
     514           16 :     fn decode_smgr_record(
     515           16 :         buf: &mut Bytes,
     516           16 :         decoded: &DecodedWALRecord,
     517           16 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     518           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     519           16 :         if info == pg_constants::XLOG_SMGR_CREATE {
     520           16 :             let create = XlSmgrCreate::decode(buf);
     521           16 :             let rel = RelTag {
     522           16 :                 spcnode: create.rnode.spcnode,
     523           16 :                 dbnode: create.rnode.dbnode,
     524           16 :                 relnode: create.rnode.relnode,
     525           16 :                 forknum: create.forknum,
     526           16 :             };
     527           16 : 
     528           16 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
     529           16 :                 rel,
     530           16 :             }))));
     531            0 :         } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     532            0 :             let truncate = XlSmgrTruncate::decode(buf);
     533            0 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
     534            0 :         }
     535            0 : 
     536            0 :         Ok(None)
     537           16 :     }
     538              : 
     539            0 :     fn decode_dbase_record(
     540            0 :         buf: &mut Bytes,
     541            0 :         decoded: &DecodedWALRecord,
     542            0 :         pg_version: u32,
     543            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     544            0 :         // TODO: Refactor this to avoid the duplication between postgres versions.
     545            0 : 
     546            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     547            0 :         tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
     548              : 
     549            0 :         if pg_version == 14 {
     550            0 :             if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     551            0 :                 let createdb = XlCreateDatabase::decode(buf);
     552            0 :                 tracing::debug!("XLOG_DBASE_CREATE v14");
     553              : 
     554            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     555            0 :                     db_id: createdb.db_id,
     556            0 :                     tablespace_id: createdb.tablespace_id,
     557            0 :                     src_db_id: createdb.src_db_id,
     558            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     559            0 :                 }));
     560            0 : 
     561            0 :                 return Ok(Some(record));
     562            0 :             } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     563            0 :                 let dropdb = XlDropDatabase::decode(buf);
     564            0 : 
     565            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     566            0 :                     db_id: dropdb.db_id,
     567            0 :                     tablespace_ids: dropdb.tablespace_ids,
     568            0 :                 }));
     569            0 : 
     570            0 :                 return Ok(Some(record));
     571            0 :             }
     572            0 :         } else if pg_version == 15 {
     573            0 :             if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     574            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     575            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     576              :                 // The XLOG record was renamed between v14 and v15,
     577              :                 // but the record format is the same.
     578              :                 // So we can reuse XlCreateDatabase here.
     579            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     580              : 
     581            0 :                 let createdb = XlCreateDatabase::decode(buf);
     582            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     583            0 :                     db_id: createdb.db_id,
     584            0 :                     tablespace_id: createdb.tablespace_id,
     585            0 :                     src_db_id: createdb.src_db_id,
     586            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     587            0 :                 }));
     588            0 : 
     589            0 :                 return Ok(Some(record));
     590            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     591            0 :                 let dropdb = XlDropDatabase::decode(buf);
     592            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     593            0 :                     db_id: dropdb.db_id,
     594            0 :                     tablespace_ids: dropdb.tablespace_ids,
     595            0 :                 }));
     596            0 : 
     597            0 :                 return Ok(Some(record));
     598            0 :             }
     599            0 :         } else if pg_version == 16 {
     600            0 :             if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     601            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     602            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     603              :                 // The XLOG record was renamed between v14 and v15,
     604              :                 // but the record format is the same.
     605              :                 // So we can reuse XlCreateDatabase here.
     606            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     607              : 
     608            0 :                 let createdb = XlCreateDatabase::decode(buf);
     609            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     610            0 :                     db_id: createdb.db_id,
     611            0 :                     tablespace_id: createdb.tablespace_id,
     612            0 :                     src_db_id: createdb.src_db_id,
     613            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     614            0 :                 }));
     615            0 : 
     616            0 :                 return Ok(Some(record));
     617            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     618            0 :                 let dropdb = XlDropDatabase::decode(buf);
     619            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     620            0 :                     db_id: dropdb.db_id,
     621            0 :                     tablespace_ids: dropdb.tablespace_ids,
     622            0 :                 }));
     623            0 : 
     624            0 :                 return Ok(Some(record));
     625            0 :             }
     626            0 :         } else if pg_version == 17 {
     627            0 :             if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     628            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     629            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     630              :                 // The XLOG record was renamed between v14 and v15,
     631              :                 // but the record format is the same.
     632              :                 // So we can reuse XlCreateDatabase here.
     633            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     634              : 
     635            0 :                 let createdb = XlCreateDatabase::decode(buf);
     636            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     637            0 :                     db_id: createdb.db_id,
     638            0 :                     tablespace_id: createdb.tablespace_id,
     639            0 :                     src_db_id: createdb.src_db_id,
     640            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     641            0 :                 }));
     642            0 : 
     643            0 :                 return Ok(Some(record));
     644            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
     645            0 :                 let dropdb = XlDropDatabase::decode(buf);
     646            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     647            0 :                     db_id: dropdb.db_id,
     648            0 :                     tablespace_ids: dropdb.tablespace_ids,
     649            0 :                 }));
     650            0 : 
     651            0 :                 return Ok(Some(record));
     652            0 :             }
     653            0 :         }
     654              : 
     655            0 :         Ok(None)
     656            0 :     }
     657              : 
     658            0 :     fn decode_clog_record(
     659            0 :         buf: &mut Bytes,
     660            0 :         decoded: &DecodedWALRecord,
     661            0 :         pg_version: u32,
     662            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     663            0 :         let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     664            0 : 
     665            0 :         if info == pg_constants::CLOG_ZEROPAGE {
     666            0 :             let pageno = if pg_version < 17 {
     667            0 :                 buf.get_u32_le()
     668              :             } else {
     669            0 :                 buf.get_u64_le() as u32
     670              :             };
     671            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     672            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     673            0 : 
     674            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
     675            0 :                 ClogZeroPage { segno, rpageno },
     676            0 :             ))))
     677              :         } else {
     678            0 :             assert!(info == pg_constants::CLOG_TRUNCATE);
     679            0 :             let xlrec = XlClogTruncate::decode(buf, pg_version);
     680            0 : 
     681            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
     682            0 :                 ClogTruncate {
     683            0 :                     pageno: xlrec.pageno,
     684            0 :                     oldest_xid: xlrec.oldest_xid,
     685            0 :                     oldest_xid_db: xlrec.oldest_xid_db,
     686            0 :                 },
     687            0 :             ))))
     688              :         }
     689            0 :     }
     690              : 
     691           24 :     fn decode_xact_record(
     692           24 :         buf: &mut Bytes,
     693           24 :         decoded: &DecodedWALRecord,
     694           24 :         lsn: Lsn,
     695           24 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     696           24 :         let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     697           24 :         let origin_id = decoded.origin_id;
     698           24 :         let xl_xid = decoded.xl_xid;
     699           24 : 
     700           24 :         if info == pg_constants::XLOG_XACT_COMMIT {
     701            8 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     702            8 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
     703            8 :                 parsed,
     704            8 :                 origin_id,
     705            8 :                 xl_xid,
     706            8 :                 lsn,
     707            8 :             }))));
     708           16 :         } else if info == pg_constants::XLOG_XACT_ABORT {
     709            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     710            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
     711            0 :                 parsed,
     712            0 :                 origin_id,
     713            0 :                 xl_xid,
     714            0 :                 lsn,
     715            0 :             }))));
     716           16 :         } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
     717            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     718            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
     719            0 :                 XactCommon {
     720            0 :                     parsed,
     721            0 :                     origin_id,
     722            0 :                     xl_xid,
     723            0 :                     lsn,
     724            0 :                 },
     725            0 :             ))));
     726           16 :         } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
     727            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     728            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
     729            0 :                 XactCommon {
     730            0 :                     parsed,
     731            0 :                     origin_id,
     732            0 :                     xl_xid,
     733            0 :                     lsn,
     734            0 :                 },
     735            0 :             ))));
     736           16 :         } else if info == pg_constants::XLOG_XACT_PREPARE {
     737            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
     738            0 :                 XactPrepare {
     739            0 :                     xl_xid: decoded.xl_xid,
     740            0 :                     data: Bytes::copy_from_slice(&buf[..]),
     741            0 :                 },
     742            0 :             ))));
     743           16 :         }
     744           16 : 
     745           16 :         Ok(None)
     746           24 :     }
     747              : 
     748            0 :     fn decode_multixact_record(
     749            0 :         buf: &mut Bytes,
     750            0 :         decoded: &DecodedWALRecord,
     751            0 :         pg_version: u32,
     752            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     753            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     754            0 : 
     755            0 :         if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
     756            0 :             || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
     757              :         {
     758            0 :             let pageno = if pg_version < 17 {
     759            0 :                 buf.get_u32_le()
     760              :             } else {
     761            0 :                 buf.get_u64_le() as u32
     762              :             };
     763            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     764            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     765              : 
     766            0 :             let slru_kind = match info {
     767            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
     768            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
     769            0 :                 _ => unreachable!(),
     770              :             };
     771              : 
     772            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
     773            0 :                 MultiXactZeroPage {
     774            0 :                     slru_kind,
     775            0 :                     segno,
     776            0 :                     rpageno,
     777            0 :                 },
     778            0 :             ))));
     779            0 :         } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     780            0 :             let xlrec = XlMultiXactCreate::decode(buf);
     781            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
     782            0 :                 xlrec,
     783            0 :             ))));
     784            0 :         } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     785            0 :             let xlrec = XlMultiXactTruncate::decode(buf);
     786            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
     787            0 :                 xlrec,
     788            0 :             ))));
     789            0 :         }
     790            0 : 
     791            0 :         Ok(None)
     792            0 :     }
     793              : 
     794            0 :     fn decode_relmap_record(
     795            0 :         buf: &mut Bytes,
     796            0 :         decoded: &DecodedWALRecord,
     797            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     798            0 :         let update = XlRelmapUpdate::decode(buf);
     799            0 : 
     800            0 :         let mut buf = decoded.record.clone();
     801            0 :         buf.advance(decoded.main_data_offset);
     802            0 :         // skip xl_relmap_update
     803            0 :         buf.advance(12);
     804            0 : 
     805            0 :         Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
     806            0 :             RelmapUpdate {
     807            0 :                 update,
     808            0 :                 buf: Bytes::copy_from_slice(&buf[..]),
     809            0 :             },
     810            0 :         ))))
     811            0 :     }
     812              : 
     813           30 :     fn decode_xlog_record(
     814           30 :         buf: &mut Bytes,
     815           30 :         decoded: &DecodedWALRecord,
     816           30 :         lsn: Lsn,
     817           30 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     818           30 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     819           30 :         Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
     820           30 :             info,
     821           30 :             lsn,
     822           30 :             buf: buf.clone(),
     823           30 :         }))))
     824           30 :     }
     825              : 
     826            0 :     fn decode_logical_message_record(
     827            0 :         buf: &mut Bytes,
     828            0 :         decoded: &DecodedWALRecord,
     829            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     830            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     831            0 :         if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     832            0 :             let xlrec = XlLogicalMessage::decode(buf);
     833            0 :             let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     834              : 
     835              :             #[cfg(feature = "testing")]
     836            0 :             if prefix == "neon-test" {
     837            0 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     838            0 :                     LogicalMessageRecord::Failpoint,
     839            0 :                 )));
     840            0 :             }
     841              : 
     842            0 :             if let Some(path) = prefix.strip_prefix("neon-file:") {
     843            0 :                 let buf_size = xlrec.prefix_size + xlrec.message_size;
     844            0 :                 let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
     845            0 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     846            0 :                     LogicalMessageRecord::Put(PutLogicalMessage {
     847            0 :                         path: path.to_string(),
     848            0 :                         buf,
     849            0 :                     }),
     850            0 :                 )));
     851            0 :             }
     852            0 :         }
     853              : 
     854            0 :         Ok(None)
     855            0 :     }
     856              : 
     857           16 :     fn decode_standby_record(
     858           16 :         buf: &mut Bytes,
     859           16 :         decoded: &DecodedWALRecord,
     860           16 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     861           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     862           16 :         if info == pg_constants::XLOG_RUNNING_XACTS {
     863            0 :             let xlrec = XlRunningXacts::decode(buf);
     864            0 :             return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
     865            0 :                 StandbyRunningXacts {
     866            0 :                     oldest_running_xid: xlrec.oldest_running_xid,
     867            0 :                 },
     868            0 :             ))));
     869           16 :         }
     870           16 : 
     871           16 :         Ok(None)
     872           16 :     }
     873              : 
     874            0 :     fn decode_replorigin_record(
     875            0 :         buf: &mut Bytes,
     876            0 :         decoded: &DecodedWALRecord,
     877            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     878            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     879            0 :         if info == pg_constants::XLOG_REPLORIGIN_SET {
     880            0 :             let xlrec = XlReploriginSet::decode(buf);
     881            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
     882            0 :                 xlrec,
     883            0 :             ))));
     884            0 :         } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
     885            0 :             let xlrec = XlReploriginDrop::decode(buf);
     886            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
     887            0 :                 xlrec,
     888            0 :             ))));
     889            0 :         }
     890            0 : 
     891            0 :         Ok(None)
     892            0 :     }
     893              : }
        

Generated by: LCOV version 2.1-beta