LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - models.rs (source / functions) Coverage Total Hit
Test: a2f0f8a80fbf1089336086fa360ce27fa555cb1a.info Lines: 0.0 % 34 0
Test Date: 2024-11-20 17:59:39 Functions: 0.0 % 224 0

            Line data    Source code
       1              : //! This module houses types which represent decoded PG WAL records
       2              : //! ready for the pageserver to interpret. They are derived from the original
       3              : //! WAL records, so that each struct corresponds closely to one WAL record of
       4              : //! a specific kind. They contain the same information as the original WAL records,
       5              : //! but the values are already serialized in a [`SerializedValueBatch`], which
       6              : //! is the format that the pageserver is expecting them in.
       7              : //!
       8              : //! The ingestion code uses these structs to help with parsing the WAL records,
       9              : //! and it splits them into a stream of modifications to the key-value pairs that
      10              : //! are ultimately stored in delta layers.  See also the split-out counterparts in
      11              : //! [`postgres_ffi::walrecord`].
      12              : //!
      13              : //! The pipeline which processes WAL records is not super obvious, so let's follow
      14              : //! the flow of an example XACT_COMMIT Postgres record:
      15              : //!
      16              : //! (Postgres XACT_COMMIT record)
      17              : //! |
      18              : //! |--> pageserver::walingest::WalIngest::decode_xact_record
      19              : //!      |
      20              : //!      |--> ([`XactRecord::Commit`])
      21              : //!           |
      22              : //!           |--> pageserver::walingest::WalIngest::ingest_xact_record
      23              : //!                |
      24              : //!                |--> (NeonWalRecord::ClogSetCommitted)
      25              : //!                     |
      26              : //!                     |--> write to KV store within the pageserver
      27              : 
      28              : use bytes::Bytes;
      29              : use pageserver_api::reltag::{RelTag, SlruKind};
      30              : use postgres_ffi::walrecord::{
      31              :     XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
      32              :     XlSmgrTruncate, XlXactParsedRecord,
      33              : };
      34              : use postgres_ffi::{Oid, TransactionId};
      35              : use serde::{Deserialize, Serialize};
      36              : use utils::lsn::Lsn;
      37              : 
      38              : use crate::serialized_batch::SerializedValueBatch;
      39              : 
      40              : // Code generated by protobuf.
      41              : pub mod proto {
      42              :     // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
      43              :     // we don't use these types for anything but broker data transmission,
      44              :     // so it's ok to ignore this one.
      45              :     #![allow(clippy::derive_partial_eq_without_eq)]
      46              :     // The generated ValueMeta has a `len` method generate for its `len` field.
      47              :     #![allow(clippy::len_without_is_empty)]
      48              :     tonic::include_proto!("interpreted_wal");
      49              : }
      50              : 
      51            0 : #[derive(Serialize, Deserialize)]
      52              : pub enum FlushUncommittedRecords {
      53              :     Yes,
      54              :     No,
      55              : }
      56              : 
      57              : /// A batch of interpreted WAL records
      58            0 : #[derive(Serialize, Deserialize)]
      59              : pub struct InterpretedWalRecords {
      60              :     pub records: Vec<InterpretedWalRecord>,
      61              :     // Start LSN of the next record after the batch.
      62              :     // Note that said record may belong to the current shard.
      63              :     pub next_record_lsn: Option<Lsn>,
      64              : }
      65              : 
      66              : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
      67            0 : #[derive(Serialize, Deserialize)]
      68              : pub struct InterpretedWalRecord {
      69              :     /// Optional metadata record - may cause writes to metadata keys
      70              :     /// in the storage engine
      71              :     pub metadata_record: Option<MetadataRecord>,
      72              :     /// A pre-serialized batch along with the required metadata for ingestion
      73              :     /// by the pageserver
      74              :     pub batch: SerializedValueBatch,
      75              :     /// Byte offset within WAL for the start of the next PG WAL record.
      76              :     /// Usually this is the end LSN of the current record, but in case of
      77              :     /// XLOG SWITCH records it will be within the next segment.
      78              :     pub next_record_lsn: Lsn,
      79              :     /// Whether to flush all uncommitted modifications to the storage engine
      80              :     /// before ingesting this record. This is currently only used for legacy PG
      81              :     /// database creations which read pages from a template database. Such WAL
      82              :     /// records require reading data blocks while ingesting, hence the need to flush.
      83              :     pub flush_uncommitted: FlushUncommittedRecords,
      84              :     /// Transaction id of the original PG WAL record
      85              :     pub xid: TransactionId,
      86              : }
      87              : 
      88              : impl InterpretedWalRecord {
      89              :     /// Checks if the WAL record is empty
      90              :     ///
      91              :     /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
      92              :     /// pageserver.
      93            0 :     pub fn is_empty(&self) -> bool {
      94            0 :         self.batch.is_empty()
      95            0 :             && self.metadata_record.is_none()
      96            0 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
      97            0 :     }
      98              : }
      99              : 
     100              : /// The interpreted part of the Postgres WAL record which requires metadata
     101              : /// writes to the underlying storage engine.
     102            0 : #[derive(Serialize, Deserialize)]
     103              : pub enum MetadataRecord {
     104              :     Heapam(HeapamRecord),
     105              :     Neonrmgr(NeonrmgrRecord),
     106              :     Smgr(SmgrRecord),
     107              :     Dbase(DbaseRecord),
     108              :     Clog(ClogRecord),
     109              :     Xact(XactRecord),
     110              :     MultiXact(MultiXactRecord),
     111              :     Relmap(RelmapRecord),
     112              :     Xlog(XlogRecord),
     113              :     LogicalMessage(LogicalMessageRecord),
     114              :     Standby(StandbyRecord),
     115              :     Replorigin(ReploriginRecord),
     116              : }
     117              : 
     118            0 : #[derive(Serialize, Deserialize)]
     119              : pub enum HeapamRecord {
     120              :     ClearVmBits(ClearVmBits),
     121              : }
     122              : 
     123            0 : #[derive(Serialize, Deserialize)]
     124              : pub struct ClearVmBits {
     125              :     pub new_heap_blkno: Option<u32>,
     126              :     pub old_heap_blkno: Option<u32>,
     127              :     pub vm_rel: RelTag,
     128              :     pub flags: u8,
     129              : }
     130              : 
     131            0 : #[derive(Serialize, Deserialize)]
     132              : pub enum NeonrmgrRecord {
     133              :     ClearVmBits(ClearVmBits),
     134              : }
     135              : 
     136            0 : #[derive(Serialize, Deserialize)]
     137              : pub enum SmgrRecord {
     138              :     Create(SmgrCreate),
     139              :     Truncate(XlSmgrTruncate),
     140              : }
     141              : 
     142            0 : #[derive(Serialize, Deserialize)]
     143              : pub struct SmgrCreate {
     144              :     pub rel: RelTag,
     145              : }
     146              : 
     147            0 : #[derive(Serialize, Deserialize)]
     148              : pub enum DbaseRecord {
     149              :     Create(DbaseCreate),
     150              :     Drop(DbaseDrop),
     151              : }
     152              : 
     153            0 : #[derive(Serialize, Deserialize)]
     154              : pub struct DbaseCreate {
     155              :     pub db_id: Oid,
     156              :     pub tablespace_id: Oid,
     157              :     pub src_db_id: Oid,
     158              :     pub src_tablespace_id: Oid,
     159              : }
     160              : 
     161            0 : #[derive(Serialize, Deserialize)]
     162              : pub struct DbaseDrop {
     163              :     pub db_id: Oid,
     164              :     pub tablespace_ids: Vec<Oid>,
     165              : }
     166              : 
     167            0 : #[derive(Serialize, Deserialize)]
     168              : pub enum ClogRecord {
     169              :     ZeroPage(ClogZeroPage),
     170              :     Truncate(ClogTruncate),
     171              : }
     172              : 
     173            0 : #[derive(Serialize, Deserialize)]
     174              : pub struct ClogZeroPage {
     175              :     pub segno: u32,
     176              :     pub rpageno: u32,
     177              : }
     178              : 
     179            0 : #[derive(Serialize, Deserialize)]
     180              : pub struct ClogTruncate {
     181              :     pub pageno: u32,
     182              :     pub oldest_xid: TransactionId,
     183              :     pub oldest_xid_db: Oid,
     184              : }
     185              : 
     186            0 : #[derive(Serialize, Deserialize)]
     187              : pub enum XactRecord {
     188              :     Commit(XactCommon),
     189              :     Abort(XactCommon),
     190              :     CommitPrepared(XactCommon),
     191              :     AbortPrepared(XactCommon),
     192              :     Prepare(XactPrepare),
     193              : }
     194              : 
     195            0 : #[derive(Serialize, Deserialize)]
     196              : pub struct XactCommon {
     197              :     pub parsed: XlXactParsedRecord,
     198              :     pub origin_id: u16,
     199              :     // Fields below are only used for logging
     200              :     pub xl_xid: TransactionId,
     201              :     pub lsn: Lsn,
     202              : }
     203              : 
     204            0 : #[derive(Serialize, Deserialize)]
     205              : pub struct XactPrepare {
     206              :     pub xl_xid: TransactionId,
     207              :     pub data: Bytes,
     208              : }
     209              : 
     210            0 : #[derive(Serialize, Deserialize)]
     211              : pub enum MultiXactRecord {
     212              :     ZeroPage(MultiXactZeroPage),
     213              :     Create(XlMultiXactCreate),
     214              :     Truncate(XlMultiXactTruncate),
     215              : }
     216              : 
     217            0 : #[derive(Serialize, Deserialize)]
     218              : pub struct MultiXactZeroPage {
     219              :     pub slru_kind: SlruKind,
     220              :     pub segno: u32,
     221              :     pub rpageno: u32,
     222              : }
     223              : 
     224            0 : #[derive(Serialize, Deserialize)]
     225              : pub enum RelmapRecord {
     226              :     Update(RelmapUpdate),
     227              : }
     228              : 
     229            0 : #[derive(Serialize, Deserialize)]
     230              : pub struct RelmapUpdate {
     231              :     pub update: XlRelmapUpdate,
     232              :     pub buf: Bytes,
     233              : }
     234              : 
     235            0 : #[derive(Serialize, Deserialize)]
     236              : pub enum XlogRecord {
     237              :     Raw(RawXlogRecord),
     238              : }
     239              : 
     240            0 : #[derive(Serialize, Deserialize)]
     241              : pub struct RawXlogRecord {
     242              :     pub info: u8,
     243              :     pub lsn: Lsn,
     244              :     pub buf: Bytes,
     245              : }
     246              : 
     247            0 : #[derive(Serialize, Deserialize)]
     248              : pub enum LogicalMessageRecord {
     249              :     Put(PutLogicalMessage),
     250              :     #[cfg(feature = "testing")]
     251              :     Failpoint,
     252              : }
     253              : 
     254            0 : #[derive(Serialize, Deserialize)]
     255              : pub struct PutLogicalMessage {
     256              :     pub path: String,
     257              :     pub buf: Bytes,
     258              : }
     259              : 
     260            0 : #[derive(Serialize, Deserialize)]
     261              : pub enum StandbyRecord {
     262              :     RunningXacts(StandbyRunningXacts),
     263              : }
     264              : 
     265            0 : #[derive(Serialize, Deserialize)]
     266              : pub struct StandbyRunningXacts {
     267              :     pub oldest_running_xid: TransactionId,
     268              : }
     269              : 
     270            0 : #[derive(Serialize, Deserialize)]
     271              : pub enum ReploriginRecord {
     272              :     Set(XlReploriginSet),
     273              :     Drop(XlReploriginDrop),
     274              : }
        

Generated by: LCOV version 2.1-beta