LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - models.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 12.8 % 39 5
Test Date: 2025-02-20 13:11:02 Functions: 0.7 % 137 1

            Line data    Source code
       1              : //! This module houses types which represent decoded PG WAL records
       2              : //! ready for the pageserver to interpret. They are derived from the original
       3              : //! WAL records, so that each struct corresponds closely to one WAL record of
       4              : //! a specific kind. They contain the same information as the original WAL records,
       5              : //! but the values are already serialized in a [`SerializedValueBatch`], which
       6              : //! is the format that the pageserver is expecting them in.
       7              : //!
       8              : //! The ingestion code uses these structs to help with parsing the WAL records,
       9              : //! and it splits them into a stream of modifications to the key-value pairs that
      10              : //! are ultimately stored in delta layers.  See also the split-out counterparts in
      11              : //! [`postgres_ffi::walrecord`].
      12              : //!
      13              : //! The pipeline which processes WAL records is not super obvious, so let's follow
      14              : //! the flow of an example XACT_COMMIT Postgres record:
      15              : //!
      16              : //! (Postgres XACT_COMMIT record)
      17              : //! |
      18              : //! |--> pageserver::walingest::WalIngest::decode_xact_record
      19              : //!      |
      20              : //!      |--> ([`XactRecord::Commit`])
      21              : //!           |
      22              : //!           |--> pageserver::walingest::WalIngest::ingest_xact_record
      23              : //!                |
      24              : //!                |--> (NeonWalRecord::ClogSetCommitted)
      25              : //!                     |
      26              : //!                     |--> write to KV store within the pageserver
      27              : 
      28              : use bytes::Bytes;
      29              : use pageserver_api::reltag::{RelTag, SlruKind};
      30              : use postgres_ffi::walrecord::{
      31              :     XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
      32              :     XlSmgrTruncate, XlXactParsedRecord,
      33              : };
      34              : use postgres_ffi::{Oid, TransactionId};
      35              : use serde::{Deserialize, Serialize};
      36              : use utils::lsn::Lsn;
      37              : 
      38              : use crate::serialized_batch::SerializedValueBatch;
      39              : 
      40              : // Code generated by protobuf.
      41              : pub mod proto {
      42              :     // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
      43              :     // we don't use these types for anything but broker data transmission,
      44              :     // so it's ok to ignore this one.
      45              :     #![allow(clippy::derive_partial_eq_without_eq)]
      46              :     // The generated ValueMeta has a `len` method generate for its `len` field.
      47              :     #![allow(clippy::len_without_is_empty)]
      48              :     include!(concat!(env!("OUT_DIR"), concat!("/interpreted_wal.rs")));
      49              : }
      50              : 
      51            0 : #[derive(Copy, Clone, Serialize, Deserialize)]
      52              : pub enum FlushUncommittedRecords {
      53              :     Yes,
      54              :     No,
      55              : }
      56              : 
      57              : /// A batch of interpreted WAL records
      58            0 : #[derive(Serialize, Deserialize)]
      59              : pub struct InterpretedWalRecords {
      60              :     pub records: Vec<InterpretedWalRecord>,
      61              :     // Start LSN of the next record after the batch.
      62              :     // Note that said record may not belong to the current shard.
      63              :     pub next_record_lsn: Option<Lsn>,
      64              : }
      65              : 
      66              : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
      67            0 : #[derive(Serialize, Deserialize, Clone)]
      68              : pub struct InterpretedWalRecord {
      69              :     /// Optional metadata record - may cause writes to metadata keys
      70              :     /// in the storage engine
      71              :     pub metadata_record: Option<MetadataRecord>,
      72              :     /// A pre-serialized batch along with the required metadata for ingestion
      73              :     /// by the pageserver
      74              :     pub batch: SerializedValueBatch,
      75              :     /// Byte offset within WAL for the start of the next PG WAL record.
      76              :     /// Usually this is the end LSN of the current record, but in case of
      77              :     /// XLOG SWITCH records it will be within the next segment.
      78              :     pub next_record_lsn: Lsn,
      79              :     /// Whether to flush all uncommitted modifications to the storage engine
      80              :     /// before ingesting this record. This is currently only used for legacy PG
      81              :     /// database creations which read pages from a template database. Such WAL
      82              :     /// records require reading data blocks while ingesting, hence the need to flush.
      83              :     pub flush_uncommitted: FlushUncommittedRecords,
      84              :     /// Transaction id of the original PG WAL record
      85              :     pub xid: TransactionId,
      86              : }
      87              : 
      88              : impl InterpretedWalRecord {
      89              :     /// Checks if the WAL record is empty
      90              :     ///
      91              :     /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
      92              :     /// pageserver.
      93          795 :     pub fn is_empty(&self) -> bool {
      94          795 :         self.batch.is_empty()
      95          795 :             && self.metadata_record.is_none()
      96          199 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
      97          795 :     }
      98              : 
      99              :     /// Checks if the WAL record is observed (i.e. contains only metadata
     100              :     /// for observed values)
     101            0 :     pub fn is_observed(&self) -> bool {
     102            0 :         self.batch.is_observed()
     103            0 :             && self.metadata_record.is_none()
     104            0 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
     105            0 :     }
     106              : }
     107              : 
     108              : /// The interpreted part of the Postgres WAL record which requires metadata
     109              : /// writes to the underlying storage engine.
     110            0 : #[derive(Clone, Serialize, Deserialize)]
     111              : pub enum MetadataRecord {
     112              :     Heapam(HeapamRecord),
     113              :     Neonrmgr(NeonrmgrRecord),
     114              :     Smgr(SmgrRecord),
     115              :     Dbase(DbaseRecord),
     116              :     Clog(ClogRecord),
     117              :     Xact(XactRecord),
     118              :     MultiXact(MultiXactRecord),
     119              :     Relmap(RelmapRecord),
     120              :     Xlog(XlogRecord),
     121              :     LogicalMessage(LogicalMessageRecord),
     122              :     Standby(StandbyRecord),
     123              :     Replorigin(ReploriginRecord),
     124              : }
     125              : 
     126            0 : #[derive(Clone, Serialize, Deserialize)]
     127              : pub enum HeapamRecord {
     128              :     ClearVmBits(ClearVmBits),
     129              : }
     130              : 
     131            0 : #[derive(Clone, Serialize, Deserialize)]
     132              : pub struct ClearVmBits {
     133              :     pub new_heap_blkno: Option<u32>,
     134              :     pub old_heap_blkno: Option<u32>,
     135              :     pub vm_rel: RelTag,
     136              :     pub flags: u8,
     137              : }
     138              : 
     139            0 : #[derive(Clone, Serialize, Deserialize)]
     140              : pub enum NeonrmgrRecord {
     141              :     ClearVmBits(ClearVmBits),
     142              : }
     143              : 
     144            0 : #[derive(Clone, Serialize, Deserialize)]
     145              : pub enum SmgrRecord {
     146              :     Create(SmgrCreate),
     147              :     Truncate(XlSmgrTruncate),
     148              : }
     149              : 
     150            0 : #[derive(Clone, Serialize, Deserialize)]
     151              : pub struct SmgrCreate {
     152              :     pub rel: RelTag,
     153              : }
     154              : 
     155            0 : #[derive(Clone, Serialize, Deserialize)]
     156              : pub enum DbaseRecord {
     157              :     Create(DbaseCreate),
     158              :     Drop(DbaseDrop),
     159              : }
     160              : 
     161            0 : #[derive(Clone, Serialize, Deserialize)]
     162              : pub struct DbaseCreate {
     163              :     pub db_id: Oid,
     164              :     pub tablespace_id: Oid,
     165              :     pub src_db_id: Oid,
     166              :     pub src_tablespace_id: Oid,
     167              : }
     168              : 
     169            0 : #[derive(Clone, Serialize, Deserialize)]
     170              : pub struct DbaseDrop {
     171              :     pub db_id: Oid,
     172              :     pub tablespace_ids: Vec<Oid>,
     173              : }
     174              : 
     175            0 : #[derive(Clone, Serialize, Deserialize)]
     176              : pub enum ClogRecord {
     177              :     ZeroPage(ClogZeroPage),
     178              :     Truncate(ClogTruncate),
     179              : }
     180              : 
     181            0 : #[derive(Clone, Serialize, Deserialize)]
     182              : pub struct ClogZeroPage {
     183              :     pub segno: u32,
     184              :     pub rpageno: u32,
     185              : }
     186              : 
     187            0 : #[derive(Clone, Serialize, Deserialize)]
     188              : pub struct ClogTruncate {
     189              :     pub pageno: u32,
     190              :     pub oldest_xid: TransactionId,
     191              :     pub oldest_xid_db: Oid,
     192              : }
     193              : 
     194            0 : #[derive(Clone, Serialize, Deserialize)]
     195              : pub enum XactRecord {
     196              :     Commit(XactCommon),
     197              :     Abort(XactCommon),
     198              :     CommitPrepared(XactCommon),
     199              :     AbortPrepared(XactCommon),
     200              :     Prepare(XactPrepare),
     201              : }
     202              : 
     203            0 : #[derive(Clone, Serialize, Deserialize)]
     204              : pub struct XactCommon {
     205              :     pub parsed: XlXactParsedRecord,
     206              :     pub origin_id: u16,
     207              :     // Fields below are only used for logging
     208              :     pub xl_xid: TransactionId,
     209              :     pub lsn: Lsn,
     210              : }
     211              : 
     212            0 : #[derive(Clone, Serialize, Deserialize)]
     213              : pub struct XactPrepare {
     214              :     pub xl_xid: TransactionId,
     215              :     pub data: Bytes,
     216              : }
     217              : 
     218            0 : #[derive(Clone, Serialize, Deserialize)]
     219              : pub enum MultiXactRecord {
     220              :     ZeroPage(MultiXactZeroPage),
     221              :     Create(XlMultiXactCreate),
     222              :     Truncate(XlMultiXactTruncate),
     223              : }
     224              : 
     225            0 : #[derive(Clone, Serialize, Deserialize)]
     226              : pub struct MultiXactZeroPage {
     227              :     pub slru_kind: SlruKind,
     228              :     pub segno: u32,
     229              :     pub rpageno: u32,
     230              : }
     231              : 
     232            0 : #[derive(Clone, Serialize, Deserialize)]
     233              : pub enum RelmapRecord {
     234              :     Update(RelmapUpdate),
     235              : }
     236              : 
     237            0 : #[derive(Clone, Serialize, Deserialize)]
     238              : pub struct RelmapUpdate {
     239              :     pub update: XlRelmapUpdate,
     240              :     pub buf: Bytes,
     241              : }
     242              : 
     243            0 : #[derive(Clone, Serialize, Deserialize)]
     244              : pub enum XlogRecord {
     245              :     Raw(RawXlogRecord),
     246              : }
     247              : 
     248            0 : #[derive(Clone, Serialize, Deserialize)]
     249              : pub struct RawXlogRecord {
     250              :     pub info: u8,
     251              :     pub lsn: Lsn,
     252              :     pub buf: Bytes,
     253              : }
     254              : 
     255            0 : #[derive(Clone, Serialize, Deserialize)]
     256              : pub enum LogicalMessageRecord {
     257              :     Put(PutLogicalMessage),
     258              :     #[cfg(feature = "testing")]
     259              :     Failpoint,
     260              : }
     261              : 
     262            0 : #[derive(Clone, Serialize, Deserialize)]
     263              : pub struct PutLogicalMessage {
     264              :     pub path: String,
     265              :     pub buf: Bytes,
     266              : }
     267              : 
     268            0 : #[derive(Clone, Serialize, Deserialize)]
     269              : pub enum StandbyRecord {
     270              :     RunningXacts(StandbyRunningXacts),
     271              : }
     272              : 
     273            0 : #[derive(Clone, Serialize, Deserialize)]
     274              : pub struct StandbyRunningXacts {
     275              :     pub oldest_running_xid: TransactionId,
     276              : }
     277              : 
     278            0 : #[derive(Clone, Serialize, Deserialize)]
     279              : pub enum ReploriginRecord {
     280              :     Set(XlReploriginSet),
     281              :     Drop(XlReploriginDrop),
     282              : }
        

Generated by: LCOV version 2.1-beta