LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - models.rs (source / functions) Coverage Total Hit
Test: 685df7483efdc579d44aa7093bca9796bb9d088e.info Lines: 0.0 % 33 0
Test Date: 2024-11-25 17:08:35 Functions: 0.0 % 267 0

            Line data    Source code
       1              : //! This module houses types which represent decoded PG WAL records
       2              : //! ready for the pageserver to interpret. They are derived from the original
       3              : //! WAL records, so that each struct corresponds closely to one WAL record of
       4              : //! a specific kind. They contain the same information as the original WAL records,
       5              : //! but the values are already serialized in a [`SerializedValueBatch`], which
       6              : //! is the format that the pageserver is expecting them in.
       7              : //!
       8              : //! The ingestion code uses these structs to help with parsing the WAL records,
       9              : //! and it splits them into a stream of modifications to the key-value pairs that
      10              : //! are ultimately stored in delta layers.  See also the split-out counterparts in
      11              : //! [`postgres_ffi::walrecord`].
      12              : //!
      13              : //! The pipeline which processes WAL records is not super obvious, so let's follow
      14              : //! the flow of an example XACT_COMMIT Postgres record:
      15              : //!
      16              : //! (Postgres XACT_COMMIT record)
      17              : //! |
      18              : //! |--> pageserver::walingest::WalIngest::decode_xact_record
      19              : //!      |
      20              : //!      |--> ([`XactRecord::Commit`])
      21              : //!           |
      22              : //!           |--> pageserver::walingest::WalIngest::ingest_xact_record
      23              : //!                |
      24              : //!                |--> (NeonWalRecord::ClogSetCommitted)
      25              : //!                     |
      26              : //!                     |--> write to KV store within the pageserver
      27              : 
      28              : use bytes::Bytes;
      29              : use pageserver_api::reltag::{RelTag, SlruKind};
      30              : use postgres_ffi::walrecord::{
      31              :     XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
      32              :     XlSmgrTruncate, XlXactParsedRecord,
      33              : };
      34              : use postgres_ffi::{Oid, TransactionId};
      35              : use serde::{Deserialize, Serialize};
      36              : use utils::lsn::Lsn;
      37              : 
      38              : use crate::serialized_batch::SerializedValueBatch;
      39              : 
      40            0 : #[derive(Serialize, Deserialize)]
      41              : pub enum FlushUncommittedRecords {
      42              :     Yes,
      43              :     No,
      44              : }
      45              : 
      46              : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
      47            0 : #[derive(Serialize, Deserialize)]
      48              : pub struct InterpretedWalRecord {
      49              :     /// Optional metadata record - may cause writes to metadata keys
      50              :     /// in the storage engine
      51              :     pub metadata_record: Option<MetadataRecord>,
      52              :     /// A pre-serialized batch along with the required metadata for ingestion
      53              :     /// by the pageserver
      54              :     pub batch: SerializedValueBatch,
      55              :     /// Byte offset within WAL for the start of the next PG WAL record.
      56              :     /// Usually this is the end LSN of the current record, but in case of
      57              :     /// XLOG SWITCH records it will be within the next segment.
      58              :     pub next_record_lsn: Lsn,
      59              :     /// Whether to flush all uncommitted modifications to the storage engine
      60              :     /// before ingesting this record. This is currently only used for legacy PG
      61              :     /// database creations which read pages from a template database. Such WAL
      62              :     /// records require reading data blocks while ingesting, hence the need to flush.
      63              :     pub flush_uncommitted: FlushUncommittedRecords,
      64              :     /// Transaction id of the original PG WAL record
      65              :     pub xid: TransactionId,
      66              : }
      67              : 
      68              : impl InterpretedWalRecord {
      69              :     /// Checks if the WAL record is empty
      70              :     ///
      71              :     /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
      72              :     /// pageserver.
      73            0 :     pub fn is_empty(&self) -> bool {
      74            0 :         self.batch.is_empty()
      75            0 :             && self.metadata_record.is_none()
      76            0 :             && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
      77            0 :     }
      78              : }
      79              : 
      80              : /// The interpreted part of the Postgres WAL record which requires metadata
      81              : /// writes to the underlying storage engine.
      82            0 : #[derive(Serialize, Deserialize)]
      83              : pub enum MetadataRecord {
      84              :     Heapam(HeapamRecord),
      85              :     Neonrmgr(NeonrmgrRecord),
      86              :     Smgr(SmgrRecord),
      87              :     Dbase(DbaseRecord),
      88              :     Clog(ClogRecord),
      89              :     Xact(XactRecord),
      90              :     MultiXact(MultiXactRecord),
      91              :     Relmap(RelmapRecord),
      92              :     Xlog(XlogRecord),
      93              :     LogicalMessage(LogicalMessageRecord),
      94              :     Standby(StandbyRecord),
      95              :     Replorigin(ReploriginRecord),
      96              : }
      97              : 
      98            0 : #[derive(Serialize, Deserialize)]
      99              : pub enum HeapamRecord {
     100              :     ClearVmBits(ClearVmBits),
     101              : }
     102              : 
     103            0 : #[derive(Serialize, Deserialize)]
     104              : pub struct ClearVmBits {
     105              :     pub new_heap_blkno: Option<u32>,
     106              :     pub old_heap_blkno: Option<u32>,
     107              :     pub vm_rel: RelTag,
     108              :     pub flags: u8,
     109              : }
     110              : 
     111            0 : #[derive(Serialize, Deserialize)]
     112              : pub enum NeonrmgrRecord {
     113              :     ClearVmBits(ClearVmBits),
     114              : }
     115              : 
     116            0 : #[derive(Serialize, Deserialize)]
     117              : pub enum SmgrRecord {
     118              :     Create(SmgrCreate),
     119              :     Truncate(XlSmgrTruncate),
     120              : }
     121              : 
     122            0 : #[derive(Serialize, Deserialize)]
     123              : pub struct SmgrCreate {
     124              :     pub rel: RelTag,
     125              : }
     126              : 
     127            0 : #[derive(Serialize, Deserialize)]
     128              : pub enum DbaseRecord {
     129              :     Create(DbaseCreate),
     130              :     Drop(DbaseDrop),
     131              : }
     132              : 
     133            0 : #[derive(Serialize, Deserialize)]
     134              : pub struct DbaseCreate {
     135              :     pub db_id: Oid,
     136              :     pub tablespace_id: Oid,
     137              :     pub src_db_id: Oid,
     138              :     pub src_tablespace_id: Oid,
     139              : }
     140              : 
     141            0 : #[derive(Serialize, Deserialize)]
     142              : pub struct DbaseDrop {
     143              :     pub db_id: Oid,
     144              :     pub tablespace_ids: Vec<Oid>,
     145              : }
     146              : 
     147            0 : #[derive(Serialize, Deserialize)]
     148              : pub enum ClogRecord {
     149              :     ZeroPage(ClogZeroPage),
     150              :     Truncate(ClogTruncate),
     151              : }
     152              : 
     153            0 : #[derive(Serialize, Deserialize)]
     154              : pub struct ClogZeroPage {
     155              :     pub segno: u32,
     156              :     pub rpageno: u32,
     157              : }
     158              : 
     159            0 : #[derive(Serialize, Deserialize)]
     160              : pub struct ClogTruncate {
     161              :     pub pageno: u32,
     162              :     pub oldest_xid: TransactionId,
     163              :     pub oldest_xid_db: Oid,
     164              : }
     165              : 
     166            0 : #[derive(Serialize, Deserialize)]
     167              : pub enum XactRecord {
     168              :     Commit(XactCommon),
     169              :     Abort(XactCommon),
     170              :     CommitPrepared(XactCommon),
     171              :     AbortPrepared(XactCommon),
     172              :     Prepare(XactPrepare),
     173              : }
     174              : 
     175            0 : #[derive(Serialize, Deserialize)]
     176              : pub struct XactCommon {
     177              :     pub parsed: XlXactParsedRecord,
     178              :     pub origin_id: u16,
     179              :     // Fields below are only used for logging
     180              :     pub xl_xid: TransactionId,
     181              :     pub lsn: Lsn,
     182              : }
     183              : 
     184            0 : #[derive(Serialize, Deserialize)]
     185              : pub struct XactPrepare {
     186              :     pub xl_xid: TransactionId,
     187              :     pub data: Bytes,
     188              : }
     189              : 
     190            0 : #[derive(Serialize, Deserialize)]
     191              : pub enum MultiXactRecord {
     192              :     ZeroPage(MultiXactZeroPage),
     193              :     Create(XlMultiXactCreate),
     194              :     Truncate(XlMultiXactTruncate),
     195              : }
     196              : 
     197            0 : #[derive(Serialize, Deserialize)]
     198              : pub struct MultiXactZeroPage {
     199              :     pub slru_kind: SlruKind,
     200              :     pub segno: u32,
     201              :     pub rpageno: u32,
     202              : }
     203              : 
     204            0 : #[derive(Serialize, Deserialize)]
     205              : pub enum RelmapRecord {
     206              :     Update(RelmapUpdate),
     207              : }
     208              : 
     209            0 : #[derive(Serialize, Deserialize)]
     210              : pub struct RelmapUpdate {
     211              :     pub update: XlRelmapUpdate,
     212              :     pub buf: Bytes,
     213              : }
     214              : 
     215            0 : #[derive(Serialize, Deserialize)]
     216              : pub enum XlogRecord {
     217              :     Raw(RawXlogRecord),
     218              : }
     219              : 
     220            0 : #[derive(Serialize, Deserialize)]
     221              : pub struct RawXlogRecord {
     222              :     pub info: u8,
     223              :     pub lsn: Lsn,
     224              :     pub buf: Bytes,
     225              : }
     226              : 
     227            0 : #[derive(Serialize, Deserialize)]
     228              : pub enum LogicalMessageRecord {
     229              :     Put(PutLogicalMessage),
     230              :     #[cfg(feature = "testing")]
     231              :     Failpoint,
     232              : }
     233              : 
     234            0 : #[derive(Serialize, Deserialize)]
     235              : pub struct PutLogicalMessage {
     236              :     pub path: String,
     237              :     pub buf: Bytes,
     238              : }
     239              : 
     240            0 : #[derive(Serialize, Deserialize)]
     241              : pub enum StandbyRecord {
     242              :     RunningXacts(StandbyRunningXacts),
     243              : }
     244              : 
     245            0 : #[derive(Serialize, Deserialize)]
     246              : pub struct StandbyRunningXacts {
     247              :     pub oldest_running_xid: TransactionId,
     248              : }
     249              : 
     250            0 : #[derive(Serialize, Deserialize)]
     251              : pub enum ReploriginRecord {
     252              :     Set(XlReploriginSet),
     253              :     Drop(XlReploriginDrop),
     254              : }
        

Generated by: LCOV version 2.1-beta