LCOV - code coverage report
Current view: top level - pageserver/src - walrecord.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 62.5 % 579 362
Test Date: 2024-02-14 18:05:35 Functions: 33.3 % 102 34

            Line data    Source code
       1              : //!
       2              : //! Functions for parsing WAL records.
       3              : //!
       4              : 
       5              : use anyhow::Result;
       6              : use bytes::{Buf, Bytes};
       7              : use postgres_ffi::dispatch_pgversion;
       8              : use postgres_ffi::pg_constants;
       9              : use postgres_ffi::BLCKSZ;
      10              : use postgres_ffi::{BlockNumber, TimestampTz};
      11              : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
      12              : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
      13              : use serde::{Deserialize, Serialize};
      14              : use tracing::*;
      15              : use utils::bin_ser::DeserializeError;
      16              : 
      17              : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
      18              : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
      19    379215474 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
      20              : pub enum NeonWalRecord {
      21              :     /// Native PostgreSQL WAL record
      22              :     Postgres { will_init: bool, rec: Bytes },
      23              : 
      24              :     /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
      25              :     ClearVisibilityMapFlags {
      26              :         new_heap_blkno: Option<u32>,
      27              :         old_heap_blkno: Option<u32>,
      28              :         flags: u8,
      29              :     },
      30              :     /// Mark transaction IDs as committed on a CLOG page
      31              :     ClogSetCommitted {
      32              :         xids: Vec<TransactionId>,
      33              :         timestamp: TimestampTz,
      34              :     },
      35              :     /// Mark transaction IDs as aborted on a CLOG page
      36              :     ClogSetAborted { xids: Vec<TransactionId> },
      37              :     /// Extend multixact offsets SLRU
      38              :     MultixactOffsetCreate {
      39              :         mid: MultiXactId,
      40              :         moff: MultiXactOffset,
      41              :     },
      42              :     /// Extend multixact members SLRU.
      43              :     MultixactMembersCreate {
      44              :         moff: MultiXactOffset,
      45              :         members: Vec<MultiXactMember>,
      46              :     },
      47              :     /// Update the map of AUX files, either writing or dropping an entry
      48              :     AuxFile {
      49              :         file_path: String,
      50              :         content: Option<Bytes>,
      51              :     },
      52              : }
      53              : 
      54              : impl NeonWalRecord {
      55              :     /// Does replaying this WAL record initialize the page from scratch, or does
      56              :     /// it need to be applied over the previous image of the page?
      57    128218110 :     pub fn will_init(&self) -> bool {
      58    128218110 :         match self {
      59    108164222 :             NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
      60              : 
      61              :             // None of the special neon record types currently initialize the page
      62     20053888 :             _ => false,
      63              :         }
      64    128218110 :     }
      65              : }
      66              : 
      67              : /// DecodedBkpBlock represents per-page data contained in a WAL record.
      68     66719524 : #[derive(Default)]
      69              : pub struct DecodedBkpBlock {
      70              :     /* Is this block ref in use? */
      71              :     //in_use: bool,
      72              : 
      73              :     /* Identify the block this refers to */
      74              :     pub rnode_spcnode: u32,
      75              :     pub rnode_dbnode: u32,
      76              :     pub rnode_relnode: u32,
      77              :     // Note that we have a few special forknum values for non-rel files.
      78              :     pub forknum: u8,
      79              :     pub blkno: u32,
      80              : 
      81              :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
      82              :     pub flags: u8,
      83              : 
      84              :     /* Information on full-page image, if any */
      85              :     pub has_image: bool,
      86              :     /* has image, even for consistency checking */
      87              :     pub apply_image: bool,
      88              :     /* has image that should be restored */
      89              :     pub will_init: bool,
      90              :     /* record doesn't need previous page version to apply */
      91              :     //char         *bkp_image;
      92              :     pub hole_offset: u16,
      93              :     pub hole_length: u16,
      94              :     pub bimg_offset: u32,
      95              :     pub bimg_len: u16,
      96              :     pub bimg_info: u8,
      97              : 
      98              :     /* Buffer holding the rmgr-specific data associated with this block */
      99              :     has_data: bool,
     100              :     data_len: u16,
     101              : }
     102              : 
     103              : impl DecodedBkpBlock {
     104     66719524 :     pub fn new() -> DecodedBkpBlock {
     105     66719524 :         Default::default()
     106     66719524 :     }
     107              : }
     108              : 
     109       749223 : #[derive(Default)]
     110              : pub struct DecodedWALRecord {
     111              :     pub xl_xid: TransactionId,
     112              :     pub xl_info: u8,
     113              :     pub xl_rmid: u8,
     114              :     pub record: Bytes, // raw XLogRecord
     115              : 
     116              :     pub blocks: Vec<DecodedBkpBlock>,
     117              :     pub main_data_offset: usize,
     118              : }
     119              : 
     120              : #[repr(C)]
     121            0 : #[derive(Debug, Clone, Copy)]
     122              : pub struct RelFileNode {
     123              :     pub spcnode: Oid, /* tablespace */
     124              :     pub dbnode: Oid,  /* database */
     125              :     pub relnode: Oid, /* relation */
     126              : }
     127              : 
     128              : #[repr(C)]
     129            0 : #[derive(Debug)]
     130              : pub struct XlRelmapUpdate {
     131              :     pub dbid: Oid,   /* database ID, or 0 for shared map */
     132              :     pub tsid: Oid,   /* database's tablespace, or pg_global */
     133              :     pub nbytes: i32, /* size of relmap data */
     134              : }
     135              : 
     136              : impl XlRelmapUpdate {
     137           62 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
     138           62 :         XlRelmapUpdate {
     139           62 :             dbid: buf.get_u32_le(),
     140           62 :             tsid: buf.get_u32_le(),
     141           62 :             nbytes: buf.get_i32_le(),
     142           62 :         }
     143           62 :     }
     144              : }
     145              : 
     146              : pub mod v14 {
     147              :     use bytes::{Buf, Bytes};
     148              :     use postgres_ffi::{OffsetNumber, TransactionId};
     149              : 
     150              :     #[repr(C)]
     151            0 :     #[derive(Debug)]
     152              :     pub struct XlHeapInsert {
     153              :         pub offnum: OffsetNumber,
     154              :         pub flags: u8,
     155              :     }
     156              : 
     157              :     impl XlHeapInsert {
     158     41797477 :         pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     159     41797477 :             XlHeapInsert {
     160     41797477 :                 offnum: buf.get_u16_le(),
     161     41797477 :                 flags: buf.get_u8(),
     162     41797477 :             }
     163     41797477 :         }
     164              :     }
     165              : 
     166              :     #[repr(C)]
     167            0 :     #[derive(Debug)]
     168              :     pub struct XlHeapMultiInsert {
     169              :         pub flags: u8,
     170              :         pub _padding: u8,
     171              :         pub ntuples: u16,
     172              :     }
     173              : 
     174              :     impl XlHeapMultiInsert {
     175      1126297 :         pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     176      1126297 :             XlHeapMultiInsert {
     177      1126297 :                 flags: buf.get_u8(),
     178      1126297 :                 _padding: buf.get_u8(),
     179      1126297 :                 ntuples: buf.get_u16_le(),
     180      1126297 :             }
     181      1126297 :         }
     182              :     }
     183              : 
     184              :     #[repr(C)]
     185            0 :     #[derive(Debug)]
     186              :     pub struct XlHeapDelete {
     187              :         pub xmax: TransactionId,
     188              :         pub offnum: OffsetNumber,
     189              :         pub _padding: u16,
     190              :         pub t_cid: u32,
     191              :         pub infobits_set: u8,
     192              :         pub flags: u8,
     193              :     }
     194              : 
     195              :     impl XlHeapDelete {
     196      2303978 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     197      2303978 :             XlHeapDelete {
     198      2303978 :                 xmax: buf.get_u32_le(),
     199      2303978 :                 offnum: buf.get_u16_le(),
     200      2303978 :                 _padding: buf.get_u16_le(),
     201      2303978 :                 t_cid: buf.get_u32_le(),
     202      2303978 :                 infobits_set: buf.get_u8(),
     203      2303978 :                 flags: buf.get_u8(),
     204      2303978 :             }
     205      2303978 :         }
     206              :     }
     207              : 
     208              :     #[repr(C)]
     209            0 :     #[derive(Debug)]
     210              :     pub struct XlHeapUpdate {
     211              :         pub old_xmax: TransactionId,
     212              :         pub old_offnum: OffsetNumber,
     213              :         pub old_infobits_set: u8,
     214              :         pub flags: u8,
     215              :         pub t_cid: u32,
     216              :         pub new_xmax: TransactionId,
     217              :         pub new_offnum: OffsetNumber,
     218              :     }
     219              : 
     220              :     impl XlHeapUpdate {
     221      3248613 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     222      3248613 :             XlHeapUpdate {
     223      3248613 :                 old_xmax: buf.get_u32_le(),
     224      3248613 :                 old_offnum: buf.get_u16_le(),
     225      3248613 :                 old_infobits_set: buf.get_u8(),
     226      3248613 :                 flags: buf.get_u8(),
     227      3248613 :                 t_cid: buf.get_u32_le(),
     228      3248613 :                 new_xmax: buf.get_u32_le(),
     229      3248613 :                 new_offnum: buf.get_u16_le(),
     230      3248613 :             }
     231      3248613 :         }
     232              :     }
     233              : 
     234              :     #[repr(C)]
     235            0 :     #[derive(Debug)]
     236              :     pub struct XlHeapLock {
     237              :         pub locking_xid: TransactionId,
     238              :         pub offnum: OffsetNumber,
     239              :         pub _padding: u16,
     240              :         pub t_cid: u32,
     241              :         pub infobits_set: u8,
     242              :         pub flags: u8,
     243              :     }
     244              : 
     245              :     impl XlHeapLock {
     246      2740111 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     247      2740111 :             XlHeapLock {
     248      2740111 :                 locking_xid: buf.get_u32_le(),
     249      2740111 :                 offnum: buf.get_u16_le(),
     250      2740111 :                 _padding: buf.get_u16_le(),
     251      2740111 :                 t_cid: buf.get_u32_le(),
     252      2740111 :                 infobits_set: buf.get_u8(),
     253      2740111 :                 flags: buf.get_u8(),
     254      2740111 :             }
     255      2740111 :         }
     256              :     }
     257              : 
     258              :     #[repr(C)]
     259            0 :     #[derive(Debug)]
     260              :     pub struct XlHeapLockUpdated {
     261              :         pub xmax: TransactionId,
     262              :         pub offnum: OffsetNumber,
     263              :         pub infobits_set: u8,
     264              :         pub flags: u8,
     265              :     }
     266              : 
     267              :     impl XlHeapLockUpdated {
     268         4146 :         pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
     269         4146 :             XlHeapLockUpdated {
     270         4146 :                 xmax: buf.get_u32_le(),
     271         4146 :                 offnum: buf.get_u16_le(),
     272         4146 :                 infobits_set: buf.get_u8(),
     273         4146 :                 flags: buf.get_u8(),
     274         4146 :             }
     275         4146 :         }
     276              :     }
     277              : }
     278              : 
     279              : pub mod v15 {
     280              :     pub use super::v14::{
     281              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
     282              :     };
     283              : }
     284              : 
     285              : pub mod v16 {
     286              :     pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
     287              :     use bytes::{Buf, Bytes};
     288              :     use postgres_ffi::{OffsetNumber, TransactionId};
     289              : 
     290              :     pub struct XlHeapDelete {
     291              :         pub xmax: TransactionId,
     292              :         pub offnum: OffsetNumber,
     293              :         pub infobits_set: u8,
     294              :         pub flags: u8,
     295              :     }
     296              : 
     297              :     impl XlHeapDelete {
     298            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     299            0 :             XlHeapDelete {
     300            0 :                 xmax: buf.get_u32_le(),
     301            0 :                 offnum: buf.get_u16_le(),
     302            0 :                 infobits_set: buf.get_u8(),
     303            0 :                 flags: buf.get_u8(),
     304            0 :             }
     305            0 :         }
     306              :     }
     307              : 
     308              :     #[repr(C)]
     309            0 :     #[derive(Debug)]
     310              :     pub struct XlHeapUpdate {
     311              :         pub old_xmax: TransactionId,
     312              :         pub old_offnum: OffsetNumber,
     313              :         pub old_infobits_set: u8,
     314              :         pub flags: u8,
     315              :         pub new_xmax: TransactionId,
     316              :         pub new_offnum: OffsetNumber,
     317              :     }
     318              : 
     319              :     impl XlHeapUpdate {
     320            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     321            0 :             XlHeapUpdate {
     322            0 :                 old_xmax: buf.get_u32_le(),
     323            0 :                 old_offnum: buf.get_u16_le(),
     324            0 :                 old_infobits_set: buf.get_u8(),
     325            0 :                 flags: buf.get_u8(),
     326            0 :                 new_xmax: buf.get_u32_le(),
     327            0 :                 new_offnum: buf.get_u16_le(),
     328            0 :             }
     329            0 :         }
     330              :     }
     331              : 
     332              :     #[repr(C)]
     333            0 :     #[derive(Debug)]
     334              :     pub struct XlHeapLock {
     335              :         pub locking_xid: TransactionId,
     336              :         pub offnum: OffsetNumber,
     337              :         pub infobits_set: u8,
     338              :         pub flags: u8,
     339              :     }
     340              : 
     341              :     impl XlHeapLock {
     342            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     343            0 :             XlHeapLock {
     344            0 :                 locking_xid: buf.get_u32_le(),
     345            0 :                 offnum: buf.get_u16_le(),
     346            0 :                 infobits_set: buf.get_u8(),
     347            0 :                 flags: buf.get_u8(),
     348            0 :             }
     349            0 :         }
     350              :     }
     351              : 
     352              :     /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
     353              :     pub mod rm_neon {
     354              :         use bytes::{Buf, Bytes};
     355              :         use postgres_ffi::{OffsetNumber, TransactionId};
     356              : 
     357              :         #[repr(C)]
     358            0 :         #[derive(Debug)]
     359              :         pub struct XlNeonHeapInsert {
     360              :             pub offnum: OffsetNumber,
     361              :             pub flags: u8,
     362              :         }
     363              : 
     364              :         impl XlNeonHeapInsert {
     365            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
     366            0 :                 XlNeonHeapInsert {
     367            0 :                     offnum: buf.get_u16_le(),
     368            0 :                     flags: buf.get_u8(),
     369            0 :                 }
     370            0 :             }
     371              :         }
     372              : 
     373              :         #[repr(C)]
     374            0 :         #[derive(Debug)]
     375              :         pub struct XlNeonHeapMultiInsert {
     376              :             pub flags: u8,
     377              :             pub _padding: u8,
     378              :             pub ntuples: u16,
     379              :             pub t_cid: u32,
     380              :         }
     381              : 
     382              :         impl XlNeonHeapMultiInsert {
     383            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
     384            0 :                 XlNeonHeapMultiInsert {
     385            0 :                     flags: buf.get_u8(),
     386            0 :                     _padding: buf.get_u8(),
     387            0 :                     ntuples: buf.get_u16_le(),
     388            0 :                     t_cid: buf.get_u32_le(),
     389            0 :                 }
     390            0 :             }
     391              :         }
     392              : 
     393              :         #[repr(C)]
     394            0 :         #[derive(Debug)]
     395              :         pub struct XlNeonHeapDelete {
     396              :             pub xmax: TransactionId,
     397              :             pub offnum: OffsetNumber,
     398              :             pub infobits_set: u8,
     399              :             pub flags: u8,
     400              :             pub t_cid: u32,
     401              :         }
     402              : 
     403              :         impl XlNeonHeapDelete {
     404            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
     405            0 :                 XlNeonHeapDelete {
     406            0 :                     xmax: buf.get_u32_le(),
     407            0 :                     offnum: buf.get_u16_le(),
     408            0 :                     infobits_set: buf.get_u8(),
     409            0 :                     flags: buf.get_u8(),
     410            0 :                     t_cid: buf.get_u32_le(),
     411            0 :                 }
     412            0 :             }
     413              :         }
     414              : 
     415              :         #[repr(C)]
     416            0 :         #[derive(Debug)]
     417              :         pub struct XlNeonHeapUpdate {
     418              :             pub old_xmax: TransactionId,
     419              :             pub old_offnum: OffsetNumber,
     420              :             pub old_infobits_set: u8,
     421              :             pub flags: u8,
     422              :             pub t_cid: u32,
     423              :             pub new_xmax: TransactionId,
     424              :             pub new_offnum: OffsetNumber,
     425              :         }
     426              : 
     427              :         impl XlNeonHeapUpdate {
     428            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
     429            0 :                 XlNeonHeapUpdate {
     430            0 :                     old_xmax: buf.get_u32_le(),
     431            0 :                     old_offnum: buf.get_u16_le(),
     432            0 :                     old_infobits_set: buf.get_u8(),
     433            0 :                     flags: buf.get_u8(),
     434            0 :                     t_cid: buf.get_u32(),
     435            0 :                     new_xmax: buf.get_u32_le(),
     436            0 :                     new_offnum: buf.get_u16_le(),
     437            0 :                 }
     438            0 :             }
     439              :         }
     440              : 
     441              :         #[repr(C)]
     442            0 :         #[derive(Debug)]
     443              :         pub struct XlNeonHeapLock {
     444              :             pub locking_xid: TransactionId,
     445              :             pub t_cid: u32,
     446              :             pub offnum: OffsetNumber,
     447              :             pub infobits_set: u8,
     448              :             pub flags: u8,
     449              :         }
     450              : 
     451              :         impl XlNeonHeapLock {
     452            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
     453            0 :                 XlNeonHeapLock {
     454            0 :                     locking_xid: buf.get_u32_le(),
     455            0 :                     t_cid: buf.get_u32_le(),
     456            0 :                     offnum: buf.get_u16_le(),
     457            0 :                     infobits_set: buf.get_u8(),
     458            0 :                     flags: buf.get_u8(),
     459            0 :                 }
     460            0 :             }
     461              :         }
     462              :     }
     463              : }
     464              : 
     465              : #[repr(C)]
     466            0 : #[derive(Debug)]
     467              : pub struct XlSmgrCreate {
     468              :     pub rnode: RelFileNode,
     469              :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     470              :     // well-defined size?
     471              :     pub forknum: u8,
     472              : }
     473              : 
     474              : impl XlSmgrCreate {
     475        75385 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     476        75385 :         XlSmgrCreate {
     477        75385 :             rnode: RelFileNode {
     478        75385 :                 spcnode: buf.get_u32_le(), /* tablespace */
     479        75385 :                 dbnode: buf.get_u32_le(),  /* database */
     480        75385 :                 relnode: buf.get_u32_le(), /* relation */
     481        75385 :             },
     482        75385 :             forknum: buf.get_u32_le() as u8,
     483        75385 :         }
     484        75385 :     }
     485              : }
     486              : 
     487              : #[repr(C)]
     488            0 : #[derive(Debug)]
     489              : pub struct XlSmgrTruncate {
     490              :     pub blkno: BlockNumber,
     491              :     pub rnode: RelFileNode,
     492              :     pub flags: u32,
     493              : }
     494              : 
     495              : impl XlSmgrTruncate {
     496          212 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     497          212 :         XlSmgrTruncate {
     498          212 :             blkno: buf.get_u32_le(),
     499          212 :             rnode: RelFileNode {
     500          212 :                 spcnode: buf.get_u32_le(), /* tablespace */
     501          212 :                 dbnode: buf.get_u32_le(),  /* database */
     502          212 :                 relnode: buf.get_u32_le(), /* relation */
     503          212 :             },
     504          212 :             flags: buf.get_u32_le(),
     505          212 :         }
     506          212 :     }
     507              : }
     508              : 
     509              : #[repr(C)]
     510            0 : #[derive(Debug)]
     511              : pub struct XlCreateDatabase {
     512              :     pub db_id: Oid,
     513              :     pub tablespace_id: Oid,
     514              :     pub src_db_id: Oid,
     515              :     pub src_tablespace_id: Oid,
     516              : }
     517              : 
     518              : impl XlCreateDatabase {
     519           24 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     520           24 :         XlCreateDatabase {
     521           24 :             db_id: buf.get_u32_le(),
     522           24 :             tablespace_id: buf.get_u32_le(),
     523           24 :             src_db_id: buf.get_u32_le(),
     524           24 :             src_tablespace_id: buf.get_u32_le(),
     525           24 :         }
     526           24 :     }
     527              : }
     528              : 
     529              : #[repr(C)]
     530            0 : #[derive(Debug)]
     531              : pub struct XlDropDatabase {
     532              :     pub db_id: Oid,
     533              :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     534              :     pub tablespace_ids: Vec<Oid>,
     535              : }
     536              : 
     537              : impl XlDropDatabase {
     538            3 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     539            3 :         let mut rec = XlDropDatabase {
     540            3 :             db_id: buf.get_u32_le(),
     541            3 :             n_tablespaces: buf.get_u32_le(),
     542            3 :             tablespace_ids: Vec::<Oid>::new(),
     543            3 :         };
     544              : 
     545            3 :         for _i in 0..rec.n_tablespaces {
     546            3 :             let id = buf.get_u32_le();
     547            3 :             rec.tablespace_ids.push(id);
     548            3 :         }
     549              : 
     550            3 :         rec
     551            3 :     }
     552              : }
     553              : 
     554              : ///
     555              : /// Note: Parsing some fields is missing, because they're not needed.
     556              : ///
     557              : /// This is similar to the xl_xact_parsed_commit and
     558              : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     559              : /// struct for commits and aborts.
     560              : ///
     561            0 : #[derive(Debug)]
     562              : pub struct XlXactParsedRecord {
     563              :     pub xid: TransactionId,
     564              :     pub info: u8,
     565              :     pub xact_time: TimestampTz,
     566              :     pub xinfo: u32,
     567              : 
     568              :     pub db_id: Oid,
     569              :     /* MyDatabaseId */
     570              :     pub ts_id: Oid,
     571              :     /* MyDatabaseTableSpace */
     572              :     pub subxacts: Vec<TransactionId>,
     573              : 
     574              :     pub xnodes: Vec<RelFileNode>,
     575              : }
     576              : 
     577              : impl XlXactParsedRecord {
     578              :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
     579              :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
     580              :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
     581      6041345 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
     582      6041345 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
     583      6041345 :         // The record starts with time of commit/abort
     584      6041345 :         let xact_time = buf.get_i64_le();
     585      6041345 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
     586      6036723 :             buf.get_u32_le()
     587              :         } else {
     588         4622 :             0
     589              :         };
     590              :         let db_id;
     591              :         let ts_id;
     592      6041345 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
     593      6032476 :             db_id = buf.get_u32_le();
     594      6032476 :             ts_id = buf.get_u32_le();
     595      6032476 :         } else {
     596         8869 :             db_id = 0;
     597         8869 :             ts_id = 0;
     598         8869 :         }
     599      6041345 :         let mut subxacts = Vec::<TransactionId>::new();
     600      6041345 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
     601          321 :             let nsubxacts = buf.get_i32_le();
     602       110245 :             for _i in 0..nsubxacts {
     603       110245 :                 let subxact = buf.get_u32_le();
     604       110245 :                 subxacts.push(subxact);
     605       110245 :             }
     606      6041024 :         }
     607      6041345 :         let mut xnodes = Vec::<RelFileNode>::new();
     608      6041345 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
     609        19498 :             let nrels = buf.get_i32_le();
     610        65435 :             for _i in 0..nrels {
     611        65435 :                 let spcnode = buf.get_u32_le();
     612        65435 :                 let dbnode = buf.get_u32_le();
     613        65435 :                 let relnode = buf.get_u32_le();
     614        65435 :                 trace!(
     615            0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
     616            0 :                     spcnode,
     617            0 :                     dbnode,
     618            0 :                     relnode
     619            0 :                 );
     620        65435 :                 xnodes.push(RelFileNode {
     621        65435 :                     spcnode,
     622        65435 :                     dbnode,
     623        65435 :                     relnode,
     624        65435 :                 });
     625              :             }
     626      6021847 :         }
     627              : 
     628      6041345 :         if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
     629            0 :             let nitems = buf.get_i32_le();
     630            0 :             debug!(
     631            0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
     632            0 :                 nitems
     633            0 :             );
     634            0 :             let sizeof_xl_xact_stats_item = 12;
     635            0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
     636      6041345 :         }
     637              : 
     638      6041345 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
     639        82770 :             let nmsgs = buf.get_i32_le();
     640        82770 :             let sizeof_shared_invalidation_message = 16;
     641        82770 :             buf.advance(
     642        82770 :                 (nmsgs * sizeof_shared_invalidation_message)
     643        82770 :                     .try_into()
     644        82770 :                     .unwrap(),
     645        82770 :             );
     646      5958575 :         }
     647              : 
     648      6041345 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
     649            2 :             xid = buf.get_u32_le();
     650            2 :             debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
     651      6041343 :         }
     652              : 
     653      6041345 :         XlXactParsedRecord {
     654      6041345 :             xid,
     655      6041345 :             info,
     656      6041345 :             xact_time,
     657      6041345 :             xinfo,
     658      6041345 :             db_id,
     659      6041345 :             ts_id,
     660      6041345 :             subxacts,
     661      6041345 :             xnodes,
     662      6041345 :         }
     663      6041345 :     }
     664              : }
     665              : 
     666              : #[repr(C)]
     667            0 : #[derive(Debug)]
     668              : pub struct XlClogTruncate {
     669              :     pub pageno: u32,
     670              :     pub oldest_xid: TransactionId,
     671              :     pub oldest_xid_db: Oid,
     672              : }
     673              : 
     674              : impl XlClogTruncate {
     675            3 :     pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
     676            3 :         XlClogTruncate {
     677            3 :             pageno: buf.get_u32_le(),
     678            3 :             oldest_xid: buf.get_u32_le(),
     679            3 :             oldest_xid_db: buf.get_u32_le(),
     680            3 :         }
     681            3 :     }
     682              : }
     683              : 
     684              : #[repr(C)]
     685       945060 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     686              : pub struct MultiXactMember {
     687              :     pub xid: TransactionId,
     688              :     pub status: MultiXactStatus,
     689              : }
     690              : 
     691              : impl MultiXactMember {
     692       474674 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     693       474674 :         MultiXactMember {
     694       474674 :             xid: buf.get_u32_le(),
     695       474674 :             status: buf.get_u32_le(),
     696       474674 :         }
     697       474674 :     }
     698              : }
     699              : 
     700              : #[repr(C)]
     701            0 : #[derive(Debug)]
     702              : pub struct XlMultiXactCreate {
     703              :     pub mid: MultiXactId,
     704              :     /* new MultiXact's ID */
     705              :     pub moff: MultiXactOffset,
     706              :     /* its starting offset in members file */
     707              :     pub nmembers: u32,
     708              :     /* number of member XIDs */
     709              :     pub members: Vec<MultiXactMember>,
     710              : }
     711              : 
     712              : impl XlMultiXactCreate {
     713        24816 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
     714        24816 :         let mid = buf.get_u32_le();
     715        24816 :         let moff = buf.get_u32_le();
     716        24816 :         let nmembers = buf.get_u32_le();
     717        24816 :         let mut members = Vec::new();
     718       474674 :         for _ in 0..nmembers {
     719       474674 :             members.push(MultiXactMember::decode(buf));
     720       474674 :         }
     721        24816 :         XlMultiXactCreate {
     722        24816 :             mid,
     723        24816 :             moff,
     724        24816 :             nmembers,
     725        24816 :             members,
     726        24816 :         }
     727        24816 :     }
     728              : }
     729              : 
     730              : #[repr(C)]
     731            0 : #[derive(Debug)]
     732              : pub struct XlMultiXactTruncate {
     733              :     pub oldest_multi_db: Oid,
     734              :     /* to-be-truncated range of multixact offsets */
     735              :     pub start_trunc_off: MultiXactId,
     736              :     /* just for completeness' sake */
     737              :     pub end_trunc_off: MultiXactId,
     738              : 
     739              :     /* to-be-truncated range of multixact members */
     740              :     pub start_trunc_memb: MultiXactOffset,
     741              :     pub end_trunc_memb: MultiXactOffset,
     742              : }
     743              : 
     744              : impl XlMultiXactTruncate {
     745            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
     746            0 :         XlMultiXactTruncate {
     747            0 :             oldest_multi_db: buf.get_u32_le(),
     748            0 :             start_trunc_off: buf.get_u32_le(),
     749            0 :             end_trunc_off: buf.get_u32_le(),
     750            0 :             start_trunc_memb: buf.get_u32_le(),
     751            0 :             end_trunc_memb: buf.get_u32_le(),
     752            0 :         }
     753            0 :     }
     754              : }
     755              : 
     756              : #[repr(C)]
     757            0 : #[derive(Debug)]
     758              : pub struct XlLogicalMessage {
     759              :     pub db_id: Oid,
     760              :     pub transactional: bool,
     761              :     pub prefix_size: usize,
     762              :     pub message_size: usize,
     763              : }
     764              : 
     765              : impl XlLogicalMessage {
     766          127 :     pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
     767          127 :         XlLogicalMessage {
     768          127 :             db_id: buf.get_u32_le(),
     769          127 :             transactional: buf.get_u32_le() != 0, // 4-bytes alignment
     770          127 :             prefix_size: buf.get_u64_le() as usize,
     771          127 :             message_size: buf.get_u64_le() as usize,
     772          127 :         }
     773          127 :     }
     774              : }
     775              : 
     776              : /// Main routine to decode a WAL record and figure out which blocks are modified
     777              : //
     778              : // See xlogrecord.h for details
     779              : // The overall layout of an XLOG record is:
     780              : //              Fixed-size header (XLogRecord struct)
     781              : //      XLogRecordBlockHeader struct
     782              : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     783              : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     784              : //                 XLogRecordBlockCompressHeader struct follows.
     785              : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     786              : //          BlockNumber follows
     787              : //      XLogRecordBlockHeader struct
     788              : //      ...
     789              : //      XLogRecordDataHeader[Short|Long] struct
     790              : //      block data
     791              : //      block data
     792              : //      ...
     793              : //      main data
     794              : //
     795              : //
     796              : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
     797              : // It would be more natural for this function to return a DecodedWALRecord as return value,
     798              : // but reusing the caller-supplied struct avoids an allocation.
     799              : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
     800              : //
     801     72693846 : pub fn decode_wal_record(
     802     72693846 :     record: Bytes,
     803     72693846 :     decoded: &mut DecodedWALRecord,
     804     72693846 :     pg_version: u32,
     805     72693846 : ) -> Result<()> {
     806     72693846 :     let mut rnode_spcnode: u32 = 0;
     807     72693846 :     let mut rnode_dbnode: u32 = 0;
     808     72693846 :     let mut rnode_relnode: u32 = 0;
     809     72693846 :     let mut got_rnode = false;
     810     72693846 : 
     811     72693846 :     let mut buf = record.clone();
     812              : 
     813              :     // 1. Parse XLogRecord struct
     814              : 
     815              :     // FIXME: assume little-endian here
     816     72693846 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     817              : 
     818     72693846 :     trace!(
     819            0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
     820            0 :         xlogrec.xl_rmid,
     821            0 :         xlogrec.xl_info
     822            0 :     );
     823              : 
     824     72693846 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
     825     72693846 : 
     826     72693846 :     if buf.remaining() != remaining {
     827            0 :         //TODO error
     828     72693846 :     }
     829              : 
     830     72693846 :     let mut max_block_id = 0;
     831     72693846 :     let mut blocks_total_len: u32 = 0;
     832     72693846 :     let mut main_data_len = 0;
     833     72693846 :     let mut datatotal: u32 = 0;
     834     72693846 :     decoded.blocks.clear();
     835              : 
     836              :     // 2. Decode the headers.
     837              :     // XLogRecordBlockHeaders if any,
     838              :     // XLogRecordDataHeader[Short|Long]
     839    211978415 :     while buf.remaining() > datatotal as usize {
     840    139284569 :         let block_id = buf.get_u8();
     841    139284569 : 
     842    139284569 :         match block_id {
     843     72304744 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
     844     72304744 :                 /* XLogRecordDataHeaderShort */
     845     72304744 :                 main_data_len = buf.get_u8() as u32;
     846     72304744 :                 datatotal += main_data_len;
     847     72304744 :             }
     848              : 
     849       149143 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
     850       149143 :                 /* XLogRecordDataHeaderLong */
     851       149143 :                 main_data_len = buf.get_u32_le();
     852       149143 :                 datatotal += main_data_len;
     853       149143 :             }
     854              : 
     855            8 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
     856            8 :                 // RepOriginId is uint16
     857            8 :                 buf.advance(2);
     858            8 :             }
     859              : 
     860       111150 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
     861       111150 :                 // TransactionId is uint32
     862       111150 :                 buf.advance(4);
     863       111150 :             }
     864              : 
     865     66719524 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
     866              :                 /* XLogRecordBlockHeader */
     867     66719524 :                 let mut blk = DecodedBkpBlock::new();
     868     66719524 : 
     869     66719524 :                 if block_id <= max_block_id {
     870     63858708 :                     // TODO
     871     63858708 :                     //report_invalid_record(state,
     872     63858708 :                     //                    "out-of-order block_id %u at %X/%X",
     873     63858708 :                     //                    block_id,
     874     63858708 :                     //                    (uint32) (state->ReadRecPtr >> 32),
     875     63858708 :                     //                    (uint32) state->ReadRecPtr);
     876     63858708 :                     //    goto err;
     877     63858708 :                 }
     878     66719524 :                 max_block_id = block_id;
     879     66719524 : 
     880     66719524 :                 let fork_flags: u8 = buf.get_u8();
     881     66719524 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
     882     66719524 :                 blk.flags = fork_flags;
     883     66719524 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
     884     66719524 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
     885     66719524 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
     886     66719524 :                 blk.data_len = buf.get_u16_le();
     887     66719524 : 
     888     66719524 :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
     889     66719524 : 
     890     66719524 :                 datatotal += blk.data_len as u32;
     891     66719524 :                 blocks_total_len += blk.data_len as u32;
     892     66719524 : 
     893     66719524 :                 if blk.has_image {
     894       275690 :                     blk.bimg_len = buf.get_u16_le();
     895       275690 :                     blk.hole_offset = buf.get_u16_le();
     896       275690 :                     blk.bimg_info = buf.get_u8();
     897              : 
     898            0 :                     blk.apply_image = dispatch_pgversion!(
     899       275690 :                         pg_version,
     900       275690 :                         (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
     901              :                     );
     902              : 
     903       275690 :                     let blk_img_is_compressed =
     904       275690 :                         postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
     905              : 
     906       275690 :                     if blk_img_is_compressed {
     907            0 :                         debug!("compressed block image , pg_version = {}", pg_version);
     908       275690 :                     }
     909              : 
     910       275690 :                     if blk_img_is_compressed {
     911            0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
     912            0 :                             blk.hole_length = buf.get_u16_le();
     913            0 :                         } else {
     914            0 :                             blk.hole_length = 0;
     915            0 :                         }
     916       275690 :                     } else {
     917       275690 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
     918       275690 :                     }
     919       275690 :                     datatotal += blk.bimg_len as u32;
     920       275690 :                     blocks_total_len += blk.bimg_len as u32;
     921       275690 : 
     922       275690 :                     /*
     923       275690 :                      * cross-check that hole_offset > 0, hole_length > 0 and
     924       275690 :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
     925       275690 :                      */
     926       275690 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
     927       212337 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
     928            0 :                     {
     929            0 :                         // TODO
     930            0 :                         /*
     931            0 :                         report_invalid_record(state,
     932            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
     933            0 :                                       (unsigned int) blk->hole_offset,
     934            0 :                                       (unsigned int) blk->hole_length,
     935            0 :                                       (unsigned int) blk->bimg_len,
     936            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     937            0 :                         goto err;
     938            0 :                                      */
     939       275690 :                     }
     940              : 
     941              :                     /*
     942              :                      * cross-check that hole_offset == 0 and hole_length == 0 if
     943              :                      * the HAS_HOLE flag is not set.
     944              :                      */
     945       275690 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     946        63353 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
     947            0 :                     {
     948            0 :                         // TODO
     949            0 :                         /*
     950            0 :                         report_invalid_record(state,
     951            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
     952            0 :                                       (unsigned int) blk->hole_offset,
     953            0 :                                       (unsigned int) blk->hole_length,
     954            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     955            0 :                         goto err;
     956            0 :                                      */
     957       275690 :                     }
     958              : 
     959              :                     /*
     960              :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
     961              :                      * flag is set.
     962              :                      */
     963       275690 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
     964        63353 :                         // TODO
     965        63353 :                         /*
     966        63353 :                         report_invalid_record(state,
     967        63353 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
     968        63353 :                                       (unsigned int) blk->bimg_len,
     969        63353 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     970        63353 :                         goto err;
     971        63353 :                                      */
     972       212337 :                     }
     973              : 
     974              :                     /*
     975              :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
     976              :                      * IS_COMPRESSED flag is set.
     977              :                      */
     978       275690 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     979        63353 :                         && !blk_img_is_compressed
     980        63353 :                         && blk.bimg_len != BLCKSZ
     981            0 :                     {
     982            0 :                         // TODO
     983            0 :                         /*
     984            0 :                         report_invalid_record(state,
     985            0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
     986            0 :                                       (unsigned int) blk->data_len,
     987            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     988            0 :                         goto err;
     989            0 :                                      */
     990       275690 :                     }
     991     66443834 :                 }
     992     66719524 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
     993     63863445 :                     rnode_spcnode = buf.get_u32_le();
     994     63863445 :                     rnode_dbnode = buf.get_u32_le();
     995     63863445 :                     rnode_relnode = buf.get_u32_le();
     996     63863445 :                     got_rnode = true;
     997     63863445 :                 } else if !got_rnode {
     998            0 :                     // TODO
     999            0 :                     /*
    1000            0 :                     report_invalid_record(state,
    1001            0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
    1002            0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1003            0 :                     goto err;           */
    1004      2856079 :                 }
    1005              : 
    1006     66719524 :                 blk.rnode_spcnode = rnode_spcnode;
    1007     66719524 :                 blk.rnode_dbnode = rnode_dbnode;
    1008     66719524 :                 blk.rnode_relnode = rnode_relnode;
    1009     66719524 : 
    1010     66719524 :                 blk.blkno = buf.get_u32_le();
    1011     66719524 :                 trace!(
    1012            0 :                     "this record affects {}/{}/{} blk {}",
    1013            0 :                     rnode_spcnode,
    1014            0 :                     rnode_dbnode,
    1015            0 :                     rnode_relnode,
    1016            0 :                     blk.blkno
    1017            0 :                 );
    1018              : 
    1019     66719524 :                 decoded.blocks.push(blk);
    1020              :             }
    1021              : 
    1022            0 :             _ => {
    1023            0 :                 // TODO: invalid block_id
    1024            0 :             }
    1025              :         }
    1026              :     }
    1027              : 
    1028              :     // 3. Decode blocks.
    1029     72693846 :     let mut ptr = record.len() - buf.remaining();
    1030     72693846 :     for blk in decoded.blocks.iter_mut() {
    1031     66719524 :         if blk.has_image {
    1032       275690 :             blk.bimg_offset = ptr as u32;
    1033       275690 :             ptr += blk.bimg_len as usize;
    1034     66443834 :         }
    1035     66719524 :         if blk.has_data {
    1036     57372387 :             ptr += blk.data_len as usize;
    1037     57372387 :         }
    1038              :     }
    1039              :     // We don't need them, so just skip blocks_total_len bytes
    1040     72693846 :     buf.advance(blocks_total_len as usize);
    1041     72693846 :     assert_eq!(ptr, record.len() - buf.remaining());
    1042              : 
    1043     72693846 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
    1044     72693846 : 
    1045     72693846 :     // 4. Decode main_data
    1046     72693846 :     if main_data_len > 0 {
    1047     72453887 :         assert_eq!(buf.remaining(), main_data_len as usize);
    1048       239959 :     }
    1049              : 
    1050     72693846 :     decoded.xl_xid = xlogrec.xl_xid;
    1051     72693846 :     decoded.xl_info = xlogrec.xl_info;
    1052     72693846 :     decoded.xl_rmid = xlogrec.xl_rmid;
    1053     72693846 :     decoded.record = record;
    1054     72693846 :     decoded.main_data_offset = main_data_offset;
    1055     72693846 : 
    1056     72693846 :     Ok(())
    1057     72693846 : }
    1058              : 
    1059              : ///
    1060              : /// Build a human-readable string to describe a WAL record
    1061              : ///
    1062              : /// For debugging purposes
    1063            0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
    1064            0 :     match rec {
    1065            0 :         NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
    1066            0 :             "will_init: {}, {}",
    1067            0 :             will_init,
    1068            0 :             describe_postgres_wal_record(rec)?
    1069              :         )),
    1070            0 :         _ => Ok(format!("{:?}", rec)),
    1071              :     }
    1072            0 : }
    1073              : 
    1074            0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
    1075            0 :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
    1076            0 :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
    1077            0 :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
    1078            0 :     // without worrying about security?
    1079            0 :     //
    1080            0 :     // But for now, we have a hand-written code for a few common WAL record types here.
    1081            0 : 
    1082            0 :     let mut buf = record.clone();
    1083              : 
    1084              :     // 1. Parse XLogRecord struct
    1085              : 
    1086              :     // FIXME: assume little-endian here
    1087            0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1088              : 
    1089              :     let unknown_str: String;
    1090              : 
    1091            0 :     let result: &str = match xlogrec.xl_rmid {
    1092              :         pg_constants::RM_HEAP2_ID => {
    1093            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1094            0 :             match info {
    1095            0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
    1096            0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
    1097              :                 _ => {
    1098            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1099            0 :                     &unknown_str
    1100              :                 }
    1101              :             }
    1102              :         }
    1103              :         pg_constants::RM_HEAP_ID => {
    1104            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1105            0 :             match info {
    1106            0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
    1107            0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
    1108            0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
    1109            0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
    1110              :                 _ => {
    1111            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1112            0 :                     &unknown_str
    1113              :                 }
    1114              :             }
    1115              :         }
    1116              :         pg_constants::RM_XLOG_ID => {
    1117            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1118            0 :             match info {
    1119            0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
    1120            0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
    1121              :                 _ => {
    1122            0 :                     unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
    1123            0 :                     &unknown_str
    1124              :                 }
    1125              :             }
    1126              :         }
    1127            0 :         rmid => {
    1128            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1129            0 : 
    1130            0 :             unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
    1131            0 :             &unknown_str
    1132              :         }
    1133              :     };
    1134              : 
    1135            0 :     Ok(String::from(result))
    1136            0 : }
        

Generated by: LCOV version 2.1-beta