LCOV - code coverage report
Current view: top level - pageserver/src - walrecord.rs (source / functions) Coverage Total Hit
Test: 2453312769e0b6b061a2008879e6693300d0b938.info Lines: 42.2 % 611 258
Test Date: 2024-09-06 16:40:18 Functions: 26.7 % 60 16

            Line data    Source code
       1              : //!
       2              : //! Functions for parsing WAL records.
       3              : //!
       4              : 
       5              : use anyhow::Result;
       6              : use bytes::{Buf, Bytes};
       7              : use postgres_ffi::dispatch_pgversion;
       8              : use postgres_ffi::pg_constants;
       9              : use postgres_ffi::BLCKSZ;
      10              : use postgres_ffi::{BlockNumber, TimestampTz};
      11              : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
      12              : use postgres_ffi::{RepOriginId, XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
      13              : use serde::{Deserialize, Serialize};
      14              : use tracing::*;
      15              : use utils::{bin_ser::DeserializeError, lsn::Lsn};
      16              : 
      17              : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
      18              : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
      19         5454 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
      20              : pub enum NeonWalRecord {
      21              :     /// Native PostgreSQL WAL record
      22              :     Postgres { will_init: bool, rec: Bytes },
      23              : 
      24              :     /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
      25              :     ClearVisibilityMapFlags {
      26              :         new_heap_blkno: Option<u32>,
      27              :         old_heap_blkno: Option<u32>,
      28              :         flags: u8,
      29              :     },
      30              :     /// Mark transaction IDs as committed on a CLOG page
      31              :     ClogSetCommitted {
      32              :         xids: Vec<TransactionId>,
      33              :         timestamp: TimestampTz,
      34              :     },
      35              :     /// Mark transaction IDs as aborted on a CLOG page
      36              :     ClogSetAborted { xids: Vec<TransactionId> },
      37              :     /// Extend multixact offsets SLRU
      38              :     MultixactOffsetCreate {
      39              :         mid: MultiXactId,
      40              :         moff: MultiXactOffset,
      41              :     },
      42              :     /// Extend multixact members SLRU.
      43              :     MultixactMembersCreate {
      44              :         moff: MultiXactOffset,
      45              :         members: Vec<MultiXactMember>,
      46              :     },
      47              :     /// Update the map of AUX files, either writing or dropping an entry
      48              :     AuxFile {
      49              :         file_path: String,
      50              :         content: Option<Bytes>,
      51              :     },
      52              : 
      53              :     /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
      54              :     #[cfg(test)]
      55              :     Test {
      56              :         /// Append a string to the image.
      57              :         append: String,
      58              :         /// Clear the image before appending.
      59              :         clear: bool,
      60              :         /// Treat this record as an init record. `clear` should be set to true if this field is set
      61              :         /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
      62              :         /// its references in `timeline.rs`.
      63              :         will_init: bool,
      64              :     },
      65              : }
      66              : 
      67              : impl NeonWalRecord {
      68              :     /// Does replaying this WAL record initialize the page from scratch, or does
      69              :     /// it need to be applied over the previous image of the page?
      70       439824 :     pub fn will_init(&self) -> bool {
      71       439824 :         // If you change this function, you'll also need to change ValueBytes::will_init
      72       439824 :         match self {
      73       436878 :             NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
      74              :             #[cfg(test)]
      75         2856 :             NeonWalRecord::Test { will_init, .. } => *will_init,
      76              :             // None of the special neon record types currently initialize the page
      77           90 :             _ => false,
      78              :         }
      79       439824 :     }
      80              : 
      81              :     #[cfg(test)]
      82          432 :     pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
      83          432 :         Self::Test {
      84          432 :             append: s.as_ref().to_string(),
      85          432 :             clear: false,
      86          432 :             will_init: false,
      87          432 :         }
      88          432 :     }
      89              : 
      90              :     #[cfg(test)]
      91            6 :     pub(crate) fn wal_clear() -> Self {
      92            6 :         Self::Test {
      93            6 :             append: "".to_string(),
      94            6 :             clear: true,
      95            6 :             will_init: false,
      96            6 :         }
      97            6 :     }
      98              : 
      99              :     #[cfg(test)]
     100           18 :     pub(crate) fn wal_init() -> Self {
     101           18 :         Self::Test {
     102           18 :             append: "".to_string(),
     103           18 :             clear: true,
     104           18 :             will_init: true,
     105           18 :         }
     106           18 :     }
     107              : }
     108              : 
     109              : /// DecodedBkpBlock represents per-page data contained in a WAL record.
     110              : #[derive(Default)]
     111              : pub struct DecodedBkpBlock {
     112              :     /* Is this block ref in use? */
     113              :     //in_use: bool,
     114              : 
     115              :     /* Identify the block this refers to */
     116              :     pub rnode_spcnode: u32,
     117              :     pub rnode_dbnode: u32,
     118              :     pub rnode_relnode: u32,
     119              :     // Note that we have a few special forknum values for non-rel files.
     120              :     pub forknum: u8,
     121              :     pub blkno: u32,
     122              : 
     123              :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
     124              :     pub flags: u8,
     125              : 
     126              :     /* Information on full-page image, if any */
     127              :     pub has_image: bool,
     128              :     /* has image, even for consistency checking */
     129              :     pub apply_image: bool,
     130              :     /* has image that should be restored */
     131              :     pub will_init: bool,
     132              :     /* record doesn't need previous page version to apply */
     133              :     //char         *bkp_image;
     134              :     pub hole_offset: u16,
     135              :     pub hole_length: u16,
     136              :     pub bimg_offset: u32,
     137              :     pub bimg_len: u16,
     138              :     pub bimg_info: u8,
     139              : 
     140              :     /* Buffer holding the rmgr-specific data associated with this block */
     141              :     has_data: bool,
     142              :     data_len: u16,
     143              : }
     144              : 
     145              : impl DecodedBkpBlock {
     146       436926 :     pub fn new() -> DecodedBkpBlock {
     147       436926 :         Default::default()
     148       436926 :     }
     149              : }
     150              : 
     151              : #[derive(Default)]
     152              : pub struct DecodedWALRecord {
     153              :     pub xl_xid: TransactionId,
     154              :     pub xl_info: u8,
     155              :     pub xl_rmid: u8,
     156              :     pub record: Bytes, // raw XLogRecord
     157              : 
     158              :     pub blocks: Vec<DecodedBkpBlock>,
     159              :     pub main_data_offset: usize,
     160              :     pub origin_id: u16,
     161              : }
     162              : 
     163              : impl DecodedWALRecord {
     164              :     /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
     165              :     /// by reading other existing relations' data blocks.  This is more complex to apply than new-style database
     166              :     /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
     167       437556 :     pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
     168       437556 :         if self.xl_rmid == pg_constants::RM_DBASE_ID {
     169            0 :             let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     170            0 :             match pg_version {
     171              :                 14 => {
     172              :                     // Postgres 14 database creations are always the legacy kind
     173            0 :                     info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
     174              :                 }
     175            0 :                 15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     176            0 :                 16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     177              :                 _ => {
     178            0 :                     panic!("Unsupported postgres version {pg_version}")
     179              :                 }
     180              :             }
     181              :         } else {
     182       437556 :             false
     183              :         }
     184       437556 :     }
     185              : }
     186              : 
     187              : #[repr(C)]
     188              : #[derive(Debug, Clone, Copy)]
     189              : pub struct RelFileNode {
     190              :     pub spcnode: Oid, /* tablespace */
     191              :     pub dbnode: Oid,  /* database */
     192              :     pub relnode: Oid, /* relation */
     193              : }
     194              : 
     195              : #[repr(C)]
     196              : #[derive(Debug)]
     197              : pub struct XlRelmapUpdate {
     198              :     pub dbid: Oid,   /* database ID, or 0 for shared map */
     199              :     pub tsid: Oid,   /* database's tablespace, or pg_global */
     200              :     pub nbytes: i32, /* size of relmap data */
     201              : }
     202              : 
     203              : impl XlRelmapUpdate {
     204            0 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
     205            0 :         XlRelmapUpdate {
     206            0 :             dbid: buf.get_u32_le(),
     207            0 :             tsid: buf.get_u32_le(),
     208            0 :             nbytes: buf.get_i32_le(),
     209            0 :         }
     210            0 :     }
     211              : }
     212              : 
     213              : pub mod v14 {
     214              :     use bytes::{Buf, Bytes};
     215              :     use postgres_ffi::{OffsetNumber, TransactionId};
     216              : 
     217              :     #[repr(C)]
     218              :     #[derive(Debug)]
     219              :     pub struct XlHeapInsert {
     220              :         pub offnum: OffsetNumber,
     221              :         pub flags: u8,
     222              :     }
     223              : 
     224              :     impl XlHeapInsert {
     225       435828 :         pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     226       435828 :             XlHeapInsert {
     227       435828 :                 offnum: buf.get_u16_le(),
     228       435828 :                 flags: buf.get_u8(),
     229       435828 :             }
     230       435828 :         }
     231              :     }
     232              : 
     233              :     #[repr(C)]
     234              :     #[derive(Debug)]
     235              :     pub struct XlHeapMultiInsert {
     236              :         pub flags: u8,
     237              :         pub _padding: u8,
     238              :         pub ntuples: u16,
     239              :     }
     240              : 
     241              :     impl XlHeapMultiInsert {
     242          126 :         pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     243          126 :             XlHeapMultiInsert {
     244          126 :                 flags: buf.get_u8(),
     245          126 :                 _padding: buf.get_u8(),
     246          126 :                 ntuples: buf.get_u16_le(),
     247          126 :             }
     248          126 :         }
     249              :     }
     250              : 
     251              :     #[repr(C)]
     252              :     #[derive(Debug)]
     253              :     pub struct XlHeapDelete {
     254              :         pub xmax: TransactionId,
     255              :         pub offnum: OffsetNumber,
     256              :         pub _padding: u16,
     257              :         pub t_cid: u32,
     258              :         pub infobits_set: u8,
     259              :         pub flags: u8,
     260              :     }
     261              : 
     262              :     impl XlHeapDelete {
     263            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     264            0 :             XlHeapDelete {
     265            0 :                 xmax: buf.get_u32_le(),
     266            0 :                 offnum: buf.get_u16_le(),
     267            0 :                 _padding: buf.get_u16_le(),
     268            0 :                 t_cid: buf.get_u32_le(),
     269            0 :                 infobits_set: buf.get_u8(),
     270            0 :                 flags: buf.get_u8(),
     271            0 :             }
     272            0 :         }
     273              :     }
     274              : 
     275              :     #[repr(C)]
     276              :     #[derive(Debug)]
     277              :     pub struct XlHeapUpdate {
     278              :         pub old_xmax: TransactionId,
     279              :         pub old_offnum: OffsetNumber,
     280              :         pub old_infobits_set: u8,
     281              :         pub flags: u8,
     282              :         pub t_cid: u32,
     283              :         pub new_xmax: TransactionId,
     284              :         pub new_offnum: OffsetNumber,
     285              :     }
     286              : 
     287              :     impl XlHeapUpdate {
     288           24 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     289           24 :             XlHeapUpdate {
     290           24 :                 old_xmax: buf.get_u32_le(),
     291           24 :                 old_offnum: buf.get_u16_le(),
     292           24 :                 old_infobits_set: buf.get_u8(),
     293           24 :                 flags: buf.get_u8(),
     294           24 :                 t_cid: buf.get_u32_le(),
     295           24 :                 new_xmax: buf.get_u32_le(),
     296           24 :                 new_offnum: buf.get_u16_le(),
     297           24 :             }
     298           24 :         }
     299              :     }
     300              : 
     301              :     #[repr(C)]
     302              :     #[derive(Debug)]
     303              :     pub struct XlHeapLock {
     304              :         pub locking_xid: TransactionId,
     305              :         pub offnum: OffsetNumber,
     306              :         pub _padding: u16,
     307              :         pub t_cid: u32,
     308              :         pub infobits_set: u8,
     309              :         pub flags: u8,
     310              :     }
     311              : 
     312              :     impl XlHeapLock {
     313            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     314            0 :             XlHeapLock {
     315            0 :                 locking_xid: buf.get_u32_le(),
     316            0 :                 offnum: buf.get_u16_le(),
     317            0 :                 _padding: buf.get_u16_le(),
     318            0 :                 t_cid: buf.get_u32_le(),
     319            0 :                 infobits_set: buf.get_u8(),
     320            0 :                 flags: buf.get_u8(),
     321            0 :             }
     322            0 :         }
     323              :     }
     324              : 
     325              :     #[repr(C)]
     326              :     #[derive(Debug)]
     327              :     pub struct XlHeapLockUpdated {
     328              :         pub xmax: TransactionId,
     329              :         pub offnum: OffsetNumber,
     330              :         pub infobits_set: u8,
     331              :         pub flags: u8,
     332              :     }
     333              : 
     334              :     impl XlHeapLockUpdated {
     335            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
     336            0 :             XlHeapLockUpdated {
     337            0 :                 xmax: buf.get_u32_le(),
     338            0 :                 offnum: buf.get_u16_le(),
     339            0 :                 infobits_set: buf.get_u8(),
     340            0 :                 flags: buf.get_u8(),
     341            0 :             }
     342            0 :         }
     343              :     }
     344              : }
     345              : 
     346              : pub mod v15 {
     347              :     pub use super::v14::{
     348              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
     349              :     };
     350              : }
     351              : 
     352              : pub mod v16 {
     353              :     pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
     354              :     use bytes::{Buf, Bytes};
     355              :     use postgres_ffi::{OffsetNumber, TransactionId};
     356              : 
     357              :     pub struct XlHeapDelete {
     358              :         pub xmax: TransactionId,
     359              :         pub offnum: OffsetNumber,
     360              :         pub infobits_set: u8,
     361              :         pub flags: u8,
     362              :     }
     363              : 
     364              :     impl XlHeapDelete {
     365            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     366            0 :             XlHeapDelete {
     367            0 :                 xmax: buf.get_u32_le(),
     368            0 :                 offnum: buf.get_u16_le(),
     369            0 :                 infobits_set: buf.get_u8(),
     370            0 :                 flags: buf.get_u8(),
     371            0 :             }
     372            0 :         }
     373              :     }
     374              : 
     375              :     #[repr(C)]
     376              :     #[derive(Debug)]
     377              :     pub struct XlHeapUpdate {
     378              :         pub old_xmax: TransactionId,
     379              :         pub old_offnum: OffsetNumber,
     380              :         pub old_infobits_set: u8,
     381              :         pub flags: u8,
     382              :         pub new_xmax: TransactionId,
     383              :         pub new_offnum: OffsetNumber,
     384              :     }
     385              : 
     386              :     impl XlHeapUpdate {
     387            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     388            0 :             XlHeapUpdate {
     389            0 :                 old_xmax: buf.get_u32_le(),
     390            0 :                 old_offnum: buf.get_u16_le(),
     391            0 :                 old_infobits_set: buf.get_u8(),
     392            0 :                 flags: buf.get_u8(),
     393            0 :                 new_xmax: buf.get_u32_le(),
     394            0 :                 new_offnum: buf.get_u16_le(),
     395            0 :             }
     396            0 :         }
     397              :     }
     398              : 
     399              :     #[repr(C)]
     400              :     #[derive(Debug)]
     401              :     pub struct XlHeapLock {
     402              :         pub locking_xid: TransactionId,
     403              :         pub offnum: OffsetNumber,
     404              :         pub infobits_set: u8,
     405              :         pub flags: u8,
     406              :     }
     407              : 
     408              :     impl XlHeapLock {
     409            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     410            0 :             XlHeapLock {
     411            0 :                 locking_xid: buf.get_u32_le(),
     412            0 :                 offnum: buf.get_u16_le(),
     413            0 :                 infobits_set: buf.get_u8(),
     414            0 :                 flags: buf.get_u8(),
     415            0 :             }
     416            0 :         }
     417              :     }
     418              : 
     419              :     /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
     420              :     pub mod rm_neon {
     421              :         use bytes::{Buf, Bytes};
     422              :         use postgres_ffi::{OffsetNumber, TransactionId};
     423              : 
     424              :         #[repr(C)]
     425              :         #[derive(Debug)]
     426              :         pub struct XlNeonHeapInsert {
     427              :             pub offnum: OffsetNumber,
     428              :             pub flags: u8,
     429              :         }
     430              : 
     431              :         impl XlNeonHeapInsert {
     432            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
     433            0 :                 XlNeonHeapInsert {
     434            0 :                     offnum: buf.get_u16_le(),
     435            0 :                     flags: buf.get_u8(),
     436            0 :                 }
     437            0 :             }
     438              :         }
     439              : 
     440              :         #[repr(C)]
     441              :         #[derive(Debug)]
     442              :         pub struct XlNeonHeapMultiInsert {
     443              :             pub flags: u8,
     444              :             pub _padding: u8,
     445              :             pub ntuples: u16,
     446              :             pub t_cid: u32,
     447              :         }
     448              : 
     449              :         impl XlNeonHeapMultiInsert {
     450            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
     451            0 :                 XlNeonHeapMultiInsert {
     452            0 :                     flags: buf.get_u8(),
     453            0 :                     _padding: buf.get_u8(),
     454            0 :                     ntuples: buf.get_u16_le(),
     455            0 :                     t_cid: buf.get_u32_le(),
     456            0 :                 }
     457            0 :             }
     458              :         }
     459              : 
     460              :         #[repr(C)]
     461              :         #[derive(Debug)]
     462              :         pub struct XlNeonHeapDelete {
     463              :             pub xmax: TransactionId,
     464              :             pub offnum: OffsetNumber,
     465              :             pub infobits_set: u8,
     466              :             pub flags: u8,
     467              :             pub t_cid: u32,
     468              :         }
     469              : 
     470              :         impl XlNeonHeapDelete {
     471            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
     472            0 :                 XlNeonHeapDelete {
     473            0 :                     xmax: buf.get_u32_le(),
     474            0 :                     offnum: buf.get_u16_le(),
     475            0 :                     infobits_set: buf.get_u8(),
     476            0 :                     flags: buf.get_u8(),
     477            0 :                     t_cid: buf.get_u32_le(),
     478            0 :                 }
     479            0 :             }
     480              :         }
     481              : 
     482              :         #[repr(C)]
     483              :         #[derive(Debug)]
     484              :         pub struct XlNeonHeapUpdate {
     485              :             pub old_xmax: TransactionId,
     486              :             pub old_offnum: OffsetNumber,
     487              :             pub old_infobits_set: u8,
     488              :             pub flags: u8,
     489              :             pub t_cid: u32,
     490              :             pub new_xmax: TransactionId,
     491              :             pub new_offnum: OffsetNumber,
     492              :         }
     493              : 
     494              :         impl XlNeonHeapUpdate {
     495            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
     496            0 :                 XlNeonHeapUpdate {
     497            0 :                     old_xmax: buf.get_u32_le(),
     498            0 :                     old_offnum: buf.get_u16_le(),
     499            0 :                     old_infobits_set: buf.get_u8(),
     500            0 :                     flags: buf.get_u8(),
     501            0 :                     t_cid: buf.get_u32(),
     502            0 :                     new_xmax: buf.get_u32_le(),
     503            0 :                     new_offnum: buf.get_u16_le(),
     504            0 :                 }
     505            0 :             }
     506              :         }
     507              : 
     508              :         #[repr(C)]
     509              :         #[derive(Debug)]
     510              :         pub struct XlNeonHeapLock {
     511              :             pub locking_xid: TransactionId,
     512              :             pub t_cid: u32,
     513              :             pub offnum: OffsetNumber,
     514              :             pub infobits_set: u8,
     515              :             pub flags: u8,
     516              :         }
     517              : 
     518              :         impl XlNeonHeapLock {
     519            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
     520            0 :                 XlNeonHeapLock {
     521            0 :                     locking_xid: buf.get_u32_le(),
     522            0 :                     t_cid: buf.get_u32_le(),
     523            0 :                     offnum: buf.get_u16_le(),
     524            0 :                     infobits_set: buf.get_u8(),
     525            0 :                     flags: buf.get_u8(),
     526            0 :                 }
     527            0 :             }
     528              :         }
     529              :     }
     530              : }
     531              : 
     532              : #[repr(C)]
     533              : #[derive(Debug)]
     534              : pub struct XlSmgrCreate {
     535              :     pub rnode: RelFileNode,
     536              :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     537              :     // well-defined size?
     538              :     pub forknum: u8,
     539              : }
     540              : 
     541              : impl XlSmgrCreate {
     542           48 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     543           48 :         XlSmgrCreate {
     544           48 :             rnode: RelFileNode {
     545           48 :                 spcnode: buf.get_u32_le(), /* tablespace */
     546           48 :                 dbnode: buf.get_u32_le(),  /* database */
     547           48 :                 relnode: buf.get_u32_le(), /* relation */
     548           48 :             },
     549           48 :             forknum: buf.get_u32_le() as u8,
     550           48 :         }
     551           48 :     }
     552              : }
     553              : 
     554              : #[repr(C)]
     555              : #[derive(Debug)]
     556              : pub struct XlSmgrTruncate {
     557              :     pub blkno: BlockNumber,
     558              :     pub rnode: RelFileNode,
     559              :     pub flags: u32,
     560              : }
     561              : 
     562              : impl XlSmgrTruncate {
     563            0 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     564            0 :         XlSmgrTruncate {
     565            0 :             blkno: buf.get_u32_le(),
     566            0 :             rnode: RelFileNode {
     567            0 :                 spcnode: buf.get_u32_le(), /* tablespace */
     568            0 :                 dbnode: buf.get_u32_le(),  /* database */
     569            0 :                 relnode: buf.get_u32_le(), /* relation */
     570            0 :             },
     571            0 :             flags: buf.get_u32_le(),
     572            0 :         }
     573            0 :     }
     574              : }
     575              : 
     576              : #[repr(C)]
     577              : #[derive(Debug)]
     578              : pub struct XlCreateDatabase {
     579              :     pub db_id: Oid,
     580              :     pub tablespace_id: Oid,
     581              :     pub src_db_id: Oid,
     582              :     pub src_tablespace_id: Oid,
     583              : }
     584              : 
     585              : impl XlCreateDatabase {
     586            0 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     587            0 :         XlCreateDatabase {
     588            0 :             db_id: buf.get_u32_le(),
     589            0 :             tablespace_id: buf.get_u32_le(),
     590            0 :             src_db_id: buf.get_u32_le(),
     591            0 :             src_tablespace_id: buf.get_u32_le(),
     592            0 :         }
     593            0 :     }
     594              : }
     595              : 
     596              : #[repr(C)]
     597              : #[derive(Debug)]
     598              : pub struct XlDropDatabase {
     599              :     pub db_id: Oid,
     600              :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     601              :     pub tablespace_ids: Vec<Oid>,
     602              : }
     603              : 
     604              : impl XlDropDatabase {
     605            0 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     606            0 :         let mut rec = XlDropDatabase {
     607            0 :             db_id: buf.get_u32_le(),
     608            0 :             n_tablespaces: buf.get_u32_le(),
     609            0 :             tablespace_ids: Vec::<Oid>::new(),
     610            0 :         };
     611              : 
     612            0 :         for _i in 0..rec.n_tablespaces {
     613            0 :             let id = buf.get_u32_le();
     614            0 :             rec.tablespace_ids.push(id);
     615            0 :         }
     616              : 
     617            0 :         rec
     618            0 :     }
     619              : }
     620              : 
     621              : ///
     622              : /// Note: Parsing some fields is missing, because they're not needed.
     623              : ///
     624              : /// This is similar to the xl_xact_parsed_commit and
     625              : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     626              : /// struct for commits and aborts.
     627              : ///
     628              : #[derive(Debug)]
     629              : pub struct XlXactParsedRecord {
     630              :     pub xid: TransactionId,
     631              :     pub info: u8,
     632              :     pub xact_time: TimestampTz,
     633              :     pub xinfo: u32,
     634              : 
     635              :     pub db_id: Oid,
     636              :     /* MyDatabaseId */
     637              :     pub ts_id: Oid,
     638              :     /* MyDatabaseTableSpace */
     639              :     pub subxacts: Vec<TransactionId>,
     640              : 
     641              :     pub xnodes: Vec<RelFileNode>,
     642              :     pub origin_lsn: Lsn,
     643              : }
     644              : 
     645              : impl XlXactParsedRecord {
     646              :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
     647              :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
     648              :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
     649           24 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
     650           24 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
     651           24 :         // The record starts with time of commit/abort
     652           24 :         let xact_time = buf.get_i64_le();
     653           24 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
     654           24 :             buf.get_u32_le()
     655              :         } else {
     656            0 :             0
     657              :         };
     658              :         let db_id;
     659              :         let ts_id;
     660           24 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
     661           24 :             db_id = buf.get_u32_le();
     662           24 :             ts_id = buf.get_u32_le();
     663           24 :         } else {
     664            0 :             db_id = 0;
     665            0 :             ts_id = 0;
     666            0 :         }
     667           24 :         let mut subxacts = Vec::<TransactionId>::new();
     668           24 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
     669            0 :             let nsubxacts = buf.get_i32_le();
     670            0 :             for _i in 0..nsubxacts {
     671            0 :                 let subxact = buf.get_u32_le();
     672            0 :                 subxacts.push(subxact);
     673            0 :             }
     674           24 :         }
     675           24 :         let mut xnodes = Vec::<RelFileNode>::new();
     676           24 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
     677            0 :             let nrels = buf.get_i32_le();
     678            0 :             for _i in 0..nrels {
     679            0 :                 let spcnode = buf.get_u32_le();
     680            0 :                 let dbnode = buf.get_u32_le();
     681            0 :                 let relnode = buf.get_u32_le();
     682            0 :                 trace!(
     683            0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
     684              :                     spcnode,
     685              :                     dbnode,
     686              :                     relnode
     687              :                 );
     688            0 :                 xnodes.push(RelFileNode {
     689            0 :                     spcnode,
     690            0 :                     dbnode,
     691            0 :                     relnode,
     692            0 :                 });
     693              :             }
     694           24 :         }
     695              : 
     696           24 :         if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
     697            0 :             let nitems = buf.get_i32_le();
     698            0 :             debug!(
     699            0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
     700              :                 nitems
     701              :             );
     702            0 :             let sizeof_xl_xact_stats_item = 12;
     703            0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
     704           24 :         }
     705              : 
     706           24 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
     707           24 :             let nmsgs = buf.get_i32_le();
     708           24 :             let sizeof_shared_invalidation_message = 16;
     709           24 :             buf.advance(
     710           24 :                 (nmsgs * sizeof_shared_invalidation_message)
     711           24 :                     .try_into()
     712           24 :                     .unwrap(),
     713           24 :             );
     714           24 :         }
     715              : 
     716           24 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
     717            0 :             xid = buf.get_u32_le();
     718            0 :             debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
     719           24 :         }
     720              : 
     721           24 :         let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 {
     722            0 :             Lsn(buf.get_u64_le())
     723              :         } else {
     724           24 :             Lsn::INVALID
     725              :         };
     726           24 :         XlXactParsedRecord {
     727           24 :             xid,
     728           24 :             info,
     729           24 :             xact_time,
     730           24 :             xinfo,
     731           24 :             db_id,
     732           24 :             ts_id,
     733           24 :             subxacts,
     734           24 :             xnodes,
     735           24 :             origin_lsn,
     736           24 :         }
     737           24 :     }
     738              : }
     739              : 
     740              : #[repr(C)]
     741              : #[derive(Debug)]
     742              : pub struct XlClogTruncate {
     743              :     pub pageno: u32,
     744              :     pub oldest_xid: TransactionId,
     745              :     pub oldest_xid_db: Oid,
     746              : }
     747              : 
     748              : impl XlClogTruncate {
     749            0 :     pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
     750            0 :         XlClogTruncate {
     751            0 :             pageno: buf.get_u32_le(),
     752            0 :             oldest_xid: buf.get_u32_le(),
     753            0 :             oldest_xid_db: buf.get_u32_le(),
     754            0 :         }
     755            0 :     }
     756              : }
     757              : 
     758              : #[repr(C)]
     759            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     760              : pub struct MultiXactMember {
     761              :     pub xid: TransactionId,
     762              :     pub status: MultiXactStatus,
     763              : }
     764              : 
     765              : impl MultiXactMember {
     766            0 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     767            0 :         MultiXactMember {
     768            0 :             xid: buf.get_u32_le(),
     769            0 :             status: buf.get_u32_le(),
     770            0 :         }
     771            0 :     }
     772              : }
     773              : 
     774              : #[repr(C)]
     775              : #[derive(Debug)]
     776              : pub struct XlMultiXactCreate {
     777              :     pub mid: MultiXactId,
     778              :     /* new MultiXact's ID */
     779              :     pub moff: MultiXactOffset,
     780              :     /* its starting offset in members file */
     781              :     pub nmembers: u32,
     782              :     /* number of member XIDs */
     783              :     pub members: Vec<MultiXactMember>,
     784              : }
     785              : 
     786              : impl XlMultiXactCreate {
     787            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
     788            0 :         let mid = buf.get_u32_le();
     789            0 :         let moff = buf.get_u32_le();
     790            0 :         let nmembers = buf.get_u32_le();
     791            0 :         let mut members = Vec::new();
     792            0 :         for _ in 0..nmembers {
     793            0 :             members.push(MultiXactMember::decode(buf));
     794            0 :         }
     795            0 :         XlMultiXactCreate {
     796            0 :             mid,
     797            0 :             moff,
     798            0 :             nmembers,
     799            0 :             members,
     800            0 :         }
     801            0 :     }
     802              : }
     803              : 
     804              : #[repr(C)]
     805              : #[derive(Debug)]
     806              : pub struct XlMultiXactTruncate {
     807              :     pub oldest_multi_db: Oid,
     808              :     /* to-be-truncated range of multixact offsets */
     809              :     pub start_trunc_off: MultiXactId,
     810              :     /* just for completeness' sake */
     811              :     pub end_trunc_off: MultiXactId,
     812              : 
     813              :     /* to-be-truncated range of multixact members */
     814              :     pub start_trunc_memb: MultiXactOffset,
     815              :     pub end_trunc_memb: MultiXactOffset,
     816              : }
     817              : 
     818              : impl XlMultiXactTruncate {
     819            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
     820            0 :         XlMultiXactTruncate {
     821            0 :             oldest_multi_db: buf.get_u32_le(),
     822            0 :             start_trunc_off: buf.get_u32_le(),
     823            0 :             end_trunc_off: buf.get_u32_le(),
     824            0 :             start_trunc_memb: buf.get_u32_le(),
     825            0 :             end_trunc_memb: buf.get_u32_le(),
     826            0 :         }
     827            0 :     }
     828              : }
     829              : 
     830              : #[repr(C)]
     831              : #[derive(Debug)]
     832              : pub struct XlLogicalMessage {
     833              :     pub db_id: Oid,
     834              :     pub transactional: bool,
     835              :     pub prefix_size: usize,
     836              :     pub message_size: usize,
     837              : }
     838              : 
     839              : impl XlLogicalMessage {
     840            0 :     pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
     841            0 :         XlLogicalMessage {
     842            0 :             db_id: buf.get_u32_le(),
     843            0 :             transactional: buf.get_u32_le() != 0, // 4-bytes alignment
     844            0 :             prefix_size: buf.get_u64_le() as usize,
     845            0 :             message_size: buf.get_u64_le() as usize,
     846            0 :         }
     847            0 :     }
     848              : }
     849              : 
     850              : #[repr(C)]
     851              : #[derive(Debug)]
     852              : pub struct XlRunningXacts {
     853              :     pub xcnt: u32,
     854              :     pub subxcnt: u32,
     855              :     pub subxid_overflow: bool,
     856              :     pub next_xid: TransactionId,
     857              :     pub oldest_running_xid: TransactionId,
     858              :     pub latest_completed_xid: TransactionId,
     859              :     pub xids: Vec<TransactionId>,
     860              : }
     861              : 
     862              : impl XlRunningXacts {
     863            0 :     pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
     864            0 :         let xcnt = buf.get_u32_le();
     865            0 :         let subxcnt = buf.get_u32_le();
     866            0 :         let subxid_overflow = buf.get_u32_le() != 0;
     867            0 :         let next_xid = buf.get_u32_le();
     868            0 :         let oldest_running_xid = buf.get_u32_le();
     869            0 :         let latest_completed_xid = buf.get_u32_le();
     870            0 :         let mut xids = Vec::new();
     871            0 :         for _ in 0..(xcnt + subxcnt) {
     872            0 :             xids.push(buf.get_u32_le());
     873            0 :         }
     874            0 :         XlRunningXacts {
     875            0 :             xcnt,
     876            0 :             subxcnt,
     877            0 :             subxid_overflow,
     878            0 :             next_xid,
     879            0 :             oldest_running_xid,
     880            0 :             latest_completed_xid,
     881            0 :             xids,
     882            0 :         }
     883            0 :     }
     884              : }
     885              : 
     886              : #[repr(C)]
     887              : #[derive(Debug)]
     888              : pub struct XlReploriginDrop {
     889              :     pub node_id: RepOriginId,
     890              : }
     891              : 
     892              : impl XlReploriginDrop {
     893            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginDrop {
     894            0 :         XlReploriginDrop {
     895            0 :             node_id: buf.get_u16_le(),
     896            0 :         }
     897            0 :     }
     898              : }
     899              : 
     900              : #[repr(C)]
     901              : #[derive(Debug)]
     902              : pub struct XlReploriginSet {
     903              :     pub remote_lsn: Lsn,
     904              :     pub node_id: RepOriginId,
     905              : }
     906              : 
     907              : impl XlReploriginSet {
     908            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginSet {
     909            0 :         XlReploriginSet {
     910            0 :             remote_lsn: Lsn(buf.get_u64_le()),
     911            0 :             node_id: buf.get_u16_le(),
     912            0 :         }
     913            0 :     }
     914              : }
     915              : 
     916              : /// Main routine to decode a WAL record and figure out which blocks are modified
     917              : //
     918              : // See xlogrecord.h for details
     919              : // The overall layout of an XLOG record is:
     920              : //              Fixed-size header (XLogRecord struct)
     921              : //      XLogRecordBlockHeader struct
     922              : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     923              : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     924              : //                 XLogRecordBlockCompressHeader struct follows.
     925              : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     926              : //          BlockNumber follows
     927              : //      XLogRecordBlockHeader struct
     928              : //      ...
     929              : //      XLogRecordDataHeader[Short|Long] struct
     930              : //      block data
     931              : //      block data
     932              : //      ...
     933              : //      main data
     934              : //
     935              : //
     936              : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
     937              : // It would be more natural for this function to return a DecodedWALRecord as return value,
     938              : // but reusing the caller-supplied struct avoids an allocation.
     939              : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
     940              : //
     941       437556 : pub fn decode_wal_record(
     942       437556 :     record: Bytes,
     943       437556 :     decoded: &mut DecodedWALRecord,
     944       437556 :     pg_version: u32,
     945       437556 : ) -> Result<()> {
     946       437556 :     let mut rnode_spcnode: u32 = 0;
     947       437556 :     let mut rnode_dbnode: u32 = 0;
     948       437556 :     let mut rnode_relnode: u32 = 0;
     949       437556 :     let mut got_rnode = false;
     950       437556 :     let mut origin_id: u16 = 0;
     951       437556 : 
     952       437556 :     let mut buf = record.clone();
     953              : 
     954              :     // 1. Parse XLogRecord struct
     955              : 
     956              :     // FIXME: assume little-endian here
     957       437556 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     958              : 
     959       437556 :     trace!(
     960            0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
     961              :         xlogrec.xl_rmid,
     962              :         xlogrec.xl_info
     963              :     );
     964              : 
     965       437556 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
     966       437556 : 
     967       437556 :     if buf.remaining() != remaining {
     968            0 :         //TODO error
     969       437556 :     }
     970              : 
     971       437556 :     let mut max_block_id = 0;
     972       437556 :     let mut blocks_total_len: u32 = 0;
     973       437556 :     let mut main_data_len = 0;
     974       437556 :     let mut datatotal: u32 = 0;
     975       437556 :     decoded.blocks.clear();
     976              : 
     977              :     // 2. Decode the headers.
     978              :     // XLogRecordBlockHeaders if any,
     979              :     // XLogRecordDataHeader[Short|Long]
     980      1311966 :     while buf.remaining() > datatotal as usize {
     981       874410 :         let block_id = buf.get_u8();
     982       874410 : 
     983       874410 :         match block_id {
     984       437436 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
     985       437436 :                 /* XLogRecordDataHeaderShort */
     986       437436 :                 main_data_len = buf.get_u8() as u32;
     987       437436 :                 datatotal += main_data_len;
     988       437436 :             }
     989              : 
     990           48 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
     991           48 :                 /* XLogRecordDataHeaderLong */
     992           48 :                 main_data_len = buf.get_u32_le();
     993           48 :                 datatotal += main_data_len;
     994           48 :             }
     995              : 
     996            0 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
     997            0 :                 // RepOriginId is uint16
     998            0 :                 origin_id = buf.get_u16_le();
     999            0 :             }
    1000              : 
    1001            0 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
    1002            0 :                 // TransactionId is uint32
    1003            0 :                 buf.advance(4);
    1004            0 :             }
    1005              : 
    1006       436926 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
    1007              :                 /* XLogRecordBlockHeader */
    1008       436926 :                 let mut blk = DecodedBkpBlock::new();
    1009       436926 : 
    1010       436926 :                 if block_id <= max_block_id {
    1011       436926 :                     // TODO
    1012       436926 :                     //report_invalid_record(state,
    1013       436926 :                     //                    "out-of-order block_id %u at %X/%X",
    1014       436926 :                     //                    block_id,
    1015       436926 :                     //                    (uint32) (state->ReadRecPtr >> 32),
    1016       436926 :                     //                    (uint32) state->ReadRecPtr);
    1017       436926 :                     //    goto err;
    1018       436926 :                 }
    1019       436926 :                 max_block_id = block_id;
    1020       436926 : 
    1021       436926 :                 let fork_flags: u8 = buf.get_u8();
    1022       436926 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
    1023       436926 :                 blk.flags = fork_flags;
    1024       436926 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
    1025       436926 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
    1026       436926 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
    1027       436926 :                 blk.data_len = buf.get_u16_le();
    1028       436926 : 
    1029       436926 :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
    1030       436926 : 
    1031       436926 :                 datatotal += blk.data_len as u32;
    1032       436926 :                 blocks_total_len += blk.data_len as u32;
    1033       436926 : 
    1034       436926 :                 if blk.has_image {
    1035          180 :                     blk.bimg_len = buf.get_u16_le();
    1036          180 :                     blk.hole_offset = buf.get_u16_le();
    1037          180 :                     blk.bimg_info = buf.get_u8();
    1038              : 
    1039            0 :                     blk.apply_image = dispatch_pgversion!(
    1040          180 :                         pg_version,
    1041            0 :                         (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
    1042              :                     );
    1043              : 
    1044          180 :                     let blk_img_is_compressed =
    1045          180 :                         postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version);
    1046          180 : 
    1047          180 :                     if blk_img_is_compressed {
    1048            0 :                         debug!("compressed block image , pg_version = {}", pg_version);
    1049          180 :                     }
    1050              : 
    1051          180 :                     if blk_img_is_compressed {
    1052            0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
    1053            0 :                             blk.hole_length = buf.get_u16_le();
    1054            0 :                         } else {
    1055            0 :                             blk.hole_length = 0;
    1056            0 :                         }
    1057          180 :                     } else {
    1058          180 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
    1059          180 :                     }
    1060          180 :                     datatotal += blk.bimg_len as u32;
    1061          180 :                     blocks_total_len += blk.bimg_len as u32;
    1062          180 : 
    1063          180 :                     /*
    1064          180 :                      * cross-check that hole_offset > 0, hole_length > 0 and
    1065          180 :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
    1066          180 :                      */
    1067          180 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
    1068          108 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
    1069            0 :                     {
    1070            0 :                         // TODO
    1071            0 :                         /*
    1072            0 :                         report_invalid_record(state,
    1073            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
    1074            0 :                                       (unsigned int) blk->hole_offset,
    1075            0 :                                       (unsigned int) blk->hole_length,
    1076            0 :                                       (unsigned int) blk->bimg_len,
    1077            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1078            0 :                         goto err;
    1079            0 :                                      */
    1080          180 :                     }
    1081              : 
    1082              :                     /*
    1083              :                      * cross-check that hole_offset == 0 and hole_length == 0 if
    1084              :                      * the HAS_HOLE flag is not set.
    1085              :                      */
    1086          180 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
    1087           72 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
    1088            0 :                     {
    1089            0 :                         // TODO
    1090            0 :                         /*
    1091            0 :                         report_invalid_record(state,
    1092            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
    1093            0 :                                       (unsigned int) blk->hole_offset,
    1094            0 :                                       (unsigned int) blk->hole_length,
    1095            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1096            0 :                         goto err;
    1097            0 :                                      */
    1098          180 :                     }
    1099              : 
    1100              :                     /*
    1101              :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
    1102              :                      * flag is set.
    1103              :                      */
    1104          180 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
    1105           72 :                         // TODO
    1106           72 :                         /*
    1107           72 :                         report_invalid_record(state,
    1108           72 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
    1109           72 :                                       (unsigned int) blk->bimg_len,
    1110           72 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1111           72 :                         goto err;
    1112           72 :                                      */
    1113          108 :                     }
    1114              : 
    1115              :                     /*
    1116              :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
    1117              :                      * IS_COMPRESSED flag is set.
    1118              :                      */
    1119          180 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
    1120           72 :                         && !blk_img_is_compressed
    1121           72 :                         && blk.bimg_len != BLCKSZ
    1122            0 :                     {
    1123            0 :                         // TODO
    1124            0 :                         /*
    1125            0 :                         report_invalid_record(state,
    1126            0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
    1127            0 :                                       (unsigned int) blk->data_len,
    1128            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1129            0 :                         goto err;
    1130            0 :                                      */
    1131          180 :                     }
    1132       436746 :                 }
    1133       436926 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
    1134       436926 :                     rnode_spcnode = buf.get_u32_le();
    1135       436926 :                     rnode_dbnode = buf.get_u32_le();
    1136       436926 :                     rnode_relnode = buf.get_u32_le();
    1137       436926 :                     got_rnode = true;
    1138       436926 :                 } else if !got_rnode {
    1139            0 :                     // TODO
    1140            0 :                     /*
    1141            0 :                     report_invalid_record(state,
    1142            0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
    1143            0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1144            0 :                     goto err;           */
    1145            0 :                 }
    1146              : 
    1147       436926 :                 blk.rnode_spcnode = rnode_spcnode;
    1148       436926 :                 blk.rnode_dbnode = rnode_dbnode;
    1149       436926 :                 blk.rnode_relnode = rnode_relnode;
    1150       436926 : 
    1151       436926 :                 blk.blkno = buf.get_u32_le();
    1152       436926 :                 trace!(
    1153            0 :                     "this record affects {}/{}/{} blk {}",
    1154              :                     rnode_spcnode,
    1155              :                     rnode_dbnode,
    1156              :                     rnode_relnode,
    1157              :                     blk.blkno
    1158              :                 );
    1159              : 
    1160       436926 :                 decoded.blocks.push(blk);
    1161              :             }
    1162              : 
    1163            0 :             _ => {
    1164            0 :                 // TODO: invalid block_id
    1165            0 :             }
    1166              :         }
    1167              :     }
    1168              : 
    1169              :     // 3. Decode blocks.
    1170       437556 :     let mut ptr = record.len() - buf.remaining();
    1171       437556 :     for blk in decoded.blocks.iter_mut() {
    1172       436926 :         if blk.has_image {
    1173          180 :             blk.bimg_offset = ptr as u32;
    1174          180 :             ptr += blk.bimg_len as usize;
    1175       436746 :         }
    1176       436926 :         if blk.has_data {
    1177       436746 :             ptr += blk.data_len as usize;
    1178       436746 :         }
    1179              :     }
    1180              :     // We don't need them, so just skip blocks_total_len bytes
    1181       437556 :     buf.advance(blocks_total_len as usize);
    1182       437556 :     assert_eq!(ptr, record.len() - buf.remaining());
    1183              : 
    1184       437556 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
    1185       437556 : 
    1186       437556 :     // 4. Decode main_data
    1187       437556 :     if main_data_len > 0 {
    1188       437484 :         assert_eq!(buf.remaining(), main_data_len as usize);
    1189           72 :     }
    1190              : 
    1191       437556 :     decoded.xl_xid = xlogrec.xl_xid;
    1192       437556 :     decoded.xl_info = xlogrec.xl_info;
    1193       437556 :     decoded.xl_rmid = xlogrec.xl_rmid;
    1194       437556 :     decoded.record = record;
    1195       437556 :     decoded.origin_id = origin_id;
    1196       437556 :     decoded.main_data_offset = main_data_offset;
    1197       437556 : 
    1198       437556 :     Ok(())
    1199       437556 : }
    1200              : 
    1201              : ///
    1202              : /// Build a human-readable string to describe a WAL record
    1203              : ///
    1204              : /// For debugging purposes
    1205            0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
    1206            0 :     match rec {
    1207            0 :         NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
    1208            0 :             "will_init: {}, {}",
    1209            0 :             will_init,
    1210            0 :             describe_postgres_wal_record(rec)?
    1211              :         )),
    1212            0 :         _ => Ok(format!("{:?}", rec)),
    1213              :     }
    1214            0 : }
    1215              : 
    1216            0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
    1217            0 :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
    1218            0 :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
    1219            0 :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
    1220            0 :     // without worrying about security?
    1221            0 :     //
    1222            0 :     // But for now, we have a hand-written code for a few common WAL record types here.
    1223            0 : 
    1224            0 :     let mut buf = record.clone();
    1225              : 
    1226              :     // 1. Parse XLogRecord struct
    1227              : 
    1228              :     // FIXME: assume little-endian here
    1229            0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1230              : 
    1231              :     let unknown_str: String;
    1232              : 
    1233            0 :     let result: &str = match xlogrec.xl_rmid {
    1234              :         pg_constants::RM_HEAP2_ID => {
    1235            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1236            0 :             match info {
    1237            0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
    1238            0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
    1239              :                 _ => {
    1240            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1241            0 :                     &unknown_str
    1242              :                 }
    1243              :             }
    1244              :         }
    1245              :         pg_constants::RM_HEAP_ID => {
    1246            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1247            0 :             match info {
    1248            0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
    1249            0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
    1250            0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
    1251            0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
    1252              :                 _ => {
    1253            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1254            0 :                     &unknown_str
    1255              :                 }
    1256              :             }
    1257              :         }
    1258              :         pg_constants::RM_XLOG_ID => {
    1259            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1260            0 :             match info {
    1261            0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
    1262            0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
    1263              :                 _ => {
    1264            0 :                     unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
    1265            0 :                     &unknown_str
    1266              :                 }
    1267              :             }
    1268              :         }
    1269            0 :         rmid => {
    1270            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1271            0 : 
    1272            0 :             unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
    1273            0 :             &unknown_str
    1274              :         }
    1275              :     };
    1276              : 
    1277            0 :     Ok(String::from(result))
    1278            0 : }
        

Generated by: LCOV version 2.1-beta