LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - models.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 12.8 % 39 5
Test Date: 2025-07-16 12:29:03 Functions: 2.2 % 46 1

            Line data    Source code
       1              : //! This module houses types which represent decoded PG WAL records
       2              : //! ready for the pageserver to interpret. They are derived from the original
       3              : //! WAL records, so that each struct corresponds closely to one WAL record of
       4              : //! a specific kind. They contain the same information as the original WAL records,
       5              : //! but the values are already serialized in a [`SerializedValueBatch`], which
       6              : //! is the format that the pageserver is expecting them in.
       7              : //!
       8              : //! The ingestion code uses these structs to help with parsing the WAL records,
       9              : //! and it splits them into a stream of modifications to the key-value pairs that
      10              : //! are ultimately stored in delta layers.  See also the split-out counterparts in
      11              : //! [`postgres_ffi::walrecord`].
      12              : //!
      13              : //! The pipeline which processes WAL records is not super obvious, so let's follow
      14              : //! the flow of an example XACT_COMMIT Postgres record:
      15              : //!
      16              : //! (Postgres XACT_COMMIT record)
      17              : //! |
      18              : //! |--> pageserver::walingest::WalIngest::decode_xact_record
      19              : //!      |
      20              : //!      |--> ([`XactRecord::Commit`])
      21              : //!           |
      22              : //!           |--> pageserver::walingest::WalIngest::ingest_xact_record
      23              : //!                |
      24              : //!                |--> (NeonWalRecord::ClogSetCommitted)
      25              : //!                     |
      26              : //!                     |--> write to KV store within the pageserver
      27              : 
      28              : pub mod record;
      29              : pub mod value;
      30              : 
      31              : use bytes::Bytes;
      32              : use pageserver_api::reltag::{RelTag, SlruKind};
      33              : use postgres_ffi::walrecord::{
      34              :     XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
      35              :     XlSmgrTruncate, XlXactParsedRecord,
      36              : };
      37              : use postgres_ffi::{Oid, TransactionId};
      38              : use serde::{Deserialize, Serialize};
      39              : use utils::lsn::Lsn;
      40              : 
      41              : use crate::serialized_batch::SerializedValueBatch;
      42              : 
      43              : // Code generated by protobuf.
      44              : pub mod proto {
      45              :     // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
      46              :     // we don't use these types for anything but broker data transmission,
      47              :     // so it's ok to ignore this one.
      48              :     #![allow(clippy::derive_partial_eq_without_eq)]
      49              :     // The generated ValueMeta has a `len` method generate for its `len` field.
      50              :     #![allow(clippy::len_without_is_empty)]
      51              :     include!(concat!(env!("OUT_DIR"), concat!("/interpreted_wal.rs")));
      52              : }
      53              : 
      54            0 : #[derive(Copy, Clone, Serialize, Deserialize)]
      55              : pub enum FlushUncommittedRecords {
      56              :     Yes,
      57              :     No,
      58              : }
      59              : 
      60              : /// A batch of interpreted WAL records
      61            0 : #[derive(Serialize, Deserialize)]
      62              : pub struct InterpretedWalRecords {
      63              :     pub records: Vec<InterpretedWalRecord>,
      64              :     // Start LSN of the next record after the batch.
      65              :     // Note that said record may not belong to the current shard.
      66              :     pub next_record_lsn: Lsn,
      67              :     // Inclusive start LSN of the PG WAL from which the interpreted
      68              :     // WAL records were extracted. Note that this is not necessarily the
      69              :     // start LSN of the first interpreted record in the batch.
      70              :     pub raw_wal_start_lsn: Option<Lsn>,
      71              : }
      72              : 
      73              : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
      74            0 : #[derive(Serialize, Deserialize, Clone)]
      75              : pub struct InterpretedWalRecord {
      76              :     /// Optional metadata record - may cause writes to metadata keys
      77              :     /// in the storage engine
      78              :     pub metadata_record: Option<MetadataRecord>,
      79              :     /// A pre-serialized batch along with the required metadata for ingestion
      80              :     /// by the pageserver
      81              :     pub batch: SerializedValueBatch,
      82              :     /// Byte offset within WAL for the start of the next PG WAL record.
      83              :     /// Usually this is the end LSN of the current record, but in case of
      84              :     /// XLOG SWITCH records it will be within the next segment.
      85              :     pub next_record_lsn: Lsn,
      86              :     /// Whether to flush all uncommitted modifications to the storage engine
      87              :     /// before ingesting this record. This is currently only used for legacy PG
      88              :     /// database creations which read pages from a template database. Such WAL
      89              :     /// records require reading data blocks while ingesting, hence the need to flush.
      90              :     pub flush_uncommitted: FlushUncommittedRecords,
      91              :     /// Transaction id of the original PG WAL record
      92              :     pub xid: TransactionId,
      93              : }
      94              : 
      95              : impl InterpretedWalRecord {
      96              :     /// Checks if the WAL record is empty
      97              :     ///
      98              :     /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
      99              :     /// pageserver.
     100          200 :     pub fn is_empty(&self) -> bool {
     101          200 :         self.batch.is_empty()
     102          200 :             && self.metadata_record.is_none()
     103          200 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
     104          200 :     }
     105              : 
     106              :     /// Checks if the WAL record is observed (i.e. contains only metadata
     107              :     /// for observed values)
     108            0 :     pub fn is_observed(&self) -> bool {
     109            0 :         self.batch.is_observed()
     110            0 :             && self.metadata_record.is_none()
     111            0 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
     112            0 :     }
     113              : }
     114              : 
     115              : /// The interpreted part of the Postgres WAL record which requires metadata
     116              : /// writes to the underlying storage engine.
     117            0 : #[derive(Clone, Serialize, Deserialize)]
     118              : pub enum MetadataRecord {
     119              :     Heapam(HeapamRecord),
     120              :     Neonrmgr(NeonrmgrRecord),
     121              :     Smgr(SmgrRecord),
     122              :     Dbase(DbaseRecord),
     123              :     Clog(ClogRecord),
     124              :     Xact(XactRecord),
     125              :     MultiXact(MultiXactRecord),
     126              :     Relmap(RelmapRecord),
     127              :     Xlog(XlogRecord),
     128              :     LogicalMessage(LogicalMessageRecord),
     129              :     Standby(StandbyRecord),
     130              :     Replorigin(ReploriginRecord),
     131              : }
     132              : 
     133            0 : #[derive(Clone, Serialize, Deserialize)]
     134              : pub enum HeapamRecord {
     135              :     ClearVmBits(ClearVmBits),
     136              : }
     137              : 
     138            0 : #[derive(Clone, Serialize, Deserialize)]
     139              : pub struct ClearVmBits {
     140              :     pub new_heap_blkno: Option<u32>,
     141              :     pub old_heap_blkno: Option<u32>,
     142              :     pub vm_rel: RelTag,
     143              :     pub flags: u8,
     144              : }
     145              : 
     146            0 : #[derive(Clone, Serialize, Deserialize)]
     147              : pub enum NeonrmgrRecord {
     148              :     ClearVmBits(ClearVmBits),
     149              : }
     150              : 
     151            0 : #[derive(Clone, Serialize, Deserialize)]
     152              : pub enum SmgrRecord {
     153              :     Create(SmgrCreate),
     154              :     Truncate(XlSmgrTruncate),
     155              : }
     156              : 
     157            0 : #[derive(Clone, Serialize, Deserialize)]
     158              : pub struct SmgrCreate {
     159              :     pub rel: RelTag,
     160              : }
     161              : 
     162            0 : #[derive(Clone, Serialize, Deserialize)]
     163              : pub enum DbaseRecord {
     164              :     Create(DbaseCreate),
     165              :     Drop(DbaseDrop),
     166              : }
     167              : 
     168            0 : #[derive(Clone, Serialize, Deserialize)]
     169              : pub struct DbaseCreate {
     170              :     pub db_id: Oid,
     171              :     pub tablespace_id: Oid,
     172              :     pub src_db_id: Oid,
     173              :     pub src_tablespace_id: Oid,
     174              : }
     175              : 
     176            0 : #[derive(Clone, Serialize, Deserialize)]
     177              : pub struct DbaseDrop {
     178              :     pub db_id: Oid,
     179              :     pub tablespace_ids: Vec<Oid>,
     180              : }
     181              : 
     182            0 : #[derive(Clone, Serialize, Deserialize)]
     183              : pub enum ClogRecord {
     184              :     ZeroPage(ClogZeroPage),
     185              :     Truncate(ClogTruncate),
     186              : }
     187              : 
     188            0 : #[derive(Clone, Serialize, Deserialize)]
     189              : pub struct ClogZeroPage {
     190              :     pub segno: u32,
     191              :     pub rpageno: u32,
     192              : }
     193              : 
     194            0 : #[derive(Clone, Serialize, Deserialize)]
     195              : pub struct ClogTruncate {
     196              :     pub pageno: u32,
     197              :     pub oldest_xid: TransactionId,
     198              :     pub oldest_xid_db: Oid,
     199              : }
     200              : 
     201            0 : #[derive(Clone, Serialize, Deserialize)]
     202              : pub enum XactRecord {
     203              :     Commit(XactCommon),
     204              :     Abort(XactCommon),
     205              :     CommitPrepared(XactCommon),
     206              :     AbortPrepared(XactCommon),
     207              :     Prepare(XactPrepare),
     208              : }
     209              : 
     210            0 : #[derive(Clone, Serialize, Deserialize)]
     211              : pub struct XactCommon {
     212              :     pub parsed: XlXactParsedRecord,
     213              :     pub origin_id: u16,
     214              :     // Fields below are only used for logging
     215              :     pub xl_xid: TransactionId,
     216              :     pub lsn: Lsn,
     217              : }
     218              : 
     219            0 : #[derive(Clone, Serialize, Deserialize)]
     220              : pub struct XactPrepare {
     221              :     pub xl_xid: TransactionId,
     222              :     pub data: Bytes,
     223              : }
     224              : 
     225            0 : #[derive(Clone, Serialize, Deserialize)]
     226              : pub enum MultiXactRecord {
     227              :     ZeroPage(MultiXactZeroPage),
     228              :     Create(XlMultiXactCreate),
     229              :     Truncate(XlMultiXactTruncate),
     230              : }
     231              : 
     232            0 : #[derive(Clone, Serialize, Deserialize)]
     233              : pub struct MultiXactZeroPage {
     234              :     pub slru_kind: SlruKind,
     235              :     pub segno: u32,
     236              :     pub rpageno: u32,
     237              : }
     238              : 
     239            0 : #[derive(Clone, Serialize, Deserialize)]
     240              : pub enum RelmapRecord {
     241              :     Update(RelmapUpdate),
     242              : }
     243              : 
     244            0 : #[derive(Clone, Serialize, Deserialize)]
     245              : pub struct RelmapUpdate {
     246              :     pub update: XlRelmapUpdate,
     247              :     pub buf: Bytes,
     248              : }
     249              : 
     250            0 : #[derive(Clone, Serialize, Deserialize)]
     251              : pub enum XlogRecord {
     252              :     Raw(RawXlogRecord),
     253              : }
     254              : 
     255            0 : #[derive(Clone, Serialize, Deserialize)]
     256              : pub struct RawXlogRecord {
     257              :     pub info: u8,
     258              :     pub lsn: Lsn,
     259              :     pub buf: Bytes,
     260              : }
     261              : 
     262            0 : #[derive(Clone, Serialize, Deserialize)]
     263              : pub enum LogicalMessageRecord {
     264              :     Put(PutLogicalMessage),
     265              :     #[cfg(feature = "testing")]
     266              :     Failpoint,
     267              : }
     268              : 
     269            0 : #[derive(Clone, Serialize, Deserialize)]
     270              : pub struct PutLogicalMessage {
     271              :     pub path: String,
     272              :     pub buf: Bytes,
     273              : }
     274              : 
     275            0 : #[derive(Clone, Serialize, Deserialize)]
     276              : pub enum StandbyRecord {
     277              :     RunningXacts(StandbyRunningXacts),
     278              : }
     279              : 
     280            0 : #[derive(Clone, Serialize, Deserialize)]
     281              : pub struct StandbyRunningXacts {
     282              :     pub oldest_running_xid: TransactionId,
     283              : }
     284              : 
     285            0 : #[derive(Clone, Serialize, Deserialize)]
     286              : pub enum ReploriginRecord {
     287              :     Set(XlReploriginSet),
     288              :     Drop(XlReploriginDrop),
     289              : }
        

Generated by: LCOV version 2.1-beta