LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - decoder.rs (source / functions) Coverage Total Hit
Test: 2e3a7638747e564a4f6d1af1cc0c3b3438fbb740.info Lines: 24.5 % 739 181
Test Date: 2024-11-20 01:36:58 Functions: 50.0 % 14 7

            Line data    Source code
       1              : //! This module contains logic for decoding and interpreting
       2              : //! raw bytes which represent a raw Postgres WAL record.
       3              : 
       4              : use crate::models::*;
       5              : use crate::serialized_batch::SerializedValueBatch;
       6              : use bytes::{Buf, Bytes};
       7              : use pageserver_api::reltag::{RelTag, SlruKind};
       8              : use pageserver_api::shard::ShardIdentity;
       9              : use postgres_ffi::pg_constants;
      10              : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      11              : use postgres_ffi::walrecord::*;
      12              : use utils::lsn::Lsn;
      13              : 
      14              : impl InterpretedWalRecord {
      15              :     /// Decode and interpreted raw bytes which represent one Postgres WAL record.
      16              :     /// Data blocks which do not match the provided shard identity are filtered out.
      17              :     /// Shard 0 is a special case since it tracks all relation sizes. We only give it
      18              :     /// the keys that are being written as that is enough for updating relation sizes.
      19       145852 :     pub fn from_bytes_filtered(
      20       145852 :         buf: Bytes,
      21       145852 :         shard: &ShardIdentity,
      22       145852 :         next_record_lsn: Lsn,
      23       145852 :         pg_version: u32,
      24       145852 :     ) -> anyhow::Result<InterpretedWalRecord> {
      25       145852 :         let mut decoded = DecodedWALRecord::default();
      26       145852 :         decode_wal_record(buf, &mut decoded, pg_version)?;
      27       145852 :         let xid = decoded.xl_xid;
      28              : 
      29       145852 :         let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
      30            0 :             FlushUncommittedRecords::Yes
      31              :         } else {
      32       145852 :             FlushUncommittedRecords::No
      33              :         };
      34              : 
      35       145852 :         let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?;
      36       145852 :         let batch = SerializedValueBatch::from_decoded_filtered(
      37       145852 :             decoded,
      38       145852 :             shard,
      39       145852 :             next_record_lsn,
      40       145852 :             pg_version,
      41       145852 :         )?;
      42              : 
      43       145852 :         Ok(InterpretedWalRecord {
      44       145852 :             metadata_record,
      45       145852 :             batch,
      46       145852 :             next_record_lsn,
      47       145852 :             flush_uncommitted,
      48       145852 :             xid,
      49       145852 :         })
      50       145852 :     }
      51              : }
      52              : 
      53              : impl MetadataRecord {
      54       145852 :     fn from_decoded(
      55       145852 :         decoded: &DecodedWALRecord,
      56       145852 :         next_record_lsn: Lsn,
      57       145852 :         pg_version: u32,
      58       145852 :     ) -> anyhow::Result<Option<MetadataRecord>> {
      59       145852 :         // Note: this doesn't actually copy the bytes since
      60       145852 :         // the [`Bytes`] type implements it via a level of indirection.
      61       145852 :         let mut buf = decoded.record.clone();
      62       145852 :         buf.advance(decoded.main_data_offset);
      63       145852 : 
      64       145852 :         match decoded.xl_rmid {
      65              :             pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
      66       145474 :                 Self::decode_heapam_record(&mut buf, decoded, pg_version)
      67              :             }
      68            0 :             pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version),
      69              :             // Handle other special record types
      70           16 :             pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded),
      71            0 :             pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version),
      72              :             pg_constants::RM_TBLSPC_ID => {
      73            0 :                 tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
      74            0 :                 Ok(None)
      75              :             }
      76            0 :             pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
      77              :             pg_constants::RM_XACT_ID => {
      78           24 :                 Self::decode_xact_record(&mut buf, decoded, next_record_lsn)
      79              :             }
      80              :             pg_constants::RM_MULTIXACT_ID => {
      81            0 :                 Self::decode_multixact_record(&mut buf, decoded, pg_version)
      82              :             }
      83            0 :             pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded),
      84              :             // This is an odd duck. It needs to go to all shards.
      85              :             // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
      86              :             // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
      87              :             // the pageserver and decode it there.
      88              :             //
      89              :             // Alternatively, one can make the checkpoint part of the subscription protocol
      90              :             // to the pageserver. This should work fine, but can be done at a later point.
      91              :             pg_constants::RM_XLOG_ID => {
      92           30 :                 Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)
      93              :             }
      94              :             pg_constants::RM_LOGICALMSG_ID => {
      95            0 :                 Self::decode_logical_message_record(&mut buf, decoded)
      96              :             }
      97           16 :             pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded),
      98            0 :             pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded),
      99          292 :             _unexpected => {
     100          292 :                 // TODO: consider failing here instead of blindly doing something without
     101          292 :                 // understanding the protocol
     102          292 :                 Ok(None)
     103              :             }
     104              :         }
     105       145852 :     }
     106              : 
     107       145474 :     fn decode_heapam_record(
     108       145474 :         buf: &mut Bytes,
     109       145474 :         decoded: &DecodedWALRecord,
     110       145474 :         pg_version: u32,
     111       145474 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     112       145474 :         // Handle VM bit updates that are implicitly part of heap records.
     113       145474 : 
     114       145474 :         // First, look at the record to determine which VM bits need
     115       145474 :         // to be cleared. If either of these variables is set, we
     116       145474 :         // need to clear the corresponding bits in the visibility map.
     117       145474 :         let mut new_heap_blkno: Option<u32> = None;
     118       145474 :         let mut old_heap_blkno: Option<u32> = None;
     119       145474 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     120       145474 : 
     121       145474 :         match pg_version {
     122              :             14 => {
     123            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     124            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     125            0 : 
     126            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     127            0 :                         let xlrec = v14::XlHeapInsert::decode(buf);
     128            0 :                         assert_eq!(0, buf.remaining());
     129            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     130            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     131            0 :                         }
     132            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     133            0 :                         let xlrec = v14::XlHeapDelete::decode(buf);
     134            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     135            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     136            0 :                         }
     137            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     138            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     139              :                     {
     140            0 :                         let xlrec = v14::XlHeapUpdate::decode(buf);
     141            0 :                         // the size of tuple data is inferred from the size of the record.
     142            0 :                         // we can't validate the remaining number of bytes without parsing
     143            0 :                         // the tuple data.
     144            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     145            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     146            0 :                         }
     147            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     148            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     149            0 :                             // non-HOT update where the new tuple goes to different page than
     150            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     151            0 :                             // set.
     152            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     153            0 :                         }
     154            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     155            0 :                         let xlrec = v14::XlHeapLock::decode(buf);
     156            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     157            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     158            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     159            0 :                         }
     160            0 :                     }
     161            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     162            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     163            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     164            0 :                         let xlrec = v14::XlHeapMultiInsert::decode(buf);
     165              : 
     166            0 :                         let offset_array_len =
     167            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     168              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     169            0 :                                 0
     170              :                             } else {
     171            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     172              :                             };
     173            0 :                         assert_eq!(offset_array_len, buf.remaining());
     174              : 
     175            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     176            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     177            0 :                         }
     178            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     179            0 :                         let xlrec = v14::XlHeapLockUpdated::decode(buf);
     180            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     181            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     182            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     183            0 :                         }
     184            0 :                     }
     185              :                 } else {
     186            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     187              :                 }
     188              :             }
     189              :             15 => {
     190       145474 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     191       145286 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     192       145286 : 
     193       145286 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     194       145276 :                         let xlrec = v15::XlHeapInsert::decode(buf);
     195       145276 :                         assert_eq!(0, buf.remaining());
     196       145276 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     197            4 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     198       145272 :                         }
     199           10 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     200            0 :                         let xlrec = v15::XlHeapDelete::decode(buf);
     201            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     202            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     203            0 :                         }
     204           10 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     205            2 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     206              :                     {
     207            8 :                         let xlrec = v15::XlHeapUpdate::decode(buf);
     208            8 :                         // the size of tuple data is inferred from the size of the record.
     209            8 :                         // we can't validate the remaining number of bytes without parsing
     210            8 :                         // the tuple data.
     211            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     212            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     213            8 :                         }
     214            8 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     215            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     216            0 :                             // non-HOT update where the new tuple goes to different page than
     217            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     218            0 :                             // set.
     219            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     220            8 :                         }
     221            2 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     222            0 :                         let xlrec = v15::XlHeapLock::decode(buf);
     223            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     224            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     225            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     226            0 :                         }
     227            2 :                     }
     228          188 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     229          188 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     230          188 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     231           42 :                         let xlrec = v15::XlHeapMultiInsert::decode(buf);
     232              : 
     233           42 :                         let offset_array_len =
     234           42 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     235              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     236            2 :                                 0
     237              :                             } else {
     238           40 :                                 size_of::<u16>() * xlrec.ntuples as usize
     239              :                             };
     240           42 :                         assert_eq!(offset_array_len, buf.remaining());
     241              : 
     242           42 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     243            8 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     244           34 :                         }
     245          146 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     246            0 :                         let xlrec = v15::XlHeapLockUpdated::decode(buf);
     247            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     248            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     249            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     250            0 :                         }
     251          146 :                     }
     252              :                 } else {
     253            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     254              :                 }
     255              :             }
     256              :             16 => {
     257            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     258            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     259            0 : 
     260            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     261            0 :                         let xlrec = v16::XlHeapInsert::decode(buf);
     262            0 :                         assert_eq!(0, buf.remaining());
     263            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     264            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     265            0 :                         }
     266            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     267            0 :                         let xlrec = v16::XlHeapDelete::decode(buf);
     268            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     269            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     270            0 :                         }
     271            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     272            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     273              :                     {
     274            0 :                         let xlrec = v16::XlHeapUpdate::decode(buf);
     275            0 :                         // the size of tuple data is inferred from the size of the record.
     276            0 :                         // we can't validate the remaining number of bytes without parsing
     277            0 :                         // the tuple data.
     278            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     279            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     280            0 :                         }
     281            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     282            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     283            0 :                             // non-HOT update where the new tuple goes to different page than
     284            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     285            0 :                             // set.
     286            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     287            0 :                         }
     288            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     289            0 :                         let xlrec = v16::XlHeapLock::decode(buf);
     290            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     291            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     292            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     293            0 :                         }
     294            0 :                     }
     295            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     296            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     297            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     298            0 :                         let xlrec = v16::XlHeapMultiInsert::decode(buf);
     299              : 
     300            0 :                         let offset_array_len =
     301            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     302              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     303            0 :                                 0
     304              :                             } else {
     305            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     306              :                             };
     307            0 :                         assert_eq!(offset_array_len, buf.remaining());
     308              : 
     309            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     310            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     311            0 :                         }
     312            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     313            0 :                         let xlrec = v16::XlHeapLockUpdated::decode(buf);
     314            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     315            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     316            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     317            0 :                         }
     318            0 :                     }
     319              :                 } else {
     320            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     321              :                 }
     322              :             }
     323              :             17 => {
     324            0 :                 if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
     325            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     326            0 : 
     327            0 :                     if info == pg_constants::XLOG_HEAP_INSERT {
     328            0 :                         let xlrec = v17::XlHeapInsert::decode(buf);
     329            0 :                         assert_eq!(0, buf.remaining());
     330            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     331            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     332            0 :                         }
     333            0 :                     } else if info == pg_constants::XLOG_HEAP_DELETE {
     334            0 :                         let xlrec = v17::XlHeapDelete::decode(buf);
     335            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     336            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     337            0 :                         }
     338            0 :                     } else if info == pg_constants::XLOG_HEAP_UPDATE
     339            0 :                         || info == pg_constants::XLOG_HEAP_HOT_UPDATE
     340              :                     {
     341            0 :                         let xlrec = v17::XlHeapUpdate::decode(buf);
     342            0 :                         // the size of tuple data is inferred from the size of the record.
     343            0 :                         // we can't validate the remaining number of bytes without parsing
     344            0 :                         // the tuple data.
     345            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     346            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     347            0 :                         }
     348            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     349            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     350            0 :                             // non-HOT update where the new tuple goes to different page than
     351            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     352            0 :                             // set.
     353            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     354            0 :                         }
     355            0 :                     } else if info == pg_constants::XLOG_HEAP_LOCK {
     356            0 :                         let xlrec = v17::XlHeapLock::decode(buf);
     357            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     358            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     359            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     360            0 :                         }
     361            0 :                     }
     362            0 :                 } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
     363            0 :                     let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     364            0 :                     if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
     365            0 :                         let xlrec = v17::XlHeapMultiInsert::decode(buf);
     366              : 
     367            0 :                         let offset_array_len =
     368            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     369              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     370            0 :                                 0
     371              :                             } else {
     372            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     373              :                             };
     374            0 :                         assert_eq!(offset_array_len, buf.remaining());
     375              : 
     376            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     377            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     378            0 :                         }
     379            0 :                     } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
     380            0 :                         let xlrec = v17::XlHeapLockUpdated::decode(buf);
     381            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     382            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     383            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     384            0 :                         }
     385            0 :                     }
     386              :                 } else {
     387            0 :                     anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
     388              :                 }
     389              :             }
     390            0 :             _ => {}
     391              :         }
     392              : 
     393       145474 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     394           12 :             let vm_rel = RelTag {
     395           12 :                 forknum: VISIBILITYMAP_FORKNUM,
     396           12 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     397           12 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     398           12 :                 relnode: decoded.blocks[0].rnode_relnode,
     399           12 :             };
     400           12 : 
     401           12 :             Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
     402           12 :                 ClearVmBits {
     403           12 :                     new_heap_blkno,
     404           12 :                     old_heap_blkno,
     405           12 :                     vm_rel,
     406           12 :                     flags,
     407           12 :                 },
     408           12 :             ))))
     409              :         } else {
     410       145462 :             Ok(None)
     411              :         }
     412       145474 :     }
     413              : 
     414            0 :     fn decode_neonmgr_record(
     415            0 :         buf: &mut Bytes,
     416            0 :         decoded: &DecodedWALRecord,
     417            0 :         pg_version: u32,
     418            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     419            0 :         // Handle VM bit updates that are implicitly part of heap records.
     420            0 : 
     421            0 :         // First, look at the record to determine which VM bits need
     422            0 :         // to be cleared. If either of these variables is set, we
     423            0 :         // need to clear the corresponding bits in the visibility map.
     424            0 :         let mut new_heap_blkno: Option<u32> = None;
     425            0 :         let mut old_heap_blkno: Option<u32> = None;
     426            0 :         let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
     427            0 : 
     428            0 :         assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
     429              : 
     430            0 :         match pg_version {
     431              :             16 | 17 => {
     432            0 :                 let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     433            0 : 
     434            0 :                 match info {
     435              :                     pg_constants::XLOG_NEON_HEAP_INSERT => {
     436            0 :                         let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
     437            0 :                         assert_eq!(0, buf.remaining());
     438            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     439            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     440            0 :                         }
     441              :                     }
     442              :                     pg_constants::XLOG_NEON_HEAP_DELETE => {
     443            0 :                         let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
     444            0 :                         if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
     445            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     446            0 :                         }
     447              :                     }
     448              :                     pg_constants::XLOG_NEON_HEAP_UPDATE
     449              :                     | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
     450            0 :                         let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
     451            0 :                         // the size of tuple data is inferred from the size of the record.
     452            0 :                         // we can't validate the remaining number of bytes without parsing
     453            0 :                         // the tuple data.
     454            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
     455            0 :                             old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
     456            0 :                         }
     457            0 :                         if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
     458            0 :                             // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
     459            0 :                             // non-HOT update where the new tuple goes to different page than
     460            0 :                             // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
     461            0 :                             // set.
     462            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     463            0 :                         }
     464              :                     }
     465              :                     pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
     466            0 :                         let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
     467              : 
     468            0 :                         let offset_array_len =
     469            0 :                             if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
     470              :                                 // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
     471            0 :                                 0
     472              :                             } else {
     473            0 :                                 size_of::<u16>() * xlrec.ntuples as usize
     474              :                             };
     475            0 :                         assert_eq!(offset_array_len, buf.remaining());
     476              : 
     477            0 :                         if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
     478            0 :                             new_heap_blkno = Some(decoded.blocks[0].blkno);
     479            0 :                         }
     480              :                     }
     481              :                     pg_constants::XLOG_NEON_HEAP_LOCK => {
     482            0 :                         let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
     483            0 :                         if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
     484            0 :                             old_heap_blkno = Some(decoded.blocks[0].blkno);
     485            0 :                             flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
     486            0 :                         }
     487              :                     }
     488            0 :                     info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
     489              :                 }
     490              :             }
     491            0 :             _ => anyhow::bail!(
     492            0 :                 "Neon RMGR has no known compatibility with PostgreSQL version {}",
     493            0 :                 pg_version
     494            0 :             ),
     495              :         }
     496              : 
     497            0 :         if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
     498            0 :             let vm_rel = RelTag {
     499            0 :                 forknum: VISIBILITYMAP_FORKNUM,
     500            0 :                 spcnode: decoded.blocks[0].rnode_spcnode,
     501            0 :                 dbnode: decoded.blocks[0].rnode_dbnode,
     502            0 :                 relnode: decoded.blocks[0].rnode_relnode,
     503            0 :             };
     504            0 : 
     505            0 :             Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
     506            0 :                 ClearVmBits {
     507            0 :                     new_heap_blkno,
     508            0 :                     old_heap_blkno,
     509            0 :                     vm_rel,
     510            0 :                     flags,
     511            0 :                 },
     512            0 :             ))))
     513              :         } else {
     514            0 :             Ok(None)
     515              :         }
     516            0 :     }
     517              : 
     518           16 :     fn decode_smgr_record(
     519           16 :         buf: &mut Bytes,
     520           16 :         decoded: &DecodedWALRecord,
     521           16 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     522           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     523           16 :         if info == pg_constants::XLOG_SMGR_CREATE {
     524           16 :             let create = XlSmgrCreate::decode(buf);
     525           16 :             let rel = RelTag {
     526           16 :                 spcnode: create.rnode.spcnode,
     527           16 :                 dbnode: create.rnode.dbnode,
     528           16 :                 relnode: create.rnode.relnode,
     529           16 :                 forknum: create.forknum,
     530           16 :             };
     531           16 : 
     532           16 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
     533           16 :                 rel,
     534           16 :             }))));
     535            0 :         } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
     536            0 :             let truncate = XlSmgrTruncate::decode(buf);
     537            0 :             return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
     538            0 :         }
     539            0 : 
     540            0 :         Ok(None)
     541           16 :     }
     542              : 
     543            0 :     fn decode_dbase_record(
     544            0 :         buf: &mut Bytes,
     545            0 :         decoded: &DecodedWALRecord,
     546            0 :         pg_version: u32,
     547            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     548            0 :         // TODO: Refactor this to avoid the duplication between postgres versions.
     549            0 : 
     550            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     551            0 :         tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
     552              : 
     553            0 :         if pg_version == 14 {
     554            0 :             if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
     555            0 :                 let createdb = XlCreateDatabase::decode(buf);
     556            0 :                 tracing::debug!("XLOG_DBASE_CREATE v14");
     557              : 
     558            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     559            0 :                     db_id: createdb.db_id,
     560            0 :                     tablespace_id: createdb.tablespace_id,
     561            0 :                     src_db_id: createdb.src_db_id,
     562            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     563            0 :                 }));
     564            0 : 
     565            0 :                 return Ok(Some(record));
     566            0 :             } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
     567            0 :                 let dropdb = XlDropDatabase::decode(buf);
     568            0 : 
     569            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     570            0 :                     db_id: dropdb.db_id,
     571            0 :                     tablespace_ids: dropdb.tablespace_ids,
     572            0 :                 }));
     573            0 : 
     574            0 :                 return Ok(Some(record));
     575            0 :             }
     576            0 :         } else if pg_version == 15 {
     577            0 :             if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     578            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     579            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     580              :                 // The XLOG record was renamed between v14 and v15,
     581              :                 // but the record format is the same.
     582              :                 // So we can reuse XlCreateDatabase here.
     583            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     584              : 
     585            0 :                 let createdb = XlCreateDatabase::decode(buf);
     586            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     587            0 :                     db_id: createdb.db_id,
     588            0 :                     tablespace_id: createdb.tablespace_id,
     589            0 :                     src_db_id: createdb.src_db_id,
     590            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     591            0 :                 }));
     592            0 : 
     593            0 :                 return Ok(Some(record));
     594            0 :             } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
     595            0 :                 let dropdb = XlDropDatabase::decode(buf);
     596            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     597            0 :                     db_id: dropdb.db_id,
     598            0 :                     tablespace_ids: dropdb.tablespace_ids,
     599            0 :                 }));
     600            0 : 
     601            0 :                 return Ok(Some(record));
     602            0 :             }
     603            0 :         } else if pg_version == 16 {
     604            0 :             if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     605            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     606            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     607              :                 // The XLOG record was renamed between v14 and v15,
     608              :                 // but the record format is the same.
     609              :                 // So we can reuse XlCreateDatabase here.
     610            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     611              : 
     612            0 :                 let createdb = XlCreateDatabase::decode(buf);
     613            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     614            0 :                     db_id: createdb.db_id,
     615            0 :                     tablespace_id: createdb.tablespace_id,
     616            0 :                     src_db_id: createdb.src_db_id,
     617            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     618            0 :                 }));
     619            0 : 
     620            0 :                 return Ok(Some(record));
     621            0 :             } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
     622            0 :                 let dropdb = XlDropDatabase::decode(buf);
     623            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     624            0 :                     db_id: dropdb.db_id,
     625            0 :                     tablespace_ids: dropdb.tablespace_ids,
     626            0 :                 }));
     627            0 : 
     628            0 :                 return Ok(Some(record));
     629            0 :             }
     630            0 :         } else if pg_version == 17 {
     631            0 :             if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
     632            0 :                 tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
     633            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
     634              :                 // The XLOG record was renamed between v14 and v15,
     635              :                 // but the record format is the same.
     636              :                 // So we can reuse XlCreateDatabase here.
     637            0 :                 tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
     638              : 
     639            0 :                 let createdb = XlCreateDatabase::decode(buf);
     640            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
     641            0 :                     db_id: createdb.db_id,
     642            0 :                     tablespace_id: createdb.tablespace_id,
     643            0 :                     src_db_id: createdb.src_db_id,
     644            0 :                     src_tablespace_id: createdb.src_tablespace_id,
     645            0 :                 }));
     646            0 : 
     647            0 :                 return Ok(Some(record));
     648            0 :             } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
     649            0 :                 let dropdb = XlDropDatabase::decode(buf);
     650            0 :                 let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
     651            0 :                     db_id: dropdb.db_id,
     652            0 :                     tablespace_ids: dropdb.tablespace_ids,
     653            0 :                 }));
     654            0 : 
     655            0 :                 return Ok(Some(record));
     656            0 :             }
     657            0 :         }
     658              : 
     659            0 :         Ok(None)
     660            0 :     }
     661              : 
     662            0 :     fn decode_clog_record(
     663            0 :         buf: &mut Bytes,
     664            0 :         decoded: &DecodedWALRecord,
     665            0 :         pg_version: u32,
     666            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     667            0 :         let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
     668            0 : 
     669            0 :         if info == pg_constants::CLOG_ZEROPAGE {
     670            0 :             let pageno = if pg_version < 17 {
     671            0 :                 buf.get_u32_le()
     672              :             } else {
     673            0 :                 buf.get_u64_le() as u32
     674              :             };
     675            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     676            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     677            0 : 
     678            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
     679            0 :                 ClogZeroPage { segno, rpageno },
     680            0 :             ))))
     681              :         } else {
     682            0 :             assert!(info == pg_constants::CLOG_TRUNCATE);
     683            0 :             let xlrec = XlClogTruncate::decode(buf, pg_version);
     684            0 : 
     685            0 :             Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
     686            0 :                 ClogTruncate {
     687            0 :                     pageno: xlrec.pageno,
     688            0 :                     oldest_xid: xlrec.oldest_xid,
     689            0 :                     oldest_xid_db: xlrec.oldest_xid_db,
     690            0 :                 },
     691            0 :             ))))
     692              :         }
     693            0 :     }
     694              : 
     695           24 :     fn decode_xact_record(
     696           24 :         buf: &mut Bytes,
     697           24 :         decoded: &DecodedWALRecord,
     698           24 :         lsn: Lsn,
     699           24 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     700           24 :         let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
     701           24 :         let origin_id = decoded.origin_id;
     702           24 :         let xl_xid = decoded.xl_xid;
     703           24 : 
     704           24 :         if info == pg_constants::XLOG_XACT_COMMIT {
     705            8 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     706            8 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
     707            8 :                 parsed,
     708            8 :                 origin_id,
     709            8 :                 xl_xid,
     710            8 :                 lsn,
     711            8 :             }))));
     712           16 :         } else if info == pg_constants::XLOG_XACT_ABORT {
     713            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     714            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
     715            0 :                 parsed,
     716            0 :                 origin_id,
     717            0 :                 xl_xid,
     718            0 :                 lsn,
     719            0 :             }))));
     720           16 :         } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
     721            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     722            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
     723            0 :                 XactCommon {
     724            0 :                     parsed,
     725            0 :                     origin_id,
     726            0 :                     xl_xid,
     727            0 :                     lsn,
     728            0 :                 },
     729            0 :             ))));
     730           16 :         } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
     731            0 :             let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
     732            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
     733            0 :                 XactCommon {
     734            0 :                     parsed,
     735            0 :                     origin_id,
     736            0 :                     xl_xid,
     737            0 :                     lsn,
     738            0 :                 },
     739            0 :             ))));
     740           16 :         } else if info == pg_constants::XLOG_XACT_PREPARE {
     741            0 :             return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
     742            0 :                 XactPrepare {
     743            0 :                     xl_xid: decoded.xl_xid,
     744            0 :                     data: Bytes::copy_from_slice(&buf[..]),
     745            0 :                 },
     746            0 :             ))));
     747           16 :         }
     748           16 : 
     749           16 :         Ok(None)
     750           24 :     }
     751              : 
     752            0 :     fn decode_multixact_record(
     753            0 :         buf: &mut Bytes,
     754            0 :         decoded: &DecodedWALRecord,
     755            0 :         pg_version: u32,
     756            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     757            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     758            0 : 
     759            0 :         if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
     760            0 :             || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
     761              :         {
     762            0 :             let pageno = if pg_version < 17 {
     763            0 :                 buf.get_u32_le()
     764              :             } else {
     765            0 :                 buf.get_u64_le() as u32
     766              :             };
     767            0 :             let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     768            0 :             let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     769              : 
     770            0 :             let slru_kind = match info {
     771            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
     772            0 :                 pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
     773            0 :                 _ => unreachable!(),
     774              :             };
     775              : 
     776            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
     777            0 :                 MultiXactZeroPage {
     778            0 :                     slru_kind,
     779            0 :                     segno,
     780            0 :                     rpageno,
     781            0 :                 },
     782            0 :             ))));
     783            0 :         } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
     784            0 :             let xlrec = XlMultiXactCreate::decode(buf);
     785            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
     786            0 :                 xlrec,
     787            0 :             ))));
     788            0 :         } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
     789            0 :             let xlrec = XlMultiXactTruncate::decode(buf);
     790            0 :             return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
     791            0 :                 xlrec,
     792            0 :             ))));
     793            0 :         }
     794            0 : 
     795            0 :         Ok(None)
     796            0 :     }
     797              : 
     798            0 :     fn decode_relmap_record(
     799            0 :         buf: &mut Bytes,
     800            0 :         decoded: &DecodedWALRecord,
     801            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     802            0 :         let update = XlRelmapUpdate::decode(buf);
     803            0 : 
     804            0 :         let mut buf = decoded.record.clone();
     805            0 :         buf.advance(decoded.main_data_offset);
     806            0 :         // skip xl_relmap_update
     807            0 :         buf.advance(12);
     808            0 : 
     809            0 :         Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
     810            0 :             RelmapUpdate {
     811            0 :                 update,
     812            0 :                 buf: Bytes::copy_from_slice(&buf[..]),
     813            0 :             },
     814            0 :         ))))
     815            0 :     }
     816              : 
     817           30 :     fn decode_xlog_record(
     818           30 :         buf: &mut Bytes,
     819           30 :         decoded: &DecodedWALRecord,
     820           30 :         lsn: Lsn,
     821           30 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     822           30 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     823           30 :         Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
     824           30 :             info,
     825           30 :             lsn,
     826           30 :             buf: buf.clone(),
     827           30 :         }))))
     828           30 :     }
     829              : 
     830            0 :     fn decode_logical_message_record(
     831            0 :         buf: &mut Bytes,
     832            0 :         decoded: &DecodedWALRecord,
     833            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     834            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     835            0 :         if info == pg_constants::XLOG_LOGICAL_MESSAGE {
     836            0 :             let xlrec = XlLogicalMessage::decode(buf);
     837            0 :             let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
     838              : 
     839              :             #[cfg(feature = "testing")]
     840            0 :             if prefix == "neon-test" {
     841            0 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     842            0 :                     LogicalMessageRecord::Failpoint,
     843            0 :                 )));
     844            0 :             }
     845              : 
     846            0 :             if let Some(path) = prefix.strip_prefix("neon-file:") {
     847            0 :                 let buf_size = xlrec.prefix_size + xlrec.message_size;
     848            0 :                 let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
     849            0 :                 return Ok(Some(MetadataRecord::LogicalMessage(
     850            0 :                     LogicalMessageRecord::Put(PutLogicalMessage {
     851            0 :                         path: path.to_string(),
     852            0 :                         buf,
     853            0 :                     }),
     854            0 :                 )));
     855            0 :             }
     856            0 :         }
     857              : 
     858            0 :         Ok(None)
     859            0 :     }
     860              : 
     861           16 :     fn decode_standby_record(
     862           16 :         buf: &mut Bytes,
     863           16 :         decoded: &DecodedWALRecord,
     864           16 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     865           16 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     866           16 :         if info == pg_constants::XLOG_RUNNING_XACTS {
     867            0 :             let xlrec = XlRunningXacts::decode(buf);
     868            0 :             return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
     869            0 :                 StandbyRunningXacts {
     870            0 :                     oldest_running_xid: xlrec.oldest_running_xid,
     871            0 :                 },
     872            0 :             ))));
     873           16 :         }
     874           16 : 
     875           16 :         Ok(None)
     876           16 :     }
     877              : 
     878            0 :     fn decode_replorigin_record(
     879            0 :         buf: &mut Bytes,
     880            0 :         decoded: &DecodedWALRecord,
     881            0 :     ) -> anyhow::Result<Option<MetadataRecord>> {
     882            0 :         let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     883            0 :         if info == pg_constants::XLOG_REPLORIGIN_SET {
     884            0 :             let xlrec = XlReploriginSet::decode(buf);
     885            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
     886            0 :                 xlrec,
     887            0 :             ))));
     888            0 :         } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
     889            0 :             let xlrec = XlReploriginDrop::decode(buf);
     890            0 :             return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
     891            0 :                 xlrec,
     892            0 :             ))));
     893            0 :         }
     894            0 : 
     895            0 :         Ok(None)
     896            0 :     }
     897              : }
        

Generated by: LCOV version 2.1-beta