LCOV - code coverage report
Current view: top level - pageserver/src - walrecord.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 69.1 % 476 329
Test Date: 2023-09-06 10:18:01 Functions: 37.0 % 81 30

            Line data    Source code
       1              : //!
       2              : //! Functions for parsing WAL records.
       3              : //!
       4              : 
       5              : use anyhow::Result;
       6              : use bytes::{Buf, Bytes};
       7              : use postgres_ffi::pg_constants;
       8              : use postgres_ffi::BLCKSZ;
       9              : use postgres_ffi::{BlockNumber, OffsetNumber, TimestampTz};
      10              : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
      11              : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
      12              : use serde::{Deserialize, Serialize};
      13              : use tracing::*;
      14              : use utils::bin_ser::DeserializeError;
      15              : 
      16              : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
      17              : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
      18    878685652 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
      19              : pub enum NeonWalRecord {
      20              :     /// Native PostgreSQL WAL record
      21              :     Postgres { will_init: bool, rec: Bytes },
      22              : 
      23              :     /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
      24              :     ClearVisibilityMapFlags {
      25              :         new_heap_blkno: Option<u32>,
      26              :         old_heap_blkno: Option<u32>,
      27              :         flags: u8,
      28              :     },
      29              :     /// Mark transaction IDs as committed on a CLOG page
      30              :     ClogSetCommitted {
      31              :         xids: Vec<TransactionId>,
      32              :         timestamp: TimestampTz,
      33              :     },
      34              :     /// Mark transaction IDs as aborted on a CLOG page
      35              :     ClogSetAborted { xids: Vec<TransactionId> },
      36              :     /// Extend multixact offsets SLRU
      37              :     MultixactOffsetCreate {
      38              :         mid: MultiXactId,
      39              :         moff: MultiXactOffset,
      40              :     },
      41              :     /// Extend multixact members SLRU.
      42              :     MultixactMembersCreate {
      43              :         moff: MultiXactOffset,
      44              :         members: Vec<MultiXactMember>,
      45              :     },
      46              : }
      47              : 
      48              : impl NeonWalRecord {
      49              :     /// Does replaying this WAL record initialize the page from scratch, or does
      50              :     /// it need to be applied over the previous image of the page?
      51    294763596 :     pub fn will_init(&self) -> bool {
      52    294763596 :         match self {
      53    265556861 :             NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
      54              : 
      55              :             // None of the special neon record types currently initialize the page
      56     29206735 :             _ => false,
      57              :         }
      58    294763596 :     }
      59              : }
      60              : 
      61              : /// DecodedBkpBlock represents per-page data contained in a WAL record.
      62     75336586 : #[derive(Default)]
      63              : pub struct DecodedBkpBlock {
      64              :     /* Is this block ref in use? */
      65              :     //in_use: bool,
      66              : 
      67              :     /* Identify the block this refers to */
      68              :     pub rnode_spcnode: u32,
      69              :     pub rnode_dbnode: u32,
      70              :     pub rnode_relnode: u32,
      71              :     // Note that we have a few special forknum values for non-rel files.
      72              :     pub forknum: u8,
      73              :     pub blkno: u32,
      74              : 
      75              :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
      76              :     pub flags: u8,
      77              : 
      78              :     /* Information on full-page image, if any */
      79              :     pub has_image: bool,   /* has image, even for consistency checking */
      80              :     pub apply_image: bool, /* has image that should be restored */
      81              :     pub will_init: bool,   /* record doesn't need previous page version to apply */
      82              :     //char         *bkp_image;
      83              :     pub hole_offset: u16,
      84              :     pub hole_length: u16,
      85              :     pub bimg_offset: u32,
      86              :     pub bimg_len: u16,
      87              :     pub bimg_info: u8,
      88              : 
      89              :     /* Buffer holding the rmgr-specific data associated with this block */
      90              :     has_data: bool,
      91              :     data_len: u16,
      92              : }
      93              : 
      94              : impl DecodedBkpBlock {
      95     75336583 :     pub fn new() -> DecodedBkpBlock {
      96     75336583 :         Default::default()
      97     75336583 :     }
      98              : }
      99              : 
     100       733013 : #[derive(Default)]
     101              : pub struct DecodedWALRecord {
     102              :     pub xl_xid: TransactionId,
     103              :     pub xl_info: u8,
     104              :     pub xl_rmid: u8,
     105              :     pub record: Bytes, // raw XLogRecord
     106              : 
     107              :     pub blocks: Vec<DecodedBkpBlock>,
     108              :     pub main_data_offset: usize,
     109              : }
     110              : 
     111              : #[repr(C)]
     112            0 : #[derive(Debug, Clone, Copy)]
     113              : pub struct RelFileNode {
     114              :     pub spcnode: Oid, /* tablespace */
     115              :     pub dbnode: Oid,  /* database */
     116              :     pub relnode: Oid, /* relation */
     117              : }
     118              : 
     119              : #[repr(C)]
     120            0 : #[derive(Debug)]
     121              : pub struct XlRelmapUpdate {
     122              :     pub dbid: Oid,   /* database ID, or 0 for shared map */
     123              :     pub tsid: Oid,   /* database's tablespace, or pg_global */
     124              :     pub nbytes: i32, /* size of relmap data */
     125              : }
     126              : 
     127              : impl XlRelmapUpdate {
     128           45 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
     129           45 :         XlRelmapUpdate {
     130           45 :             dbid: buf.get_u32_le(),
     131           45 :             tsid: buf.get_u32_le(),
     132           45 :             nbytes: buf.get_i32_le(),
     133           45 :         }
     134           45 :     }
     135              : }
     136              : 
     137              : #[repr(C)]
     138            0 : #[derive(Debug)]
     139              : pub struct XlSmgrCreate {
     140              :     pub rnode: RelFileNode,
     141              :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     142              :     // well-defined size?
     143              :     pub forknum: u8,
     144              : }
     145              : 
     146              : impl XlSmgrCreate {
     147        21950 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     148        21950 :         XlSmgrCreate {
     149        21950 :             rnode: RelFileNode {
     150        21950 :                 spcnode: buf.get_u32_le(), /* tablespace */
     151        21950 :                 dbnode: buf.get_u32_le(),  /* database */
     152        21950 :                 relnode: buf.get_u32_le(), /* relation */
     153        21950 :             },
     154        21950 :             forknum: buf.get_u32_le() as u8,
     155        21950 :         }
     156        21950 :     }
     157              : }
     158              : 
     159              : #[repr(C)]
     160            0 : #[derive(Debug)]
     161              : pub struct XlSmgrTruncate {
     162              :     pub blkno: BlockNumber,
     163              :     pub rnode: RelFileNode,
     164              :     pub flags: u32,
     165              : }
     166              : 
     167              : impl XlSmgrTruncate {
     168           45 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     169           45 :         XlSmgrTruncate {
     170           45 :             blkno: buf.get_u32_le(),
     171           45 :             rnode: RelFileNode {
     172           45 :                 spcnode: buf.get_u32_le(), /* tablespace */
     173           45 :                 dbnode: buf.get_u32_le(),  /* database */
     174           45 :                 relnode: buf.get_u32_le(), /* relation */
     175           45 :             },
     176           45 :             flags: buf.get_u32_le(),
     177           45 :         }
     178           45 :     }
     179              : }
     180              : 
     181              : #[repr(C)]
     182            0 : #[derive(Debug)]
     183              : pub struct XlCreateDatabase {
     184              :     pub db_id: Oid,
     185              :     pub tablespace_id: Oid,
     186              :     pub src_db_id: Oid,
     187              :     pub src_tablespace_id: Oid,
     188              : }
     189              : 
     190              : impl XlCreateDatabase {
     191           12 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     192           12 :         XlCreateDatabase {
     193           12 :             db_id: buf.get_u32_le(),
     194           12 :             tablespace_id: buf.get_u32_le(),
     195           12 :             src_db_id: buf.get_u32_le(),
     196           12 :             src_tablespace_id: buf.get_u32_le(),
     197           12 :         }
     198           12 :     }
     199              : }
     200              : 
     201              : #[repr(C)]
     202            0 : #[derive(Debug)]
     203              : pub struct XlDropDatabase {
     204              :     pub db_id: Oid,
     205              :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     206              :     pub tablespace_ids: Vec<Oid>,
     207              : }
     208              : 
     209              : impl XlDropDatabase {
     210            2 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     211            2 :         let mut rec = XlDropDatabase {
     212            2 :             db_id: buf.get_u32_le(),
     213            2 :             n_tablespaces: buf.get_u32_le(),
     214            2 :             tablespace_ids: Vec::<Oid>::new(),
     215            2 :         };
     216              : 
     217            2 :         for _i in 0..rec.n_tablespaces {
     218            2 :             let id = buf.get_u32_le();
     219            2 :             rec.tablespace_ids.push(id);
     220            2 :         }
     221              : 
     222            2 :         rec
     223            2 :     }
     224              : }
     225              : 
     226              : #[repr(C)]
     227            0 : #[derive(Debug)]
     228              : pub struct XlHeapInsert {
     229              :     pub offnum: OffsetNumber,
     230              :     pub flags: u8,
     231              : }
     232              : 
     233              : impl XlHeapInsert {
     234     47303205 :     pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     235     47303205 :         XlHeapInsert {
     236     47303205 :             offnum: buf.get_u16_le(),
     237     47303205 :             flags: buf.get_u8(),
     238     47303205 :         }
     239     47303205 :     }
     240              : }
     241              : 
     242              : #[repr(C)]
     243            0 : #[derive(Debug)]
     244              : pub struct XlHeapMultiInsert {
     245              :     pub flags: u8,
     246              :     pub _padding: u8,
     247              :     pub ntuples: u16,
     248              : }
     249              : 
     250              : impl XlHeapMultiInsert {
     251       692488 :     pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     252       692488 :         XlHeapMultiInsert {
     253       692488 :             flags: buf.get_u8(),
     254       692488 :             _padding: buf.get_u8(),
     255       692488 :             ntuples: buf.get_u16_le(),
     256       692488 :         }
     257       692488 :     }
     258              : }
     259              : 
     260              : #[repr(C)]
     261            0 : #[derive(Debug)]
     262              : pub struct XlHeapDelete {
     263              :     pub xmax: TransactionId,
     264              :     pub offnum: OffsetNumber,
     265              :     pub _padding: u16,
     266              :     pub t_cid: u32,
     267              :     pub infobits_set: u8,
     268              :     pub flags: u8,
     269              : }
     270              : 
     271              : impl XlHeapDelete {
     272       535296 :     pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     273       535296 :         XlHeapDelete {
     274       535296 :             xmax: buf.get_u32_le(),
     275       535296 :             offnum: buf.get_u16_le(),
     276       535296 :             _padding: buf.get_u16_le(),
     277       535296 :             t_cid: buf.get_u32_le(),
     278       535296 :             infobits_set: buf.get_u8(),
     279       535296 :             flags: buf.get_u8(),
     280       535296 :         }
     281       535296 :     }
     282              : }
     283              : 
     284              : #[repr(C)]
     285            0 : #[derive(Debug)]
     286              : pub struct XlHeapUpdate {
     287              :     pub old_xmax: TransactionId,
     288              :     pub old_offnum: OffsetNumber,
     289              :     pub old_infobits_set: u8,
     290              :     pub flags: u8,
     291              :     pub t_cid: u32,
     292              :     pub new_xmax: TransactionId,
     293              :     pub new_offnum: OffsetNumber,
     294              : }
     295              : 
     296              : impl XlHeapUpdate {
     297      5355792 :     pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     298      5355792 :         XlHeapUpdate {
     299      5355792 :             old_xmax: buf.get_u32_le(),
     300      5355792 :             old_offnum: buf.get_u16_le(),
     301      5355792 :             old_infobits_set: buf.get_u8(),
     302      5355792 :             flags: buf.get_u8(),
     303      5355792 :             t_cid: buf.get_u32(),
     304      5355792 :             new_xmax: buf.get_u32_le(),
     305      5355792 :             new_offnum: buf.get_u16_le(),
     306      5355792 :         }
     307      5355792 :     }
     308              : }
     309              : 
     310              : ///
     311              : /// Note: Parsing some fields is missing, because they're not needed.
     312              : ///
     313              : /// This is similar to the xl_xact_parsed_commit and
     314              : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     315              : /// struct for commits and aborts.
     316              : ///
     317            0 : #[derive(Debug)]
     318              : pub struct XlXactParsedRecord {
     319              :     pub xid: TransactionId,
     320              :     pub info: u8,
     321              :     pub xact_time: TimestampTz,
     322              :     pub xinfo: u32,
     323              : 
     324              :     pub db_id: Oid, /* MyDatabaseId */
     325              :     pub ts_id: Oid, /* MyDatabaseTableSpace */
     326              : 
     327              :     pub subxacts: Vec<TransactionId>,
     328              : 
     329              :     pub xnodes: Vec<RelFileNode>,
     330              : }
     331              : 
     332              : impl XlXactParsedRecord {
     333              :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
     334              :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
     335              :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
     336      2265099 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
     337      2265099 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
     338      2265099 :         // The record starts with time of commit/abort
     339      2265099 :         let xact_time = buf.get_i64_le();
     340      2265099 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
     341        23705 :             buf.get_u32_le()
     342              :         } else {
     343      2241394 :             0
     344              :         };
     345              :         let db_id;
     346              :         let ts_id;
     347      2265099 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
     348        22656 :             db_id = buf.get_u32_le();
     349        22656 :             ts_id = buf.get_u32_le();
     350      2242443 :         } else {
     351      2242443 :             db_id = 0;
     352      2242443 :             ts_id = 0;
     353      2242443 :         }
     354      2265099 :         let mut subxacts = Vec::<TransactionId>::new();
     355      2265099 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
     356          144 :             let nsubxacts = buf.get_i32_le();
     357       100049 :             for _i in 0..nsubxacts {
     358       100049 :                 let subxact = buf.get_u32_le();
     359       100049 :                 subxacts.push(subxact);
     360       100049 :             }
     361      2264955 :         }
     362      2265099 :         let mut xnodes = Vec::<RelFileNode>::new();
     363      2265099 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
     364         6656 :             let nrels = buf.get_i32_le();
     365        16872 :             for _i in 0..nrels {
     366        16872 :                 let spcnode = buf.get_u32_le();
     367        16872 :                 let dbnode = buf.get_u32_le();
     368        16872 :                 let relnode = buf.get_u32_le();
     369        16872 :                 trace!(
     370            0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
     371            0 :                     spcnode,
     372            0 :                     dbnode,
     373            0 :                     relnode
     374            0 :                 );
     375        16872 :                 xnodes.push(RelFileNode {
     376        16872 :                     spcnode,
     377        16872 :                     dbnode,
     378        16872 :                     relnode,
     379        16872 :                 });
     380              :             }
     381      2258443 :         }
     382              : 
     383      2265099 :         if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
     384            0 :             let nitems = buf.get_i32_le();
     385            0 :             debug!(
     386            0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
     387            0 :                 nitems
     388            0 :             );
     389            0 :             let sizeof_xl_xact_stats_item = 12;
     390            0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
     391      2265099 :         }
     392              : 
     393      2265099 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
     394        22656 :             let nmsgs = buf.get_i32_le();
     395        22656 :             let sizeof_shared_invalidation_message = 16;
     396        22656 :             buf.advance(
     397        22656 :                 (nmsgs * sizeof_shared_invalidation_message)
     398        22656 :                     .try_into()
     399        22656 :                     .unwrap(),
     400        22656 :             );
     401      2242443 :         }
     402              : 
     403      2265099 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
     404            2 :             xid = buf.get_u32_le();
     405            2 :             debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
     406      2265097 :         }
     407              : 
     408      2265099 :         XlXactParsedRecord {
     409      2265099 :             xid,
     410      2265099 :             info,
     411      2265099 :             xact_time,
     412      2265099 :             xinfo,
     413      2265099 :             db_id,
     414      2265099 :             ts_id,
     415      2265099 :             subxacts,
     416      2265099 :             xnodes,
     417      2265099 :         }
     418      2265099 :     }
     419              : }
     420              : 
     421              : #[repr(C)]
     422            0 : #[derive(Debug)]
     423              : pub struct XlClogTruncate {
     424              :     pub pageno: u32,
     425              :     pub oldest_xid: TransactionId,
     426              :     pub oldest_xid_db: Oid,
     427              : }
     428              : 
     429              : impl XlClogTruncate {
     430            1 :     pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
     431            1 :         XlClogTruncate {
     432            1 :             pageno: buf.get_u32_le(),
     433            1 :             oldest_xid: buf.get_u32_le(),
     434            1 :             oldest_xid_db: buf.get_u32_le(),
     435            1 :         }
     436            1 :     }
     437              : }
     438              : 
     439              : #[repr(C)]
     440       945465 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     441              : pub struct MultiXactMember {
     442              :     pub xid: TransactionId,
     443              :     pub status: MultiXactStatus,
     444              : }
     445              : 
     446              : impl MultiXactMember {
     447       472948 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     448       472948 :         MultiXactMember {
     449       472948 :             xid: buf.get_u32_le(),
     450       472948 :             status: buf.get_u32_le(),
     451       472948 :         }
     452       472948 :     }
     453              : }
     454              : 
     455              : #[repr(C)]
     456            0 : #[derive(Debug)]
     457              : pub struct XlMultiXactCreate {
     458              :     pub mid: MultiXactId,      /* new MultiXact's ID */
     459              :     pub moff: MultiXactOffset, /* its starting offset in members file */
     460              :     pub nmembers: u32,         /* number of member XIDs */
     461              :     pub members: Vec<MultiXactMember>,
     462              : }
     463              : 
     464              : impl XlMultiXactCreate {
     465        24027 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
     466        24027 :         let mid = buf.get_u32_le();
     467        24027 :         let moff = buf.get_u32_le();
     468        24027 :         let nmembers = buf.get_u32_le();
     469        24027 :         let mut members = Vec::new();
     470       472948 :         for _ in 0..nmembers {
     471       472948 :             members.push(MultiXactMember::decode(buf));
     472       472948 :         }
     473        24027 :         XlMultiXactCreate {
     474        24027 :             mid,
     475        24027 :             moff,
     476        24027 :             nmembers,
     477        24027 :             members,
     478        24027 :         }
     479        24027 :     }
     480              : }
     481              : 
     482              : #[repr(C)]
     483            0 : #[derive(Debug)]
     484              : pub struct XlMultiXactTruncate {
     485              :     pub oldest_multi_db: Oid,
     486              :     /* to-be-truncated range of multixact offsets */
     487              :     pub start_trunc_off: MultiXactId, /* just for completeness' sake */
     488              :     pub end_trunc_off: MultiXactId,
     489              : 
     490              :     /* to-be-truncated range of multixact members */
     491              :     pub start_trunc_memb: MultiXactOffset,
     492              :     pub end_trunc_memb: MultiXactOffset,
     493              : }
     494              : 
     495              : impl XlMultiXactTruncate {
     496            0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
     497            0 :         XlMultiXactTruncate {
     498            0 :             oldest_multi_db: buf.get_u32_le(),
     499            0 :             start_trunc_off: buf.get_u32_le(),
     500            0 :             end_trunc_off: buf.get_u32_le(),
     501            0 :             start_trunc_memb: buf.get_u32_le(),
     502            0 :             end_trunc_memb: buf.get_u32_le(),
     503            0 :         }
     504            0 :     }
     505              : }
     506              : 
     507              : /// Main routine to decode a WAL record and figure out which blocks are modified
     508              : //
     509              : // See xlogrecord.h for details
     510              : // The overall layout of an XLOG record is:
     511              : //              Fixed-size header (XLogRecord struct)
     512              : //      XLogRecordBlockHeader struct
     513              : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     514              : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     515              : //                 XLogRecordBlockCompressHeader struct follows.
     516              : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     517              : //          BlockNumber follows
     518              : //      XLogRecordBlockHeader struct
     519              : //      ...
     520              : //      XLogRecordDataHeader[Short|Long] struct
     521              : //      block data
     522              : //      block data
     523              : //      ...
     524              : //      main data
     525              : //
     526              : //
     527              : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
     528              : // It would be more natural for this function to return a DecodedWALRecord as return value,
     529              : // but reusing the caller-supplied struct avoids an allocation.
     530              : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
     531              : //
     532     73526713 : pub fn decode_wal_record(
     533     73526713 :     record: Bytes,
     534     73526713 :     decoded: &mut DecodedWALRecord,
     535     73526713 :     pg_version: u32,
     536     73526713 : ) -> Result<()> {
     537     73526713 :     let mut rnode_spcnode: u32 = 0;
     538     73526713 :     let mut rnode_dbnode: u32 = 0;
     539     73526713 :     let mut rnode_relnode: u32 = 0;
     540     73526713 :     let mut got_rnode = false;
     541     73526713 : 
     542     73526713 :     let mut buf = record.clone();
     543              : 
     544              :     // 1. Parse XLogRecord struct
     545              : 
     546              :     // FIXME: assume little-endian here
     547     73526713 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     548              : 
     549     73526713 :     trace!(
     550            0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
     551            0 :         xlogrec.xl_rmid,
     552            0 :         xlogrec.xl_info
     553            0 :     );
     554              : 
     555     73526712 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
     556     73526712 : 
     557     73526712 :     if buf.remaining() != remaining {
     558            0 :         //TODO error
     559     73526712 :     }
     560              : 
     561     73526712 :     let mut max_block_id = 0;
     562     73526712 :     let mut blocks_total_len: u32 = 0;
     563     73526712 :     let mut main_data_len = 0;
     564     73526712 :     let mut datatotal: u32 = 0;
     565     73526712 :     decoded.blocks.clear();
     566              : 
     567              :     // 2. Decode the headers.
     568              :     // XLogRecordBlockHeaders if any,
     569              :     // XLogRecordDataHeader[Short|Long]
     570    222248244 :     while buf.remaining() > datatotal as usize {
     571    148721530 :         let block_id = buf.get_u8();
     572    148721530 : 
     573    148721530 :         match block_id {
     574     73367364 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
     575     73367364 :                 /* XLogRecordDataHeaderShort */
     576     73367364 :                 main_data_len = buf.get_u8() as u32;
     577     73367364 :                 datatotal += main_data_len;
     578     73367364 :             }
     579              : 
     580        17585 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
     581        17585 :                 /* XLogRecordDataHeaderLong */
     582        17585 :                 main_data_len = buf.get_u32_le();
     583        17585 :                 datatotal += main_data_len;
     584        17585 :             }
     585              : 
     586            0 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
     587            0 :                 // RepOriginId is uint16
     588            0 :                 buf.advance(2);
     589            0 :             }
     590              : 
     591            0 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
     592            0 :                 // TransactionId is uint32
     593            0 :                 buf.advance(4);
     594            0 :             }
     595              : 
     596     75336586 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
     597              :                 /* XLogRecordBlockHeader */
     598     75336586 :                 let mut blk = DecodedBkpBlock::new();
     599     75336586 : 
     600     75336586 :                 if block_id <= max_block_id {
     601     71163567 :                     // TODO
     602     71163567 :                     //report_invalid_record(state,
     603     71163567 :                     //                    "out-of-order block_id %u at %X/%X",
     604     71163567 :                     //                    block_id,
     605     71163567 :                     //                    (uint32) (state->ReadRecPtr >> 32),
     606     71163567 :                     //                    (uint32) state->ReadRecPtr);
     607     71163567 :                     //    goto err;
     608     71163567 :                 }
     609     75336586 :                 max_block_id = block_id;
     610     75336586 : 
     611     75336586 :                 let fork_flags: u8 = buf.get_u8();
     612     75336586 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
     613     75336586 :                 blk.flags = fork_flags;
     614     75336586 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
     615     75336586 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
     616     75336586 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
     617     75336586 :                 blk.data_len = buf.get_u16_le();
     618     75336586 : 
     619     75336586 :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
     620     75336586 : 
     621     75336586 :                 datatotal += blk.data_len as u32;
     622     75336586 :                 blocks_total_len += blk.data_len as u32;
     623     75336586 : 
     624     75336586 :                 if blk.has_image {
     625       170593 :                     blk.bimg_len = buf.get_u16_le();
     626       170593 :                     blk.hole_offset = buf.get_u16_le();
     627       170593 :                     blk.bimg_info = buf.get_u8();
     628       170593 : 
     629       170593 :                     blk.apply_image = if pg_version == 14 {
     630       170593 :                         (blk.bimg_info & postgres_ffi::v14::bindings::BKPIMAGE_APPLY) != 0
     631              :                     } else {
     632            0 :                         assert_eq!(pg_version, 15);
     633            0 :                         (blk.bimg_info & postgres_ffi::v15::bindings::BKPIMAGE_APPLY) != 0
     634              :                     };
     635              : 
     636       170593 :                     let blk_img_is_compressed =
     637       170593 :                         postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
     638              : 
     639       170593 :                     if blk_img_is_compressed {
     640            0 :                         debug!("compressed block image , pg_version = {}", pg_version);
     641       170593 :                     }
     642              : 
     643       170593 :                     if blk_img_is_compressed {
     644            0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
     645            0 :                             blk.hole_length = buf.get_u16_le();
     646            0 :                         } else {
     647            0 :                             blk.hole_length = 0;
     648            0 :                         }
     649       170593 :                     } else {
     650       170593 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
     651       170593 :                     }
     652       170593 :                     datatotal += blk.bimg_len as u32;
     653       170593 :                     blocks_total_len += blk.bimg_len as u32;
     654       170593 : 
     655       170593 :                     /*
     656       170593 :                      * cross-check that hole_offset > 0, hole_length > 0 and
     657       170593 :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
     658       170593 :                      */
     659       170593 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
     660       143247 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
     661            0 :                     {
     662            0 :                         // TODO
     663            0 :                         /*
     664            0 :                         report_invalid_record(state,
     665            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
     666            0 :                                       (unsigned int) blk->hole_offset,
     667            0 :                                       (unsigned int) blk->hole_length,
     668            0 :                                       (unsigned int) blk->bimg_len,
     669            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     670            0 :                         goto err;
     671            0 :                                      */
     672       170593 :                     }
     673              : 
     674              :                     /*
     675              :                      * cross-check that hole_offset == 0 and hole_length == 0 if
     676              :                      * the HAS_HOLE flag is not set.
     677              :                      */
     678       170593 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     679        27346 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
     680            0 :                     {
     681            0 :                         // TODO
     682            0 :                         /*
     683            0 :                         report_invalid_record(state,
     684            0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
     685            0 :                                       (unsigned int) blk->hole_offset,
     686            0 :                                       (unsigned int) blk->hole_length,
     687            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     688            0 :                         goto err;
     689            0 :                                      */
     690       170593 :                     }
     691              : 
     692              :                     /*
     693              :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
     694              :                      * flag is set.
     695              :                      */
     696       170593 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
     697        27346 :                         // TODO
     698        27346 :                         /*
     699        27346 :                         report_invalid_record(state,
     700        27346 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
     701        27346 :                                       (unsigned int) blk->bimg_len,
     702        27346 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     703        27346 :                         goto err;
     704        27346 :                                      */
     705       143247 :                     }
     706              : 
     707              :                     /*
     708              :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
     709              :                      * IS_COMPRESSED flag is set.
     710              :                      */
     711       170593 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     712        27346 :                         && !blk_img_is_compressed
     713        27346 :                         && blk.bimg_len != BLCKSZ
     714            0 :                     {
     715            0 :                         // TODO
     716            0 :                         /*
     717            0 :                         report_invalid_record(state,
     718            0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
     719            0 :                                       (unsigned int) blk->data_len,
     720            0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     721            0 :                         goto err;
     722            0 :                                      */
     723       170593 :                     }
     724     75165993 :                 }
     725     75336586 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
     726     71164517 :                     rnode_spcnode = buf.get_u32_le();
     727     71164517 :                     rnode_dbnode = buf.get_u32_le();
     728     71164517 :                     rnode_relnode = buf.get_u32_le();
     729     71164517 :                     got_rnode = true;
     730     71164517 :                 } else if !got_rnode {
     731            0 :                     // TODO
     732            0 :                     /*
     733            0 :                     report_invalid_record(state,
     734            0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
     735            0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     736            0 :                     goto err;           */
     737      4172069 :                 }
     738              : 
     739     75336586 :                 blk.rnode_spcnode = rnode_spcnode;
     740     75336586 :                 blk.rnode_dbnode = rnode_dbnode;
     741     75336586 :                 blk.rnode_relnode = rnode_relnode;
     742     75336586 : 
     743     75336586 :                 blk.blkno = buf.get_u32_le();
     744     75336586 :                 trace!(
     745            0 :                     "this record affects {}/{}/{} blk {}",
     746            0 :                     rnode_spcnode,
     747            0 :                     rnode_dbnode,
     748            0 :                     rnode_relnode,
     749            0 :                     blk.blkno
     750            0 :                 );
     751              : 
     752     75336583 :                 decoded.blocks.push(blk);
     753              :             }
     754              : 
     755            0 :             _ => {
     756            0 :                 // TODO: invalid block_id
     757            0 :             }
     758              :         }
     759              :     }
     760              : 
     761              :     // 3. Decode blocks.
     762     73526714 :     let mut ptr = record.len() - buf.remaining();
     763     75336585 :     for blk in decoded.blocks.iter_mut() {
     764     75336585 :         if blk.has_image {
     765       170593 :             blk.bimg_offset = ptr as u32;
     766       170593 :             ptr += blk.bimg_len as usize;
     767     75165992 :         }
     768     75336585 :         if blk.has_data {
     769     64564645 :             ptr += blk.data_len as usize;
     770     64564645 :         }
     771              :     }
     772              :     // We don't need them, so just skip blocks_total_len bytes
     773     73526714 :     buf.advance(blocks_total_len as usize);
     774     73526714 :     assert_eq!(ptr, record.len() - buf.remaining());
     775              : 
     776     73526714 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
     777     73526714 : 
     778     73526714 :     // 4. Decode main_data
     779     73526714 :     if main_data_len > 0 {
     780     73384951 :         assert_eq!(buf.remaining(), main_data_len as usize);
     781       141763 :     }
     782              : 
     783     73526712 :     decoded.xl_xid = xlogrec.xl_xid;
     784     73526712 :     decoded.xl_info = xlogrec.xl_info;
     785     73526712 :     decoded.xl_rmid = xlogrec.xl_rmid;
     786     73526712 :     decoded.record = record;
     787     73526712 :     decoded.main_data_offset = main_data_offset;
     788     73526712 : 
     789     73526712 :     Ok(())
     790     73526712 : }
     791              : 
     792              : ///
     793              : /// Build a human-readable string to describe a WAL record
     794              : ///
     795              : /// For debugging purposes
     796            0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
     797            0 :     match rec {
     798            0 :         NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
     799            0 :             "will_init: {}, {}",
     800            0 :             will_init,
     801            0 :             describe_postgres_wal_record(rec)?
     802              :         )),
     803            0 :         _ => Ok(format!("{:?}", rec)),
     804              :     }
     805            0 : }
     806              : 
     807            0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
     808            0 :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
     809            0 :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
     810            0 :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
     811            0 :     // without worrying about security?
     812            0 :     //
     813            0 :     // But for now, we have a hand-written code for a few common WAL record types here.
     814            0 : 
     815            0 :     let mut buf = record.clone();
     816              : 
     817              :     // 1. Parse XLogRecord struct
     818              : 
     819              :     // FIXME: assume little-endian here
     820            0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     821              : 
     822              :     let unknown_str: String;
     823              : 
     824            0 :     let result: &str = match xlogrec.xl_rmid {
     825              :         pg_constants::RM_HEAP2_ID => {
     826            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     827            0 :             match info {
     828            0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
     829            0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
     830              :                 _ => {
     831            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
     832            0 :                     &unknown_str
     833              :                 }
     834              :             }
     835              :         }
     836              :         pg_constants::RM_HEAP_ID => {
     837            0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
     838            0 :             match info {
     839            0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
     840            0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
     841            0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
     842            0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
     843              :                 _ => {
     844            0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
     845            0 :                     &unknown_str
     846              :                 }
     847              :             }
     848              :         }
     849              :         pg_constants::RM_XLOG_ID => {
     850            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     851            0 :             match info {
     852            0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
     853            0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
     854              :                 _ => {
     855            0 :                     unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
     856            0 :                     &unknown_str
     857              :                 }
     858              :             }
     859              :         }
     860            0 :         rmid => {
     861            0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
     862            0 : 
     863            0 :             unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
     864            0 :             &unknown_str
     865              :         }
     866              :     };
     867              : 
     868            0 :     Ok(String::from(result))
     869            0 : }
        

Generated by: LCOV version 2.1-beta