LCOV - code coverage report
Current view: top level - pageserver/src - walrecord.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 62.5 % 579 362
Test Date: 2024-02-12 20:26:03 Functions: 33.3 % 102 34

            Line data    Source code
       1              : //!
       2              : //! Functions for parsing WAL records.
       3              : //!
       4              : 
       5              : use anyhow::Result;
       6              : use bytes::{Buf, Bytes};
       7              : use postgres_ffi::dispatch_pgversion;
       8              : use postgres_ffi::pg_constants;
       9              : use postgres_ffi::BLCKSZ;
      10              : use postgres_ffi::{BlockNumber, TimestampTz};
      11              : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
      12              : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
      13              : use serde::{Deserialize, Serialize};
      14              : use tracing::*;
      15              : use utils::bin_ser::DeserializeError;
      16              : 
      17              : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
      18              : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
      19    378535554 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
      20              : pub enum NeonWalRecord {
      21              :     /// Native PostgreSQL WAL record
      22              :     Postgres { will_init: bool, rec: Bytes },
      23              : 
      24              :     /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
      25              :     ClearVisibilityMapFlags {
      26              :         new_heap_blkno: Option<u32>,
      27              :         old_heap_blkno: Option<u32>,
      28              :         flags: u8,
      29              :     },
      30              :     /// Mark transaction IDs as committed on a CLOG page
      31              :     ClogSetCommitted {
      32              :         xids: Vec<TransactionId>,
      33              :         timestamp: TimestampTz,
      34              :     },
      35              :     /// Mark transaction IDs as aborted on a CLOG page
      36              :     ClogSetAborted { xids: Vec<TransactionId> },
      37              :     /// Extend multixact offsets SLRU
      38              :     MultixactOffsetCreate {
      39              :         mid: MultiXactId,
      40              :         moff: MultiXactOffset,
      41              :     },
      42              :     /// Extend multixact members SLRU.
      43              :     MultixactMembersCreate {
      44              :         moff: MultiXactOffset,
      45              :         members: Vec<MultiXactMember>,
      46              :     },
      47              : }
      48              : 
      49              : impl NeonWalRecord {
      50              :     /// Does replaying this WAL record initialize the page from scratch, or does
      51              :     /// it need to be applied over the previous image of the page?
      52    128007188 :     pub fn will_init(&self) -> bool {
      53    128007188 :         match self {
      54    108542424 :             NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
      55              : 
      56              :             // None of the special neon record types currently initialize the page
      57     19464764 :             _ => false,
      58              :         }
      59    128007188 :     }
      60              : }
      61              : 
      62              : /// DecodedBkpBlock represents per-page data contained in a WAL record.
      63     66986358 : #[derive(Default)]
      64              : pub struct DecodedBkpBlock {
      65              :     /* Is this block ref in use? */
      66              :     //in_use: bool,
      67              : 
      68              :     /* Identify the block this refers to */
      69              :     pub rnode_spcnode: u32,
      70              :     pub rnode_dbnode: u32,
      71              :     pub rnode_relnode: u32,
      72              :     // Note that we have a few special forknum values for non-rel files.
      73              :     pub forknum: u8,
      74              :     pub blkno: u32,
      75              : 
      76              :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
      77              :     pub flags: u8,
      78              : 
      79              :     /* Information on full-page image, if any */
      80              :     pub has_image: bool,
      81              :     /* has image, even for consistency checking */
      82              :     pub apply_image: bool,
      83              :     /* has image that should be restored */
      84              :     pub will_init: bool,
      85              :     /* record doesn't need previous page version to apply */
      86              :     //char         *bkp_image;
      87              :     pub hole_offset: u16,
      88              :     pub hole_length: u16,
      89              :     pub bimg_offset: u32,
      90              :     pub bimg_len: u16,
      91              :     pub bimg_info: u8,
      92              : 
      93              :     /* Buffer holding the rmgr-specific data associated with this block */
      94              :     has_data: bool,
      95              :     data_len: u16,
      96              : }
      97              : 
      98              : impl DecodedBkpBlock {
      99     66986358 :     pub fn new() -> DecodedBkpBlock {
     100     66986358 :         Default::default()
     101     66986358 :     }
     102              : }
     103              : 
     104       790192 : #[derive(Default)]
     105              : pub struct DecodedWALRecord {
     106              :     pub xl_xid: TransactionId,
     107              :     pub xl_info: u8,
     108              :     pub xl_rmid: u8,
     109              :     pub record: Bytes, // raw XLogRecord
     110              : 
     111              :     pub blocks: Vec<DecodedBkpBlock>,
     112              :     pub main_data_offset: usize,
     113              : }
     114              : 
     115              : #[repr(C)]
     116            0 : #[derive(Debug, Clone, Copy)]
     117              : pub struct RelFileNode {
     118              :     pub spcnode: Oid, /* tablespace */
     119              :     pub dbnode: Oid,  /* database */
     120              :     pub relnode: Oid, /* relation */
     121              : }
     122              : 
     123              : #[repr(C)]
     124            0 : #[derive(Debug)]
     125              : pub struct XlRelmapUpdate {
     126              :     pub dbid: Oid,   /* database ID, or 0 for shared map */
     127              :     pub tsid: Oid,   /* database's tablespace, or pg_global */
     128              :     pub nbytes: i32, /* size of relmap data */
     129              : }
     130              : 
     131              : impl XlRelmapUpdate {
     132           62 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
     133           62 :         XlRelmapUpdate {
     134           62 :             dbid: buf.get_u32_le(),
     135           62 :             tsid: buf.get_u32_le(),
     136           62 :             nbytes: buf.get_i32_le(),
     137           62 :         }
     138           62 :     }
     139              : }
     140              : 
     141              : pub mod v14 {
     142              :     use bytes::{Buf, Bytes};
     143              :     use postgres_ffi::{OffsetNumber, TransactionId};
     144              : 
     145              :     #[repr(C)]
     146            0 :     #[derive(Debug)]
     147              :     pub struct XlHeapInsert {
     148              :         pub offnum: OffsetNumber,
     149              :         pub flags: u8,
     150              :     }
     151              : 
     152              :     impl XlHeapInsert {
     153     41821648 :         pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     154     41821648 :             XlHeapInsert {
     155     41821648 :                 offnum: buf.get_u16_le(),
     156     41821648 :                 flags: buf.get_u8(),
     157     41821648 :             }
     158     41821648 :         }
     159              :     }
     160              : 
     161              :     #[repr(C)]
     162            0 :     #[derive(Debug)]
     163              :     pub struct XlHeapMultiInsert {
     164              :         pub flags: u8,
     165              :         pub _padding: u8,
     166              :         pub ntuples: u16,
     167              :     }
     168              : 
     169              :     impl XlHeapMultiInsert {
     170      1126181 :         pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     171      1126181 :             XlHeapMultiInsert {
     172      1126181 :                 flags: buf.get_u8(),
     173      1126181 :                 _padding: buf.get_u8(),
     174      1126181 :                 ntuples: buf.get_u16_le(),
     175      1126181 :             }
     176      1126181 :         }
     177              :     }
     178              : 
     179              :     #[repr(C)]
     180            0 :     #[derive(Debug)]
     181              :     pub struct XlHeapDelete {
     182              :         pub xmax: TransactionId,
     183              :         pub offnum: OffsetNumber,
     184              :         pub _padding: u16,
     185              :         pub t_cid: u32,
     186              :         pub infobits_set: u8,
     187              :         pub flags: u8,
     188              :     }
     189              : 
     190              :     impl XlHeapDelete {
     191      2303981 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     192      2303981 :             XlHeapDelete {
     193      2303981 :                 xmax: buf.get_u32_le(),
     194      2303981 :                 offnum: buf.get_u16_le(),
     195      2303981 :                 _padding: buf.get_u16_le(),
     196      2303981 :                 t_cid: buf.get_u32_le(),
     197      2303981 :                 infobits_set: buf.get_u8(),
     198      2303981 :                 flags: buf.get_u8(),
     199      2303981 :             }
     200      2303981 :         }
     201              :     }
     202              : 
     203              :     #[repr(C)]
     204            0 :     #[derive(Debug)]
     205              :     pub struct XlHeapUpdate {
     206              :         pub old_xmax: TransactionId,
     207              :         pub old_offnum: OffsetNumber,
     208              :         pub old_infobits_set: u8,
     209              :         pub flags: u8,
     210              :         pub t_cid: u32,
     211              :         pub new_xmax: TransactionId,
     212              :         pub new_offnum: OffsetNumber,
     213              :     }
     214              : 
     215              :     impl XlHeapUpdate {
     216      3450018 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     217      3450018 :             XlHeapUpdate {
     218      3450018 :                 old_xmax: buf.get_u32_le(),
     219      3450018 :                 old_offnum: buf.get_u16_le(),
     220      3450018 :                 old_infobits_set: buf.get_u8(),
     221      3450018 :                 flags: buf.get_u8(),
     222      3450018 :                 t_cid: buf.get_u32_le(),
     223      3450018 :                 new_xmax: buf.get_u32_le(),
     224      3450018 :                 new_offnum: buf.get_u16_le(),
     225      3450018 :             }
     226      3450018 :         }
     227              :     }
     228              : 
     229              :     #[repr(C)]
     230            0 :     #[derive(Debug)]
     231              :     pub struct XlHeapLock {
     232              :         pub locking_xid: TransactionId,
     233              :         pub offnum: OffsetNumber,
     234              :         pub _padding: u16,
     235              :         pub t_cid: u32,
     236              :         pub infobits_set: u8,
     237              :         pub flags: u8,
     238              :     }
     239              : 
     240              :     impl XlHeapLock {
     241      2760101 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     242      2760101 :             XlHeapLock {
     243      2760101 :                 locking_xid: buf.get_u32_le(),
     244      2760101 :                 offnum: buf.get_u16_le(),
     245      2760101 :                 _padding: buf.get_u16_le(),
     246      2760101 :                 t_cid: buf.get_u32_le(),
     247      2760101 :                 infobits_set: buf.get_u8(),
     248      2760101 :                 flags: buf.get_u8(),
     249      2760101 :             }
     250      2760101 :         }
     251              :     }
     252              : 
     253              :     #[repr(C)]
     254            0 :     #[derive(Debug)]
     255              :     pub struct XlHeapLockUpdated {
     256              :         pub xmax: TransactionId,
     257              :         pub offnum: OffsetNumber,
     258              :         pub infobits_set: u8,
     259              :         pub flags: u8,
     260              :     }
     261              : 
     262              :     impl XlHeapLockUpdated {
     263         4146 :         pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
     264         4146 :             XlHeapLockUpdated {
     265         4146 :                 xmax: buf.get_u32_le(),
     266         4146 :                 offnum: buf.get_u16_le(),
     267         4146 :                 infobits_set: buf.get_u8(),
     268         4146 :                 flags: buf.get_u8(),
     269         4146 :             }
     270         4146 :         }
     271              :     }
     272              : }
     273              : 
     274              : pub mod v15 {
     275              :     pub use super::v14::{
     276              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
     277              :     };
     278              : }
     279              : 
     280              : pub mod v16 {
     281              :     pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
     282              :     use bytes::{Buf, Bytes};
     283              :     use postgres_ffi::{OffsetNumber, TransactionId};
     284              : 
     285              :     pub struct XlHeapDelete {
     286              :         pub xmax: TransactionId,
     287              :         pub offnum: OffsetNumber,
     288              :         pub infobits_set: u8,
     289              :         pub flags: u8,
     290              :     }
     291              : 
     292              :     impl XlHeapDelete {
     293            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     294            0 :             XlHeapDelete {
     295            0 :                 xmax: buf.get_u32_le(),
     296            0 :                 offnum: buf.get_u16_le(),
     297            0 :                 infobits_set: buf.get_u8(),
     298            0 :                 flags: buf.get_u8(),
     299            0 :             }
     300            0 :         }
     301              :     }
     302              : 
     303              :     #[repr(C)]
     304            0 :     #[derive(Debug)]
     305              :     pub struct XlHeapUpdate {
     306              :         pub old_xmax: TransactionId,
     307              :         pub old_offnum: OffsetNumber,
     308              :         pub old_infobits_set: u8,
     309              :         pub flags: u8,
     310              :         pub new_xmax: TransactionId,
     311              :         pub new_offnum: OffsetNumber,
     312              :     }
     313              : 
     314              :     impl XlHeapUpdate {
     315            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     316            0 :             XlHeapUpdate {
     317            0 :                 old_xmax: buf.get_u32_le(),
     318            0 :                 old_offnum: buf.get_u16_le(),
     319            0 :                 old_infobits_set: buf.get_u8(),
     320            0 :                 flags: buf.get_u8(),
     321            0 :                 new_xmax: buf.get_u32_le(),
     322            0 :                 new_offnum: buf.get_u16_le(),
     323            0 :             }
     324            0 :         }
     325              :     }
     326              : 
     327              :     #[repr(C)]
     328            0 :     #[derive(Debug)]
     329              :     pub struct XlHeapLock {
     330              :         pub locking_xid: TransactionId,
     331              :         pub offnum: OffsetNumber,
     332              :         pub infobits_set: u8,
     333              :         pub flags: u8,
     334              :     }
     335              : 
     336              :     impl XlHeapLock {
     337            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     338            0 :             XlHeapLock {
     339            0 :                 locking_xid: buf.get_u32_le(),
     340            0 :                 offnum: buf.get_u16_le(),
     341            0 :                 infobits_set: buf.get_u8(),
     342            0 :                 flags: buf.get_u8(),
     343            0 :             }
     344            0 :         }
     345              :     }
     346              : 
     347              :     /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
     348              :     pub mod rm_neon {
     349              :         use bytes::{Buf, Bytes};
     350              :         use postgres_ffi::{OffsetNumber, TransactionId};
     351              : 
     352              :         #[repr(C)]
     353            0 :         #[derive(Debug)]
     354              :         pub struct XlNeonHeapInsert {
     355              :             pub offnum: OffsetNumber,
     356              :             pub flags: u8,
     357              :         }
     358              : 
     359              :         impl XlNeonHeapInsert {
     360            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
     361            0 :                 XlNeonHeapInsert {
     362            0 :                     offnum: buf.get_u16_le(),
     363            0 :                     flags: buf.get_u8(),
     364            0 :                 }
     365            0 :             }
     366              :         }
     367              : 
     368              :         #[repr(C)]
     369            0 :         #[derive(Debug)]
     370              :         pub struct XlNeonHeapMultiInsert {
     371              :             pub flags: u8,
     372              :             pub _padding: u8,
     373              :             pub ntuples: u16,
     374              :             pub t_cid: u32,
     375              :         }
     376              : 
     377              :         impl XlNeonHeapMultiInsert {
     378            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
     379            0 :                 XlNeonHeapMultiInsert {
     380            0 :                     flags: buf.get_u8(),
     381            0 :                     _padding: buf.get_u8(),
     382            0 :                     ntuples: buf.get_u16_le(),
     383            0 :                     t_cid: buf.get_u32_le(),
     384            0 :                 }
     385            0 :             }
     386              :         }
     387              : 
     388              :         #[repr(C)]
     389            0 :         #[derive(Debug)]
     390              :         pub struct XlNeonHeapDelete {
     391              :             pub xmax: TransactionId,
     392              :             pub offnum: OffsetNumber,
     393              :             pub infobits_set: u8,
     394              :             pub flags: u8,
     395              :             pub t_cid: u32,
     396              :         }
     397              : 
     398              :         impl XlNeonHeapDelete {
     399            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
     400            0 :                 XlNeonHeapDelete {
     401            0 :                     xmax: buf.get_u32_le(),
     402            0 :                     offnum: buf.get_u16_le(),
     403            0 :                     infobits_set: buf.get_u8(),
     404            0 :                     flags: buf.get_u8(),
     405            0 :                     t_cid: buf.get_u32_le(),
     406            0 :                 }
     407            0 :             }
     408              :         }
     409              : 
     410              :         #[repr(C)]
     411            0 :         #[derive(Debug)]
     412              :         pub struct XlNeonHeapUpdate {
     413              :             pub old_xmax: TransactionId,
     414              :             pub old_offnum: OffsetNumber,
     415              :             pub old_infobits_set: u8,
     416              :             pub flags: u8,
     417              :             pub t_cid: u32,
     418              :             pub new_xmax: TransactionId,
     419              :             pub new_offnum: OffsetNumber,
     420              :         }
     421              : 
     422              :         impl XlNeonHeapUpdate {
     423            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
     424            0 :                 XlNeonHeapUpdate {
     425            0 :                     old_xmax: buf.get_u32_le(),
     426            0 :                     old_offnum: buf.get_u16_le(),
     427            0 :                     old_infobits_set: buf.get_u8(),
     428            0 :                     flags: buf.get_u8(),
     429            0 :                     t_cid: buf.get_u32(),
     430            0 :                     new_xmax: buf.get_u32_le(),
     431            0 :                     new_offnum: buf.get_u16_le(),
     432            0 :                 }
     433            0 :             }
     434              :         }
     435              : 
     436              :         #[repr(C)]
     437            0 :         #[derive(Debug)]
     438              :         pub struct XlNeonHeapLock {
     439              :             pub locking_xid: TransactionId,
     440              :             pub t_cid: u32,
     441              :             pub offnum: OffsetNumber,
     442              :             pub infobits_set: u8,
     443              :             pub flags: u8,
     444              :         }
     445              : 
     446              :         impl XlNeonHeapLock {
     447            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
     448            0 :                 XlNeonHeapLock {
     449            0 :                     locking_xid: buf.get_u32_le(),
     450            0 :                     t_cid: buf.get_u32_le(),
     451            0 :                     offnum: buf.get_u16_le(),
     452            0 :                     infobits_set: buf.get_u8(),
     453            0 :                     flags: buf.get_u8(),
     454            0 :                 }
     455            0 :             }
     456              :         }
     457              :     }
     458              : }
     459              : 
     460              : #[repr(C)]
     461            0 : #[derive(Debug)]
     462              : pub struct XlSmgrCreate {
     463              :     pub rnode: RelFileNode,
     464              :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     465              :     // well-defined size?
     466              :     pub forknum: u8,
     467              : }
     468              : 
     469              : impl XlSmgrCreate {
     470        75378 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     471        75378 :         XlSmgrCreate {
     472        75378 :             rnode: RelFileNode {
     473        75378 :                 spcnode: buf.get_u32_le(), /* tablespace */
     474        75378 :                 dbnode: buf.get_u32_le(),  /* database */
     475        75378 :                 relnode: buf.get_u32_le(), /* relation */
     476        75378 :             },
     477        75378 :             forknum: buf.get_u32_le() as u8,
     478        75378 :         }
     479        75378 :     }
     480              : }
     481              : 
     482              : #[repr(C)]
     483            0 : #[derive(Debug)]
     484              : pub struct XlSmgrTruncate {
     485              :     pub blkno: BlockNumber,
     486              :     pub rnode: RelFileNode,
     487              :     pub flags: u32,
     488              : }
     489              : 
     490              : impl XlSmgrTruncate {
     491          225 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     492          225 :         XlSmgrTruncate {
     493          225 :             blkno: buf.get_u32_le(),
     494          225 :             rnode: RelFileNode {
     495          225 :                 spcnode: buf.get_u32_le(), /* tablespace */
     496          225 :                 dbnode: buf.get_u32_le(),  /* database */
     497          225 :                 relnode: buf.get_u32_le(), /* relation */
     498          225 :             },
     499          225 :             flags: buf.get_u32_le(),
     500          225 :         }
     501          225 :     }
     502              : }
     503              : 
     504              : #[repr(C)]
     505            0 : #[derive(Debug)]
     506              : pub struct XlCreateDatabase {
     507              :     pub db_id: Oid,
     508              :     pub tablespace_id: Oid,
     509              :     pub src_db_id: Oid,
     510              :     pub src_tablespace_id: Oid,
     511              : }
     512              : 
     513              : impl XlCreateDatabase {
     514           24 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     515           24 :         XlCreateDatabase {
     516           24 :             db_id: buf.get_u32_le(),
     517           24 :             tablespace_id: buf.get_u32_le(),
     518           24 :             src_db_id: buf.get_u32_le(),
     519           24 :             src_tablespace_id: buf.get_u32_le(),
     520           24 :         }
     521           24 :     }
     522              : }
     523              : 
     524              : #[repr(C)]
     525            0 : #[derive(Debug)]
     526              : pub struct XlDropDatabase {
     527              :     pub db_id: Oid,
     528              :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     529              :     pub tablespace_ids: Vec<Oid>,
     530              : }
     531              : 
     532              : impl XlDropDatabase {
     533            3 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     534            3 :         let mut rec = XlDropDatabase {
     535            3 :             db_id: buf.get_u32_le(),
     536            3 :             n_tablespaces: buf.get_u32_le(),
     537            3 :             tablespace_ids: Vec::<Oid>::new(),
     538            3 :         };
     539              : 
     540            3 :         for _i in 0..rec.n_tablespaces {
     541            3 :             let id = buf.get_u32_le();
     542            3 :             rec.tablespace_ids.push(id);
     543            3 :         }
     544              : 
     545            3 :         rec
     546            3 :     }
     547              : }
     548              : 
     549              : ///
     550              : /// Note: Parsing some fields is missing, because they're not needed.
     551              : ///
     552              : /// This is similar to the xl_xact_parsed_commit and
     553              : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     554              : /// struct for commits and aborts.
     555              : ///
     556            0 : #[derive(Debug)]
     557              : pub struct XlXactParsedRecord {
     558              :     pub xid: TransactionId,
     559              :     pub info: u8,
     560              :     pub xact_time: TimestampTz,
     561              :     pub xinfo: u32,
     562              : 
     563              :     pub db_id: Oid,
     564              :     /* MyDatabaseId */
     565              :     pub ts_id: Oid,
     566              :     /* MyDatabaseTableSpace */
     567              :     pub subxacts: Vec<TransactionId>,
     568              : 
     569              :     pub xnodes: Vec<RelFileNode>,
     570              : }
     571              : 
     572              : impl XlXactParsedRecord {
     573              :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
     574              :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
     575              :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
     576      6132817 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
     577      6132817 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
     578      6132817 :         // The record starts with time of commit/abort
     579      6132817 :         let xact_time = buf.get_i64_le();
     580      6132817 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
     581      6128195 :             buf.get_u32_le()
     582              :         } else {
     583         4622 :             0
     584              :         };
     585              :         let db_id;
     586              :         let ts_id;
     587      6132817 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
     588      6123948 :             db_id = buf.get_u32_le();
     589      6123948 :             ts_id = buf.get_u32_le();
     590      6123948 :         } else {
     591         8869 :             db_id = 0;
     592         8869 :             ts_id = 0;
     593         8869 :         }
     594      6132817 :         let mut subxacts = Vec::<TransactionId>::new();
     595      6132817 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
     596          321 :             let nsubxacts = buf.get_i32_le();
     597       110245 :             for _i in 0..nsubxacts {
     598       110245 :                 let subxact = buf.get_u32_le();
     599       110245 :                 subxacts.push(subxact);
     600       110245 :             }
     601      6132496 :         }
     602      6132817 :         let mut xnodes = Vec::<RelFileNode>::new();
     603      6132817 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
     604        19498 :             let nrels = buf.get_i32_le();
     605        65436 :             for _i in 0..nrels {
     606        65436 :                 let spcnode = buf.get_u32_le();
     607        65436 :                 let dbnode = buf.get_u32_le();
     608        65436 :                 let relnode = buf.get_u32_le();
     609        65436 :                 trace!(
     610            0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
     611            0 :                     spcnode,
     612            0 :                     dbnode,
     613            0 :                     relnode
     614            0 :                 );
     615        65436 :                 xnodes.push(RelFileNode {
     616        65436 :                     spcnode,
     617        65436 :                     dbnode,
     618        65436 :                     relnode,
     619        65436 :                 });
     620              :             }
     621      6113319 :         }
     622              : 
     623      6132817 :         if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
     624            0 :             let nitems = buf.get_i32_le();
     625            0 :             debug!(
     626            0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
     627            0 :                 nitems
     628            0 :             );
     629            0 :             let sizeof_xl_xact_stats_item = 12;
     630            0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
     631      6132817 :         }
     632              : 
     633      6132817 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
     634        82785 :             let nmsgs = buf.get_i32_le();
     635        82785 :             let sizeof_shared_invalidation_message = 16;
     636        82785 :             buf.advance(
     637        82785 :                 (nmsgs * sizeof_shared_invalidation_message)
     638        82785 :                     .try_into()
     639        82785 :                     .unwrap(),
     640        82785 :             );
     641      6050032 :         }
     642              : 
     643      6132817 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
     644            2 :             xid = buf.get_u32_le();
     645            2 :             debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
     646      6132815 :         }
     647              : 
     648      6132817 :         XlXactParsedRecord {
     649      6132817 :             xid,
     650      6132817 :             info,
     651      6132817 :             xact_time,
     652      6132817 :             xinfo,
     653      6132817 :             db_id,
     654      6132817 :             ts_id,
     655      6132817 :             subxacts,
     656      6132817 :             xnodes,
     657      6132817 :         }
     658      6132817 :     }
     659              : }
     660              : 
     661              : #[repr(C)]
     662            0 : #[derive(Debug)]
     663              : pub struct XlClogTruncate {
     664              :     pub pageno: u32,
     665              :     pub oldest_xid: TransactionId,
     666              :     pub oldest_xid_db: Oid,
     667              : }
     668              : 
     669              : impl XlClogTruncate {
     670            3 :     pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
     671            3 :         XlClogTruncate {
     672            3 :             pageno: buf.get_u32_le(),
     673            3 :             oldest_xid: buf.get_u32_le(),
     674            3 :             oldest_xid_db: buf.get_u32_le(),
     675            3 :         }
     676            3 :     }
     677              : }
     678              : 
     679              : #[repr(C)]
     680       945060 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     681              : pub struct MultiXactMember {
     682              :     pub xid: TransactionId,
     683              :     pub status: MultiXactStatus,
     684              : }
     685              : 
     686              : impl MultiXactMember {
     687       474674 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     688       474674 :         MultiXactMember {
     689       474674 :             xid: buf.get_u32_le(),
     690       474674 :             status: buf.get_u32_le(),
     691       474674 :         }
     692       474674 :     }
     693              : }
     694              : 
     695              : #[repr(C)]
     696            0 : #[derive(Debug)]
     697              : pub struct XlMultiXactCreate {
     698              :     pub mid: MultiXactId,
     699              :     /* new MultiXact's ID */
     700              :     pub moff: MultiXactOffset,
     701              :     /* its starting offset in members file */
     702              :     pub nmembers: u32,
     703              :     /* number of member XIDs */
     704              :     pub members: Vec<MultiXactMember>,
     705              : }
     706              : 
     707              : impl XlMultiXactCreate {
     708        24816 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
     709        24816 :         let mid = buf.get_u32_le();
     710        24816 :         let moff = buf.get_u32_le();
     711        24816 :         let nmembers = buf.get_u32_le();
     712        24816 :         let mut members = Vec::new();
     713       474674 :         for _ in 0..nmembers {
     714       474674 :             members.push(MultiXactMember::decode(buf));
     715       474674 :         }
     716        24816 :         XlMultiXactCreate {
     717        24816 :             mid,
     718        24816 :             moff,
     719        24816 :             nmembers,
     720        24816 :             members,
     721        24816 :         }
     722        24816 :     }
     723              : }
     724              : 
     725              : #[repr(C)]
     726            0 : #[derive(Debug)]
     727              : pub struct XlMultiXactTruncate {
     728              :     pub oldest_multi_db: Oid,
     729              :     /* to-be-truncated range of multixact offsets */
     730              :     pub start_trunc_off: MultiXactId,
     731              :     /* just for completeness' sake */
     732              :     pub end_trunc_off: MultiXactId,
     733              : 
     734              :     /* to-be-truncated range of multixact members */
     735              :     pub start_trunc_memb: MultiXactOffset,
     736              :     pub end_trunc_memb: MultiXactOffset,
     737              : }
     738              : 
     739              : impl XlMultiXactTruncate {
     740            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
     741            0 :         XlMultiXactTruncate {
     742            0 :             oldest_multi_db: buf.get_u32_le(),
     743            0 :             start_trunc_off: buf.get_u32_le(),
     744            0 :             end_trunc_off: buf.get_u32_le(),
     745            0 :             start_trunc_memb: buf.get_u32_le(),
     746            0 :             end_trunc_memb: buf.get_u32_le(),
     747            0 :         }
     748            0 :     }
     749              : }
     750              : 
     751              : #[repr(C)]
     752            0 : #[derive(Debug)]
     753              : pub struct XlLogicalMessage {
     754              :     pub db_id: Oid,
     755              :     pub transactional: bool,
     756              :     pub prefix_size: usize,
     757              :     pub message_size: usize,
     758              : }
     759              : 
     760              : impl XlLogicalMessage {
     761          129 :     pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
     762          129 :         XlLogicalMessage {
     763          129 :             db_id: buf.get_u32_le(),
     764          129 :             transactional: buf.get_u32_le() != 0, // 4-bytes alignment
     765          129 :             prefix_size: buf.get_u64_le() as usize,
     766          129 :             message_size: buf.get_u64_le() as usize,
     767          129 :         }
     768          129 :     }
     769              : }
     770              : 
     771              : /// Main routine to decode a WAL record and figure out which blocks are modified
     772              : //
     773              : // See xlogrecord.h for details
     774              : // The overall layout of an XLOG record is:
     775              : //              Fixed-size header (XLogRecord struct)
     776              : //      XLogRecordBlockHeader struct
     777              : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     778              : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     779              : //                 XLogRecordBlockCompressHeader struct follows.
     780              : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     781              : //          BlockNumber follows
     782              : //      XLogRecordBlockHeader struct
     783              : //      ...
     784              : //      XLogRecordDataHeader[Short|Long] struct
     785              : //      block data
     786              : //      block data
     787              : //      ...
     788              : //      main data
     789              : //
     790              : //
     791              : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
     792              : // It would be more natural for this function to return a DecodedWALRecord as return value,
     793              : // but reusing the caller-supplied struct avoids an allocation.
     794              : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
     795              : //
     796     73047334 : pub fn decode_wal_record(
     797     73047334 :     record: Bytes,
     798     73047334 :     decoded: &mut DecodedWALRecord,
     799     73047334 :     pg_version: u32,
     800     73047334 : ) -> Result<()> {
     801     73047334 :     let mut rnode_spcnode: u32 = 0;
     802     73047334 :     let mut rnode_dbnode: u32 = 0;
     803     73047334 :     let mut rnode_relnode: u32 = 0;
     804     73047334 :     let mut got_rnode = false;
     805     73047334 : 
     806     73047334 :     let mut buf = record.clone();
     807              : 
     808              :     // 1. Parse XLogRecord struct
     809              : 
     810              :     // FIXME: assume little-endian here
     811     73047334 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     812              : 
     813     73047334 :     trace!(
     814            0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
     815            0 :         xlogrec.xl_rmid,
     816            0 :         xlogrec.xl_info
     817            0 :     );
     818              : 
     819     73047334 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
     820     73047334 : 
     821     73047334 :     if buf.remaining() != remaining {
     822            0 :         //TODO error
     823     73047334 :     }
     824              : 
     825     73047334 :     let mut max_block_id = 0;
     826     73047334 :     let mut blocks_total_len: u32 = 0;
     827     73047334 :     let mut main_data_len = 0;
     828     73047334 :     let mut datatotal: u32 = 0;
     829     73047334 :     decoded.blocks.clear();
     830              : 
     831              :     // 2. Decode the headers.
     832              :     // XLogRecordBlockHeaders if any,
     833              :     // XLogRecordDataHeader[Short|Long]
     834    212952724 :     while buf.remaining() > datatotal as usize {
     835    139905390 :         let block_id = buf.get_u8();
     836    139905390 : 
     837    139905390 :         match block_id {
     838     72658313 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
     839     72658313 :                 /* XLogRecordDataHeaderShort */
     840     72658313 :                 main_data_len = buf.get_u8() as u32;
     841     72658313 :                 datatotal += main_data_len;
     842     72658313 :             }
     843              : 
     844       149562 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
     845       149562 :                 /* XLogRecordDataHeaderLong */
     846       149562 :                 main_data_len = buf.get_u32_le();
     847       149562 :                 datatotal += main_data_len;
     848       149562 :             }
     849              : 
     850            8 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
     851            8 :                 // RepOriginId is uint16
     852            8 :                 buf.advance(2);
     853            8 :             }
     854              : 
     855       111149 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
     856       111149 :                 // TransactionId is uint32
     857       111149 :                 buf.advance(4);
     858       111149 :             }
     859              : 
     860     66986358 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
     861              :                 /* XLogRecordBlockHeader */
     862     66986358 :                 let mut blk = DecodedBkpBlock::new();
     863     66986358 : 
     864     66986358 :                 if block_id <= max_block_id {
     865     64121244 :                     // TODO
     866     64121244 :                     //report_invalid_record(state,
     867     64121244 :                     //                    "out-of-order block_id %u at %X/%X",
     868     64121244 :                     //                    block_id,
     869     64121244 :                     //                    (uint32) (state->ReadRecPtr >> 32),
     870     64121244 :                     //                    (uint32) state->ReadRecPtr);
     871     64121244 :                     //    goto err;
     872     64121244 :                 }
     873     66986358 :                 max_block_id = block_id;
     874     66986358 : 
     875     66986358 :                 let fork_flags: u8 = buf.get_u8();
     876     66986358 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
     877     66986358 :                 blk.flags = fork_flags;
     878     66986358 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
     879     66986358 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
     880     66986358 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
     881     66986358 :                 blk.data_len = buf.get_u16_le();
     882     66986358 : 
     883     66986358 :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
     884     66986358 : 
     885     66986358 :                 datatotal += blk.data_len as u32;
     886     66986358 :                 blocks_total_len += blk.data_len as u32;
     887     66986358 : 
     888     66986358 :                 if blk.has_image {
     889       275578 :                     blk.bimg_len = buf.get_u16_le();
     890       275578 :                     blk.hole_offset = buf.get_u16_le();
     891       275578 :                     blk.bimg_info = buf.get_u8();
     892              : 
     893            0 :                     blk.apply_image = dispatch_pgversion!(
     894       275578 :                         pg_version,
     895       275578 :                         (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
     896              :                     );
     897              : 
     898       275578 :                     let blk_img_is_compressed =
     899       275578 :                         postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
     900              : 
     901       275578 :                     if blk_img_is_compressed {
     902            0 :                         debug!("compressed block image , pg_version = {}", pg_version);
     903       275578 :                     }
     904              : 
     905       275578 :                     if blk_img_is_compressed {
     906            0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
     907            0 :                             blk.hole_length = buf.get_u16_le();
     908            0 :                         } else {
     909            0 :                             blk.hole_length = 0;
     910            0 :                         }
     911       275578 :                     } else {
     912       275578 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
     913       275578 :                     }
     914       275578 :                     datatotal += blk.bimg_len as u32;
     915       275578 :                     blocks_total_len += blk.bimg_len as u32;
     916       275578 : 
     917       275578 :                     /*
     918       275578 :                      * cross-check that hole_offset > 0, hole_length > 0 and
     919       275578 :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
     920       275578 :                      */
     921       275578 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
     922       212391 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
     923            0 :                     {
     924            0 :                         // TODO
     925            0 :                         /*
     926            0 :                         report_invalid_record(state,
     927            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
     928            0 :                                       (unsigned int) blk->hole_offset,
     929            0 :                                       (unsigned int) blk->hole_length,
     930            0 :                                       (unsigned int) blk->bimg_len,
     931            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     932            0 :                         goto err;
     933            0 :                                      */
     934       275578 :                     }
     935              : 
     936              :                     /*
     937              :                      * cross-check that hole_offset == 0 and hole_length == 0 if
     938              :                      * the HAS_HOLE flag is not set.
     939              :                      */
     940       275578 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     941        63187 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
     942            0 :                     {
     943            0 :                         // TODO
     944            0 :                         /*
     945            0 :                         report_invalid_record(state,
     946            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
     947            0 :                                       (unsigned int) blk->hole_offset,
     948            0 :                                       (unsigned int) blk->hole_length,
     949            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     950            0 :                         goto err;
     951            0 :                                      */
     952       275578 :                     }
     953              : 
     954              :                     /*
     955              :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
     956              :                      * flag is set.
     957              :                      */
     958       275578 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
     959        63187 :                         // TODO
     960        63187 :                         /*
     961        63187 :                         report_invalid_record(state,
     962        63187 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
     963        63187 :                                       (unsigned int) blk->bimg_len,
     964        63187 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     965        63187 :                         goto err;
     966        63187 :                                      */
     967       212391 :                     }
     968              : 
     969              :                     /*
     970              :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
     971              :                      * IS_COMPRESSED flag is set.
     972              :                      */
     973       275578 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     974        63187 :                         && !blk_img_is_compressed
     975        63187 :                         && blk.bimg_len != BLCKSZ
     976            0 :                     {
     977            0 :                         // TODO
     978            0 :                         /*
     979            0 :                         report_invalid_record(state,
     980            0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
     981            0 :                                       (unsigned int) blk->data_len,
     982            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     983            0 :                         goto err;
     984            0 :                                      */
     985       275578 :                     }
     986     66710780 :                 }
     987     66986358 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
     988     64125929 :                     rnode_spcnode = buf.get_u32_le();
     989     64125929 :                     rnode_dbnode = buf.get_u32_le();
     990     64125929 :                     rnode_relnode = buf.get_u32_le();
     991     64125929 :                     got_rnode = true;
     992     64125929 :                 } else if !got_rnode {
     993            0 :                     // TODO
     994            0 :                     /*
     995            0 :                     report_invalid_record(state,
     996            0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
     997            0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     998            0 :                     goto err;           */
     999      2860429 :                 }
    1000              : 
    1001     66986358 :                 blk.rnode_spcnode = rnode_spcnode;
    1002     66986358 :                 blk.rnode_dbnode = rnode_dbnode;
    1003     66986358 :                 blk.rnode_relnode = rnode_relnode;
    1004     66986358 : 
    1005     66986358 :                 blk.blkno = buf.get_u32_le();
    1006     66986358 :                 trace!(
    1007            0 :                     "this record affects {}/{}/{} blk {}",
    1008            0 :                     rnode_spcnode,
    1009            0 :                     rnode_dbnode,
    1010            0 :                     rnode_relnode,
    1011            0 :                     blk.blkno
    1012            0 :                 );
    1013              : 
    1014     66986358 :                 decoded.blocks.push(blk);
    1015              :             }
    1016              : 
    1017            0 :             _ => {
    1018            0 :                 // TODO: invalid block_id
    1019            0 :             }
    1020              :         }
    1021              :     }
    1022              : 
    1023              :     // 3. Decode blocks.
    1024     73047334 :     let mut ptr = record.len() - buf.remaining();
    1025     73047334 :     for blk in decoded.blocks.iter_mut() {
    1026     66986358 :         if blk.has_image {
    1027       275578 :             blk.bimg_offset = ptr as u32;
    1028       275578 :             ptr += blk.bimg_len as usize;
    1029     66710780 :         }
    1030     66986358 :         if blk.has_data {
    1031     57614293 :             ptr += blk.data_len as usize;
    1032     57614293 :         }
    1033              :     }
    1034              :     // We don't need them, so just skip blocks_total_len bytes
    1035     73047334 :     buf.advance(blocks_total_len as usize);
    1036     73047334 :     assert_eq!(ptr, record.len() - buf.remaining());
    1037              : 
    1038     73047334 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
    1039     73047334 : 
    1040     73047334 :     // 4. Decode main_data
    1041     73047334 :     if main_data_len > 0 {
    1042     72807875 :         assert_eq!(buf.remaining(), main_data_len as usize);
    1043       239459 :     }
    1044              : 
    1045     73047334 :     decoded.xl_xid = xlogrec.xl_xid;
    1046     73047334 :     decoded.xl_info = xlogrec.xl_info;
    1047     73047334 :     decoded.xl_rmid = xlogrec.xl_rmid;
    1048     73047334 :     decoded.record = record;
    1049     73047334 :     decoded.main_data_offset = main_data_offset;
    1050     73047334 : 
    1051     73047334 :     Ok(())
    1052     73047334 : }
    1053              : 
    1054              : ///
    1055              : /// Build a human-readable string to describe a WAL record
    1056              : ///
    1057              : /// For debugging purposes
    1058            0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
    1059            0 :     match rec {
    1060            0 :         NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
    1061            0 :             "will_init: {}, {}",
    1062            0 :             will_init,
    1063            0 :             describe_postgres_wal_record(rec)?
    1064              :         )),
    1065            0 :         _ => Ok(format!("{:?}", rec)),
    1066              :     }
    1067            0 : }
    1068              : 
    1069            0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
    1070            0 :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
    1071            0 :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
    1072            0 :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
    1073            0 :     // without worrying about security?
    1074            0 :     //
    1075            0 :     // But for now, we have a hand-written code for a few common WAL record types here.
    1076            0 : 
    1077            0 :     let mut buf = record.clone();
    1078              : 
    1079              :     // 1. Parse XLogRecord struct
    1080              : 
    1081              :     // FIXME: assume little-endian here
    1082            0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1083              : 
    1084              :     let unknown_str: String;
    1085              : 
    1086            0 :     let result: &str = match xlogrec.xl_rmid {
    1087              :         pg_constants::RM_HEAP2_ID => {
    1088            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1089            0 :             match info {
    1090            0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
    1091            0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
    1092              :                 _ => {
    1093            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1094            0 :                     &unknown_str
    1095              :                 }
    1096              :             }
    1097              :         }
    1098              :         pg_constants::RM_HEAP_ID => {
    1099            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1100            0 :             match info {
    1101            0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
    1102            0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
    1103            0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
    1104            0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
    1105              :                 _ => {
    1106            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1107            0 :                     &unknown_str
    1108              :                 }
    1109              :             }
    1110              :         }
    1111              :         pg_constants::RM_XLOG_ID => {
    1112            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1113            0 :             match info {
    1114            0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
    1115            0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
    1116              :                 _ => {
    1117            0 :                     unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
    1118            0 :                     &unknown_str
    1119              :                 }
    1120              :             }
    1121              :         }
    1122            0 :         rmid => {
    1123            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1124            0 : 
    1125            0 :             unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
    1126            0 :             &unknown_str
    1127              :         }
    1128              :     };
    1129              : 
    1130            0 :     Ok(String::from(result))
    1131            0 : }
        

Generated by: LCOV version 2.1-beta