LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - walrecord.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 37.9 % 575 218
Test Date: 2025-07-16 12:29:03 Functions: 14.8 % 61 9

            Line data    Source code
       1              : //! This module houses types used in decoding of PG WAL
       2              : //! records.
       3              : //!
       4              : //! TODO: Generate separate types for each supported PG version
       5              : 
       6              : use bytes::{Buf, Bytes};
       7              : use postgres_ffi_types::TimestampTz;
       8              : use serde::{Deserialize, Serialize};
       9              : use utils::bin_ser::DeserializeError;
      10              : use utils::lsn::Lsn;
      11              : 
      12              : use crate::{
      13              :     BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, PgMajorVersion,
      14              :     RepOriginId, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
      15              : };
      16              : 
      17              : #[repr(C)]
      18            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      19              : pub struct XlMultiXactCreate {
      20              :     pub mid: MultiXactId,
      21              :     /* new MultiXact's ID */
      22              :     pub moff: MultiXactOffset,
      23              :     /* its starting offset in members file */
      24              :     pub nmembers: u32,
      25              :     /* number of member XIDs */
      26              :     pub members: Vec<MultiXactMember>,
      27              : }
      28              : 
      29              : impl XlMultiXactCreate {
      30            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
      31            0 :         let mid = buf.get_u32_le();
      32            0 :         let moff = buf.get_u32_le();
      33            0 :         let nmembers = buf.get_u32_le();
      34            0 :         let mut members = Vec::new();
      35            0 :         for _ in 0..nmembers {
      36            0 :             members.push(MultiXactMember::decode(buf));
      37            0 :         }
      38            0 :         XlMultiXactCreate {
      39            0 :             mid,
      40            0 :             moff,
      41            0 :             nmembers,
      42            0 :             members,
      43            0 :         }
      44            0 :     }
      45              : }
      46              : 
      47              : #[repr(C)]
      48            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      49              : pub struct XlMultiXactTruncate {
      50              :     pub oldest_multi_db: Oid,
      51              :     /* to-be-truncated range of multixact offsets */
      52              :     pub start_trunc_off: MultiXactId,
      53              :     /* just for completeness' sake */
      54              :     pub end_trunc_off: MultiXactId,
      55              : 
      56              :     /* to-be-truncated range of multixact members */
      57              :     pub start_trunc_memb: MultiXactOffset,
      58              :     pub end_trunc_memb: MultiXactOffset,
      59              : }
      60              : 
      61              : impl XlMultiXactTruncate {
      62            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
      63            0 :         XlMultiXactTruncate {
      64            0 :             oldest_multi_db: buf.get_u32_le(),
      65            0 :             start_trunc_off: buf.get_u32_le(),
      66            0 :             end_trunc_off: buf.get_u32_le(),
      67            0 :             start_trunc_memb: buf.get_u32_le(),
      68            0 :             end_trunc_memb: buf.get_u32_le(),
      69            0 :         }
      70            0 :     }
      71              : }
      72              : 
      73              : #[repr(C)]
      74            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      75              : pub struct XlRelmapUpdate {
      76              :     pub dbid: Oid,   /* database ID, or 0 for shared map */
      77              :     pub tsid: Oid,   /* database's tablespace, or pg_global */
      78              :     pub nbytes: i32, /* size of relmap data */
      79              : }
      80              : 
      81              : impl XlRelmapUpdate {
      82            0 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
      83            0 :         XlRelmapUpdate {
      84            0 :             dbid: buf.get_u32_le(),
      85            0 :             tsid: buf.get_u32_le(),
      86            0 :             nbytes: buf.get_i32_le(),
      87            0 :         }
      88            0 :     }
      89              : }
      90              : 
      91              : #[repr(C)]
      92            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
      93              : pub struct XlReploriginDrop {
      94              :     pub node_id: RepOriginId,
      95              : }
      96              : 
      97              : impl XlReploriginDrop {
      98            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginDrop {
      99            0 :         XlReploriginDrop {
     100            0 :             node_id: buf.get_u16_le(),
     101            0 :         }
     102            0 :     }
     103              : }
     104              : 
     105              : #[repr(C)]
     106            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
     107              : pub struct XlReploriginSet {
     108              :     pub remote_lsn: Lsn,
     109              :     pub node_id: RepOriginId,
     110              : }
     111              : 
     112              : impl XlReploriginSet {
     113            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginSet {
     114            0 :         XlReploriginSet {
     115            0 :             remote_lsn: Lsn(buf.get_u64_le()),
     116            0 :             node_id: buf.get_u16_le(),
     117            0 :         }
     118            0 :     }
     119              : }
     120              : 
     121              : #[repr(C)]
     122            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     123              : pub struct RelFileNode {
     124              :     pub spcnode: Oid, /* tablespace */
     125              :     pub dbnode: Oid,  /* database */
     126              :     pub relnode: Oid, /* relation */
     127              : }
     128              : 
     129              : #[repr(C)]
     130            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     131              : pub struct MultiXactMember {
     132              :     pub xid: TransactionId,
     133              :     pub status: MultiXactStatus,
     134              : }
     135              : 
     136              : impl MultiXactMember {
     137            0 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     138            0 :         MultiXactMember {
     139            0 :             xid: buf.get_u32_le(),
     140            0 :             status: buf.get_u32_le(),
     141            0 :         }
     142            0 :     }
     143              : }
     144              : 
     145              : /// DecodedBkpBlock represents per-page data contained in a WAL record.
     146              : #[derive(Default)]
     147              : pub struct DecodedBkpBlock {
     148              :     /* Is this block ref in use? */
     149              :     //in_use: bool,
     150              : 
     151              :     /* Identify the block this refers to */
     152              :     pub rnode_spcnode: u32,
     153              :     pub rnode_dbnode: u32,
     154              :     pub rnode_relnode: u32,
     155              :     // Note that we have a few special forknum values for non-rel files.
     156              :     pub forknum: u8,
     157              :     pub blkno: u32,
     158              : 
     159              :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
     160              :     pub flags: u8,
     161              : 
     162              :     /* Information on full-page image, if any */
     163              :     pub has_image: bool,
     164              :     /* has image, even for consistency checking */
     165              :     pub apply_image: bool,
     166              :     /* has image that should be restored */
     167              :     pub will_init: bool,
     168              :     /* record doesn't need previous page version to apply */
     169              :     //char         *bkp_image;
     170              :     pub hole_offset: u16,
     171              :     pub hole_length: u16,
     172              :     pub bimg_offset: u32,
     173              :     pub bimg_len: u16,
     174              :     pub bimg_info: u8,
     175              : 
     176              :     /* Buffer holding the rmgr-specific data associated with this block */
     177              :     has_data: bool,
     178              :     data_len: u16,
     179              : }
     180              : 
     181              : impl DecodedBkpBlock {
     182        72821 :     pub fn new() -> DecodedBkpBlock {
     183        72821 :         Default::default()
     184        72821 :     }
     185              : }
     186              : 
     187              : #[derive(Default)]
     188              : pub struct DecodedWALRecord {
     189              :     pub xl_xid: TransactionId,
     190              :     pub xl_info: u8,
     191              :     pub xl_rmid: u8,
     192              :     pub record: Bytes, // raw XLogRecord
     193              : 
     194              :     pub blocks: Vec<DecodedBkpBlock>,
     195              :     pub main_data_offset: usize,
     196              :     pub origin_id: u16,
     197              : }
     198              : 
     199              : impl DecodedWALRecord {
     200              :     /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
     201              :     /// by reading other existing relations' data blocks.  This is more complex to apply than new-style database
     202              :     /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
     203        73532 :     pub fn is_dbase_create_copy(&self, pg_version: PgMajorVersion) -> bool {
     204        73532 :         if self.xl_rmid == pg_constants::RM_DBASE_ID {
     205            0 :             let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     206            0 :             match pg_version {
     207              :                 PgMajorVersion::PG14 => {
     208              :                     // Postgres 14 database creations are always the legacy kind
     209            0 :                     info == crate::v14::bindings::XLOG_DBASE_CREATE
     210              :                 }
     211            0 :                 PgMajorVersion::PG15 => info == crate::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     212            0 :                 PgMajorVersion::PG16 => info == crate::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     213            0 :                 PgMajorVersion::PG17 => info == crate::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     214              :             }
     215              :         } else {
     216        73532 :             false
     217              :         }
     218        73532 :     }
     219              : }
     220              : 
     221              : /// Main routine to decode a WAL record and figure out which blocks are modified
     222              : //
     223              : // See xlogrecord.h for details
     224              : // The overall layout of an XLOG record is:
     225              : //              Fixed-size header (XLogRecord struct)
     226              : //      XLogRecordBlockHeader struct
     227              : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     228              : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     229              : //                 XLogRecordBlockCompressHeader struct follows.
     230              : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     231              : //          BlockNumber follows
     232              : //      XLogRecordBlockHeader struct
     233              : //      ...
     234              : //      XLogRecordDataHeader[Short|Long] struct
     235              : //      block data
     236              : //      block data
     237              : //      ...
     238              : //      main data
     239              : //
     240              : //
     241              : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
     242              : // It would be more natural for this function to return a DecodedWALRecord as return value,
     243              : // but reusing the caller-supplied struct avoids an allocation.
     244              : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
     245              : //
     246        73532 : pub fn decode_wal_record(
     247        73532 :     record: Bytes,
     248        73532 :     decoded: &mut DecodedWALRecord,
     249        73532 :     pg_version: PgMajorVersion,
     250        73532 : ) -> anyhow::Result<()> {
     251        73532 :     let mut rnode_spcnode: u32 = 0;
     252        73532 :     let mut rnode_dbnode: u32 = 0;
     253        73532 :     let mut rnode_relnode: u32 = 0;
     254        73532 :     let mut got_rnode = false;
     255        73532 :     let mut origin_id: u16 = 0;
     256              : 
     257        73532 :     let mut buf = record.clone();
     258              : 
     259              :     // 1. Parse XLogRecord struct
     260              : 
     261              :     // FIXME: assume little-endian here
     262        73532 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     263              : 
     264        73532 :     tracing::trace!(
     265            0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
     266              :         xlogrec.xl_rmid,
     267              :         xlogrec.xl_info
     268              :     );
     269              : 
     270        73532 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
     271              : 
     272        73532 :     if buf.remaining() != remaining {
     273            0 :         //TODO error
     274        73532 :     }
     275              : 
     276        73532 :     let mut max_block_id = 0;
     277        73532 :     let mut blocks_total_len: u32 = 0;
     278        73532 :     let mut main_data_len = 0;
     279        73532 :     let mut datatotal: u32 = 0;
     280        73532 :     decoded.blocks.clear();
     281              : 
     282              :     // 2. Decode the headers.
     283              :     // XLogRecordBlockHeaders if any,
     284              :     // XLogRecordDataHeader[Short|Long]
     285       219873 :     while buf.remaining() > datatotal as usize {
     286       146341 :         let block_id = buf.get_u8();
     287              : 
     288       146341 :         match block_id {
     289        72906 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
     290        72906 :                 /* XLogRecordDataHeaderShort */
     291        72906 :                 main_data_len = buf.get_u8() as u32;
     292        72906 :                 datatotal += main_data_len;
     293        72906 :             }
     294              : 
     295          614 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
     296          614 :                 /* XLogRecordDataHeaderLong */
     297          614 :                 main_data_len = buf.get_u32_le();
     298          614 :                 datatotal += main_data_len;
     299          614 :             }
     300              : 
     301            0 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
     302            0 :                 // RepOriginId is uint16
     303            0 :                 origin_id = buf.get_u16_le();
     304            0 :             }
     305              : 
     306            0 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
     307            0 :                 // TransactionId is uint32
     308            0 :                 buf.advance(4);
     309            0 :             }
     310              : 
     311        72821 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
     312              :                 /* XLogRecordBlockHeader */
     313        72821 :                 let mut blk = DecodedBkpBlock::new();
     314              : 
     315        72821 :                 if block_id <= max_block_id {
     316        72821 :                     // TODO
     317        72821 :                     //report_invalid_record(state,
     318        72821 :                     //                    "out-of-order block_id %u at %X/%X",
     319        72821 :                     //                    block_id,
     320        72821 :                     //                    (uint32) (state->ReadRecPtr >> 32),
     321        72821 :                     //                    (uint32) state->ReadRecPtr);
     322        72821 :                     //    goto err;
     323        72821 :                 }
     324        72821 :                 max_block_id = block_id;
     325              : 
     326        72821 :                 let fork_flags: u8 = buf.get_u8();
     327        72821 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
     328        72821 :                 blk.flags = fork_flags;
     329        72821 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
     330        72821 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
     331        72821 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
     332        72821 :                 blk.data_len = buf.get_u16_le();
     333              : 
     334              :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
     335              : 
     336        72821 :                 datatotal += blk.data_len as u32;
     337        72821 :                 blocks_total_len += blk.data_len as u32;
     338              : 
     339        72821 :                 if blk.has_image {
     340           30 :                     blk.bimg_len = buf.get_u16_le();
     341           30 :                     blk.hole_offset = buf.get_u16_le();
     342           30 :                     blk.bimg_info = buf.get_u8();
     343              : 
     344           30 :                     blk.apply_image = dispatch_pgversion!(
     345           30 :                         pg_version,
     346            0 :                         (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
     347              :                     );
     348              : 
     349           30 :                     let blk_img_is_compressed =
     350           30 :                         crate::bkpimage_is_compressed(blk.bimg_info, pg_version);
     351              : 
     352           30 :                     if blk_img_is_compressed {
     353            0 :                         tracing::debug!("compressed block image , pg_version = {}", pg_version);
     354           30 :                     }
     355              : 
     356           30 :                     if blk_img_is_compressed {
     357            0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
     358            0 :                             blk.hole_length = buf.get_u16_le();
     359            0 :                         } else {
     360            0 :                             blk.hole_length = 0;
     361            0 :                         }
     362           30 :                     } else {
     363           30 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
     364           30 :                     }
     365           30 :                     datatotal += blk.bimg_len as u32;
     366           30 :                     blocks_total_len += blk.bimg_len as u32;
     367              : 
     368              :                     /*
     369              :                      * cross-check that hole_offset > 0, hole_length > 0 and
     370              :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
     371              :                      */
     372           30 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
     373           18 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
     374            0 :                     {
     375            0 :                         // TODO
     376            0 :                         /*
     377            0 :                         report_invalid_record(state,
     378            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
     379            0 :                                       (unsigned int) blk->hole_offset,
     380            0 :                                       (unsigned int) blk->hole_length,
     381            0 :                                       (unsigned int) blk->bimg_len,
     382            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     383            0 :                         goto err;
     384            0 :                                      */
     385           30 :                     }
     386              : 
     387              :                     /*
     388              :                      * cross-check that hole_offset == 0 and hole_length == 0 if
     389              :                      * the HAS_HOLE flag is not set.
     390              :                      */
     391           30 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     392           12 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
     393            0 :                     {
     394            0 :                         // TODO
     395            0 :                         /*
     396            0 :                         report_invalid_record(state,
     397            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
     398            0 :                                       (unsigned int) blk->hole_offset,
     399            0 :                                       (unsigned int) blk->hole_length,
     400            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     401            0 :                         goto err;
     402            0 :                                      */
     403           30 :                     }
     404              : 
     405              :                     /*
     406              :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
     407              :                      * flag is set.
     408              :                      */
     409           30 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
     410           12 :                         // TODO
     411           12 :                         /*
     412           12 :                         report_invalid_record(state,
     413           12 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
     414           12 :                                       (unsigned int) blk->bimg_len,
     415           12 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     416           12 :                         goto err;
     417           12 :                                      */
     418           18 :                     }
     419              : 
     420              :                     /*
     421              :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
     422              :                      * IS_COMPRESSED flag is set.
     423              :                      */
     424           30 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     425           12 :                         && !blk_img_is_compressed
     426           12 :                         && blk.bimg_len != BLCKSZ
     427            0 :                     {
     428            0 :                         // TODO
     429            0 :                         /*
     430            0 :                         report_invalid_record(state,
     431            0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
     432            0 :                                       (unsigned int) blk->data_len,
     433            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     434            0 :                         goto err;
     435            0 :                                      */
     436           30 :                     }
     437        72791 :                 }
     438        72821 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
     439        72821 :                     rnode_spcnode = buf.get_u32_le();
     440        72821 :                     rnode_dbnode = buf.get_u32_le();
     441        72821 :                     rnode_relnode = buf.get_u32_le();
     442        72821 :                     got_rnode = true;
     443        72821 :                 } else if !got_rnode {
     444            0 :                     // TODO
     445            0 :                     /*
     446            0 :                     report_invalid_record(state,
     447            0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
     448            0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     449            0 :                     goto err;           */
     450            0 :                 }
     451              : 
     452        72821 :                 blk.rnode_spcnode = rnode_spcnode;
     453        72821 :                 blk.rnode_dbnode = rnode_dbnode;
     454        72821 :                 blk.rnode_relnode = rnode_relnode;
     455              : 
     456        72821 :                 blk.blkno = buf.get_u32_le();
     457        72821 :                 tracing::trace!(
     458            0 :                     "this record affects {}/{}/{} blk {}",
     459              :                     rnode_spcnode,
     460              :                     rnode_dbnode,
     461              :                     rnode_relnode,
     462              :                     blk.blkno
     463              :                 );
     464              : 
     465        72821 :                 decoded.blocks.push(blk);
     466              :             }
     467              : 
     468            0 :             _ => {
     469            0 :                 // TODO: invalid block_id
     470            0 :             }
     471              :         }
     472              :     }
     473              : 
     474              :     // 3. Decode blocks.
     475        73532 :     let mut ptr = record.len() - buf.remaining();
     476        73532 :     for blk in decoded.blocks.iter_mut() {
     477        72821 :         if blk.has_image {
     478           30 :             blk.bimg_offset = ptr as u32;
     479           30 :             ptr += blk.bimg_len as usize;
     480        72791 :         }
     481        72821 :         if blk.has_data {
     482        72791 :             ptr += blk.data_len as usize;
     483        72791 :         }
     484              :     }
     485              :     // We don't need them, so just skip blocks_total_len bytes
     486        73532 :     buf.advance(blocks_total_len as usize);
     487        73532 :     assert_eq!(ptr, record.len() - buf.remaining());
     488              : 
     489        73532 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
     490              : 
     491              :     // 4. Decode main_data
     492        73532 :     if main_data_len > 0 {
     493        73520 :         assert_eq!(buf.remaining(), main_data_len as usize);
     494           12 :     }
     495              : 
     496        73532 :     decoded.xl_xid = xlogrec.xl_xid;
     497        73532 :     decoded.xl_info = xlogrec.xl_info;
     498        73532 :     decoded.xl_rmid = xlogrec.xl_rmid;
     499        73532 :     decoded.record = record;
     500        73532 :     decoded.origin_id = origin_id;
     501        73532 :     decoded.main_data_offset = main_data_offset;
     502              : 
     503        73532 :     Ok(())
     504        73532 : }
     505              : 
     506              : pub mod v14 {
     507              :     use bytes::{Buf, Bytes};
     508              : 
     509              :     use crate::{OffsetNumber, TransactionId};
     510              : 
     511              :     #[repr(C)]
     512              :     #[derive(Debug)]
     513              :     pub struct XlHeapInsert {
     514              :         pub offnum: OffsetNumber,
     515              :         pub flags: u8,
     516              :     }
     517              : 
     518              :     impl XlHeapInsert {
     519        72638 :         pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     520        72638 :             XlHeapInsert {
     521        72638 :                 offnum: buf.get_u16_le(),
     522        72638 :                 flags: buf.get_u8(),
     523        72638 :             }
     524        72638 :         }
     525              :     }
     526              : 
     527              :     #[repr(C)]
     528              :     #[derive(Debug)]
     529              :     pub struct XlHeapMultiInsert {
     530              :         pub flags: u8,
     531              :         pub _padding: u8,
     532              :         pub ntuples: u16,
     533              :     }
     534              : 
     535              :     impl XlHeapMultiInsert {
     536           21 :         pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     537           21 :             XlHeapMultiInsert {
     538           21 :                 flags: buf.get_u8(),
     539           21 :                 _padding: buf.get_u8(),
     540           21 :                 ntuples: buf.get_u16_le(),
     541           21 :             }
     542           21 :         }
     543              :     }
     544              : 
     545              :     #[repr(C)]
     546              :     #[derive(Debug)]
     547              :     pub struct XlHeapDelete {
     548              :         pub xmax: TransactionId,
     549              :         pub offnum: OffsetNumber,
     550              :         pub _padding: u16,
     551              :         pub t_cid: u32,
     552              :         pub infobits_set: u8,
     553              :         pub flags: u8,
     554              :     }
     555              : 
     556              :     impl XlHeapDelete {
     557            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     558            0 :             XlHeapDelete {
     559            0 :                 xmax: buf.get_u32_le(),
     560            0 :                 offnum: buf.get_u16_le(),
     561            0 :                 _padding: buf.get_u16_le(),
     562            0 :                 t_cid: buf.get_u32_le(),
     563            0 :                 infobits_set: buf.get_u8(),
     564            0 :                 flags: buf.get_u8(),
     565            0 :             }
     566            0 :         }
     567              :     }
     568              : 
     569              :     #[repr(C)]
     570              :     #[derive(Debug)]
     571              :     pub struct XlHeapUpdate {
     572              :         pub old_xmax: TransactionId,
     573              :         pub old_offnum: OffsetNumber,
     574              :         pub old_infobits_set: u8,
     575              :         pub flags: u8,
     576              :         pub t_cid: u32,
     577              :         pub new_xmax: TransactionId,
     578              :         pub new_offnum: OffsetNumber,
     579              :     }
     580              : 
     581              :     impl XlHeapUpdate {
     582            4 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     583            4 :             XlHeapUpdate {
     584            4 :                 old_xmax: buf.get_u32_le(),
     585            4 :                 old_offnum: buf.get_u16_le(),
     586            4 :                 old_infobits_set: buf.get_u8(),
     587            4 :                 flags: buf.get_u8(),
     588            4 :                 t_cid: buf.get_u32_le(),
     589            4 :                 new_xmax: buf.get_u32_le(),
     590            4 :                 new_offnum: buf.get_u16_le(),
     591            4 :             }
     592            4 :         }
     593              :     }
     594              : 
     595              :     #[repr(C)]
     596              :     #[derive(Debug)]
     597              :     pub struct XlHeapLock {
     598              :         pub locking_xid: TransactionId,
     599              :         pub offnum: OffsetNumber,
     600              :         pub _padding: u16,
     601              :         pub t_cid: u32,
     602              :         pub infobits_set: u8,
     603              :         pub flags: u8,
     604              :     }
     605              : 
     606              :     impl XlHeapLock {
     607            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     608            0 :             XlHeapLock {
     609            0 :                 locking_xid: buf.get_u32_le(),
     610            0 :                 offnum: buf.get_u16_le(),
     611            0 :                 _padding: buf.get_u16_le(),
     612            0 :                 t_cid: buf.get_u32_le(),
     613            0 :                 infobits_set: buf.get_u8(),
     614            0 :                 flags: buf.get_u8(),
     615            0 :             }
     616            0 :         }
     617              :     }
     618              : 
     619              :     #[repr(C)]
     620              :     #[derive(Debug)]
     621              :     pub struct XlHeapLockUpdated {
     622              :         pub xmax: TransactionId,
     623              :         pub offnum: OffsetNumber,
     624              :         pub infobits_set: u8,
     625              :         pub flags: u8,
     626              :     }
     627              : 
     628              :     impl XlHeapLockUpdated {
     629            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
     630            0 :             XlHeapLockUpdated {
     631            0 :                 xmax: buf.get_u32_le(),
     632            0 :                 offnum: buf.get_u16_le(),
     633            0 :                 infobits_set: buf.get_u8(),
     634            0 :                 flags: buf.get_u8(),
     635            0 :             }
     636            0 :         }
     637              :     }
     638              : 
     639              :     #[repr(C)]
     640              :     #[derive(Debug)]
     641              :     pub struct XlParameterChange {
     642              :         pub max_connections: i32,
     643              :         pub max_worker_processes: i32,
     644              :         pub max_wal_senders: i32,
     645              :         pub max_prepared_xacts: i32,
     646              :         pub max_locks_per_xact: i32,
     647              :         pub wal_level: i32,
     648              :         pub wal_log_hints: bool,
     649              :         pub track_commit_timestamp: bool,
     650              :         pub _padding: [u8; 2],
     651              :     }
     652              : 
     653              :     impl XlParameterChange {
     654            0 :         pub fn decode(buf: &mut Bytes) -> XlParameterChange {
     655            0 :             XlParameterChange {
     656            0 :                 max_connections: buf.get_i32_le(),
     657            0 :                 max_worker_processes: buf.get_i32_le(),
     658            0 :                 max_wal_senders: buf.get_i32_le(),
     659            0 :                 max_prepared_xacts: buf.get_i32_le(),
     660            0 :                 max_locks_per_xact: buf.get_i32_le(),
     661            0 :                 wal_level: buf.get_i32_le(),
     662            0 :                 wal_log_hints: buf.get_u8() != 0,
     663            0 :                 track_commit_timestamp: buf.get_u8() != 0,
     664            0 :                 _padding: [buf.get_u8(), buf.get_u8()],
     665            0 :             }
     666            0 :         }
     667              :     }
     668              : }
     669              : 
     670              : pub mod v15 {
     671              :     pub use super::v14::{
     672              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
     673              :         XlParameterChange,
     674              :     };
     675              : }
     676              : 
     677              : pub mod v16 {
     678              :     use bytes::{Buf, Bytes};
     679              : 
     680              :     pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
     681              :     use crate::{OffsetNumber, TransactionId};
     682              : 
     683              :     pub struct XlHeapDelete {
     684              :         pub xmax: TransactionId,
     685              :         pub offnum: OffsetNumber,
     686              :         pub infobits_set: u8,
     687              :         pub flags: u8,
     688              :     }
     689              : 
     690              :     impl XlHeapDelete {
     691            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     692            0 :             XlHeapDelete {
     693            0 :                 xmax: buf.get_u32_le(),
     694            0 :                 offnum: buf.get_u16_le(),
     695            0 :                 infobits_set: buf.get_u8(),
     696            0 :                 flags: buf.get_u8(),
     697            0 :             }
     698            0 :         }
     699              :     }
     700              : 
     701              :     #[repr(C)]
     702              :     #[derive(Debug)]
     703              :     pub struct XlHeapUpdate {
     704              :         pub old_xmax: TransactionId,
     705              :         pub old_offnum: OffsetNumber,
     706              :         pub old_infobits_set: u8,
     707              :         pub flags: u8,
     708              :         pub new_xmax: TransactionId,
     709              :         pub new_offnum: OffsetNumber,
     710              :     }
     711              : 
     712              :     impl XlHeapUpdate {
     713            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     714            0 :             XlHeapUpdate {
     715            0 :                 old_xmax: buf.get_u32_le(),
     716            0 :                 old_offnum: buf.get_u16_le(),
     717            0 :                 old_infobits_set: buf.get_u8(),
     718            0 :                 flags: buf.get_u8(),
     719            0 :                 new_xmax: buf.get_u32_le(),
     720            0 :                 new_offnum: buf.get_u16_le(),
     721            0 :             }
     722            0 :         }
     723              :     }
     724              : 
     725              :     #[repr(C)]
     726              :     #[derive(Debug)]
     727              :     pub struct XlHeapLock {
     728              :         pub locking_xid: TransactionId,
     729              :         pub offnum: OffsetNumber,
     730              :         pub infobits_set: u8,
     731              :         pub flags: u8,
     732              :     }
     733              : 
     734              :     impl XlHeapLock {
     735            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     736            0 :             XlHeapLock {
     737            0 :                 locking_xid: buf.get_u32_le(),
     738            0 :                 offnum: buf.get_u16_le(),
     739            0 :                 infobits_set: buf.get_u8(),
     740            0 :                 flags: buf.get_u8(),
     741            0 :             }
     742            0 :         }
     743              :     }
     744              : 
     745              :     /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
     746              :     pub mod rm_neon {
     747              :         use bytes::{Buf, Bytes};
     748              : 
     749              :         use crate::{OffsetNumber, TransactionId};
     750              : 
     751              :         #[repr(C)]
     752              :         #[derive(Debug)]
     753              :         pub struct XlNeonHeapInsert {
     754              :             pub offnum: OffsetNumber,
     755              :             pub flags: u8,
     756              :         }
     757              : 
     758              :         impl XlNeonHeapInsert {
     759            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
     760            0 :                 XlNeonHeapInsert {
     761            0 :                     offnum: buf.get_u16_le(),
     762            0 :                     flags: buf.get_u8(),
     763            0 :                 }
     764            0 :             }
     765              :         }
     766              : 
     767              :         #[repr(C)]
     768              :         #[derive(Debug)]
     769              :         pub struct XlNeonHeapMultiInsert {
     770              :             pub flags: u8,
     771              :             pub _padding: u8,
     772              :             pub ntuples: u16,
     773              :             pub t_cid: u32,
     774              :         }
     775              : 
     776              :         impl XlNeonHeapMultiInsert {
     777            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
     778            0 :                 XlNeonHeapMultiInsert {
     779            0 :                     flags: buf.get_u8(),
     780            0 :                     _padding: buf.get_u8(),
     781            0 :                     ntuples: buf.get_u16_le(),
     782            0 :                     t_cid: buf.get_u32_le(),
     783            0 :                 }
     784            0 :             }
     785              :         }
     786              : 
     787              :         #[repr(C)]
     788              :         #[derive(Debug)]
     789              :         pub struct XlNeonHeapDelete {
     790              :             pub xmax: TransactionId,
     791              :             pub offnum: OffsetNumber,
     792              :             pub infobits_set: u8,
     793              :             pub flags: u8,
     794              :             pub t_cid: u32,
     795              :         }
     796              : 
     797              :         impl XlNeonHeapDelete {
     798            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
     799            0 :                 XlNeonHeapDelete {
     800            0 :                     xmax: buf.get_u32_le(),
     801            0 :                     offnum: buf.get_u16_le(),
     802            0 :                     infobits_set: buf.get_u8(),
     803            0 :                     flags: buf.get_u8(),
     804            0 :                     t_cid: buf.get_u32_le(),
     805            0 :                 }
     806            0 :             }
     807              :         }
     808              : 
     809              :         #[repr(C)]
     810              :         #[derive(Debug)]
     811              :         pub struct XlNeonHeapUpdate {
     812              :             pub old_xmax: TransactionId,
     813              :             pub old_offnum: OffsetNumber,
     814              :             pub old_infobits_set: u8,
     815              :             pub flags: u8,
     816              :             pub t_cid: u32,
     817              :             pub new_xmax: TransactionId,
     818              :             pub new_offnum: OffsetNumber,
     819              :         }
     820              : 
     821              :         impl XlNeonHeapUpdate {
     822            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
     823            0 :                 XlNeonHeapUpdate {
     824            0 :                     old_xmax: buf.get_u32_le(),
     825            0 :                     old_offnum: buf.get_u16_le(),
     826            0 :                     old_infobits_set: buf.get_u8(),
     827            0 :                     flags: buf.get_u8(),
     828            0 :                     t_cid: buf.get_u32(),
     829            0 :                     new_xmax: buf.get_u32_le(),
     830            0 :                     new_offnum: buf.get_u16_le(),
     831            0 :                 }
     832            0 :             }
     833              :         }
     834              : 
     835              :         #[repr(C)]
     836              :         #[derive(Debug)]
     837              :         pub struct XlNeonHeapLock {
     838              :             pub locking_xid: TransactionId,
     839              :             pub t_cid: u32,
     840              :             pub offnum: OffsetNumber,
     841              :             pub infobits_set: u8,
     842              :             pub flags: u8,
     843              :         }
     844              : 
     845              :         impl XlNeonHeapLock {
     846            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
     847            0 :                 XlNeonHeapLock {
     848            0 :                     locking_xid: buf.get_u32_le(),
     849            0 :                     t_cid: buf.get_u32_le(),
     850            0 :                     offnum: buf.get_u16_le(),
     851            0 :                     infobits_set: buf.get_u8(),
     852            0 :                     flags: buf.get_u8(),
     853            0 :                 }
     854            0 :             }
     855              :         }
     856              :     }
     857              : }
     858              : 
     859              : pub mod v17 {
     860              :     use bytes::{Buf, Bytes};
     861              : 
     862              :     pub use super::v14::XlHeapLockUpdated;
     863              :     pub use super::v16::{
     864              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
     865              :         rm_neon,
     866              :     };
     867              :     pub use crate::TimeLineID;
     868              :     pub use postgres_ffi_types::TimestampTz;
     869              : 
     870              :     #[repr(C)]
     871              :     #[derive(Debug)]
     872              :     pub struct XlEndOfRecovery {
     873              :         pub end_time: TimestampTz,
     874              :         pub this_time_line_id: TimeLineID,
     875              :         pub prev_time_line_id: TimeLineID,
     876              :         pub wal_level: i32,
     877              :     }
     878              : 
     879              :     impl XlEndOfRecovery {
     880            0 :         pub fn decode(buf: &mut Bytes) -> XlEndOfRecovery {
     881            0 :             XlEndOfRecovery {
     882            0 :                 end_time: buf.get_i64_le(),
     883            0 :                 this_time_line_id: buf.get_u32_le(),
     884            0 :                 prev_time_line_id: buf.get_u32_le(),
     885            0 :                 wal_level: buf.get_i32_le(),
     886            0 :             }
     887            0 :         }
     888              :     }
     889              : }
     890              : 
     891              : #[repr(C)]
     892              : #[derive(Debug)]
     893              : pub struct XlSmgrCreate {
     894              :     pub rnode: RelFileNode,
     895              :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     896              :     // well-defined size?
     897              :     pub forknum: u8,
     898              : }
     899              : 
     900              : impl XlSmgrCreate {
     901            8 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     902            8 :         XlSmgrCreate {
     903            8 :             rnode: RelFileNode {
     904            8 :                 spcnode: buf.get_u32_le(), /* tablespace */
     905            8 :                 dbnode: buf.get_u32_le(),  /* database */
     906            8 :                 relnode: buf.get_u32_le(), /* relation */
     907            8 :             },
     908            8 :             forknum: buf.get_u32_le() as u8,
     909            8 :         }
     910            8 :     }
     911              : }
     912              : 
     913              : #[repr(C)]
     914            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
     915              : pub struct XlSmgrTruncate {
     916              :     pub blkno: BlockNumber,
     917              :     pub rnode: RelFileNode,
     918              :     pub flags: u32,
     919              : }
     920              : 
     921              : impl XlSmgrTruncate {
     922            0 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     923            0 :         XlSmgrTruncate {
     924            0 :             blkno: buf.get_u32_le(),
     925            0 :             rnode: RelFileNode {
     926            0 :                 spcnode: buf.get_u32_le(), /* tablespace */
     927            0 :                 dbnode: buf.get_u32_le(),  /* database */
     928            0 :                 relnode: buf.get_u32_le(), /* relation */
     929            0 :             },
     930            0 :             flags: buf.get_u32_le(),
     931            0 :         }
     932            0 :     }
     933              : }
     934              : 
     935              : #[repr(C)]
     936              : #[derive(Debug)]
     937              : pub struct XlCreateDatabase {
     938              :     pub db_id: Oid,
     939              :     pub tablespace_id: Oid,
     940              :     pub src_db_id: Oid,
     941              :     pub src_tablespace_id: Oid,
     942              : }
     943              : 
     944              : impl XlCreateDatabase {
     945            0 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     946            0 :         XlCreateDatabase {
     947            0 :             db_id: buf.get_u32_le(),
     948            0 :             tablespace_id: buf.get_u32_le(),
     949            0 :             src_db_id: buf.get_u32_le(),
     950            0 :             src_tablespace_id: buf.get_u32_le(),
     951            0 :         }
     952            0 :     }
     953              : }
     954              : 
     955              : #[repr(C)]
     956              : #[derive(Debug)]
     957              : pub struct XlDropDatabase {
     958              :     pub db_id: Oid,
     959              :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     960              :     pub tablespace_ids: Vec<Oid>,
     961              : }
     962              : 
     963              : impl XlDropDatabase {
     964            0 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     965            0 :         let mut rec = XlDropDatabase {
     966            0 :             db_id: buf.get_u32_le(),
     967            0 :             n_tablespaces: buf.get_u32_le(),
     968            0 :             tablespace_ids: Vec::<Oid>::new(),
     969            0 :         };
     970              : 
     971            0 :         for _i in 0..rec.n_tablespaces {
     972            0 :             let id = buf.get_u32_le();
     973            0 :             rec.tablespace_ids.push(id);
     974            0 :         }
     975              : 
     976            0 :         rec
     977            0 :     }
     978              : }
     979              : 
     980              : ///
     981              : /// Note: Parsing some fields is missing, because they're not needed.
     982              : ///
     983              : /// This is similar to the xl_xact_parsed_commit and
     984              : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     985              : /// struct for commits and aborts.
     986              : ///
     987            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
     988              : pub struct XlXactParsedRecord {
     989              :     pub xid: TransactionId,
     990              :     pub info: u8,
     991              :     pub xact_time: TimestampTz,
     992              :     pub xinfo: u32,
     993              : 
     994              :     pub db_id: Oid,
     995              :     /* MyDatabaseId */
     996              :     pub ts_id: Oid,
     997              :     /* MyDatabaseTableSpace */
     998              :     pub subxacts: Vec<TransactionId>,
     999              : 
    1000              :     pub xnodes: Vec<RelFileNode>,
    1001              :     pub origin_lsn: Lsn,
    1002              : }
    1003              : 
    1004              : impl XlXactParsedRecord {
    1005              :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
    1006              :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
    1007              :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
    1008            4 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
    1009            4 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
    1010              :         // The record starts with time of commit/abort
    1011            4 :         let xact_time = buf.get_i64_le();
    1012            4 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
    1013            4 :             buf.get_u32_le()
    1014              :         } else {
    1015            0 :             0
    1016              :         };
    1017              :         let db_id;
    1018              :         let ts_id;
    1019            4 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
    1020            4 :             db_id = buf.get_u32_le();
    1021            4 :             ts_id = buf.get_u32_le();
    1022            4 :         } else {
    1023            0 :             db_id = 0;
    1024            0 :             ts_id = 0;
    1025            0 :         }
    1026            4 :         let mut subxacts = Vec::<TransactionId>::new();
    1027            4 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
    1028            0 :             let nsubxacts = buf.get_i32_le();
    1029            0 :             for _i in 0..nsubxacts {
    1030            0 :                 let subxact = buf.get_u32_le();
    1031            0 :                 subxacts.push(subxact);
    1032            0 :             }
    1033            4 :         }
    1034            4 :         let mut xnodes = Vec::<RelFileNode>::new();
    1035            4 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
    1036            0 :             let nrels = buf.get_i32_le();
    1037            0 :             for _i in 0..nrels {
    1038            0 :                 let spcnode = buf.get_u32_le();
    1039            0 :                 let dbnode = buf.get_u32_le();
    1040            0 :                 let relnode = buf.get_u32_le();
    1041            0 :                 tracing::trace!(
    1042            0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
    1043              :                     spcnode,
    1044              :                     dbnode,
    1045              :                     relnode
    1046              :                 );
    1047            0 :                 xnodes.push(RelFileNode {
    1048            0 :                     spcnode,
    1049            0 :                     dbnode,
    1050            0 :                     relnode,
    1051            0 :                 });
    1052              :             }
    1053            4 :         }
    1054              : 
    1055            4 :         if xinfo & crate::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
    1056            0 :             let nitems = buf.get_i32_le();
    1057            0 :             tracing::debug!(
    1058            0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
    1059              :                 nitems
    1060              :             );
    1061            0 :             let sizeof_xl_xact_stats_item = 12;
    1062            0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
    1063            4 :         }
    1064              : 
    1065            4 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
    1066            4 :             let nmsgs = buf.get_i32_le();
    1067            4 :             let sizeof_shared_invalidation_message = 16;
    1068            4 :             buf.advance(
    1069            4 :                 (nmsgs * sizeof_shared_invalidation_message)
    1070            4 :                     .try_into()
    1071            4 :                     .unwrap(),
    1072            4 :             );
    1073            4 :         }
    1074              : 
    1075            4 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
    1076            0 :             xid = buf.get_u32_le();
    1077            0 :             tracing::debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
    1078            4 :         }
    1079              : 
    1080            4 :         let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 {
    1081            0 :             Lsn(buf.get_u64_le())
    1082              :         } else {
    1083            4 :             Lsn::INVALID
    1084              :         };
    1085            4 :         XlXactParsedRecord {
    1086            4 :             xid,
    1087            4 :             info,
    1088            4 :             xact_time,
    1089            4 :             xinfo,
    1090            4 :             db_id,
    1091            4 :             ts_id,
    1092            4 :             subxacts,
    1093            4 :             xnodes,
    1094            4 :             origin_lsn,
    1095            4 :         }
    1096            4 :     }
    1097              : }
    1098              : 
    1099              : #[repr(C)]
    1100              : #[derive(Debug)]
    1101              : pub struct XlClogTruncate {
    1102              :     pub pageno: u32,
    1103              :     pub oldest_xid: TransactionId,
    1104              :     pub oldest_xid_db: Oid,
    1105              : }
    1106              : 
    1107              : impl XlClogTruncate {
    1108            0 :     pub fn decode(buf: &mut Bytes, pg_version: PgMajorVersion) -> XlClogTruncate {
    1109              :         XlClogTruncate {
    1110            0 :             pageno: if pg_version < PgMajorVersion::PG17 {
    1111            0 :                 buf.get_u32_le()
    1112              :             } else {
    1113            0 :                 buf.get_u64_le() as u32
    1114              :             },
    1115            0 :             oldest_xid: buf.get_u32_le(),
    1116            0 :             oldest_xid_db: buf.get_u32_le(),
    1117              :         }
    1118            0 :     }
    1119              : }
    1120              : 
    1121              : #[repr(C)]
    1122              : #[derive(Debug)]
    1123              : pub struct XlLogicalMessage {
    1124              :     pub db_id: Oid,
    1125              :     pub transactional: bool,
    1126              :     pub prefix_size: usize,
    1127              :     pub message_size: usize,
    1128              : }
    1129              : 
    1130              : impl XlLogicalMessage {
    1131          606 :     pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
    1132          606 :         XlLogicalMessage {
    1133          606 :             db_id: buf.get_u32_le(),
    1134          606 :             transactional: buf.get_u32_le() != 0, // 4-bytes alignment
    1135          606 :             prefix_size: buf.get_u64_le() as usize,
    1136          606 :             message_size: buf.get_u64_le() as usize,
    1137          606 :         }
    1138          606 :     }
    1139              : }
    1140              : 
    1141              : #[repr(C)]
    1142              : #[derive(Debug)]
    1143              : pub struct XlRunningXacts {
    1144              :     pub xcnt: u32,
    1145              :     pub subxcnt: u32,
    1146              :     pub subxid_overflow: bool,
    1147              :     pub next_xid: TransactionId,
    1148              :     pub oldest_running_xid: TransactionId,
    1149              :     pub latest_completed_xid: TransactionId,
    1150              :     pub xids: Vec<TransactionId>,
    1151              : }
    1152              : 
    1153              : impl XlRunningXacts {
    1154            0 :     pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
    1155            0 :         let xcnt = buf.get_u32_le();
    1156            0 :         let subxcnt = buf.get_u32_le();
    1157            0 :         let subxid_overflow = buf.get_u32_le() != 0;
    1158            0 :         let next_xid = buf.get_u32_le();
    1159            0 :         let oldest_running_xid = buf.get_u32_le();
    1160            0 :         let latest_completed_xid = buf.get_u32_le();
    1161            0 :         let mut xids = Vec::new();
    1162            0 :         for _ in 0..(xcnt + subxcnt) {
    1163            0 :             xids.push(buf.get_u32_le());
    1164            0 :         }
    1165            0 :         XlRunningXacts {
    1166            0 :             xcnt,
    1167            0 :             subxcnt,
    1168            0 :             subxid_overflow,
    1169            0 :             next_xid,
    1170            0 :             oldest_running_xid,
    1171            0 :             latest_completed_xid,
    1172            0 :             xids,
    1173            0 :         }
    1174            0 :     }
    1175              : }
    1176              : 
    1177            0 : pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
    1178              :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
    1179              :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
    1180              :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
    1181              :     // without worrying about security?
    1182              :     //
    1183              :     // But for now, we have a hand-written code for a few common WAL record types here.
    1184              : 
    1185            0 :     let mut buf = record.clone();
    1186              : 
    1187              :     // 1. Parse XLogRecord struct
    1188              : 
    1189              :     // FIXME: assume little-endian here
    1190            0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1191              : 
    1192              :     let unknown_str: String;
    1193              : 
    1194            0 :     let result: &str = match xlogrec.xl_rmid {
    1195              :         pg_constants::RM_HEAP2_ID => {
    1196            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1197            0 :             match info {
    1198            0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
    1199            0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
    1200              :                 _ => {
    1201            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
    1202            0 :                     &unknown_str
    1203              :                 }
    1204              :             }
    1205              :         }
    1206              :         pg_constants::RM_HEAP_ID => {
    1207            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1208            0 :             match info {
    1209            0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
    1210            0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
    1211            0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
    1212            0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
    1213              :                 _ => {
    1214            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
    1215            0 :                     &unknown_str
    1216              :                 }
    1217              :             }
    1218              :         }
    1219              :         pg_constants::RM_XLOG_ID => {
    1220            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1221            0 :             match info {
    1222            0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
    1223            0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
    1224              :                 _ => {
    1225            0 :                     unknown_str = format!("XLOG UNKNOWN_0x{info:02x}");
    1226            0 :                     &unknown_str
    1227              :                 }
    1228              :             }
    1229              :         }
    1230            0 :         rmid => {
    1231            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1232              : 
    1233            0 :             unknown_str = format!("UNKNOWN_RM_{rmid} INFO_0x{info:02x}");
    1234            0 :             &unknown_str
    1235              :         }
    1236              :     };
    1237              : 
    1238            0 :     Ok(String::from(result))
    1239            0 : }
        

Generated by: LCOV version 2.1-beta