LCOV - differential code coverage report
Current view: top level - pageserver/src - walrecord.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 61.8 % 579 358 221 358
Current Date: 2023-10-19 02:04:12 Functions: 32.4 % 102 33 69 33
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Functions for parsing WAL records.
       3                 : //!
       4                 : 
       5                 : use anyhow::Result;
       6                 : use bytes::{Buf, Bytes};
       7                 : use postgres_ffi::dispatch_pgversion;
       8                 : use postgres_ffi::pg_constants;
       9                 : use postgres_ffi::BLCKSZ;
      10                 : use postgres_ffi::{BlockNumber, TimestampTz};
      11                 : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
      12                 : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
      13                 : use serde::{Deserialize, Serialize};
      14                 : use tracing::*;
      15                 : use utils::bin_ser::DeserializeError;
      16                 : 
      17                 : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
      18                 : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
      19 CBC   709714057 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
      20                 : pub enum NeonWalRecord {
      21                 :     /// Native PostgreSQL WAL record
      22                 :     Postgres { will_init: bool, rec: Bytes },
      23                 : 
      24                 :     /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
      25                 :     ClearVisibilityMapFlags {
      26                 :         new_heap_blkno: Option<u32>,
      27                 :         old_heap_blkno: Option<u32>,
      28                 :         flags: u8,
      29                 :     },
      30                 :     /// Mark transaction IDs as committed on a CLOG page
      31                 :     ClogSetCommitted {
      32                 :         xids: Vec<TransactionId>,
      33                 :         timestamp: TimestampTz,
      34                 :     },
      35                 :     /// Mark transaction IDs as aborted on a CLOG page
      36                 :     ClogSetAborted { xids: Vec<TransactionId> },
      37                 :     /// Extend multixact offsets SLRU
      38                 :     MultixactOffsetCreate {
      39                 :         mid: MultiXactId,
      40                 :         moff: MultiXactOffset,
      41                 :     },
      42                 :     /// Extend multixact members SLRU.
      43                 :     MultixactMembersCreate {
      44                 :         moff: MultiXactOffset,
      45                 :         members: Vec<MultiXactMember>,
      46                 :     },
      47                 : }
      48                 : 
      49                 : impl NeonWalRecord {
      50                 :     /// Does replaying this WAL record initialize the page from scratch, or does
      51                 :     /// it need to be applied over the previous image of the page?
      52       238323425 :     pub fn will_init(&self) -> bool {
      53       238323425 :         match self {
      54       214612145 :             NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
      55                 : 
      56                 :             // None of the special neon record types currently initialize the page
      57        23711280 :             _ => false,
      58                 :         }
      59       238323425 :     }
      60                 : }
      61                 : 
      62                 : /// DecodedBkpBlock represents per-page data contained in a WAL record.
      63        70056495 : #[derive(Default)]
      64                 : pub struct DecodedBkpBlock {
      65                 :     /* Is this block ref in use? */
      66                 :     //in_use: bool,
      67                 : 
      68                 :     /* Identify the block this refers to */
      69                 :     pub rnode_spcnode: u32,
      70                 :     pub rnode_dbnode: u32,
      71                 :     pub rnode_relnode: u32,
      72                 :     // Note that we have a few special forknum values for non-rel files.
      73                 :     pub forknum: u8,
      74                 :     pub blkno: u32,
      75                 : 
      76                 :     /* copy of the fork_flags field from the XLogRecordBlockHeader */
      77                 :     pub flags: u8,
      78                 : 
      79                 :     /* Information on full-page image, if any */
      80                 :     pub has_image: bool,
      81                 :     /* has image, even for consistency checking */
      82                 :     pub apply_image: bool,
      83                 :     /* has image that should be restored */
      84                 :     pub will_init: bool,
      85                 :     /* record doesn't need previous page version to apply */
      86                 :     //char         *bkp_image;
      87                 :     pub hole_offset: u16,
      88                 :     pub hole_length: u16,
      89                 :     pub bimg_offset: u32,
      90                 :     pub bimg_len: u16,
      91                 :     pub bimg_info: u8,
      92                 : 
      93                 :     /* Buffer holding the rmgr-specific data associated with this block */
      94                 :     has_data: bool,
      95                 :     data_len: u16,
      96                 : }
      97                 : 
      98                 : impl DecodedBkpBlock {
      99        70056495 :     pub fn new() -> DecodedBkpBlock {
     100        70056495 :         Default::default()
     101        70056495 :     }
     102                 : }
     103                 : 
     104          775555 : #[derive(Default)]
     105                 : pub struct DecodedWALRecord {
     106                 :     pub xl_xid: TransactionId,
     107                 :     pub xl_info: u8,
     108                 :     pub xl_rmid: u8,
     109                 :     pub record: Bytes, // raw XLogRecord
     110                 : 
     111                 :     pub blocks: Vec<DecodedBkpBlock>,
     112                 :     pub main_data_offset: usize,
     113                 : }
     114                 : 
     115                 : #[repr(C)]
     116 UBC           0 : #[derive(Debug, Clone, Copy)]
     117                 : pub struct RelFileNode {
     118                 :     pub spcnode: Oid, /* tablespace */
     119                 :     pub dbnode: Oid,  /* database */
     120                 :     pub relnode: Oid, /* relation */
     121                 : }
     122                 : 
     123                 : #[repr(C)]
     124               0 : #[derive(Debug)]
     125                 : pub struct XlRelmapUpdate {
     126                 :     pub dbid: Oid,   /* database ID, or 0 for shared map */
     127                 :     pub tsid: Oid,   /* database's tablespace, or pg_global */
     128                 :     pub nbytes: i32, /* size of relmap data */
     129                 : }
     130                 : 
     131                 : impl XlRelmapUpdate {
     132 CBC          54 :     pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
     133              54 :         XlRelmapUpdate {
     134              54 :             dbid: buf.get_u32_le(),
     135              54 :             tsid: buf.get_u32_le(),
     136              54 :             nbytes: buf.get_i32_le(),
     137              54 :         }
     138              54 :     }
     139                 : }
     140                 : 
     141                 : pub mod v14 {
     142                 :     use bytes::{Buf, Bytes};
     143                 :     use postgres_ffi::{OffsetNumber, TransactionId};
     144                 : 
     145                 :     #[repr(C)]
     146 UBC           0 :     #[derive(Debug)]
     147                 :     pub struct XlHeapInsert {
     148                 :         pub offnum: OffsetNumber,
     149                 :         pub flags: u8,
     150                 :     }
     151                 : 
     152                 :     impl XlHeapInsert {
     153 CBC    42667615 :         pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
     154        42667615 :             XlHeapInsert {
     155        42667615 :                 offnum: buf.get_u16_le(),
     156        42667615 :                 flags: buf.get_u8(),
     157        42667615 :             }
     158        42667615 :         }
     159                 :     }
     160                 : 
     161                 :     #[repr(C)]
     162 UBC           0 :     #[derive(Debug)]
     163                 :     pub struct XlHeapMultiInsert {
     164                 :         pub flags: u8,
     165                 :         pub _padding: u8,
     166                 :         pub ntuples: u16,
     167                 :     }
     168                 : 
     169                 :     impl XlHeapMultiInsert {
     170 CBC      710747 :         pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
     171          710747 :             XlHeapMultiInsert {
     172          710747 :                 flags: buf.get_u8(),
     173          710747 :                 _padding: buf.get_u8(),
     174          710747 :                 ntuples: buf.get_u16_le(),
     175          710747 :             }
     176          710747 :         }
     177                 :     }
     178                 : 
     179                 :     #[repr(C)]
     180 UBC           0 :     #[derive(Debug)]
     181                 :     pub struct XlHeapDelete {
     182                 :         pub xmax: TransactionId,
     183                 :         pub offnum: OffsetNumber,
     184                 :         pub _padding: u16,
     185                 :         pub t_cid: u32,
     186                 :         pub infobits_set: u8,
     187                 :         pub flags: u8,
     188                 :     }
     189                 : 
     190                 :     impl XlHeapDelete {
     191 CBC      535608 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     192          535608 :             XlHeapDelete {
     193          535608 :                 xmax: buf.get_u32_le(),
     194          535608 :                 offnum: buf.get_u16_le(),
     195          535608 :                 _padding: buf.get_u16_le(),
     196          535608 :                 t_cid: buf.get_u32_le(),
     197          535608 :                 infobits_set: buf.get_u8(),
     198          535608 :                 flags: buf.get_u8(),
     199          535608 :             }
     200          535608 :         }
     201                 :     }
     202                 : 
     203                 :     #[repr(C)]
     204 UBC           0 :     #[derive(Debug)]
     205                 :     pub struct XlHeapUpdate {
     206                 :         pub old_xmax: TransactionId,
     207                 :         pub old_offnum: OffsetNumber,
     208                 :         pub old_infobits_set: u8,
     209                 :         pub flags: u8,
     210                 :         pub t_cid: u32,
     211                 :         pub new_xmax: TransactionId,
     212                 :         pub new_offnum: OffsetNumber,
     213                 :     }
     214                 : 
     215                 :     impl XlHeapUpdate {
     216 CBC     5096417 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     217         5096417 :             XlHeapUpdate {
     218         5096417 :                 old_xmax: buf.get_u32_le(),
     219         5096417 :                 old_offnum: buf.get_u16_le(),
     220         5096417 :                 old_infobits_set: buf.get_u8(),
     221         5096417 :                 flags: buf.get_u8(),
     222         5096417 :                 t_cid: buf.get_u32_le(),
     223         5096417 :                 new_xmax: buf.get_u32_le(),
     224         5096417 :                 new_offnum: buf.get_u16_le(),
     225         5096417 :             }
     226         5096417 :         }
     227                 :     }
     228                 : 
     229                 :     #[repr(C)]
     230 UBC           0 :     #[derive(Debug)]
     231                 :     pub struct XlHeapLock {
     232                 :         pub locking_xid: TransactionId,
     233                 :         pub offnum: OffsetNumber,
     234                 :         pub _padding: u16,
     235                 :         pub t_cid: u32,
     236                 :         pub infobits_set: u8,
     237                 :         pub flags: u8,
     238                 :     }
     239                 : 
     240                 :     impl XlHeapLock {
     241 CBC     4668342 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     242         4668342 :             XlHeapLock {
     243         4668342 :                 locking_xid: buf.get_u32_le(),
     244         4668342 :                 offnum: buf.get_u16_le(),
     245         4668342 :                 _padding: buf.get_u16_le(),
     246         4668342 :                 t_cid: buf.get_u32_le(),
     247         4668342 :                 infobits_set: buf.get_u8(),
     248         4668342 :                 flags: buf.get_u8(),
     249         4668342 :             }
     250         4668342 :         }
     251                 :     }
     252                 : 
     253                 :     #[repr(C)]
     254 UBC           0 :     #[derive(Debug)]
     255                 :     pub struct XlHeapLockUpdated {
     256                 :         pub xmax: TransactionId,
     257                 :         pub offnum: OffsetNumber,
     258                 :         pub infobits_set: u8,
     259                 :         pub flags: u8,
     260                 :     }
     261                 : 
     262                 :     impl XlHeapLockUpdated {
     263 CBC        3898 :         pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
     264            3898 :             XlHeapLockUpdated {
     265            3898 :                 xmax: buf.get_u32_le(),
     266            3898 :                 offnum: buf.get_u16_le(),
     267            3898 :                 infobits_set: buf.get_u8(),
     268            3898 :                 flags: buf.get_u8(),
     269            3898 :             }
     270            3898 :         }
     271                 :     }
     272                 : }
     273                 : 
     274                 : pub mod v15 {
     275                 :     pub use super::v14::{
     276                 :         XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
     277                 :     };
     278                 : }
     279                 : 
     280                 : pub mod v16 {
     281                 :     pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
     282                 :     use bytes::{Buf, Bytes};
     283                 :     use postgres_ffi::{OffsetNumber, TransactionId};
     284                 : 
     285                 :     pub struct XlHeapDelete {
     286                 :         pub xmax: TransactionId,
     287                 :         pub offnum: OffsetNumber,
     288                 :         pub infobits_set: u8,
     289                 :         pub flags: u8,
     290                 :     }
     291                 : 
     292                 :     impl XlHeapDelete {
     293 UBC           0 :         pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
     294               0 :             XlHeapDelete {
     295               0 :                 xmax: buf.get_u32_le(),
     296               0 :                 offnum: buf.get_u16_le(),
     297               0 :                 infobits_set: buf.get_u8(),
     298               0 :                 flags: buf.get_u8(),
     299               0 :             }
     300               0 :         }
     301                 :     }
     302                 : 
     303                 :     #[repr(C)]
     304               0 :     #[derive(Debug)]
     305                 :     pub struct XlHeapUpdate {
     306                 :         pub old_xmax: TransactionId,
     307                 :         pub old_offnum: OffsetNumber,
     308                 :         pub old_infobits_set: u8,
     309                 :         pub flags: u8,
     310                 :         pub new_xmax: TransactionId,
     311                 :         pub new_offnum: OffsetNumber,
     312                 :     }
     313                 : 
     314                 :     impl XlHeapUpdate {
     315               0 :         pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
     316               0 :             XlHeapUpdate {
     317               0 :                 old_xmax: buf.get_u32_le(),
     318               0 :                 old_offnum: buf.get_u16_le(),
     319               0 :                 old_infobits_set: buf.get_u8(),
     320               0 :                 flags: buf.get_u8(),
     321               0 :                 new_xmax: buf.get_u32_le(),
     322               0 :                 new_offnum: buf.get_u16_le(),
     323               0 :             }
     324               0 :         }
     325                 :     }
     326                 : 
     327                 :     #[repr(C)]
     328               0 :     #[derive(Debug)]
     329                 :     pub struct XlHeapLock {
     330                 :         pub locking_xid: TransactionId,
     331                 :         pub offnum: OffsetNumber,
     332                 :         pub infobits_set: u8,
     333                 :         pub flags: u8,
     334                 :     }
     335                 : 
     336                 :     impl XlHeapLock {
     337               0 :         pub fn decode(buf: &mut Bytes) -> XlHeapLock {
     338               0 :             XlHeapLock {
     339               0 :                 locking_xid: buf.get_u32_le(),
     340               0 :                 offnum: buf.get_u16_le(),
     341               0 :                 infobits_set: buf.get_u8(),
     342               0 :                 flags: buf.get_u8(),
     343               0 :             }
     344               0 :         }
     345                 :     }
     346                 : 
     347                 :     /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
     348                 :     pub mod rm_neon {
     349                 :         use bytes::{Buf, Bytes};
     350                 :         use postgres_ffi::{OffsetNumber, TransactionId};
     351                 : 
     352                 :         #[repr(C)]
     353               0 :         #[derive(Debug)]
     354                 :         pub struct XlNeonHeapInsert {
     355                 :             pub offnum: OffsetNumber,
     356                 :             pub flags: u8,
     357                 :         }
     358                 : 
     359                 :         impl XlNeonHeapInsert {
     360               0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
     361               0 :                 XlNeonHeapInsert {
     362               0 :                     offnum: buf.get_u16_le(),
     363               0 :                     flags: buf.get_u8(),
     364               0 :                 }
     365               0 :             }
     366                 :         }
     367                 : 
     368                 :         #[repr(C)]
     369               0 :         #[derive(Debug)]
     370                 :         pub struct XlNeonHeapMultiInsert {
     371                 :             pub flags: u8,
     372                 :             pub _padding: u8,
     373                 :             pub ntuples: u16,
     374                 :             pub t_cid: u32,
     375                 :         }
     376                 : 
     377                 :         impl XlNeonHeapMultiInsert {
     378               0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
     379               0 :                 XlNeonHeapMultiInsert {
     380               0 :                     flags: buf.get_u8(),
     381               0 :                     _padding: buf.get_u8(),
     382               0 :                     ntuples: buf.get_u16_le(),
     383               0 :                     t_cid: buf.get_u32_le(),
     384               0 :                 }
     385               0 :             }
     386                 :         }
     387                 : 
     388                 :         #[repr(C)]
     389               0 :         #[derive(Debug)]
     390                 :         pub struct XlNeonHeapDelete {
     391                 :             pub xmax: TransactionId,
     392                 :             pub offnum: OffsetNumber,
     393                 :             pub infobits_set: u8,
     394                 :             pub flags: u8,
     395                 :             pub t_cid: u32,
     396                 :         }
     397                 : 
     398                 :         impl XlNeonHeapDelete {
     399               0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
     400               0 :                 XlNeonHeapDelete {
     401               0 :                     xmax: buf.get_u32_le(),
     402               0 :                     offnum: buf.get_u16_le(),
     403               0 :                     infobits_set: buf.get_u8(),
     404               0 :                     flags: buf.get_u8(),
     405               0 :                     t_cid: buf.get_u32_le(),
     406               0 :                 }
     407               0 :             }
     408                 :         }
     409                 : 
     410                 :         #[repr(C)]
     411               0 :         #[derive(Debug)]
     412                 :         pub struct XlNeonHeapUpdate {
     413                 :             pub old_xmax: TransactionId,
     414                 :             pub old_offnum: OffsetNumber,
     415                 :             pub old_infobits_set: u8,
     416                 :             pub flags: u8,
     417                 :             pub t_cid: u32,
     418                 :             pub new_xmax: TransactionId,
     419                 :             pub new_offnum: OffsetNumber,
     420                 :         }
     421                 : 
     422                 :         impl XlNeonHeapUpdate {
     423               0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
     424               0 :                 XlNeonHeapUpdate {
     425               0 :                     old_xmax: buf.get_u32_le(),
     426               0 :                     old_offnum: buf.get_u16_le(),
     427               0 :                     old_infobits_set: buf.get_u8(),
     428               0 :                     flags: buf.get_u8(),
     429               0 :                     t_cid: buf.get_u32(),
     430               0 :                     new_xmax: buf.get_u32_le(),
     431               0 :                     new_offnum: buf.get_u16_le(),
     432               0 :                 }
     433               0 :             }
     434                 :         }
     435                 : 
     436                 :         #[repr(C)]
     437               0 :         #[derive(Debug)]
     438                 :         pub struct XlNeonHeapLock {
     439                 :             pub locking_xid: TransactionId,
     440                 :             pub t_cid: u32,
     441                 :             pub offnum: OffsetNumber,
     442                 :             pub infobits_set: u8,
     443                 :             pub flags: u8,
     444                 :         }
     445                 : 
     446                 :         impl XlNeonHeapLock {
     447               0 :             pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
     448               0 :                 XlNeonHeapLock {
     449               0 :                     locking_xid: buf.get_u32_le(),
     450               0 :                     t_cid: buf.get_u32_le(),
     451               0 :                     offnum: buf.get_u16_le(),
     452               0 :                     infobits_set: buf.get_u8(),
     453               0 :                     flags: buf.get_u8(),
     454               0 :                 }
     455               0 :             }
     456                 :         }
     457                 :     }
     458                 : }
     459                 : 
     460                 : #[repr(C)]
     461               0 : #[derive(Debug)]
     462                 : pub struct XlSmgrCreate {
     463                 :     pub rnode: RelFileNode,
     464                 :     // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
     465                 :     // well-defined size?
     466                 :     pub forknum: u8,
     467                 : }
     468                 : 
     469                 : impl XlSmgrCreate {
     470 CBC       21617 :     pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
     471           21617 :         XlSmgrCreate {
     472           21617 :             rnode: RelFileNode {
     473           21617 :                 spcnode: buf.get_u32_le(), /* tablespace */
     474           21617 :                 dbnode: buf.get_u32_le(),  /* database */
     475           21617 :                 relnode: buf.get_u32_le(), /* relation */
     476           21617 :             },
     477           21617 :             forknum: buf.get_u32_le() as u8,
     478           21617 :         }
     479           21617 :     }
     480                 : }
     481                 : 
     482                 : #[repr(C)]
     483 UBC           0 : #[derive(Debug)]
     484                 : pub struct XlSmgrTruncate {
     485                 :     pub blkno: BlockNumber,
     486                 :     pub rnode: RelFileNode,
     487                 :     pub flags: u32,
     488                 : }
     489                 : 
     490                 : impl XlSmgrTruncate {
     491 CBC          44 :     pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
     492              44 :         XlSmgrTruncate {
     493              44 :             blkno: buf.get_u32_le(),
     494              44 :             rnode: RelFileNode {
     495              44 :                 spcnode: buf.get_u32_le(), /* tablespace */
     496              44 :                 dbnode: buf.get_u32_le(),  /* database */
     497              44 :                 relnode: buf.get_u32_le(), /* relation */
     498              44 :             },
     499              44 :             flags: buf.get_u32_le(),
     500              44 :         }
     501              44 :     }
     502                 : }
     503                 : 
     504                 : #[repr(C)]
     505 UBC           0 : #[derive(Debug)]
     506                 : pub struct XlCreateDatabase {
     507                 :     pub db_id: Oid,
     508                 :     pub tablespace_id: Oid,
     509                 :     pub src_db_id: Oid,
     510                 :     pub src_tablespace_id: Oid,
     511                 : }
     512                 : 
     513                 : impl XlCreateDatabase {
     514 CBC          13 :     pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
     515              13 :         XlCreateDatabase {
     516              13 :             db_id: buf.get_u32_le(),
     517              13 :             tablespace_id: buf.get_u32_le(),
     518              13 :             src_db_id: buf.get_u32_le(),
     519              13 :             src_tablespace_id: buf.get_u32_le(),
     520              13 :         }
     521              13 :     }
     522                 : }
     523                 : 
     524                 : #[repr(C)]
     525 UBC           0 : #[derive(Debug)]
     526                 : pub struct XlDropDatabase {
     527                 :     pub db_id: Oid,
     528                 :     pub n_tablespaces: Oid, /* number of tablespace IDs */
     529                 :     pub tablespace_ids: Vec<Oid>,
     530                 : }
     531                 : 
     532                 : impl XlDropDatabase {
     533 CBC           3 :     pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
     534               3 :         let mut rec = XlDropDatabase {
     535               3 :             db_id: buf.get_u32_le(),
     536               3 :             n_tablespaces: buf.get_u32_le(),
     537               3 :             tablespace_ids: Vec::<Oid>::new(),
     538               3 :         };
     539                 : 
     540               3 :         for _i in 0..rec.n_tablespaces {
     541               3 :             let id = buf.get_u32_le();
     542               3 :             rec.tablespace_ids.push(id);
     543               3 :         }
     544                 : 
     545               3 :         rec
     546               3 :     }
     547                 : }
     548                 : 
     549                 : ///
     550                 : /// Note: Parsing some fields is missing, because they're not needed.
     551                 : ///
     552                 : /// This is similar to the xl_xact_parsed_commit and
     553                 : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
     554                 : /// struct for commits and aborts.
     555                 : ///
     556 UBC           0 : #[derive(Debug)]
     557                 : pub struct XlXactParsedRecord {
     558                 :     pub xid: TransactionId,
     559                 :     pub info: u8,
     560                 :     pub xact_time: TimestampTz,
     561                 :     pub xinfo: u32,
     562                 : 
     563                 :     pub db_id: Oid,
     564                 :     /* MyDatabaseId */
     565                 :     pub ts_id: Oid,
     566                 :     /* MyDatabaseTableSpace */
     567                 :     pub subxacts: Vec<TransactionId>,
     568                 : 
     569                 :     pub xnodes: Vec<RelFileNode>,
     570                 : }
     571                 : 
     572                 : impl XlXactParsedRecord {
     573                 :     /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
     574                 :     /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
     575                 :     /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
     576 CBC     1950130 :     pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
     577         1950130 :         let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
     578         1950130 :         // The record starts with time of commit/abort
     579         1950130 :         let xact_time = buf.get_i64_le();
     580         1950130 :         let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
     581         1921783 :             buf.get_u32_le()
     582                 :         } else {
     583           28347 :             0
     584                 :         };
     585                 :         let db_id;
     586                 :         let ts_id;
     587         1950130 :         if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
     588         1920935 :             db_id = buf.get_u32_le();
     589         1920935 :             ts_id = buf.get_u32_le();
     590         1920935 :         } else {
     591           29195 :             db_id = 0;
     592           29195 :             ts_id = 0;
     593           29195 :         }
     594         1950130 :         let mut subxacts = Vec::<TransactionId>::new();
     595         1950130 :         if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
     596             145 :             let nsubxacts = buf.get_i32_le();
     597          200049 :             for _i in 0..nsubxacts {
     598          200049 :                 let subxact = buf.get_u32_le();
     599          200049 :                 subxacts.push(subxact);
     600          200049 :             }
     601         1949985 :         }
     602         1950130 :         let mut xnodes = Vec::<RelFileNode>::new();
     603         1950130 :         if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
     604            6629 :             let nrels = buf.get_i32_le();
     605           16764 :             for _i in 0..nrels {
     606           16764 :                 let spcnode = buf.get_u32_le();
     607           16764 :                 let dbnode = buf.get_u32_le();
     608           16764 :                 let relnode = buf.get_u32_le();
     609           16764 :                 trace!(
     610 UBC           0 :                     "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
     611               0 :                     spcnode,
     612               0 :                     dbnode,
     613               0 :                     relnode
     614               0 :                 );
     615 CBC       16764 :                 xnodes.push(RelFileNode {
     616           16764 :                     spcnode,
     617           16764 :                     dbnode,
     618           16764 :                     relnode,
     619           16764 :                 });
     620                 :             }
     621         1943501 :         }
     622                 : 
     623         1950130 :         if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
     624 UBC           0 :             let nitems = buf.get_i32_le();
     625               0 :             debug!(
     626               0 :                 "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
     627               0 :                 nitems
     628               0 :             );
     629               0 :             let sizeof_xl_xact_stats_item = 12;
     630               0 :             buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
     631 CBC     1950130 :         }
     632                 : 
     633         1950130 :         if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
     634           22267 :             let nmsgs = buf.get_i32_le();
     635           22267 :             let sizeof_shared_invalidation_message = 16;
     636           22267 :             buf.advance(
     637           22267 :                 (nmsgs * sizeof_shared_invalidation_message)
     638           22267 :                     .try_into()
     639           22267 :                     .unwrap(),
     640           22267 :             );
     641         1927863 :         }
     642                 : 
     643         1950130 :         if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
     644               2 :             xid = buf.get_u32_le();
     645               2 :             debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
     646         1950128 :         }
     647                 : 
     648         1950130 :         XlXactParsedRecord {
     649         1950130 :             xid,
     650         1950130 :             info,
     651         1950130 :             xact_time,
     652         1950130 :             xinfo,
     653         1950130 :             db_id,
     654         1950130 :             ts_id,
     655         1950130 :             subxacts,
     656         1950130 :             xnodes,
     657         1950130 :         }
     658         1950130 :     }
     659                 : }
     660                 : 
     661                 : #[repr(C)]
     662 UBC           0 : #[derive(Debug)]
     663                 : pub struct XlClogTruncate {
     664                 :     pub pageno: u32,
     665                 :     pub oldest_xid: TransactionId,
     666                 :     pub oldest_xid_db: Oid,
     667                 : }
     668                 : 
     669                 : impl XlClogTruncate {
     670 CBC           1 :     pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
     671               1 :         XlClogTruncate {
     672               1 :             pageno: buf.get_u32_le(),
     673               1 :             oldest_xid: buf.get_u32_le(),
     674               1 :             oldest_xid_db: buf.get_u32_le(),
     675               1 :         }
     676               1 :     }
     677                 : }
     678                 : 
     679                 : #[repr(C)]
     680          945465 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
     681                 : pub struct MultiXactMember {
     682                 :     pub xid: TransactionId,
     683                 :     pub status: MultiXactStatus,
     684                 : }
     685                 : 
     686                 : impl MultiXactMember {
     687          472948 :     pub fn decode(buf: &mut Bytes) -> MultiXactMember {
     688          472948 :         MultiXactMember {
     689          472948 :             xid: buf.get_u32_le(),
     690          472948 :             status: buf.get_u32_le(),
     691          472948 :         }
     692          472948 :     }
     693                 : }
     694                 : 
     695                 : #[repr(C)]
     696 UBC           0 : #[derive(Debug)]
     697                 : pub struct XlMultiXactCreate {
     698                 :     pub mid: MultiXactId,
     699                 :     /* new MultiXact's ID */
     700                 :     pub moff: MultiXactOffset,
     701                 :     /* its starting offset in members file */
     702                 :     pub nmembers: u32,
     703                 :     /* number of member XIDs */
     704                 :     pub members: Vec<MultiXactMember>,
     705                 : }
     706                 : 
     707                 : impl XlMultiXactCreate {
     708 CBC       24027 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
     709           24027 :         let mid = buf.get_u32_le();
     710           24027 :         let moff = buf.get_u32_le();
     711           24027 :         let nmembers = buf.get_u32_le();
     712           24027 :         let mut members = Vec::new();
     713          472948 :         for _ in 0..nmembers {
     714          472948 :             members.push(MultiXactMember::decode(buf));
     715          472948 :         }
     716           24027 :         XlMultiXactCreate {
     717           24027 :             mid,
     718           24027 :             moff,
     719           24027 :             nmembers,
     720           24027 :             members,
     721           24027 :         }
     722           24027 :     }
     723                 : }
     724                 : 
     725                 : #[repr(C)]
     726 UBC           0 : #[derive(Debug)]
     727                 : pub struct XlMultiXactTruncate {
     728                 :     pub oldest_multi_db: Oid,
     729                 :     /* to-be-truncated range of multixact offsets */
     730                 :     pub start_trunc_off: MultiXactId,
     731                 :     /* just for completeness' sake */
     732                 :     pub end_trunc_off: MultiXactId,
     733                 : 
     734                 :     /* to-be-truncated range of multixact members */
     735                 :     pub start_trunc_memb: MultiXactOffset,
     736                 :     pub end_trunc_memb: MultiXactOffset,
     737                 : }
     738                 : 
     739                 : impl XlMultiXactTruncate {
     740               0 :     pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
     741               0 :         XlMultiXactTruncate {
     742               0 :             oldest_multi_db: buf.get_u32_le(),
     743               0 :             start_trunc_off: buf.get_u32_le(),
     744               0 :             end_trunc_off: buf.get_u32_le(),
     745               0 :             start_trunc_memb: buf.get_u32_le(),
     746               0 :             end_trunc_memb: buf.get_u32_le(),
     747               0 :         }
     748               0 :     }
     749                 : }
     750                 : 
     751                 : #[repr(C)]
     752               0 : #[derive(Debug)]
     753                 : pub struct XlLogicalMessage {
     754                 :     pub db_id: Oid,
     755                 :     pub transactional: bool,
     756                 :     pub prefix_size: usize,
     757                 :     pub message_size: usize,
     758                 : }
     759                 : 
     760                 : impl XlLogicalMessage {
     761 CBC         109 :     pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
     762             109 :         XlLogicalMessage {
     763             109 :             db_id: buf.get_u32_le(),
     764             109 :             transactional: buf.get_u32_le() != 0, // 4-bytes alignment
     765             109 :             prefix_size: buf.get_u64_le() as usize,
     766             109 :             message_size: buf.get_u64_le() as usize,
     767             109 :         }
     768             109 :     }
     769                 : }
     770                 : 
     771                 : /// Main routine to decode a WAL record and figure out which blocks are modified
     772                 : //
     773                 : // See xlogrecord.h for details
     774                 : // The overall layout of an XLOG record is:
     775                 : //              Fixed-size header (XLogRecord struct)
     776                 : //      XLogRecordBlockHeader struct
     777                 : //          If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
     778                 : //                 If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
     779                 : //                 XLogRecordBlockCompressHeader struct follows.
     780                 : //          If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
     781                 : //          BlockNumber follows
     782                 : //      XLogRecordBlockHeader struct
     783                 : //      ...
     784                 : //      XLogRecordDataHeader[Short|Long] struct
     785                 : //      block data
     786                 : //      block data
     787                 : //      ...
     788                 : //      main data
     789                 : //
     790                 : //
     791                 : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
     792                 : // It would be more natural for this function to return a DecodedWALRecord as return value,
     793                 : // but reusing the caller-supplied struct avoids an allocation.
     794                 : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
     795                 : //
     796        68588765 : pub fn decode_wal_record(
     797        68588765 :     record: Bytes,
     798        68588765 :     decoded: &mut DecodedWALRecord,
     799        68588765 :     pg_version: u32,
     800        68588765 : ) -> Result<()> {
     801        68588765 :     let mut rnode_spcnode: u32 = 0;
     802        68588765 :     let mut rnode_dbnode: u32 = 0;
     803        68588765 :     let mut rnode_relnode: u32 = 0;
     804        68588765 :     let mut got_rnode = false;
     805        68588765 : 
     806        68588765 :     let mut buf = record.clone();
     807                 : 
     808                 :     // 1. Parse XLogRecord struct
     809                 : 
     810                 :     // FIXME: assume little-endian here
     811        68588765 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
     812                 : 
     813        68588765 :     trace!(
     814 UBC           0 :         "decode_wal_record xl_rmid = {} xl_info = {}",
     815               0 :         xlogrec.xl_rmid,
     816               0 :         xlogrec.xl_info
     817               0 :     );
     818                 : 
     819 CBC    68588764 :     let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
     820        68588764 : 
     821        68588764 :     if buf.remaining() != remaining {
     822 UBC           0 :         //TODO error
     823 CBC    68588764 :     }
     824                 : 
     825        68588764 :     let mut max_block_id = 0;
     826        68588764 :     let mut blocks_total_len: u32 = 0;
     827        68588764 :     let mut main_data_len = 0;
     828        68588764 :     let mut datatotal: u32 = 0;
     829        68588764 :     decoded.blocks.clear();
     830                 : 
     831                 :     // 2. Decode the headers.
     832                 :     // XLogRecordBlockHeaders if any,
     833                 :     // XLogRecordDataHeader[Short|Long]
     834       207204622 :     while buf.remaining() > datatotal as usize {
     835       138615858 :         let block_id = buf.get_u8();
     836       138615858 : 
     837       138615858 :         match block_id {
     838        68411691 :             pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
     839        68411691 :                 /* XLogRecordDataHeaderShort */
     840        68411691 :                 main_data_len = buf.get_u8() as u32;
     841        68411691 :                 datatotal += main_data_len;
     842        68411691 :             }
     843                 : 
     844           45880 :             pg_constants::XLR_BLOCK_ID_DATA_LONG => {
     845           45880 :                 /* XLogRecordDataHeaderLong */
     846           45880 :                 main_data_len = buf.get_u32_le();
     847           45880 :                 datatotal += main_data_len;
     848           45880 :             }
     849                 : 
     850 UBC           0 :             pg_constants::XLR_BLOCK_ID_ORIGIN => {
     851               0 :                 // RepOriginId is uint16
     852               0 :                 buf.advance(2);
     853               0 :             }
     854                 : 
     855 CBC      101792 :             pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
     856          101792 :                 // TransactionId is uint32
     857          101792 :                 buf.advance(4);
     858          101792 :             }
     859                 : 
     860        70056495 :             0..=pg_constants::XLR_MAX_BLOCK_ID => {
     861                 :                 /* XLogRecordBlockHeader */
     862        70056495 :                 let mut blk = DecodedBkpBlock::new();
     863        70056495 : 
     864        70056495 :                 if block_id <= max_block_id {
     865        65876793 :                     // TODO
     866        65876793 :                     //report_invalid_record(state,
     867        65876793 :                     //                    "out-of-order block_id %u at %X/%X",
     868        65876793 :                     //                    block_id,
     869        65876793 :                     //                    (uint32) (state->ReadRecPtr >> 32),
     870        65876793 :                     //                    (uint32) state->ReadRecPtr);
     871        65876793 :                     //    goto err;
     872        65876793 :                 }
     873        70056495 :                 max_block_id = block_id;
     874        70056495 : 
     875        70056495 :                 let fork_flags: u8 = buf.get_u8();
     876        70056495 :                 blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
     877        70056495 :                 blk.flags = fork_flags;
     878        70056495 :                 blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
     879        70056495 :                 blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
     880        70056495 :                 blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
     881        70056495 :                 blk.data_len = buf.get_u16_le();
     882        70056495 : 
     883        70056495 :                 /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
     884        70056495 : 
     885        70056495 :                 datatotal += blk.data_len as u32;
     886        70056495 :                 blocks_total_len += blk.data_len as u32;
     887        70056495 : 
     888        70056495 :                 if blk.has_image {
     889          172031 :                     blk.bimg_len = buf.get_u16_le();
     890          172031 :                     blk.hole_offset = buf.get_u16_le();
     891          172031 :                     blk.bimg_info = buf.get_u8();
     892                 : 
     893 UBC           0 :                     blk.apply_image = dispatch_pgversion!(
     894 CBC      172031 :                         pg_version,
     895          172031 :                         (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
     896                 :                     );
     897                 : 
     898          172031 :                     let blk_img_is_compressed =
     899          172031 :                         postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
     900                 : 
     901          172031 :                     if blk_img_is_compressed {
     902 UBC           0 :                         debug!("compressed block image , pg_version = {}", pg_version);
     903 CBC      172031 :                     }
     904                 : 
     905          172031 :                     if blk_img_is_compressed {
     906 UBC           0 :                         if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
     907               0 :                             blk.hole_length = buf.get_u16_le();
     908               0 :                         } else {
     909               0 :                             blk.hole_length = 0;
     910               0 :                         }
     911 CBC      172031 :                     } else {
     912          172031 :                         blk.hole_length = BLCKSZ - blk.bimg_len;
     913          172031 :                     }
     914          172031 :                     datatotal += blk.bimg_len as u32;
     915          172031 :                     blocks_total_len += blk.bimg_len as u32;
     916          172031 : 
     917          172031 :                     /*
     918          172031 :                      * cross-check that hole_offset > 0, hole_length > 0 and
     919          172031 :                      * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
     920          172031 :                      */
     921          172031 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
     922          147585 :                         && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
     923 UBC           0 :                     {
     924               0 :                         // TODO
     925               0 :                         /*
     926               0 :                         report_invalid_record(state,
     927               0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
     928               0 :                                       (unsigned int) blk->hole_offset,
     929               0 :                                       (unsigned int) blk->hole_length,
     930               0 :                                       (unsigned int) blk->bimg_len,
     931               0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     932               0 :                         goto err;
     933               0 :                                      */
     934 CBC      172031 :                     }
     935                 : 
     936                 :                     /*
     937                 :                      * cross-check that hole_offset == 0 and hole_length == 0 if
     938                 :                      * the HAS_HOLE flag is not set.
     939                 :                      */
     940          172031 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     941           24446 :                         && (blk.hole_offset != 0 || blk.hole_length != 0)
     942 UBC           0 :                     {
     943               0 :                         // TODO
     944               0 :                         /*
     945               0 :                         report_invalid_record(state,
     946               0 :                                       "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
     947               0 :                                       (unsigned int) blk->hole_offset,
     948               0 :                                       (unsigned int) blk->hole_length,
     949               0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     950               0 :                         goto err;
     951               0 :                                      */
     952 CBC      172031 :                     }
     953                 : 
     954                 :                     /*
     955                 :                      * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
     956                 :                      * flag is set.
     957                 :                      */
     958          172031 :                     if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
     959           24446 :                         // TODO
     960           24446 :                         /*
     961           24446 :                         report_invalid_record(state,
     962           24446 :                                       "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
     963           24446 :                                       (unsigned int) blk->bimg_len,
     964           24446 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     965           24446 :                         goto err;
     966           24446 :                                      */
     967          147585 :                     }
     968                 : 
     969                 :                     /*
     970                 :                      * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
     971                 :                      * IS_COMPRESSED flag is set.
     972                 :                      */
     973          172031 :                     if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
     974           24446 :                         && !blk_img_is_compressed
     975           24446 :                         && blk.bimg_len != BLCKSZ
     976 UBC           0 :                     {
     977               0 :                         // TODO
     978               0 :                         /*
     979               0 :                         report_invalid_record(state,
     980               0 :                                       "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
     981               0 :                                       (unsigned int) blk->data_len,
     982               0 :                                       (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     983               0 :                         goto err;
     984               0 :                                      */
     985 CBC      172031 :                     }
     986        69884464 :                 }
     987        70056495 :                 if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
     988        65877730 :                     rnode_spcnode = buf.get_u32_le();
     989        65877730 :                     rnode_dbnode = buf.get_u32_le();
     990        65877730 :                     rnode_relnode = buf.get_u32_le();
     991        65877730 :                     got_rnode = true;
     992        65877730 :                 } else if !got_rnode {
     993 UBC           0 :                     // TODO
     994               0 :                     /*
     995               0 :                     report_invalid_record(state,
     996               0 :                                     "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
     997               0 :                                     (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
     998               0 :                     goto err;           */
     999 CBC     4178765 :                 }
    1000                 : 
    1001        70056495 :                 blk.rnode_spcnode = rnode_spcnode;
    1002        70056495 :                 blk.rnode_dbnode = rnode_dbnode;
    1003        70056495 :                 blk.rnode_relnode = rnode_relnode;
    1004        70056495 : 
    1005        70056495 :                 blk.blkno = buf.get_u32_le();
    1006        70056495 :                 trace!(
    1007 UBC           0 :                     "this record affects {}/{}/{} blk {}",
    1008               0 :                     rnode_spcnode,
    1009               0 :                     rnode_dbnode,
    1010               0 :                     rnode_relnode,
    1011               0 :                     blk.blkno
    1012               0 :                 );
    1013                 : 
    1014 CBC    70056495 :                 decoded.blocks.push(blk);
    1015                 :             }
    1016                 : 
    1017 UBC           0 :             _ => {
    1018               0 :                 // TODO: invalid block_id
    1019               0 :             }
    1020                 :         }
    1021                 :     }
    1022                 : 
    1023                 :     // 3. Decode blocks.
    1024 CBC    68588764 :     let mut ptr = record.len() - buf.remaining();
    1025        70056495 :     for blk in decoded.blocks.iter_mut() {
    1026        70056495 :         if blk.has_image {
    1027          172031 :             blk.bimg_offset = ptr as u32;
    1028          172031 :             ptr += blk.bimg_len as usize;
    1029        69884464 :         }
    1030        70056495 :         if blk.has_data {
    1031        59267048 :             ptr += blk.data_len as usize;
    1032        59267048 :         }
    1033                 :     }
    1034                 :     // We don't need them, so just skip blocks_total_len bytes
    1035        68588764 :     buf.advance(blocks_total_len as usize);
    1036        68588764 :     assert_eq!(ptr, record.len() - buf.remaining());
    1037                 : 
    1038        68588764 :     let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
    1039        68588764 : 
    1040        68588764 :     // 4. Decode main_data
    1041        68588764 :     if main_data_len > 0 {
    1042        68457571 :         assert_eq!(buf.remaining(), main_data_len as usize);
    1043          131193 :     }
    1044                 : 
    1045        68588764 :     decoded.xl_xid = xlogrec.xl_xid;
    1046        68588764 :     decoded.xl_info = xlogrec.xl_info;
    1047        68588764 :     decoded.xl_rmid = xlogrec.xl_rmid;
    1048        68588764 :     decoded.record = record;
    1049        68588764 :     decoded.main_data_offset = main_data_offset;
    1050        68588764 : 
    1051        68588764 :     Ok(())
    1052        68588764 : }
    1053                 : 
    1054                 : ///
    1055                 : /// Build a human-readable string to describe a WAL record
    1056                 : ///
    1057                 : /// For debugging purposes
    1058 UBC           0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
    1059               0 :     match rec {
    1060               0 :         NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
    1061               0 :             "will_init: {}, {}",
    1062               0 :             will_init,
    1063               0 :             describe_postgres_wal_record(rec)?
    1064                 :         )),
    1065               0 :         _ => Ok(format!("{:?}", rec)),
    1066                 :     }
    1067               0 : }
    1068                 : 
    1069               0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
    1070               0 :     // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
    1071               0 :     // Maybe use the postgres wal redo process, the same used for replaying WAL records?
    1072               0 :     // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
    1073               0 :     // without worrying about security?
    1074               0 :     //
    1075               0 :     // But for now, we have a hand-written code for a few common WAL record types here.
    1076               0 : 
    1077               0 :     let mut buf = record.clone();
    1078                 : 
    1079                 :     // 1. Parse XLogRecord struct
    1080                 : 
    1081                 :     // FIXME: assume little-endian here
    1082               0 :     let xlogrec = XLogRecord::from_bytes(&mut buf)?;
    1083                 : 
    1084                 :     let unknown_str: String;
    1085                 : 
    1086               0 :     let result: &str = match xlogrec.xl_rmid {
    1087                 :         pg_constants::RM_HEAP2_ID => {
    1088               0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1089               0 :             match info {
    1090               0 :                 pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
    1091               0 :                 pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
    1092                 :                 _ => {
    1093               0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1094               0 :                     &unknown_str
    1095                 :                 }
    1096                 :             }
    1097                 :         }
    1098                 :         pg_constants::RM_HEAP_ID => {
    1099               0 :             let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
    1100               0 :             match info {
    1101               0 :                 pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
    1102               0 :                 pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
    1103               0 :                 pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
    1104               0 :                 pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
    1105                 :                 _ => {
    1106               0 :                     unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
    1107               0 :                     &unknown_str
    1108                 :                 }
    1109                 :             }
    1110                 :         }
    1111                 :         pg_constants::RM_XLOG_ID => {
    1112               0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1113               0 :             match info {
    1114               0 :                 pg_constants::XLOG_FPI => "XLOG FPI",
    1115               0 :                 pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
    1116                 :                 _ => {
    1117               0 :                     unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
    1118               0 :                     &unknown_str
    1119                 :                 }
    1120                 :             }
    1121                 :         }
    1122               0 :         rmid => {
    1123               0 :             let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
    1124               0 : 
    1125               0 :             unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
    1126               0 :             &unknown_str
    1127                 :         }
    1128                 :     };
    1129                 : 
    1130               0 :     Ok(String::from(result))
    1131               0 : }
        

Generated by: LCOV version 2.1-beta