LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - walrecord.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 39.1 % 606 237
Test Date: 2025-03-12 16:10:49 Functions: 10.2 % 88 9

            Line data    Source code
       1              : //! This module houses types used in decoding of PG WAL
       2              : //! records.
       3              : //!
       4              : //! TODO: Generate separate types for each supported PG version
       5              : 
       6              : use bytes::{Buf, Bytes};
       7              : use serde::{Deserialize, Serialize};
       8              : use utils::bin_ser::DeserializeError;
       9              : use utils::lsn::Lsn;
      10              : 
      11              : use crate::{
      12              :     BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId,
      13              :     TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
      14              : };
      15              : 
      16              : #[repr(C)]
      17            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      18              : pub struct XlMultiXactCreate {
      19              :     pub mid: MultiXactId,
      20              :     /* new MultiXact's ID */
      21              :     pub moff: MultiXactOffset,
      22              :     /* its starting offset in members file */
      23              :     pub nmembers: u32,
      24              :     /* number of member XIDs */
      25              :     pub members: Vec<MultiXactMember>,
      26              : }
      27              : 
      28              : impl XlMultiXactCreate {
      29            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
      30            0 :         let mid = buf.get_u32_le();
      31            0 :         let moff = buf.get_u32_le();
      32            0 :         let nmembers = buf.get_u32_le();
      33            0 :         let mut members = Vec::new();
      34            0 :         for _ in 0..nmembers {
      35            0 :             members.push(MultiXactMember::decode(buf));
      36            0 :         }
      37            0 :         XlMultiXactCreate {
      38            0 :             mid,
      39            0 :             moff,
      40            0 :             nmembers,
      41            0 :             members,
      42            0 :         }
      43            0 :     }
      44              : }
      45              : 
      46              : #[repr(C)]
      47            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      48              : pub struct XlMultiXactTruncate {
      49              :     pub oldest_multi_db: Oid,
      50              :     /* to-be-truncated range of multixact offsets */
      51              :     pub start_trunc_off: MultiXactId,
      52              :     /* just for completeness' sake */
      53              :     pub end_trunc_off: MultiXactId,
      54              : 
      55              :     /* to-be-truncated range of multixact members */
      56              :     pub start_trunc_memb: MultiXactOffset,
      57              :     pub end_trunc_memb: MultiXactOffset,
      58              : }
      59              : 
      60              : impl XlMultiXactTruncate {
      61            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
      62            0 :         XlMultiXactTruncate {
      63            0 :             oldest_multi_db: buf.get_u32_le(),
      64            0 :             start_trunc_off: buf.get_u32_le(),
      65            0 :             end_trunc_off: buf.get_u32_le(),
      66            0 :             start_trunc_memb: buf.get_u32_le(),
      67            0 :             end_trunc_memb: buf.get_u32_le(),
      68            0 :         }
      69            0 :     }
      70              : }
      71              : 
      72              : #[repr(C)]
      73            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      74              : pub struct XlRelmapUpdate {
      75              :     pub dbid: Oid,   /* database ID, or 0 for shared map */
      76              :     pub tsid: Oid,   /* database's tablespace, or pg_global */
      77              :     pub nbytes: i32, /* size of relmap data */
      78              : }
      79              : 
      80              : impl XlRelmapUpdate {
      81            0 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
      82            0 :         XlRelmapUpdate {
      83            0 :             dbid: buf.get_u32_le(),
      84            0 :             tsid: buf.get_u32_le(),
      85            0 :             nbytes: buf.get_i32_le(),
      86            0 :         }
      87            0 :     }
      88              : }
      89              : 
      90              : #[repr(C)]
      91            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      92              : pub struct XlReploriginDrop {
      93              :     pub node_id: RepOriginId,
      94              : }
      95              : 
      96              : impl XlReploriginDrop {
      97            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginDrop {
      98            0 :         XlReploriginDrop {
      99            0 :             node_id: buf.get_u16_le(),
     100            0 :         }
     101            0 :     }
     102              : }
     103              : 
     104              : #[repr(C)]
     105            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
     106              : pub struct XlReploriginSet {
     107              :     pub remote_lsn: Lsn,
     108              :     pub node_id: RepOriginId,
     109              : }
     110              : 
     111              : impl XlReploriginSet {
     112            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginSet {
     113            0 :         XlReploriginSet {
     114            0 :             remote_lsn: Lsn(buf.get_u64_le()),
     115            0 :             node_id: buf.get_u16_le(),
     116            0 :         }
     117            0 :     }
     118              : }
     119              : 
     120              : #[repr(C)]
     121            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     122              : pub struct RelFileNode {
     123              :     pub spcnode: Oid, /* tablespace */
     124              :     pub dbnode: Oid,  /* database */
     125              :     pub relnode: Oid, /* relation */
     126              : }
     127              : 
     128              : #[repr(C)]
     129            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     130              : pub struct MultiXactMember {
     131              :     pub xid: TransactionId,
     132              :     pub status: MultiXactStatus,
     133              : }
     134              : 
     135              : impl MultiXactMember {
     136            0 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     137            0 :         MultiXactMember {
     138            0 :             xid: buf.get_u32_le(),
     139            0 :             status: buf.get_u32_le(),
     140            0 :         }
     141            0 :     }
     142              : }
     143              : 
     144              : /// DecodedBkpBlock represents per-page data contained in a WAL record.
     145              : #[derive(Default)]
     146              : pub struct DecodedBkpBlock {
     147              :     /* Is this block ref in use? */
     148              :     //in_use: bool,
     149              : 
     150              :     /* Identify the block this refers to */
     151              :     pub rnode_spcnode: u32,
     152              :     pub rnode_dbnode: u32,
     153              :     pub rnode_relnode: u32,
     154              :     // Note that we have a few special forknum values for non-rel files.
     155              :     pub forknum: u8,
     156              :     pub blkno: u32,
     157              : 
     158              :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
     159              :     pub flags: u8,
     160              : 
     161              :     /* Information on full-page image, if any */
     162              :     pub has_image: bool,
     163              :     /* has image, even for consistency checking */
     164              :     pub apply_image: bool,
     165              :     /* has image that should be restored */
     166              :     pub will_init: bool,
     167              :     /* record doesn't need previous page version to apply */
     168              :     //char         *bkp_image;
     169              :     pub hole_offset: u16,
     170              :     pub hole_length: u16,
     171              :     pub bimg_offset: u32,
     172              :     pub bimg_len: u16,
     173              :     pub bimg_info: u8,
     174              : 
     175              :     /* Buffer holding the rmgr-specific data associated with this block */
     176              :     has_data: bool,
     177              :     data_len: u16,
     178              : }
     179              : 
     180              : impl DecodedBkpBlock {
     181       291284 :     pub fn new() -> DecodedBkpBlock {
     182       291284 :         Default::default()
     183       291284 :     }
     184              : }
     185              : 
     186              : #[derive(Default)]
     187              : pub struct DecodedWALRecord {
     188              :     pub xl_xid: TransactionId,
     189              :     pub xl_info: u8,
     190              :     pub xl_rmid: u8,
     191              :     pub record: Bytes, // raw XLogRecord
     192              : 
     193              :     pub blocks: Vec<DecodedBkpBlock>,
     194              :     pub main_data_offset: usize,
     195              :     pub origin_id: u16,
     196              : }
     197              : 
     198              : impl DecodedWALRecord {
     199              :     /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
     200              :     /// by reading other existing relations' data blocks.  This is more complex to apply than new-style database
     201              :     /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
     202       292310 :     pub fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
     203       292310 :         if self.xl_rmid == pg_constants::RM_DBASE_ID {
     204            0 :             let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     205            0 :             match pg_version {
     206              :                 14 => {
     207              :                     // Postgres 14 database creations are always the legacy kind
     208            0 :                     info == crate::v14::bindings::XLOG_DBASE_CREATE
     209              :                 }
     210            0 :                 15 => info == crate::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     211            0 :                 16 => info == crate::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     212            0 :                 17 => info == crate::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     213              :                 _ => {
     214            0 :                     panic!("Unsupported postgres version {pg_version}")
     215              :                 }
     216              :             }
     217              :         } else {
     218       292310 :             false
     219              :         }
     220       292310 :     }
     221              : }
     222              : 
     223              : /// Main routine to decode a WAL record and figure out which blocks are modified
     224              : //
     225              : // See xlogrecord.h for details
     226              : // The overall layout of an XLOG record is:
     227              : //              Fixed-size header (XLogRecord struct)
     228              : //      XLogRecordBlockHeader struct
     229              : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     230              : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     231              : //                 XLogRecordBlockCompressHeader struct follows.
     232              : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     233              : //          BlockNumber follows
     234              : //      XLogRecordBlockHeader struct
     235              : //      ...
     236              : //      XLogRecordDataHeader[Short|Long] struct
     237              : //      block data
     238              : //      block data
     239              : //      ...
     240              : //      main data
     241              : //
     242              : //
     243              : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
     244              : // It would be more natural for this function to return a DecodedWALRecord as return value,
     245              : // but reusing the caller-supplied struct avoids an allocation.
     246              : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
     247              : //
     248       292310 : pub fn decode_wal_record(
     249       292310 :     record: Bytes,
     250       292310 :     decoded: &mut DecodedWALRecord,
     251       292310 :     pg_version: u32,
     252       292310 : ) -> anyhow::Result<()> {
     253       292310 :     let mut rnode_spcnode: u32 = 0;
     254       292310 :     let mut rnode_dbnode: u32 = 0;
     255       292310 :     let mut rnode_relnode: u32 = 0;
     256       292310 :     let mut got_rnode = false;
     257       292310 :     let mut origin_id: u16 = 0;
     258       292310 : 
     259       292310 :     let mut buf = record.clone();
     260              : 
     261              :     // 1. Parse XLogRecord struct
     262              : 
     263              :     // FIXME: assume little-endian here
     264       292310 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     265              : 
     266       292310 :     tracing::trace!(
     267            0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
     268              :         xlogrec.xl_rmid,
     269              :         xlogrec.xl_info
     270              :     );
     271              : 
     272       292310 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
     273       292310 : 
     274       292310 :     if buf.remaining() != remaining {
     275            0 :         //TODO error
     276       292310 :     }
     277              : 
     278       292310 :     let mut max_block_id = 0;
     279       292310 :     let mut blocks_total_len: u32 = 0;
     280       292310 :     let mut main_data_len = 0;
     281       292310 :     let mut datatotal: u32 = 0;
     282       292310 :     decoded.blocks.clear();
     283              : 
     284              :     // 2. Decode the headers.
     285              :     // XLogRecordBlockHeaders if any,
     286              :     // XLogRecordDataHeader[Short|Long]
     287       875856 :     while buf.remaining() > datatotal as usize {
     288       583546 :         let block_id = buf.get_u8();
     289       583546 : 
     290       583546 :         match block_id {
     291       291624 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
     292       291624 :                 /* XLogRecordDataHeaderShort */
     293       291624 :                 main_data_len = buf.get_u8() as u32;
     294       291624 :                 datatotal += main_data_len;
     295       291624 :             }
     296              : 
     297          638 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
     298          638 :                 /* XLogRecordDataHeaderLong */
     299          638 :                 main_data_len = buf.get_u32_le();
     300          638 :                 datatotal += main_data_len;
     301          638 :             }
     302              : 
     303            0 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
     304            0 :                 // RepOriginId is uint16
     305            0 :                 origin_id = buf.get_u16_le();
     306            0 :             }
     307              : 
     308            0 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
     309            0 :                 // TransactionId is uint32
     310            0 :                 buf.advance(4);
     311            0 :             }
     312              : 
     313       291284 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
     314              :                 /* XLogRecordBlockHeader */
     315       291284 :                 let mut blk = DecodedBkpBlock::new();
     316       291284 : 
     317       291284 :                 if block_id <= max_block_id {
     318       291284 :                     // TODO
     319       291284 :                     //report_invalid_record(state,
     320       291284 :                     //                    "out-of-order block_id %u at %X/%X",
     321       291284 :                     //                    block_id,
     322       291284 :                     //                    (uint32) (state->ReadRecPtr >> 32),
     323       291284 :                     //                    (uint32) state->ReadRecPtr);
     324       291284 :                     //    goto err;
     325       291284 :                 }
     326       291284 :                 max_block_id = block_id;
     327       291284 : 
     328       291284 :                 let fork_flags: u8 = buf.get_u8();
     329       291284 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
     330       291284 :                 blk.flags = fork_flags;
     331       291284 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
     332       291284 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
     333       291284 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
     334       291284 :                 blk.data_len = buf.get_u16_le();
     335       291284 : 
     336       291284 :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
     337       291284 : 
     338       291284 :                 datatotal += blk.data_len as u32;
     339       291284 :                 blocks_total_len += blk.data_len as u32;
     340       291284 : 
     341       291284 :                 if blk.has_image {
     342          120 :                     blk.bimg_len = buf.get_u16_le();
     343          120 :                     blk.hole_offset = buf.get_u16_le();
     344          120 :                     blk.bimg_info = buf.get_u8();
     345              : 
     346            0 :                     blk.apply_image = dispatch_pgversion!(
     347          120 :                         pg_version,
     348            0 :                         (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
     349              :                     );
     350              : 
     351          120 :                     let blk_img_is_compressed =
     352          120 :                         crate::bkpimage_is_compressed(blk.bimg_info, pg_version);
     353          120 : 
     354          120 :                     if blk_img_is_compressed {
     355            0 :                         tracing::debug!("compressed block image , pg_version = {}", pg_version);
     356          120 :                     }
     357              : 
     358          120 :                     if blk_img_is_compressed {
     359            0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
     360            0 :                             blk.hole_length = buf.get_u16_le();
     361            0 :                         } else {
     362            0 :                             blk.hole_length = 0;
     363            0 :                         }
     364          120 :                     } else {
     365          120 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
     366          120 :                     }
     367          120 :                     datatotal += blk.bimg_len as u32;
     368          120 :                     blocks_total_len += blk.bimg_len as u32;
     369          120 : 
     370          120 :                     /*
     371          120 :                      * cross-check that hole_offset > 0, hole_length > 0 and
     372          120 :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
     373          120 :                      */
     374          120 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
     375           72 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
     376            0 :                     {
     377            0 :                         // TODO
     378            0 :                         /*
     379            0 :                         report_invalid_record(state,
     380            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
     381            0 :                                       (unsigned int) blk->hole_offset,
     382            0 :                                       (unsigned int) blk->hole_length,
     383            0 :                                       (unsigned int) blk->bimg_len,
     384            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     385            0 :                         goto err;
     386            0 :                                      */
     387          120 :                     }
     388              : 
     389              :                     /*
     390              :                      * cross-check that hole_offset == 0 and hole_length == 0 if
     391              :                      * the HAS_HOLE flag is not set.
     392              :                      */
     393          120 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     394           48 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
     395            0 :                     {
     396            0 :                         // TODO
     397            0 :                         /*
     398            0 :                         report_invalid_record(state,
     399            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
     400            0 :                                       (unsigned int) blk->hole_offset,
     401            0 :                                       (unsigned int) blk->hole_length,
     402            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     403            0 :                         goto err;
     404            0 :                                      */
     405          120 :                     }
     406              : 
     407              :                     /*
     408              :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
     409              :                      * flag is set.
     410              :                      */
     411          120 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
     412           48 :                         // TODO
     413           48 :                         /*
     414           48 :                         report_invalid_record(state,
     415           48 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
     416           48 :                                       (unsigned int) blk->bimg_len,
     417           48 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     418           48 :                         goto err;
     419           48 :                                      */
     420           72 :                     }
     421              : 
     422              :                     /*
     423              :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
     424              :                      * IS_COMPRESSED flag is set.
     425              :                      */
     426          120 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     427           48 :                         && !blk_img_is_compressed
     428           48 :                         && blk.bimg_len != BLCKSZ
     429            0 :                     {
     430            0 :                         // TODO
     431            0 :                         /*
     432            0 :                         report_invalid_record(state,
     433            0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
     434            0 :                                       (unsigned int) blk->data_len,
     435            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     436            0 :                         goto err;
     437            0 :                                      */
     438          120 :                     }
     439       291164 :                 }
     440       291284 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
     441       291284 :                     rnode_spcnode = buf.get_u32_le();
     442       291284 :                     rnode_dbnode = buf.get_u32_le();
     443       291284 :                     rnode_relnode = buf.get_u32_le();
     444       291284 :                     got_rnode = true;
     445       291284 :                 } else if !got_rnode {
     446            0 :                     // TODO
     447            0 :                     /*
     448            0 :                     report_invalid_record(state,
     449            0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
     450            0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     451            0 :                     goto err;           */
     452            0 :                 }
     453              : 
     454       291284 :                 blk.rnode_spcnode = rnode_spcnode;
     455       291284 :                 blk.rnode_dbnode = rnode_dbnode;
     456       291284 :                 blk.rnode_relnode = rnode_relnode;
     457       291284 : 
     458       291284 :                 blk.blkno = buf.get_u32_le();
     459       291284 :                 tracing::trace!(
     460            0 :                     "this record affects {}/{}/{} blk {}",
     461              :                     rnode_spcnode,
     462              :                     rnode_dbnode,
     463              :                     rnode_relnode,
     464              :                     blk.blkno
     465              :                 );
     466              : 
     467       291284 :                 decoded.blocks.push(blk);
     468              :             }
     469              : 
     470            0 :             _ => {
     471            0 :                 // TODO: invalid block_id
     472            0 :             }
     473              :         }
     474              :     }
     475              : 
     476              :     // 3. Decode blocks.
     477       292310 :     let mut ptr = record.len() - buf.remaining();
     478       292310 :     for blk in decoded.blocks.iter_mut() {
     479       291284 :         if blk.has_image {
     480          120 :             blk.bimg_offset = ptr as u32;
     481          120 :             ptr += blk.bimg_len as usize;
     482       291164 :         }
     483       291284 :         if blk.has_data {
     484       291164 :             ptr += blk.data_len as usize;
     485       291164 :         }
     486              :     }
     487              :     // We don't need them, so just skip blocks_total_len bytes
     488       292310 :     buf.advance(blocks_total_len as usize);
     489       292310 :     assert_eq!(ptr, record.len() - buf.remaining());
     490              : 
     491       292310 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
     492       292310 : 
     493       292310 :     // 4. Decode main_data
     494       292310 :     if main_data_len > 0 {
     495       292262 :         assert_eq!(buf.remaining(), main_data_len as usize);
     496           48 :     }
     497              : 
     498       292310 :     decoded.xl_xid = xlogrec.xl_xid;
     499       292310 :     decoded.xl_info = xlogrec.xl_info;
     500       292310 :     decoded.xl_rmid = xlogrec.xl_rmid;
     501       292310 :     decoded.record = record;
     502       292310 :     decoded.origin_id = origin_id;
     503       292310 :     decoded.main_data_offset = main_data_offset;
     504       292310 : 
     505       292310 :     Ok(())
     506       292310 : }
     507              : 
     508              : pub mod v14 {
     509              :     use bytes::{Buf, Bytes};
     510              : 
     511              :     use crate::{OffsetNumber, TransactionId};
     512              : 
     513              :     #[repr(C)]
     514              :     #[derive(Debug)]
     515              :     pub struct XlHeapInsert {
     516              :         pub offnum: OffsetNumber,
     517              :         pub flags: u8,
     518              :     }
     519              : 
     520              :     impl XlHeapInsert {
     521       290552 :         pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     522       290552 :             XlHeapInsert {
     523       290552 :                 offnum: buf.get_u16_le(),
     524       290552 :                 flags: buf.get_u8(),
     525       290552 :             }
     526       290552 :         }
     527              :     }
     528              : 
     529              :     #[repr(C)]
     530              :     #[derive(Debug)]
     531              :     pub struct XlHeapMultiInsert {
     532              :         pub flags: u8,
     533              :         pub _padding: u8,
     534              :         pub ntuples: u16,
     535              :     }
     536              : 
     537              :     impl XlHeapMultiInsert {
     538           84 :         pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     539           84 :             XlHeapMultiInsert {
     540           84 :                 flags: buf.get_u8(),
     541           84 :                 _padding: buf.get_u8(),
     542           84 :                 ntuples: buf.get_u16_le(),
     543           84 :             }
     544           84 :         }
     545              :     }
     546              : 
     547              :     #[repr(C)]
     548              :     #[derive(Debug)]
     549              :     pub struct XlHeapDelete {
     550              :         pub xmax: TransactionId,
     551              :         pub offnum: OffsetNumber,
     552              :         pub _padding: u16,
     553              :         pub t_cid: u32,
     554              :         pub infobits_set: u8,
     555              :         pub flags: u8,
     556              :     }
     557              : 
     558              :     impl XlHeapDelete {
     559            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     560            0 :             XlHeapDelete {
     561            0 :                 xmax: buf.get_u32_le(),
     562            0 :                 offnum: buf.get_u16_le(),
     563            0 :                 _padding: buf.get_u16_le(),
     564            0 :                 t_cid: buf.get_u32_le(),
     565            0 :                 infobits_set: buf.get_u8(),
     566            0 :                 flags: buf.get_u8(),
     567            0 :             }
     568            0 :         }
     569              :     }
     570              : 
     571              :     #[repr(C)]
     572              :     #[derive(Debug)]
     573              :     pub struct XlHeapUpdate {
     574              :         pub old_xmax: TransactionId,
     575              :         pub old_offnum: OffsetNumber,
     576              :         pub old_infobits_set: u8,
     577              :         pub flags: u8,
     578              :         pub t_cid: u32,
     579              :         pub new_xmax: TransactionId,
     580              :         pub new_offnum: OffsetNumber,
     581              :     }
     582              : 
     583              :     impl XlHeapUpdate {
     584           16 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     585           16 :             XlHeapUpdate {
     586           16 :                 old_xmax: buf.get_u32_le(),
     587           16 :                 old_offnum: buf.get_u16_le(),
     588           16 :                 old_infobits_set: buf.get_u8(),
     589           16 :                 flags: buf.get_u8(),
     590           16 :                 t_cid: buf.get_u32_le(),
     591           16 :                 new_xmax: buf.get_u32_le(),
     592           16 :                 new_offnum: buf.get_u16_le(),
     593           16 :             }
     594           16 :         }
     595              :     }
     596              : 
     597              :     #[repr(C)]
     598              :     #[derive(Debug)]
     599              :     pub struct XlHeapLock {
     600              :         pub locking_xid: TransactionId,
     601              :         pub offnum: OffsetNumber,
     602              :         pub _padding: u16,
     603              :         pub t_cid: u32,
     604              :         pub infobits_set: u8,
     605              :         pub flags: u8,
     606              :     }
     607              : 
     608              :     impl XlHeapLock {
     609            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     610            0 :             XlHeapLock {
     611            0 :                 locking_xid: buf.get_u32_le(),
     612            0 :                 offnum: buf.get_u16_le(),
     613            0 :                 _padding: buf.get_u16_le(),
     614            0 :                 t_cid: buf.get_u32_le(),
     615            0 :                 infobits_set: buf.get_u8(),
     616            0 :                 flags: buf.get_u8(),
     617            0 :             }
     618            0 :         }
     619              :     }
     620              : 
     621              :     #[repr(C)]
     622              :     #[derive(Debug)]
     623              :     pub struct XlHeapLockUpdated {
     624              :         pub xmax: TransactionId,
     625              :         pub offnum: OffsetNumber,
     626              :         pub infobits_set: u8,
     627              :         pub flags: u8,
     628              :     }
     629              : 
     630              :     impl XlHeapLockUpdated {
     631            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
     632            0 :             XlHeapLockUpdated {
     633            0 :                 xmax: buf.get_u32_le(),
     634            0 :                 offnum: buf.get_u16_le(),
     635            0 :                 infobits_set: buf.get_u8(),
     636            0 :                 flags: buf.get_u8(),
     637            0 :             }
     638            0 :         }
     639              :     }
     640              : 
     641              :     #[repr(C)]
     642              :     #[derive(Debug)]
     643              :     pub struct XlParameterChange {
     644              :         pub max_connections: i32,
     645              :         pub max_worker_processes: i32,
     646              :         pub max_wal_senders: i32,
     647              :         pub max_prepared_xacts: i32,
     648              :         pub max_locks_per_xact: i32,
     649              :         pub wal_level: i32,
     650              :         pub wal_log_hints: bool,
     651              :         pub track_commit_timestamp: bool,
     652              :         pub _padding: [u8; 2],
     653              :     }
     654              : 
     655              :     impl XlParameterChange {
     656            0 :         pub fn decode(buf: &mut Bytes) -> XlParameterChange {
     657            0 :             XlParameterChange {
     658            0 :                 max_connections: buf.get_i32_le(),
     659            0 :                 max_worker_processes: buf.get_i32_le(),
     660            0 :                 max_wal_senders: buf.get_i32_le(),
     661            0 :                 max_prepared_xacts: buf.get_i32_le(),
     662            0 :                 max_locks_per_xact: buf.get_i32_le(),
     663            0 :                 wal_level: buf.get_i32_le(),
     664            0 :                 wal_log_hints: buf.get_u8() != 0,
     665            0 :                 track_commit_timestamp: buf.get_u8() != 0,
     666            0 :                 _padding: [buf.get_u8(), buf.get_u8()],
     667            0 :             }
     668            0 :         }
     669              :     }
     670              : }
     671              : 
     672              : pub mod v15 {
     673              :     pub use super::v14::{
     674              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
     675              :         XlParameterChange,
     676              :     };
     677              : }
     678              : 
     679              : pub mod v16 {
     680              :     use bytes::{Buf, Bytes};
     681              : 
     682              :     pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
     683              :     use crate::{OffsetNumber, TransactionId};
     684              : 
     685              :     pub struct XlHeapDelete {
     686              :         pub xmax: TransactionId,
     687              :         pub offnum: OffsetNumber,
     688              :         pub infobits_set: u8,
     689              :         pub flags: u8,
     690              :     }
     691              : 
     692              :     impl XlHeapDelete {
     693            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     694            0 :             XlHeapDelete {
     695            0 :                 xmax: buf.get_u32_le(),
     696            0 :                 offnum: buf.get_u16_le(),
     697            0 :                 infobits_set: buf.get_u8(),
     698            0 :                 flags: buf.get_u8(),
     699            0 :             }
     700            0 :         }
     701              :     }
     702              : 
     703              :     #[repr(C)]
     704              :     #[derive(Debug)]
     705              :     pub struct XlHeapUpdate {
     706              :         pub old_xmax: TransactionId,
     707              :         pub old_offnum: OffsetNumber,
     708              :         pub old_infobits_set: u8,
     709              :         pub flags: u8,
     710              :         pub new_xmax: TransactionId,
     711              :         pub new_offnum: OffsetNumber,
     712              :     }
     713              : 
     714              :     impl XlHeapUpdate {
     715            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     716            0 :             XlHeapUpdate {
     717            0 :                 old_xmax: buf.get_u32_le(),
     718            0 :                 old_offnum: buf.get_u16_le(),
     719            0 :                 old_infobits_set: buf.get_u8(),
     720            0 :                 flags: buf.get_u8(),
     721            0 :                 new_xmax: buf.get_u32_le(),
     722            0 :                 new_offnum: buf.get_u16_le(),
     723            0 :             }
     724            0 :         }
     725              :     }
     726              : 
     727              :     #[repr(C)]
     728              :     #[derive(Debug)]
     729              :     pub struct XlHeapLock {
     730              :         pub locking_xid: TransactionId,
     731              :         pub offnum: OffsetNumber,
     732              :         pub infobits_set: u8,
     733              :         pub flags: u8,
     734              :     }
     735              : 
     736              :     impl XlHeapLock {
     737            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     738            0 :             XlHeapLock {
     739            0 :                 locking_xid: buf.get_u32_le(),
     740            0 :                 offnum: buf.get_u16_le(),
     741            0 :                 infobits_set: buf.get_u8(),
     742            0 :                 flags: buf.get_u8(),
     743            0 :             }
     744            0 :         }
     745              :     }
     746              : 
     747              :     /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
     748              :     pub mod rm_neon {
     749              :         use bytes::{Buf, Bytes};
     750              : 
     751              :         use crate::{OffsetNumber, TransactionId};
     752              : 
     753              :         #[repr(C)]
     754              :         #[derive(Debug)]
     755              :         pub struct XlNeonHeapInsert {
     756              :             pub offnum: OffsetNumber,
     757              :             pub flags: u8,
     758              :         }
     759              : 
     760              :         impl XlNeonHeapInsert {
     761            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
     762            0 :                 XlNeonHeapInsert {
     763            0 :                     offnum: buf.get_u16_le(),
     764            0 :                     flags: buf.get_u8(),
     765            0 :                 }
     766            0 :             }
     767              :         }
     768              : 
     769              :         #[repr(C)]
     770              :         #[derive(Debug)]
     771              :         pub struct XlNeonHeapMultiInsert {
     772              :             pub flags: u8,
     773              :             pub _padding: u8,
     774              :             pub ntuples: u16,
     775              :             pub t_cid: u32,
     776              :         }
     777              : 
     778              :         impl XlNeonHeapMultiInsert {
     779            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
     780            0 :                 XlNeonHeapMultiInsert {
     781            0 :                     flags: buf.get_u8(),
     782            0 :                     _padding: buf.get_u8(),
     783            0 :                     ntuples: buf.get_u16_le(),
     784            0 :                     t_cid: buf.get_u32_le(),
     785            0 :                 }
     786            0 :             }
     787              :         }
     788              : 
     789              :         #[repr(C)]
     790              :         #[derive(Debug)]
     791              :         pub struct XlNeonHeapDelete {
     792              :             pub xmax: TransactionId,
     793              :             pub offnum: OffsetNumber,
     794              :             pub infobits_set: u8,
     795              :             pub flags: u8,
     796              :             pub t_cid: u32,
     797              :         }
     798              : 
     799              :         impl XlNeonHeapDelete {
     800            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
     801            0 :                 XlNeonHeapDelete {
     802            0 :                     xmax: buf.get_u32_le(),
     803            0 :                     offnum: buf.get_u16_le(),
     804            0 :                     infobits_set: buf.get_u8(),
     805            0 :                     flags: buf.get_u8(),
     806            0 :                     t_cid: buf.get_u32_le(),
     807            0 :                 }
     808            0 :             }
     809              :         }
     810              : 
     811              :         #[repr(C)]
     812              :         #[derive(Debug)]
     813              :         pub struct XlNeonHeapUpdate {
     814              :             pub old_xmax: TransactionId,
     815              :             pub old_offnum: OffsetNumber,
     816              :             pub old_infobits_set: u8,
     817              :             pub flags: u8,
     818              :             pub t_cid: u32,
     819              :             pub new_xmax: TransactionId,
     820              :             pub new_offnum: OffsetNumber,
     821              :         }
     822              : 
     823              :         impl XlNeonHeapUpdate {
     824            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
     825            0 :                 XlNeonHeapUpdate {
     826            0 :                     old_xmax: buf.get_u32_le(),
     827            0 :                     old_offnum: buf.get_u16_le(),
     828            0 :                     old_infobits_set: buf.get_u8(),
     829            0 :                     flags: buf.get_u8(),
     830            0 :                     t_cid: buf.get_u32(),
     831            0 :                     new_xmax: buf.get_u32_le(),
     832            0 :                     new_offnum: buf.get_u16_le(),
     833            0 :                 }
     834            0 :             }
     835              :         }
     836              : 
     837              :         #[repr(C)]
     838              :         #[derive(Debug)]
     839              :         pub struct XlNeonHeapLock {
     840              :             pub locking_xid: TransactionId,
     841              :             pub t_cid: u32,
     842              :             pub offnum: OffsetNumber,
     843              :             pub infobits_set: u8,
     844              :             pub flags: u8,
     845              :         }
     846              : 
     847              :         impl XlNeonHeapLock {
     848            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
     849            0 :                 XlNeonHeapLock {
     850            0 :                     locking_xid: buf.get_u32_le(),
     851            0 :                     t_cid: buf.get_u32_le(),
     852            0 :                     offnum: buf.get_u16_le(),
     853            0 :                     infobits_set: buf.get_u8(),
     854            0 :                     flags: buf.get_u8(),
     855            0 :                 }
     856            0 :             }
     857              :         }
     858              :     }
     859              : }
     860              : 
     861              : pub mod v17 {
     862              :     use bytes::{Buf, Bytes};
     863              : 
     864              :     pub use super::v14::XlHeapLockUpdated;
     865              :     pub use super::v16::{
     866              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
     867              :         rm_neon,
     868              :     };
     869              :     pub use crate::{TimeLineID, TimestampTz};
     870              : 
     871              :     #[repr(C)]
     872              :     #[derive(Debug)]
     873              :     pub struct XlEndOfRecovery {
     874              :         pub end_time: TimestampTz,
     875              :         pub this_time_line_id: TimeLineID,
     876              :         pub prev_time_line_id: TimeLineID,
     877              :         pub wal_level: i32,
     878              :     }
     879              : 
     880              :     impl XlEndOfRecovery {
     881            0 :         pub fn decode(buf: &mut Bytes) -> XlEndOfRecovery {
     882            0 :             XlEndOfRecovery {
     883            0 :                 end_time: buf.get_i64_le(),
     884            0 :                 this_time_line_id: buf.get_u32_le(),
     885            0 :                 prev_time_line_id: buf.get_u32_le(),
     886            0 :                 wal_level: buf.get_i32_le(),
     887            0 :             }
     888            0 :         }
     889              :     }
     890              : }
     891              : 
     892              : #[repr(C)]
     893              : #[derive(Debug)]
     894              : pub struct XlSmgrCreate {
     895              :     pub rnode: RelFileNode,
     896              :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     897              :     // well-defined size?
     898              :     pub forknum: u8,
     899              : }
     900              : 
     901              : impl XlSmgrCreate {
     902           32 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     903           32 :         XlSmgrCreate {
     904           32 :             rnode: RelFileNode {
     905           32 :                 spcnode: buf.get_u32_le(), /* tablespace */
     906           32 :                 dbnode: buf.get_u32_le(),  /* database */
     907           32 :                 relnode: buf.get_u32_le(), /* relation */
     908           32 :             },
     909           32 :             forknum: buf.get_u32_le() as u8,
     910           32 :         }
     911           32 :     }
     912              : }
     913              : 
     914              : #[repr(C)]
     915            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
     916              : pub struct XlSmgrTruncate {
     917              :     pub blkno: BlockNumber,
     918              :     pub rnode: RelFileNode,
     919              :     pub flags: u32,
     920              : }
     921              : 
     922              : impl XlSmgrTruncate {
     923            0 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     924            0 :         XlSmgrTruncate {
     925            0 :             blkno: buf.get_u32_le(),
     926            0 :             rnode: RelFileNode {
     927            0 :                 spcnode: buf.get_u32_le(), /* tablespace */
     928            0 :                 dbnode: buf.get_u32_le(),  /* database */
     929            0 :                 relnode: buf.get_u32_le(), /* relation */
     930            0 :             },
     931            0 :             flags: buf.get_u32_le(),
     932            0 :         }
     933            0 :     }
     934              : }
     935              : 
     936              : #[repr(C)]
     937              : #[derive(Debug)]
     938              : pub struct XlCreateDatabase {
     939              :     pub db_id: Oid,
     940              :     pub tablespace_id: Oid,
     941              :     pub src_db_id: Oid,
     942              :     pub src_tablespace_id: Oid,
     943              : }
     944              : 
     945              : impl XlCreateDatabase {
     946            0 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     947            0 :         XlCreateDatabase {
     948            0 :             db_id: buf.get_u32_le(),
     949            0 :             tablespace_id: buf.get_u32_le(),
     950            0 :             src_db_id: buf.get_u32_le(),
     951            0 :             src_tablespace_id: buf.get_u32_le(),
     952            0 :         }
     953            0 :     }
     954              : }
     955              : 
     956              : #[repr(C)]
     957              : #[derive(Debug)]
     958              : pub struct XlDropDatabase {
     959              :     pub db_id: Oid,
     960              :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     961              :     pub tablespace_ids: Vec<Oid>,
     962              : }
     963              : 
     964              : impl XlDropDatabase {
     965            0 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     966            0 :         let mut rec = XlDropDatabase {
     967            0 :             db_id: buf.get_u32_le(),
     968            0 :             n_tablespaces: buf.get_u32_le(),
     969            0 :             tablespace_ids: Vec::<Oid>::new(),
     970            0 :         };
     971              : 
     972            0 :         for _i in 0..rec.n_tablespaces {
     973            0 :             let id = buf.get_u32_le();
     974            0 :             rec.tablespace_ids.push(id);
     975            0 :         }
     976              : 
     977            0 :         rec
     978            0 :     }
     979              : }
     980              : 
     981              : ///
     982              : /// Note: Parsing some fields is missing, because they're not needed.
     983              : ///
     984              : /// This is similar to the xl_xact_parsed_commit and
     985              : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     986              : /// struct for commits and aborts.
     987              : ///
     988            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
     989              : pub struct XlXactParsedRecord {
     990              :     pub xid: TransactionId,
     991              :     pub info: u8,
     992              :     pub xact_time: TimestampTz,
     993              :     pub xinfo: u32,
     994              : 
     995              :     pub db_id: Oid,
     996              :     /* MyDatabaseId */
     997              :     pub ts_id: Oid,
     998              :     /* MyDatabaseTableSpace */
     999              :     pub subxacts: Vec<TransactionId>,
    1000              : 
    1001              :     pub xnodes: Vec<RelFileNode>,
    1002              :     pub origin_lsn: Lsn,
    1003              : }
    1004              : 
    1005              : impl XlXactParsedRecord {
    1006              :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
    1007              :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
    1008              :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
    1009           16 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
    1010           16 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
    1011           16 :         // The record starts with time of commit/abort
    1012           16 :         let xact_time = buf.get_i64_le();
    1013           16 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
    1014           16 :             buf.get_u32_le()
    1015              :         } else {
    1016            0 :             0
    1017              :         };
    1018              :         let db_id;
    1019              :         let ts_id;
    1020           16 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
    1021           16 :             db_id = buf.get_u32_le();
    1022           16 :             ts_id = buf.get_u32_le();
    1023           16 :         } else {
    1024            0 :             db_id = 0;
    1025            0 :             ts_id = 0;
    1026            0 :         }
    1027           16 :         let mut subxacts = Vec::<TransactionId>::new();
    1028           16 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
    1029            0 :             let nsubxacts = buf.get_i32_le();
    1030            0 :             for _i in 0..nsubxacts {
    1031            0 :                 let subxact = buf.get_u32_le();
    1032            0 :                 subxacts.push(subxact);
    1033            0 :             }
    1034           16 :         }
    1035           16 :         let mut xnodes = Vec::<RelFileNode>::new();
    1036           16 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
    1037            0 :             let nrels = buf.get_i32_le();
    1038            0 :             for _i in 0..nrels {
    1039            0 :                 let spcnode = buf.get_u32_le();
    1040            0 :                 let dbnode = buf.get_u32_le();
    1041            0 :                 let relnode = buf.get_u32_le();
    1042            0 :                 tracing::trace!(
    1043            0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
    1044              :                     spcnode,
    1045              :                     dbnode,
    1046              :                     relnode
    1047              :                 );
    1048            0 :                 xnodes.push(RelFileNode {
    1049            0 :                     spcnode,
    1050            0 :                     dbnode,
    1051            0 :                     relnode,
    1052            0 :                 });
    1053              :             }
    1054           16 :         }
    1055              : 
    1056           16 :         if xinfo & crate::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
    1057            0 :             let nitems = buf.get_i32_le();
    1058            0 :             tracing::debug!(
    1059            0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
    1060              :                 nitems
    1061              :             );
    1062            0 :             let sizeof_xl_xact_stats_item = 12;
    1063            0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
    1064           16 :         }
    1065              : 
    1066           16 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
    1067           16 :             let nmsgs = buf.get_i32_le();
    1068           16 :             let sizeof_shared_invalidation_message = 16;
    1069           16 :             buf.advance(
    1070           16 :                 (nmsgs * sizeof_shared_invalidation_message)
    1071           16 :                     .try_into()
    1072           16 :                     .unwrap(),
    1073           16 :             );
    1074           16 :         }
    1075              : 
    1076           16 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
    1077            0 :             xid = buf.get_u32_le();
    1078            0 :             tracing::debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
    1079           16 :         }
    1080              : 
    1081           16 :         let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 {
    1082            0 :             Lsn(buf.get_u64_le())
    1083              :         } else {
    1084           16 :             Lsn::INVALID
    1085              :         };
    1086           16 :         XlXactParsedRecord {
    1087           16 :             xid,
    1088           16 :             info,
    1089           16 :             xact_time,
    1090           16 :             xinfo,
    1091           16 :             db_id,
    1092           16 :             ts_id,
    1093           16 :             subxacts,
    1094           16 :             xnodes,
    1095           16 :             origin_lsn,
    1096           16 :         }
    1097           16 :     }
    1098              : }
    1099              : 
    1100              : #[repr(C)]
    1101              : #[derive(Debug)]
    1102              : pub struct XlClogTruncate {
    1103              :     pub pageno: u32,
    1104              :     pub oldest_xid: TransactionId,
    1105              :     pub oldest_xid_db: Oid,
    1106              : }
    1107              : 
    1108              : impl XlClogTruncate {
    1109            0 :     pub fn decode(buf: &mut Bytes, pg_version: u32) -> XlClogTruncate {
    1110            0 :         XlClogTruncate {
    1111            0 :             pageno: if pg_version < 17 {
    1112            0 :                 buf.get_u32_le()
    1113              :             } else {
    1114            0 :                 buf.get_u64_le() as u32
    1115              :             },
    1116            0 :             oldest_xid: buf.get_u32_le(),
    1117            0 :             oldest_xid_db: buf.get_u32_le(),
    1118            0 :         }
    1119            0 :     }
    1120              : }
    1121              : 
    1122              : #[repr(C)]
    1123              : #[derive(Debug)]
    1124              : pub struct XlLogicalMessage {
    1125              :     pub db_id: Oid,
    1126              :     pub transactional: bool,
    1127              :     pub prefix_size: usize,
    1128              :     pub message_size: usize,
    1129              : }
    1130              : 
    1131              : impl XlLogicalMessage {
    1132          606 :     pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
    1133          606 :         XlLogicalMessage {
    1134          606 :             db_id: buf.get_u32_le(),
    1135          606 :             transactional: buf.get_u32_le() != 0, // 4-bytes alignment
    1136          606 :             prefix_size: buf.get_u64_le() as usize,
    1137          606 :             message_size: buf.get_u64_le() as usize,
    1138          606 :         }
    1139          606 :     }
    1140              : }
    1141              : 
    1142              : #[repr(C)]
    1143              : #[derive(Debug)]
    1144              : pub struct XlRunningXacts {
    1145              :     pub xcnt: u32,
    1146              :     pub subxcnt: u32,
    1147              :     pub subxid_overflow: bool,
    1148              :     pub next_xid: TransactionId,
    1149              :     pub oldest_running_xid: TransactionId,
    1150              :     pub latest_completed_xid: TransactionId,
    1151              :     pub xids: Vec<TransactionId>,
    1152              : }
    1153              : 
    1154              : impl XlRunningXacts {
    1155            0 :     pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
    1156            0 :         let xcnt = buf.get_u32_le();
    1157            0 :         let subxcnt = buf.get_u32_le();
    1158            0 :         let subxid_overflow = buf.get_u32_le() != 0;
    1159            0 :         let next_xid = buf.get_u32_le();
    1160            0 :         let oldest_running_xid = buf.get_u32_le();
    1161            0 :         let latest_completed_xid = buf.get_u32_le();
    1162            0 :         let mut xids = Vec::new();
    1163            0 :         for _ in 0..(xcnt + subxcnt) {
    1164            0 :             xids.push(buf.get_u32_le());
    1165            0 :         }
    1166            0 :         XlRunningXacts {
    1167            0 :             xcnt,
    1168            0 :             subxcnt,
    1169            0 :             subxid_overflow,
    1170            0 :             next_xid,
    1171            0 :             oldest_running_xid,
    1172            0 :             latest_completed_xid,
    1173            0 :             xids,
    1174            0 :         }
    1175            0 :     }
    1176              : }
    1177              : 
    1178            0 : pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
    1179            0 :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
    1180            0 :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
    1181            0 :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
    1182            0 :     // without worrying about security?
    1183            0 :     //
    1184            0 :     // But for now, we have a hand-written code for a few common WAL record types here.
    1185            0 : 
    1186            0 :     let mut buf = record.clone();
    1187              : 
    1188              :     // 1. Parse XLogRecord struct
    1189              : 
    1190              :     // FIXME: assume little-endian here
    1191            0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1192              : 
    1193              :     let unknown_str: String;
    1194              : 
    1195            0 :     let result: &str = match xlogrec.xl_rmid {
    1196              :         pg_constants::RM_HEAP2_ID => {
    1197            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1198            0 :             match info {
    1199            0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
    1200            0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
    1201              :                 _ => {
    1202            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1203            0 :                     &unknown_str
    1204              :                 }
    1205              :             }
    1206              :         }
    1207              :         pg_constants::RM_HEAP_ID => {
    1208            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1209            0 :             match info {
    1210            0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
    1211            0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
    1212            0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
    1213            0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
    1214              :                 _ => {
    1215            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1216            0 :                     &unknown_str
    1217              :                 }
    1218              :             }
    1219              :         }
    1220              :         pg_constants::RM_XLOG_ID => {
    1221            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1222            0 :             match info {
    1223            0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
    1224            0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
    1225              :                 _ => {
    1226            0 :                     unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
    1227            0 :                     &unknown_str
    1228              :                 }
    1229              :             }
    1230              :         }
    1231            0 :         rmid => {
    1232            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1233            0 : 
    1234            0 :             unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
    1235            0 :             &unknown_str
    1236              :         }
    1237              :     };
    1238              : 
    1239            0 :     Ok(String::from(result))
    1240            0 : }
        

Generated by: LCOV version 2.1-beta