LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - models.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 12.8 % 39 5
Test Date: 2025-03-12 00:01:28 Functions: 0.7 % 137 1

            Line data    Source code
       1              : //! This module houses types which represent decoded PG WAL records
       2              : //! ready for the pageserver to interpret. They are derived from the original
       3              : //! WAL records, so that each struct corresponds closely to one WAL record of
       4              : //! a specific kind. They contain the same information as the original WAL records,
       5              : //! but the values are already serialized in a [`SerializedValueBatch`], which
       6              : //! is the format that the pageserver is expecting them in.
       7              : //!
       8              : //! The ingestion code uses these structs to help with parsing the WAL records,
       9              : //! and it splits them into a stream of modifications to the key-value pairs that
      10              : //! are ultimately stored in delta layers.  See also the split-out counterparts in
      11              : //! [`postgres_ffi::walrecord`].
      12              : //!
      13              : //! The pipeline which processes WAL records is not super obvious, so let's follow
      14              : //! the flow of an example XACT_COMMIT Postgres record:
      15              : //!
      16              : //! (Postgres XACT_COMMIT record)
      17              : //! |
      18              : //! |--> pageserver::walingest::WalIngest::decode_xact_record
      19              : //!      |
      20              : //!      |--> ([`XactRecord::Commit`])
      21              : //!           |
      22              : //!           |--> pageserver::walingest::WalIngest::ingest_xact_record
      23              : //!                |
      24              : //!                |--> (NeonWalRecord::ClogSetCommitted)
      25              : //!                     |
      26              : //!                     |--> write to KV store within the pageserver
      27              : 
      28              : use bytes::Bytes;
      29              : use pageserver_api::reltag::{RelTag, SlruKind};
      30              : use postgres_ffi::walrecord::{
      31              :     XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
      32              :     XlSmgrTruncate, XlXactParsedRecord,
      33              : };
      34              : use postgres_ffi::{Oid, TransactionId};
      35              : use serde::{Deserialize, Serialize};
      36              : use utils::lsn::Lsn;
      37              : 
      38              : use crate::serialized_batch::SerializedValueBatch;
      39              : 
      40              : // Code generated by protobuf.
      41              : pub mod proto {
      42              :     // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
      43              :     // we don't use these types for anything but broker data transmission,
      44              :     // so it's ok to ignore this one.
      45              :     #![allow(clippy::derive_partial_eq_without_eq)]
      46              :     // The generated ValueMeta has a `len` method generate for its `len` field.
      47              :     #![allow(clippy::len_without_is_empty)]
      48              :     include!(concat!(env!("OUT_DIR"), concat!("/interpreted_wal.rs")));
      49              : }
      50              : 
      51            0 : #[derive(Copy, Clone, Serialize, Deserialize)]
      52              : pub enum FlushUncommittedRecords {
      53              :     Yes,
      54              :     No,
      55              : }
      56              : 
      57              : /// A batch of interpreted WAL records
      58            0 : #[derive(Serialize, Deserialize)]
      59              : pub struct InterpretedWalRecords {
      60              :     pub records: Vec<InterpretedWalRecord>,
      61              :     // Start LSN of the next record after the batch.
      62              :     // Note that said record may not belong to the current shard.
      63              :     pub next_record_lsn: Lsn,
      64              :     // Inclusive start LSN of the PG WAL from which the interpreted
      65              :     // WAL records were extracted. Note that this is not necessarily the
      66              :     // start LSN of the first interpreted record in the batch.
      67              :     pub raw_wal_start_lsn: Option<Lsn>,
      68              : }
      69              : 
      70              : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
      71            0 : #[derive(Serialize, Deserialize, Clone)]
      72              : pub struct InterpretedWalRecord {
      73              :     /// Optional metadata record - may cause writes to metadata keys
      74              :     /// in the storage engine
      75              :     pub metadata_record: Option<MetadataRecord>,
      76              :     /// A pre-serialized batch along with the required metadata for ingestion
      77              :     /// by the pageserver
      78              :     pub batch: SerializedValueBatch,
      79              :     /// Byte offset within WAL for the start of the next PG WAL record.
      80              :     /// Usually this is the end LSN of the current record, but in case of
      81              :     /// XLOG SWITCH records it will be within the next segment.
      82              :     pub next_record_lsn: Lsn,
      83              :     /// Whether to flush all uncommitted modifications to the storage engine
      84              :     /// before ingesting this record. This is currently only used for legacy PG
      85              :     /// database creations which read pages from a template database. Such WAL
      86              :     /// records require reading data blocks while ingesting, hence the need to flush.
      87              :     pub flush_uncommitted: FlushUncommittedRecords,
      88              :     /// Transaction id of the original PG WAL record
      89              :     pub xid: TransactionId,
      90              : }
      91              : 
      92              : impl InterpretedWalRecord {
      93              :     /// Checks if the WAL record is empty
      94              :     ///
      95              :     /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
      96              :     /// pageserver.
      97          200 :     pub fn is_empty(&self) -> bool {
      98          200 :         self.batch.is_empty()
      99          200 :             && self.metadata_record.is_none()
     100          200 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
     101          200 :     }
     102              : 
     103              :     /// Checks if the WAL record is observed (i.e. contains only metadata
     104              :     /// for observed values)
     105            0 :     pub fn is_observed(&self) -> bool {
     106            0 :         self.batch.is_observed()
     107            0 :             && self.metadata_record.is_none()
     108            0 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
     109            0 :     }
     110              : }
     111              : 
     112              : /// The interpreted part of the Postgres WAL record which requires metadata
     113              : /// writes to the underlying storage engine.
     114            0 : #[derive(Clone, Serialize, Deserialize)]
     115              : pub enum MetadataRecord {
     116              :     Heapam(HeapamRecord),
     117              :     Neonrmgr(NeonrmgrRecord),
     118              :     Smgr(SmgrRecord),
     119              :     Dbase(DbaseRecord),
     120              :     Clog(ClogRecord),
     121              :     Xact(XactRecord),
     122              :     MultiXact(MultiXactRecord),
     123              :     Relmap(RelmapRecord),
     124              :     Xlog(XlogRecord),
     125              :     LogicalMessage(LogicalMessageRecord),
     126              :     Standby(StandbyRecord),
     127              :     Replorigin(ReploriginRecord),
     128              : }
     129              : 
     130            0 : #[derive(Clone, Serialize, Deserialize)]
     131              : pub enum HeapamRecord {
     132              :     ClearVmBits(ClearVmBits),
     133              : }
     134              : 
     135            0 : #[derive(Clone, Serialize, Deserialize)]
     136              : pub struct ClearVmBits {
     137              :     pub new_heap_blkno: Option<u32>,
     138              :     pub old_heap_blkno: Option<u32>,
     139              :     pub vm_rel: RelTag,
     140              :     pub flags: u8,
     141              : }
     142              : 
     143            0 : #[derive(Clone, Serialize, Deserialize)]
     144              : pub enum NeonrmgrRecord {
     145              :     ClearVmBits(ClearVmBits),
     146              : }
     147              : 
     148            0 : #[derive(Clone, Serialize, Deserialize)]
     149              : pub enum SmgrRecord {
     150              :     Create(SmgrCreate),
     151              :     Truncate(XlSmgrTruncate),
     152              : }
     153              : 
     154            0 : #[derive(Clone, Serialize, Deserialize)]
     155              : pub struct SmgrCreate {
     156              :     pub rel: RelTag,
     157              : }
     158              : 
     159            0 : #[derive(Clone, Serialize, Deserialize)]
     160              : pub enum DbaseRecord {
     161              :     Create(DbaseCreate),
     162              :     Drop(DbaseDrop),
     163              : }
     164              : 
     165            0 : #[derive(Clone, Serialize, Deserialize)]
     166              : pub struct DbaseCreate {
     167              :     pub db_id: Oid,
     168              :     pub tablespace_id: Oid,
     169              :     pub src_db_id: Oid,
     170              :     pub src_tablespace_id: Oid,
     171              : }
     172              : 
     173            0 : #[derive(Clone, Serialize, Deserialize)]
     174              : pub struct DbaseDrop {
     175              :     pub db_id: Oid,
     176              :     pub tablespace_ids: Vec<Oid>,
     177              : }
     178              : 
     179            0 : #[derive(Clone, Serialize, Deserialize)]
     180              : pub enum ClogRecord {
     181              :     ZeroPage(ClogZeroPage),
     182              :     Truncate(ClogTruncate),
     183              : }
     184              : 
     185            0 : #[derive(Clone, Serialize, Deserialize)]
     186              : pub struct ClogZeroPage {
     187              :     pub segno: u32,
     188              :     pub rpageno: u32,
     189              : }
     190              : 
     191            0 : #[derive(Clone, Serialize, Deserialize)]
     192              : pub struct ClogTruncate {
     193              :     pub pageno: u32,
     194              :     pub oldest_xid: TransactionId,
     195              :     pub oldest_xid_db: Oid,
     196              : }
     197              : 
     198            0 : #[derive(Clone, Serialize, Deserialize)]
     199              : pub enum XactRecord {
     200              :     Commit(XactCommon),
     201              :     Abort(XactCommon),
     202              :     CommitPrepared(XactCommon),
     203              :     AbortPrepared(XactCommon),
     204              :     Prepare(XactPrepare),
     205              : }
     206              : 
     207            0 : #[derive(Clone, Serialize, Deserialize)]
     208              : pub struct XactCommon {
     209              :     pub parsed: XlXactParsedRecord,
     210              :     pub origin_id: u16,
     211              :     // Fields below are only used for logging
     212              :     pub xl_xid: TransactionId,
     213              :     pub lsn: Lsn,
     214              : }
     215              : 
     216            0 : #[derive(Clone, Serialize, Deserialize)]
     217              : pub struct XactPrepare {
     218              :     pub xl_xid: TransactionId,
     219              :     pub data: Bytes,
     220              : }
     221              : 
     222            0 : #[derive(Clone, Serialize, Deserialize)]
     223              : pub enum MultiXactRecord {
     224              :     ZeroPage(MultiXactZeroPage),
     225              :     Create(XlMultiXactCreate),
     226              :     Truncate(XlMultiXactTruncate),
     227              : }
     228              : 
     229            0 : #[derive(Clone, Serialize, Deserialize)]
     230              : pub struct MultiXactZeroPage {
     231              :     pub slru_kind: SlruKind,
     232              :     pub segno: u32,
     233              :     pub rpageno: u32,
     234              : }
     235              : 
     236            0 : #[derive(Clone, Serialize, Deserialize)]
     237              : pub enum RelmapRecord {
     238              :     Update(RelmapUpdate),
     239              : }
     240              : 
     241            0 : #[derive(Clone, Serialize, Deserialize)]
     242              : pub struct RelmapUpdate {
     243              :     pub update: XlRelmapUpdate,
     244              :     pub buf: Bytes,
     245              : }
     246              : 
     247            0 : #[derive(Clone, Serialize, Deserialize)]
     248              : pub enum XlogRecord {
     249              :     Raw(RawXlogRecord),
     250              : }
     251              : 
     252            0 : #[derive(Clone, Serialize, Deserialize)]
     253              : pub struct RawXlogRecord {
     254              :     pub info: u8,
     255              :     pub lsn: Lsn,
     256              :     pub buf: Bytes,
     257              : }
     258              : 
     259            0 : #[derive(Clone, Serialize, Deserialize)]
     260              : pub enum LogicalMessageRecord {
     261              :     Put(PutLogicalMessage),
     262              :     #[cfg(feature = "testing")]
     263              :     Failpoint,
     264              : }
     265              : 
     266            0 : #[derive(Clone, Serialize, Deserialize)]
     267              : pub struct PutLogicalMessage {
     268              :     pub path: String,
     269              :     pub buf: Bytes,
     270              : }
     271              : 
     272            0 : #[derive(Clone, Serialize, Deserialize)]
     273              : pub enum StandbyRecord {
     274              :     RunningXacts(StandbyRunningXacts),
     275              : }
     276              : 
     277            0 : #[derive(Clone, Serialize, Deserialize)]
     278              : pub struct StandbyRunningXacts {
     279              :     pub oldest_running_xid: TransactionId,
     280              : }
     281              : 
     282            0 : #[derive(Clone, Serialize, Deserialize)]
     283              : pub enum ReploriginRecord {
     284              :     Set(XlReploriginSet),
     285              :     Drop(XlReploriginDrop),
     286              : }
        

Generated by: LCOV version 2.1-beta