LCOV - code coverage report
Current view: top level - pageserver/src - walrecord.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 40.6 % 635 258
Test Date: 2024-10-22 22:13:45 Functions: 27.0 % 63 17

            Line data    Source code
       1              : //!
       2              : //! Functions for parsing WAL records.
       3              : //!
       4              : 
       5              : use anyhow::Result;
       6              : use bytes::{Buf, Bytes};
       7              : use postgres_ffi::dispatch_pgversion;
       8              : use postgres_ffi::pg_constants;
       9              : use postgres_ffi::BLCKSZ;
      10              : use postgres_ffi::{BlockNumber, TimestampTz};
      11              : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
      12              : use postgres_ffi::{RepOriginId, XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
      13              : use serde::{Deserialize, Serialize};
      14              : use tracing::*;
      15              : use utils::{bin_ser::DeserializeError, lsn::Lsn};
      16              : 
      17              : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
      18              : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
      19         1866 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
      20              : pub enum NeonWalRecord {
      21              :     /// Native PostgreSQL WAL record
      22              :     Postgres { will_init: bool, rec: Bytes },
      23              : 
      24              :     /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
      25              :     ClearVisibilityMapFlags {
      26              :         new_heap_blkno: Option<u32>,
      27              :         old_heap_blkno: Option<u32>,
      28              :         flags: u8,
      29              :     },
      30              :     /// Mark transaction IDs as committed on a CLOG page
      31              :     ClogSetCommitted {
      32              :         xids: Vec<TransactionId>,
      33              :         timestamp: TimestampTz,
      34              :     },
      35              :     /// Mark transaction IDs as aborted on a CLOG page
      36              :     ClogSetAborted { xids: Vec<TransactionId> },
      37              :     /// Extend multixact offsets SLRU
      38              :     MultixactOffsetCreate {
      39              :         mid: MultiXactId,
      40              :         moff: MultiXactOffset,
      41              :     },
      42              :     /// Extend multixact members SLRU.
      43              :     MultixactMembersCreate {
      44              :         moff: MultiXactOffset,
      45              :         members: Vec<MultiXactMember>,
      46              :     },
      47              :     /// Update the map of AUX files, either writing or dropping an entry
      48              :     AuxFile {
      49              :         file_path: String,
      50              :         content: Option<Bytes>,
      51              :     },
      52              : 
      53              :     /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
      54              :     #[cfg(test)]
      55              :     Test {
      56              :         /// Append a string to the image.
      57              :         append: String,
      58              :         /// Clear the image before appending.
      59              :         clear: bool,
      60              :         /// Treat this record as an init record. `clear` should be set to true if this field is set
      61              :         /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
      62              :         /// its references in `timeline.rs`.
      63              :         will_init: bool,
      64              :     },
      65              : }
      66              : 
      67              : impl NeonWalRecord {
      68              :     /// Does replaying this WAL record initialize the page from scratch, or does
      69              :     /// it need to be applied over the previous image of the page?
      70       146646 :     pub fn will_init(&self) -> bool {
      71       146646 :         // If you change this function, you'll also need to change ValueBytes::will_init
      72       146646 :         match self {
      73       145626 :             NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
      74              :             #[cfg(test)]
      75         1000 :             NeonWalRecord::Test { will_init, .. } => *will_init,
      76              :             // None of the special neon record types currently initialize the page
      77           20 :             _ => false,
      78              :         }
      79       146646 :     }
      80              : 
      81              :     #[cfg(test)]
      82          160 :     pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
      83          160 :         Self::Test {
      84          160 :             append: s.as_ref().to_string(),
      85          160 :             clear: false,
      86          160 :             will_init: false,
      87          160 :         }
      88          160 :     }
      89              : 
      90              :     #[cfg(test)]
      91            2 :     pub(crate) fn wal_clear() -> Self {
      92            2 :         Self::Test {
      93            2 :             append: "".to_string(),
      94            2 :             clear: true,
      95            2 :             will_init: false,
      96            2 :         }
      97            2 :     }
      98              : 
      99              :     #[cfg(test)]
     100           10 :     pub(crate) fn wal_init() -> Self {
     101           10 :         Self::Test {
     102           10 :             append: "".to_string(),
     103           10 :             clear: true,
     104           10 :             will_init: true,
     105           10 :         }
     106           10 :     }
     107              : }
     108              : 
     109              : /// DecodedBkpBlock represents per-page data contained in a WAL record.
     110              : #[derive(Default)]
     111              : pub struct DecodedBkpBlock {
     112              :     /* Is this block ref in use? */
     113              :     //in_use: bool,
     114              : 
     115              :     /* Identify the block this refers to */
     116              :     pub rnode_spcnode: u32,
     117              :     pub rnode_dbnode: u32,
     118              :     pub rnode_relnode: u32,
     119              :     // Note that we have a few special forknum values for non-rel files.
     120              :     pub forknum: u8,
     121              :     pub blkno: u32,
     122              : 
     123              :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
     124              :     pub flags: u8,
     125              : 
     126              :     /* Information on full-page image, if any */
     127              :     pub has_image: bool,
     128              :     /* has image, even for consistency checking */
     129              :     pub apply_image: bool,
     130              :     /* has image that should be restored */
     131              :     pub will_init: bool,
     132              :     /* record doesn't need previous page version to apply */
     133              :     //char         *bkp_image;
     134              :     pub hole_offset: u16,
     135              :     pub hole_length: u16,
     136              :     pub bimg_offset: u32,
     137              :     pub bimg_len: u16,
     138              :     pub bimg_info: u8,
     139              : 
     140              :     /* Buffer holding the rmgr-specific data associated with this block */
     141              :     has_data: bool,
     142              :     data_len: u16,
     143              : }
     144              : 
     145              : impl DecodedBkpBlock {
     146       145642 :     pub fn new() -> DecodedBkpBlock {
     147       145642 :         Default::default()
     148       145642 :     }
     149              : }
     150              : 
     151              : #[derive(Default)]
     152              : pub struct DecodedWALRecord {
     153              :     pub xl_xid: TransactionId,
     154              :     pub xl_info: u8,
     155              :     pub xl_rmid: u8,
     156              :     pub record: Bytes, // raw XLogRecord
     157              : 
     158              :     pub blocks: Vec<DecodedBkpBlock>,
     159              :     pub main_data_offset: usize,
     160              :     pub origin_id: u16,
     161              : }
     162              : 
     163              : impl DecodedWALRecord {
     164              :     /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
     165              :     /// by reading other existing relations' data blocks.  This is more complex to apply than new-style database
     166              :     /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
     167       145852 :     pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
     168       145852 :         if self.xl_rmid == pg_constants::RM_DBASE_ID {
     169            0 :             let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     170            0 :             match pg_version {
     171              :                 14 => {
     172              :                     // Postgres 14 database creations are always the legacy kind
     173            0 :                     info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
     174              :                 }
     175            0 :                 15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     176            0 :                 16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     177            0 :                 17 => info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY,
     178              :                 _ => {
     179            0 :                     panic!("Unsupported postgres version {pg_version}")
     180              :                 }
     181              :             }
     182              :         } else {
     183       145852 :             false
     184              :         }
     185       145852 :     }
     186              : }
     187              : 
     188              : #[repr(C)]
     189              : #[derive(Debug, Clone, Copy)]
     190              : pub struct RelFileNode {
     191              :     pub spcnode: Oid, /* tablespace */
     192              :     pub dbnode: Oid,  /* database */
     193              :     pub relnode: Oid, /* relation */
     194              : }
     195              : 
     196              : #[repr(C)]
     197              : #[derive(Debug)]
     198              : pub struct XlRelmapUpdate {
     199              :     pub dbid: Oid,   /* database ID, or 0 for shared map */
     200              :     pub tsid: Oid,   /* database's tablespace, or pg_global */
     201              :     pub nbytes: i32, /* size of relmap data */
     202              : }
     203              : 
     204              : impl XlRelmapUpdate {
     205            0 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
     206            0 :         XlRelmapUpdate {
     207            0 :             dbid: buf.get_u32_le(),
     208            0 :             tsid: buf.get_u32_le(),
     209            0 :             nbytes: buf.get_i32_le(),
     210            0 :         }
     211            0 :     }
     212              : }
     213              : 
     214              : pub mod v14 {
     215              :     use bytes::{Buf, Bytes};
     216              :     use postgres_ffi::{OffsetNumber, TransactionId};
     217              : 
     218              :     #[repr(C)]
     219              :     #[derive(Debug)]
     220              :     pub struct XlHeapInsert {
     221              :         pub offnum: OffsetNumber,
     222              :         pub flags: u8,
     223              :     }
     224              : 
     225              :     impl XlHeapInsert {
     226       145276 :         pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     227       145276 :             XlHeapInsert {
     228       145276 :                 offnum: buf.get_u16_le(),
     229       145276 :                 flags: buf.get_u8(),
     230       145276 :             }
     231       145276 :         }
     232              :     }
     233              : 
     234              :     #[repr(C)]
     235              :     #[derive(Debug)]
     236              :     pub struct XlHeapMultiInsert {
     237              :         pub flags: u8,
     238              :         pub _padding: u8,
     239              :         pub ntuples: u16,
     240              :     }
     241              : 
     242              :     impl XlHeapMultiInsert {
     243           42 :         pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     244           42 :             XlHeapMultiInsert {
     245           42 :                 flags: buf.get_u8(),
     246           42 :                 _padding: buf.get_u8(),
     247           42 :                 ntuples: buf.get_u16_le(),
     248           42 :             }
     249           42 :         }
     250              :     }
     251              : 
     252              :     #[repr(C)]
     253              :     #[derive(Debug)]
     254              :     pub struct XlHeapDelete {
     255              :         pub xmax: TransactionId,
     256              :         pub offnum: OffsetNumber,
     257              :         pub _padding: u16,
     258              :         pub t_cid: u32,
     259              :         pub infobits_set: u8,
     260              :         pub flags: u8,
     261              :     }
     262              : 
     263              :     impl XlHeapDelete {
     264            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     265            0 :             XlHeapDelete {
     266            0 :                 xmax: buf.get_u32_le(),
     267            0 :                 offnum: buf.get_u16_le(),
     268            0 :                 _padding: buf.get_u16_le(),
     269            0 :                 t_cid: buf.get_u32_le(),
     270            0 :                 infobits_set: buf.get_u8(),
     271            0 :                 flags: buf.get_u8(),
     272            0 :             }
     273            0 :         }
     274              :     }
     275              : 
     276              :     #[repr(C)]
     277              :     #[derive(Debug)]
     278              :     pub struct XlHeapUpdate {
     279              :         pub old_xmax: TransactionId,
     280              :         pub old_offnum: OffsetNumber,
     281              :         pub old_infobits_set: u8,
     282              :         pub flags: u8,
     283              :         pub t_cid: u32,
     284              :         pub new_xmax: TransactionId,
     285              :         pub new_offnum: OffsetNumber,
     286              :     }
     287              : 
     288              :     impl XlHeapUpdate {
     289            8 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     290            8 :             XlHeapUpdate {
     291            8 :                 old_xmax: buf.get_u32_le(),
     292            8 :                 old_offnum: buf.get_u16_le(),
     293            8 :                 old_infobits_set: buf.get_u8(),
     294            8 :                 flags: buf.get_u8(),
     295            8 :                 t_cid: buf.get_u32_le(),
     296            8 :                 new_xmax: buf.get_u32_le(),
     297            8 :                 new_offnum: buf.get_u16_le(),
     298            8 :             }
     299            8 :         }
     300              :     }
     301              : 
     302              :     #[repr(C)]
     303              :     #[derive(Debug)]
     304              :     pub struct XlHeapLock {
     305              :         pub locking_xid: TransactionId,
     306              :         pub offnum: OffsetNumber,
     307              :         pub _padding: u16,
     308              :         pub t_cid: u32,
     309              :         pub infobits_set: u8,
     310              :         pub flags: u8,
     311              :     }
     312              : 
     313              :     impl XlHeapLock {
     314            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     315            0 :             XlHeapLock {
     316            0 :                 locking_xid: buf.get_u32_le(),
     317            0 :                 offnum: buf.get_u16_le(),
     318            0 :                 _padding: buf.get_u16_le(),
     319            0 :                 t_cid: buf.get_u32_le(),
     320            0 :                 infobits_set: buf.get_u8(),
     321            0 :                 flags: buf.get_u8(),
     322            0 :             }
     323            0 :         }
     324              :     }
     325              : 
     326              :     #[repr(C)]
     327              :     #[derive(Debug)]
     328              :     pub struct XlHeapLockUpdated {
     329              :         pub xmax: TransactionId,
     330              :         pub offnum: OffsetNumber,
     331              :         pub infobits_set: u8,
     332              :         pub flags: u8,
     333              :     }
     334              : 
     335              :     impl XlHeapLockUpdated {
     336            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
     337            0 :             XlHeapLockUpdated {
     338            0 :                 xmax: buf.get_u32_le(),
     339            0 :                 offnum: buf.get_u16_le(),
     340            0 :                 infobits_set: buf.get_u8(),
     341            0 :                 flags: buf.get_u8(),
     342            0 :             }
     343            0 :         }
     344              :     }
     345              : 
     346              :     #[repr(C)]
     347              :     #[derive(Debug)]
     348              :     pub struct XlParameterChange {
     349              :         pub max_connections: i32,
     350              :         pub max_worker_processes: i32,
     351              :         pub max_wal_senders: i32,
     352              :         pub max_prepared_xacts: i32,
     353              :         pub max_locks_per_xact: i32,
     354              :         pub wal_level: i32,
     355              :         pub wal_log_hints: bool,
     356              :         pub track_commit_timestamp: bool,
     357              :         pub _padding: [u8; 2],
     358              :     }
     359              : 
     360              :     impl XlParameterChange {
     361            0 :         pub fn decode(buf: &mut Bytes) -> XlParameterChange {
     362            0 :             XlParameterChange {
     363            0 :                 max_connections: buf.get_i32_le(),
     364            0 :                 max_worker_processes: buf.get_i32_le(),
     365            0 :                 max_wal_senders: buf.get_i32_le(),
     366            0 :                 max_prepared_xacts: buf.get_i32_le(),
     367            0 :                 max_locks_per_xact: buf.get_i32_le(),
     368            0 :                 wal_level: buf.get_i32_le(),
     369            0 :                 wal_log_hints: buf.get_u8() != 0,
     370            0 :                 track_commit_timestamp: buf.get_u8() != 0,
     371            0 :                 _padding: [buf.get_u8(), buf.get_u8()],
     372            0 :             }
     373            0 :         }
     374              :     }
     375              : }
     376              : 
     377              : pub mod v15 {
     378              :     pub use super::v14::{
     379              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
     380              :         XlParameterChange,
     381              :     };
     382              : }
     383              : 
     384              : pub mod v16 {
     385              :     pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
     386              :     use bytes::{Buf, Bytes};
     387              :     use postgres_ffi::{OffsetNumber, TransactionId};
     388              : 
     389              :     pub struct XlHeapDelete {
     390              :         pub xmax: TransactionId,
     391              :         pub offnum: OffsetNumber,
     392              :         pub infobits_set: u8,
     393              :         pub flags: u8,
     394              :     }
     395              : 
     396              :     impl XlHeapDelete {
     397            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     398            0 :             XlHeapDelete {
     399            0 :                 xmax: buf.get_u32_le(),
     400            0 :                 offnum: buf.get_u16_le(),
     401            0 :                 infobits_set: buf.get_u8(),
     402            0 :                 flags: buf.get_u8(),
     403            0 :             }
     404            0 :         }
     405              :     }
     406              : 
     407              :     #[repr(C)]
     408              :     #[derive(Debug)]
     409              :     pub struct XlHeapUpdate {
     410              :         pub old_xmax: TransactionId,
     411              :         pub old_offnum: OffsetNumber,
     412              :         pub old_infobits_set: u8,
     413              :         pub flags: u8,
     414              :         pub new_xmax: TransactionId,
     415              :         pub new_offnum: OffsetNumber,
     416              :     }
     417              : 
     418              :     impl XlHeapUpdate {
     419            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     420            0 :             XlHeapUpdate {
     421            0 :                 old_xmax: buf.get_u32_le(),
     422            0 :                 old_offnum: buf.get_u16_le(),
     423            0 :                 old_infobits_set: buf.get_u8(),
     424            0 :                 flags: buf.get_u8(),
     425            0 :                 new_xmax: buf.get_u32_le(),
     426            0 :                 new_offnum: buf.get_u16_le(),
     427            0 :             }
     428            0 :         }
     429              :     }
     430              : 
     431              :     #[repr(C)]
     432              :     #[derive(Debug)]
     433              :     pub struct XlHeapLock {
     434              :         pub locking_xid: TransactionId,
     435              :         pub offnum: OffsetNumber,
     436              :         pub infobits_set: u8,
     437              :         pub flags: u8,
     438              :     }
     439              : 
     440              :     impl XlHeapLock {
     441            0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     442            0 :             XlHeapLock {
     443            0 :                 locking_xid: buf.get_u32_le(),
     444            0 :                 offnum: buf.get_u16_le(),
     445            0 :                 infobits_set: buf.get_u8(),
     446            0 :                 flags: buf.get_u8(),
     447            0 :             }
     448            0 :         }
     449              :     }
     450              : 
     451              :     /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
     452              :     pub mod rm_neon {
     453              :         use bytes::{Buf, Bytes};
     454              :         use postgres_ffi::{OffsetNumber, TransactionId};
     455              : 
     456              :         #[repr(C)]
     457              :         #[derive(Debug)]
     458              :         pub struct XlNeonHeapInsert {
     459              :             pub offnum: OffsetNumber,
     460              :             pub flags: u8,
     461              :         }
     462              : 
     463              :         impl XlNeonHeapInsert {
     464            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
     465            0 :                 XlNeonHeapInsert {
     466            0 :                     offnum: buf.get_u16_le(),
     467            0 :                     flags: buf.get_u8(),
     468            0 :                 }
     469            0 :             }
     470              :         }
     471              : 
     472              :         #[repr(C)]
     473              :         #[derive(Debug)]
     474              :         pub struct XlNeonHeapMultiInsert {
     475              :             pub flags: u8,
     476              :             pub _padding: u8,
     477              :             pub ntuples: u16,
     478              :             pub t_cid: u32,
     479              :         }
     480              : 
     481              :         impl XlNeonHeapMultiInsert {
     482            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
     483            0 :                 XlNeonHeapMultiInsert {
     484            0 :                     flags: buf.get_u8(),
     485            0 :                     _padding: buf.get_u8(),
     486            0 :                     ntuples: buf.get_u16_le(),
     487            0 :                     t_cid: buf.get_u32_le(),
     488            0 :                 }
     489            0 :             }
     490              :         }
     491              : 
     492              :         #[repr(C)]
     493              :         #[derive(Debug)]
     494              :         pub struct XlNeonHeapDelete {
     495              :             pub xmax: TransactionId,
     496              :             pub offnum: OffsetNumber,
     497              :             pub infobits_set: u8,
     498              :             pub flags: u8,
     499              :             pub t_cid: u32,
     500              :         }
     501              : 
     502              :         impl XlNeonHeapDelete {
     503            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
     504            0 :                 XlNeonHeapDelete {
     505            0 :                     xmax: buf.get_u32_le(),
     506            0 :                     offnum: buf.get_u16_le(),
     507            0 :                     infobits_set: buf.get_u8(),
     508            0 :                     flags: buf.get_u8(),
     509            0 :                     t_cid: buf.get_u32_le(),
     510            0 :                 }
     511            0 :             }
     512              :         }
     513              : 
     514              :         #[repr(C)]
     515              :         #[derive(Debug)]
     516              :         pub struct XlNeonHeapUpdate {
     517              :             pub old_xmax: TransactionId,
     518              :             pub old_offnum: OffsetNumber,
     519              :             pub old_infobits_set: u8,
     520              :             pub flags: u8,
     521              :             pub t_cid: u32,
     522              :             pub new_xmax: TransactionId,
     523              :             pub new_offnum: OffsetNumber,
     524              :         }
     525              : 
     526              :         impl XlNeonHeapUpdate {
     527            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
     528            0 :                 XlNeonHeapUpdate {
     529            0 :                     old_xmax: buf.get_u32_le(),
     530            0 :                     old_offnum: buf.get_u16_le(),
     531            0 :                     old_infobits_set: buf.get_u8(),
     532            0 :                     flags: buf.get_u8(),
     533            0 :                     t_cid: buf.get_u32(),
     534            0 :                     new_xmax: buf.get_u32_le(),
     535            0 :                     new_offnum: buf.get_u16_le(),
     536            0 :                 }
     537            0 :             }
     538              :         }
     539              : 
     540              :         #[repr(C)]
     541              :         #[derive(Debug)]
     542              :         pub struct XlNeonHeapLock {
     543              :             pub locking_xid: TransactionId,
     544              :             pub t_cid: u32,
     545              :             pub offnum: OffsetNumber,
     546              :             pub infobits_set: u8,
     547              :             pub flags: u8,
     548              :         }
     549              : 
     550              :         impl XlNeonHeapLock {
     551            0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
     552            0 :                 XlNeonHeapLock {
     553            0 :                     locking_xid: buf.get_u32_le(),
     554            0 :                     t_cid: buf.get_u32_le(),
     555            0 :                     offnum: buf.get_u16_le(),
     556            0 :                     infobits_set: buf.get_u8(),
     557            0 :                     flags: buf.get_u8(),
     558            0 :                 }
     559            0 :             }
     560              :         }
     561              :     }
     562              : }
     563              : 
     564              : pub mod v17 {
     565              :     pub use super::v14::XlHeapLockUpdated;
     566              :     use bytes::{Buf, Bytes};
     567              :     pub use postgres_ffi::{TimeLineID, TimestampTz};
     568              : 
     569              :     pub use super::v16::rm_neon;
     570              :     pub use super::v16::{
     571              :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
     572              :     };
     573              : 
     574              :     #[repr(C)]
     575              :     #[derive(Debug)]
     576              :     pub struct XlEndOfRecovery {
     577              :         pub end_time: TimestampTz,
     578              :         pub this_time_line_id: TimeLineID,
     579              :         pub prev_time_line_id: TimeLineID,
     580              :         pub wal_level: i32,
     581              :     }
     582              : 
     583              :     impl XlEndOfRecovery {
     584            0 :         pub fn decode(buf: &mut Bytes) -> XlEndOfRecovery {
     585            0 :             XlEndOfRecovery {
     586            0 :                 end_time: buf.get_i64_le(),
     587            0 :                 this_time_line_id: buf.get_u32_le(),
     588            0 :                 prev_time_line_id: buf.get_u32_le(),
     589            0 :                 wal_level: buf.get_i32_le(),
     590            0 :             }
     591            0 :         }
     592              :     }
     593              : }
     594              : 
     595              : #[repr(C)]
     596              : #[derive(Debug)]
     597              : pub struct XlSmgrCreate {
     598              :     pub rnode: RelFileNode,
     599              :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     600              :     // well-defined size?
     601              :     pub forknum: u8,
     602              : }
     603              : 
     604              : impl XlSmgrCreate {
     605           16 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     606           16 :         XlSmgrCreate {
     607           16 :             rnode: RelFileNode {
     608           16 :                 spcnode: buf.get_u32_le(), /* tablespace */
     609           16 :                 dbnode: buf.get_u32_le(),  /* database */
     610           16 :                 relnode: buf.get_u32_le(), /* relation */
     611           16 :             },
     612           16 :             forknum: buf.get_u32_le() as u8,
     613           16 :         }
     614           16 :     }
     615              : }
     616              : 
     617              : #[repr(C)]
     618              : #[derive(Debug)]
     619              : pub struct XlSmgrTruncate {
     620              :     pub blkno: BlockNumber,
     621              :     pub rnode: RelFileNode,
     622              :     pub flags: u32,
     623              : }
     624              : 
     625              : impl XlSmgrTruncate {
     626            0 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     627            0 :         XlSmgrTruncate {
     628            0 :             blkno: buf.get_u32_le(),
     629            0 :             rnode: RelFileNode {
     630            0 :                 spcnode: buf.get_u32_le(), /* tablespace */
     631            0 :                 dbnode: buf.get_u32_le(),  /* database */
     632            0 :                 relnode: buf.get_u32_le(), /* relation */
     633            0 :             },
     634            0 :             flags: buf.get_u32_le(),
     635            0 :         }
     636            0 :     }
     637              : }
     638              : 
     639              : #[repr(C)]
     640              : #[derive(Debug)]
     641              : pub struct XlCreateDatabase {
     642              :     pub db_id: Oid,
     643              :     pub tablespace_id: Oid,
     644              :     pub src_db_id: Oid,
     645              :     pub src_tablespace_id: Oid,
     646              : }
     647              : 
     648              : impl XlCreateDatabase {
     649            0 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     650            0 :         XlCreateDatabase {
     651            0 :             db_id: buf.get_u32_le(),
     652            0 :             tablespace_id: buf.get_u32_le(),
     653            0 :             src_db_id: buf.get_u32_le(),
     654            0 :             src_tablespace_id: buf.get_u32_le(),
     655            0 :         }
     656            0 :     }
     657              : }
     658              : 
     659              : #[repr(C)]
     660              : #[derive(Debug)]
     661              : pub struct XlDropDatabase {
     662              :     pub db_id: Oid,
     663              :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     664              :     pub tablespace_ids: Vec<Oid>,
     665              : }
     666              : 
     667              : impl XlDropDatabase {
     668            0 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     669            0 :         let mut rec = XlDropDatabase {
     670            0 :             db_id: buf.get_u32_le(),
     671            0 :             n_tablespaces: buf.get_u32_le(),
     672            0 :             tablespace_ids: Vec::<Oid>::new(),
     673            0 :         };
     674              : 
     675            0 :         for _i in 0..rec.n_tablespaces {
     676            0 :             let id = buf.get_u32_le();
     677            0 :             rec.tablespace_ids.push(id);
     678            0 :         }
     679              : 
     680            0 :         rec
     681            0 :     }
     682              : }
     683              : 
     684              : ///
     685              : /// Note: Parsing some fields is missing, because they're not needed.
     686              : ///
     687              : /// This is similar to the xl_xact_parsed_commit and
     688              : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     689              : /// struct for commits and aborts.
     690              : ///
     691              : #[derive(Debug)]
     692              : pub struct XlXactParsedRecord {
     693              :     pub xid: TransactionId,
     694              :     pub info: u8,
     695              :     pub xact_time: TimestampTz,
     696              :     pub xinfo: u32,
     697              : 
     698              :     pub db_id: Oid,
     699              :     /* MyDatabaseId */
     700              :     pub ts_id: Oid,
     701              :     /* MyDatabaseTableSpace */
     702              :     pub subxacts: Vec<TransactionId>,
     703              : 
     704              :     pub xnodes: Vec<RelFileNode>,
     705              :     pub origin_lsn: Lsn,
     706              : }
     707              : 
     708              : impl XlXactParsedRecord {
     709              :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
     710              :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
     711              :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
     712            8 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
     713            8 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
     714            8 :         // The record starts with time of commit/abort
     715            8 :         let xact_time = buf.get_i64_le();
     716            8 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
     717            8 :             buf.get_u32_le()
     718              :         } else {
     719            0 :             0
     720              :         };
     721              :         let db_id;
     722              :         let ts_id;
     723            8 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
     724            8 :             db_id = buf.get_u32_le();
     725            8 :             ts_id = buf.get_u32_le();
     726            8 :         } else {
     727            0 :             db_id = 0;
     728            0 :             ts_id = 0;
     729            0 :         }
     730            8 :         let mut subxacts = Vec::<TransactionId>::new();
     731            8 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
     732            0 :             let nsubxacts = buf.get_i32_le();
     733            0 :             for _i in 0..nsubxacts {
     734            0 :                 let subxact = buf.get_u32_le();
     735            0 :                 subxacts.push(subxact);
     736            0 :             }
     737            8 :         }
     738            8 :         let mut xnodes = Vec::<RelFileNode>::new();
     739            8 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
     740            0 :             let nrels = buf.get_i32_le();
     741            0 :             for _i in 0..nrels {
     742            0 :                 let spcnode = buf.get_u32_le();
     743            0 :                 let dbnode = buf.get_u32_le();
     744            0 :                 let relnode = buf.get_u32_le();
     745            0 :                 trace!(
     746            0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
     747              :                     spcnode,
     748              :                     dbnode,
     749              :                     relnode
     750              :                 );
     751            0 :                 xnodes.push(RelFileNode {
     752            0 :                     spcnode,
     753            0 :                     dbnode,
     754            0 :                     relnode,
     755            0 :                 });
     756              :             }
     757            8 :         }
     758              : 
     759            8 :         if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
     760            0 :             let nitems = buf.get_i32_le();
     761            0 :             debug!(
     762            0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
     763              :                 nitems
     764              :             );
     765            0 :             let sizeof_xl_xact_stats_item = 12;
     766            0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
     767            8 :         }
     768              : 
     769            8 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
     770            8 :             let nmsgs = buf.get_i32_le();
     771            8 :             let sizeof_shared_invalidation_message = 16;
     772            8 :             buf.advance(
     773            8 :                 (nmsgs * sizeof_shared_invalidation_message)
     774            8 :                     .try_into()
     775            8 :                     .unwrap(),
     776            8 :             );
     777            8 :         }
     778              : 
     779            8 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
     780            0 :             xid = buf.get_u32_le();
     781            0 :             debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
     782            8 :         }
     783              : 
     784            8 :         let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 {
     785            0 :             Lsn(buf.get_u64_le())
     786              :         } else {
     787            8 :             Lsn::INVALID
     788              :         };
     789            8 :         XlXactParsedRecord {
     790            8 :             xid,
     791            8 :             info,
     792            8 :             xact_time,
     793            8 :             xinfo,
     794            8 :             db_id,
     795            8 :             ts_id,
     796            8 :             subxacts,
     797            8 :             xnodes,
     798            8 :             origin_lsn,
     799            8 :         }
     800            8 :     }
     801              : }
     802              : 
     803              : #[repr(C)]
     804              : #[derive(Debug)]
     805              : pub struct XlClogTruncate {
     806              :     pub pageno: u32,
     807              :     pub oldest_xid: TransactionId,
     808              :     pub oldest_xid_db: Oid,
     809              : }
     810              : 
     811              : impl XlClogTruncate {
     812            0 :     pub fn decode(buf: &mut Bytes, pg_version: u32) -> XlClogTruncate {
     813            0 :         XlClogTruncate {
     814            0 :             pageno: if pg_version < 17 {
     815            0 :                 buf.get_u32_le()
     816              :             } else {
     817            0 :                 buf.get_u64_le() as u32
     818              :             },
     819            0 :             oldest_xid: buf.get_u32_le(),
     820            0 :             oldest_xid_db: buf.get_u32_le(),
     821            0 :         }
     822            0 :     }
     823              : }
     824              : 
     825              : #[repr(C)]
     826            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     827              : pub struct MultiXactMember {
     828              :     pub xid: TransactionId,
     829              :     pub status: MultiXactStatus,
     830              : }
     831              : 
     832              : impl MultiXactMember {
     833            0 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     834            0 :         MultiXactMember {
     835            0 :             xid: buf.get_u32_le(),
     836            0 :             status: buf.get_u32_le(),
     837            0 :         }
     838            0 :     }
     839              : }
     840              : 
     841              : #[repr(C)]
     842              : #[derive(Debug)]
     843              : pub struct XlMultiXactCreate {
     844              :     pub mid: MultiXactId,
     845              :     /* new MultiXact's ID */
     846              :     pub moff: MultiXactOffset,
     847              :     /* its starting offset in members file */
     848              :     pub nmembers: u32,
     849              :     /* number of member XIDs */
     850              :     pub members: Vec<MultiXactMember>,
     851              : }
     852              : 
     853              : impl XlMultiXactCreate {
     854            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
     855            0 :         let mid = buf.get_u32_le();
     856            0 :         let moff = buf.get_u32_le();
     857            0 :         let nmembers = buf.get_u32_le();
     858            0 :         let mut members = Vec::new();
     859            0 :         for _ in 0..nmembers {
     860            0 :             members.push(MultiXactMember::decode(buf));
     861            0 :         }
     862            0 :         XlMultiXactCreate {
     863            0 :             mid,
     864            0 :             moff,
     865            0 :             nmembers,
     866            0 :             members,
     867            0 :         }
     868            0 :     }
     869              : }
     870              : 
     871              : #[repr(C)]
     872              : #[derive(Debug)]
     873              : pub struct XlMultiXactTruncate {
     874              :     pub oldest_multi_db: Oid,
     875              :     /* to-be-truncated range of multixact offsets */
     876              :     pub start_trunc_off: MultiXactId,
     877              :     /* just for completeness' sake */
     878              :     pub end_trunc_off: MultiXactId,
     879              : 
     880              :     /* to-be-truncated range of multixact members */
     881              :     pub start_trunc_memb: MultiXactOffset,
     882              :     pub end_trunc_memb: MultiXactOffset,
     883              : }
     884              : 
     885              : impl XlMultiXactTruncate {
     886            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
     887            0 :         XlMultiXactTruncate {
     888            0 :             oldest_multi_db: buf.get_u32_le(),
     889            0 :             start_trunc_off: buf.get_u32_le(),
     890            0 :             end_trunc_off: buf.get_u32_le(),
     891            0 :             start_trunc_memb: buf.get_u32_le(),
     892            0 :             end_trunc_memb: buf.get_u32_le(),
     893            0 :         }
     894            0 :     }
     895              : }
     896              : 
     897              : #[repr(C)]
     898              : #[derive(Debug)]
     899              : pub struct XlLogicalMessage {
     900              :     pub db_id: Oid,
     901              :     pub transactional: bool,
     902              :     pub prefix_size: usize,
     903              :     pub message_size: usize,
     904              : }
     905              : 
     906              : impl XlLogicalMessage {
     907            0 :     pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
     908            0 :         XlLogicalMessage {
     909            0 :             db_id: buf.get_u32_le(),
     910            0 :             transactional: buf.get_u32_le() != 0, // 4-bytes alignment
     911            0 :             prefix_size: buf.get_u64_le() as usize,
     912            0 :             message_size: buf.get_u64_le() as usize,
     913            0 :         }
     914            0 :     }
     915              : }
     916              : 
     917              : #[repr(C)]
     918              : #[derive(Debug)]
     919              : pub struct XlRunningXacts {
     920              :     pub xcnt: u32,
     921              :     pub subxcnt: u32,
     922              :     pub subxid_overflow: bool,
     923              :     pub next_xid: TransactionId,
     924              :     pub oldest_running_xid: TransactionId,
     925              :     pub latest_completed_xid: TransactionId,
     926              :     pub xids: Vec<TransactionId>,
     927              : }
     928              : 
     929              : impl XlRunningXacts {
     930            0 :     pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
     931            0 :         let xcnt = buf.get_u32_le();
     932            0 :         let subxcnt = buf.get_u32_le();
     933            0 :         let subxid_overflow = buf.get_u32_le() != 0;
     934            0 :         let next_xid = buf.get_u32_le();
     935            0 :         let oldest_running_xid = buf.get_u32_le();
     936            0 :         let latest_completed_xid = buf.get_u32_le();
     937            0 :         let mut xids = Vec::new();
     938            0 :         for _ in 0..(xcnt + subxcnt) {
     939            0 :             xids.push(buf.get_u32_le());
     940            0 :         }
     941            0 :         XlRunningXacts {
     942            0 :             xcnt,
     943            0 :             subxcnt,
     944            0 :             subxid_overflow,
     945            0 :             next_xid,
     946            0 :             oldest_running_xid,
     947            0 :             latest_completed_xid,
     948            0 :             xids,
     949            0 :         }
     950            0 :     }
     951              : }
     952              : 
     953              : #[repr(C)]
     954              : #[derive(Debug)]
     955              : pub struct XlReploriginDrop {
     956              :     pub node_id: RepOriginId,
     957              : }
     958              : 
     959              : impl XlReploriginDrop {
     960            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginDrop {
     961            0 :         XlReploriginDrop {
     962            0 :             node_id: buf.get_u16_le(),
     963            0 :         }
     964            0 :     }
     965              : }
     966              : 
     967              : #[repr(C)]
     968              : #[derive(Debug)]
     969              : pub struct XlReploriginSet {
     970              :     pub remote_lsn: Lsn,
     971              :     pub node_id: RepOriginId,
     972              : }
     973              : 
     974              : impl XlReploriginSet {
     975            0 :     pub fn decode(buf: &mut Bytes) -> XlReploriginSet {
     976            0 :         XlReploriginSet {
     977            0 :             remote_lsn: Lsn(buf.get_u64_le()),
     978            0 :             node_id: buf.get_u16_le(),
     979            0 :         }
     980            0 :     }
     981              : }
     982              : 
     983              : /// Main routine to decode a WAL record and figure out which blocks are modified
     984              : //
     985              : // See xlogrecord.h for details
     986              : // The overall layout of an XLOG record is:
     987              : //              Fixed-size header (XLogRecord struct)
     988              : //      XLogRecordBlockHeader struct
     989              : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     990              : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     991              : //                 XLogRecordBlockCompressHeader struct follows.
     992              : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     993              : //          BlockNumber follows
     994              : //      XLogRecordBlockHeader struct
     995              : //      ...
     996              : //      XLogRecordDataHeader[Short|Long] struct
     997              : //      block data
     998              : //      block data
     999              : //      ...
    1000              : //      main data
    1001              : //
    1002              : //
    1003              : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
    1004              : // It would be more natural for this function to return a DecodedWALRecord as return value,
    1005              : // but reusing the caller-supplied struct avoids an allocation.
    1006              : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
    1007              : //
    1008       145852 : pub fn decode_wal_record(
    1009       145852 :     record: Bytes,
    1010       145852 :     decoded: &mut DecodedWALRecord,
    1011       145852 :     pg_version: u32,
    1012       145852 : ) -> Result<()> {
    1013       145852 :     let mut rnode_spcnode: u32 = 0;
    1014       145852 :     let mut rnode_dbnode: u32 = 0;
    1015       145852 :     let mut rnode_relnode: u32 = 0;
    1016       145852 :     let mut got_rnode = false;
    1017       145852 :     let mut origin_id: u16 = 0;
    1018       145852 : 
    1019       145852 :     let mut buf = record.clone();
    1020              : 
    1021              :     // 1. Parse XLogRecord struct
    1022              : 
    1023              :     // FIXME: assume little-endian here
    1024       145852 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1025              : 
    1026       145852 :     trace!(
    1027            0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
    1028              :         xlogrec.xl_rmid,
    1029              :         xlogrec.xl_info
    1030              :     );
    1031              : 
    1032       145852 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
    1033       145852 : 
    1034       145852 :     if buf.remaining() != remaining {
    1035            0 :         //TODO error
    1036       145852 :     }
    1037              : 
    1038       145852 :     let mut max_block_id = 0;
    1039       145852 :     let mut blocks_total_len: u32 = 0;
    1040       145852 :     let mut main_data_len = 0;
    1041       145852 :     let mut datatotal: u32 = 0;
    1042       145852 :     decoded.blocks.clear();
    1043              : 
    1044              :     // 2. Decode the headers.
    1045              :     // XLogRecordBlockHeaders if any,
    1046              :     // XLogRecordDataHeader[Short|Long]
    1047       437322 :     while buf.remaining() > datatotal as usize {
    1048       291470 :         let block_id = buf.get_u8();
    1049       291470 : 
    1050       291470 :         match block_id {
    1051       145812 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
    1052       145812 :                 /* XLogRecordDataHeaderShort */
    1053       145812 :                 main_data_len = buf.get_u8() as u32;
    1054       145812 :                 datatotal += main_data_len;
    1055       145812 :             }
    1056              : 
    1057           16 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
    1058           16 :                 /* XLogRecordDataHeaderLong */
    1059           16 :                 main_data_len = buf.get_u32_le();
    1060           16 :                 datatotal += main_data_len;
    1061           16 :             }
    1062              : 
    1063            0 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
    1064            0 :                 // RepOriginId is uint16
    1065            0 :                 origin_id = buf.get_u16_le();
    1066            0 :             }
    1067              : 
    1068            0 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
    1069            0 :                 // TransactionId is uint32
    1070            0 :                 buf.advance(4);
    1071            0 :             }
    1072              : 
    1073       145642 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
    1074              :                 /* XLogRecordBlockHeader */
    1075       145642 :                 let mut blk = DecodedBkpBlock::new();
    1076       145642 : 
    1077       145642 :                 if block_id <= max_block_id {
    1078       145642 :                     // TODO
    1079       145642 :                     //report_invalid_record(state,
    1080       145642 :                     //                    "out-of-order block_id %u at %X/%X",
    1081       145642 :                     //                    block_id,
    1082       145642 :                     //                    (uint32) (state->ReadRecPtr >> 32),
    1083       145642 :                     //                    (uint32) state->ReadRecPtr);
    1084       145642 :                     //    goto err;
    1085       145642 :                 }
    1086       145642 :                 max_block_id = block_id;
    1087       145642 : 
    1088       145642 :                 let fork_flags: u8 = buf.get_u8();
    1089       145642 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
    1090       145642 :                 blk.flags = fork_flags;
    1091       145642 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
    1092       145642 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
    1093       145642 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
    1094       145642 :                 blk.data_len = buf.get_u16_le();
    1095       145642 : 
    1096       145642 :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
    1097       145642 : 
    1098       145642 :                 datatotal += blk.data_len as u32;
    1099       145642 :                 blocks_total_len += blk.data_len as u32;
    1100       145642 : 
    1101       145642 :                 if blk.has_image {
    1102           60 :                     blk.bimg_len = buf.get_u16_le();
    1103           60 :                     blk.hole_offset = buf.get_u16_le();
    1104           60 :                     blk.bimg_info = buf.get_u8();
    1105              : 
    1106            0 :                     blk.apply_image = dispatch_pgversion!(
    1107           60 :                         pg_version,
    1108            0 :                         (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
    1109              :                     );
    1110              : 
    1111           60 :                     let blk_img_is_compressed =
    1112           60 :                         postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version);
    1113           60 : 
    1114           60 :                     if blk_img_is_compressed {
    1115            0 :                         debug!("compressed block image , pg_version = {}", pg_version);
    1116           60 :                     }
    1117              : 
    1118           60 :                     if blk_img_is_compressed {
    1119            0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
    1120            0 :                             blk.hole_length = buf.get_u16_le();
    1121            0 :                         } else {
    1122            0 :                             blk.hole_length = 0;
    1123            0 :                         }
    1124           60 :                     } else {
    1125           60 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
    1126           60 :                     }
    1127           60 :                     datatotal += blk.bimg_len as u32;
    1128           60 :                     blocks_total_len += blk.bimg_len as u32;
    1129           60 : 
    1130           60 :                     /*
    1131           60 :                      * cross-check that hole_offset > 0, hole_length > 0 and
    1132           60 :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
    1133           60 :                      */
    1134           60 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
    1135           36 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
    1136            0 :                     {
    1137            0 :                         // TODO
    1138            0 :                         /*
    1139            0 :                         report_invalid_record(state,
    1140            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
    1141            0 :                                       (unsigned int) blk->hole_offset,
    1142            0 :                                       (unsigned int) blk->hole_length,
    1143            0 :                                       (unsigned int) blk->bimg_len,
    1144            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1145            0 :                         goto err;
    1146            0 :                                      */
    1147           60 :                     }
    1148              : 
    1149              :                     /*
    1150              :                      * cross-check that hole_offset == 0 and hole_length == 0 if
    1151              :                      * the HAS_HOLE flag is not set.
    1152              :                      */
    1153           60 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
    1154           24 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
    1155            0 :                     {
    1156            0 :                         // TODO
    1157            0 :                         /*
    1158            0 :                         report_invalid_record(state,
    1159            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
    1160            0 :                                       (unsigned int) blk->hole_offset,
    1161            0 :                                       (unsigned int) blk->hole_length,
    1162            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1163            0 :                         goto err;
    1164            0 :                                      */
    1165           60 :                     }
    1166              : 
    1167              :                     /*
    1168              :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
    1169              :                      * flag is set.
    1170              :                      */
    1171           60 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
    1172           24 :                         // TODO
    1173           24 :                         /*
    1174           24 :                         report_invalid_record(state,
    1175           24 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
    1176           24 :                                       (unsigned int) blk->bimg_len,
    1177           24 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1178           24 :                         goto err;
    1179           24 :                                      */
    1180           36 :                     }
    1181              : 
    1182              :                     /*
    1183              :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
    1184              :                      * IS_COMPRESSED flag is set.
    1185              :                      */
    1186           60 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
    1187           24 :                         && !blk_img_is_compressed
    1188           24 :                         && blk.bimg_len != BLCKSZ
    1189            0 :                     {
    1190            0 :                         // TODO
    1191            0 :                         /*
    1192            0 :                         report_invalid_record(state,
    1193            0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
    1194            0 :                                       (unsigned int) blk->data_len,
    1195            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1196            0 :                         goto err;
    1197            0 :                                      */
    1198           60 :                     }
    1199       145582 :                 }
    1200       145642 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
    1201       145642 :                     rnode_spcnode = buf.get_u32_le();
    1202       145642 :                     rnode_dbnode = buf.get_u32_le();
    1203       145642 :                     rnode_relnode = buf.get_u32_le();
    1204       145642 :                     got_rnode = true;
    1205       145642 :                 } else if !got_rnode {
    1206            0 :                     // TODO
    1207            0 :                     /*
    1208            0 :                     report_invalid_record(state,
    1209            0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
    1210            0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
    1211            0 :                     goto err;           */
    1212            0 :                 }
    1213              : 
    1214       145642 :                 blk.rnode_spcnode = rnode_spcnode;
    1215       145642 :                 blk.rnode_dbnode = rnode_dbnode;
    1216       145642 :                 blk.rnode_relnode = rnode_relnode;
    1217       145642 : 
    1218       145642 :                 blk.blkno = buf.get_u32_le();
    1219       145642 :                 trace!(
    1220            0 :                     "this record affects {}/{}/{} blk {}",
    1221              :                     rnode_spcnode,
    1222              :                     rnode_dbnode,
    1223              :                     rnode_relnode,
    1224              :                     blk.blkno
    1225              :                 );
    1226              : 
    1227       145642 :                 decoded.blocks.push(blk);
    1228              :             }
    1229              : 
    1230            0 :             _ => {
    1231            0 :                 // TODO: invalid block_id
    1232            0 :             }
    1233              :         }
    1234              :     }
    1235              : 
    1236              :     // 3. Decode blocks.
    1237       145852 :     let mut ptr = record.len() - buf.remaining();
    1238       145852 :     for blk in decoded.blocks.iter_mut() {
    1239       145642 :         if blk.has_image {
    1240           60 :             blk.bimg_offset = ptr as u32;
    1241           60 :             ptr += blk.bimg_len as usize;
    1242       145582 :         }
    1243       145642 :         if blk.has_data {
    1244       145582 :             ptr += blk.data_len as usize;
    1245       145582 :         }
    1246              :     }
    1247              :     // We don't need them, so just skip blocks_total_len bytes
    1248       145852 :     buf.advance(blocks_total_len as usize);
    1249       145852 :     assert_eq!(ptr, record.len() - buf.remaining());
    1250              : 
    1251       145852 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
    1252       145852 : 
    1253       145852 :     // 4. Decode main_data
    1254       145852 :     if main_data_len > 0 {
    1255       145828 :         assert_eq!(buf.remaining(), main_data_len as usize);
    1256           24 :     }
    1257              : 
    1258       145852 :     decoded.xl_xid = xlogrec.xl_xid;
    1259       145852 :     decoded.xl_info = xlogrec.xl_info;
    1260       145852 :     decoded.xl_rmid = xlogrec.xl_rmid;
    1261       145852 :     decoded.record = record;
    1262       145852 :     decoded.origin_id = origin_id;
    1263       145852 :     decoded.main_data_offset = main_data_offset;
    1264       145852 : 
    1265       145852 :     Ok(())
    1266       145852 : }
    1267              : 
    1268              : ///
    1269              : /// Build a human-readable string to describe a WAL record
    1270              : ///
    1271              : /// For debugging purposes
    1272            0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
    1273            0 :     match rec {
    1274            0 :         NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
    1275            0 :             "will_init: {}, {}",
    1276            0 :             will_init,
    1277            0 :             describe_postgres_wal_record(rec)?
    1278              :         )),
    1279            0 :         _ => Ok(format!("{:?}", rec)),
    1280              :     }
    1281            0 : }
    1282              : 
    1283            0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
    1284            0 :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
    1285            0 :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
    1286            0 :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
    1287            0 :     // without worrying about security?
    1288            0 :     //
    1289            0 :     // But for now, we have a hand-written code for a few common WAL record types here.
    1290            0 : 
    1291            0 :     let mut buf = record.clone();
    1292              : 
    1293              :     // 1. Parse XLogRecord struct
    1294              : 
    1295              :     // FIXME: assume little-endian here
    1296            0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1297              : 
    1298              :     let unknown_str: String;
    1299              : 
    1300            0 :     let result: &str = match xlogrec.xl_rmid {
    1301              :         pg_constants::RM_HEAP2_ID => {
    1302            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1303            0 :             match info {
    1304            0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
    1305            0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
    1306              :                 _ => {
    1307            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1308            0 :                     &unknown_str
    1309              :                 }
    1310              :             }
    1311              :         }
    1312              :         pg_constants::RM_HEAP_ID => {
    1313            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1314            0 :             match info {
    1315            0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
    1316            0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
    1317            0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
    1318            0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
    1319              :                 _ => {
    1320            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1321            0 :                     &unknown_str
    1322              :                 }
    1323              :             }
    1324              :         }
    1325              :         pg_constants::RM_XLOG_ID => {
    1326            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1327            0 :             match info {
    1328            0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
    1329            0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
    1330              :                 _ => {
    1331            0 :                     unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
    1332            0 :                     &unknown_str
    1333              :                 }
    1334              :             }
    1335              :         }
    1336            0 :         rmid => {
    1337            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1338            0 : 
    1339            0 :             unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
    1340            0 :             &unknown_str
    1341              :         }
    1342              :     };
    1343              : 
    1344            0 :     Ok(String::from(result))
    1345            0 : }
        

Generated by: LCOV version 2.1-beta