LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - models.rs (source / functions) Coverage Total Hit
Test: 3eba1babe267649f8cebefc91c236589db030548.info Lines: 0.0 % 28 0
Test Date: 2024-11-22 12:36:12 Functions: 0.0 % 210 0

            Line data    Source code
       1              : //! This module houses types which represent decoded PG WAL records
       2              : //! ready for the pageserver to interpret. They are derived from the original
       3              : //! WAL records, so that each struct corresponds closely to one WAL record of
       4              : //! a specific kind. They contain the same information as the original WAL records,
       5              : //! but the values are already serialized in a [`SerializedValueBatch`], which
       6              : //! is the format that the pageserver is expecting them in.
       7              : //!
       8              : //! The ingestion code uses these structs to help with parsing the WAL records,
       9              : //! and it splits them into a stream of modifications to the key-value pairs that
      10              : //! are ultimately stored in delta layers.  See also the split-out counterparts in
      11              : //! [`postgres_ffi::walrecord`].
      12              : //!
      13              : //! The pipeline which processes WAL records is not super obvious, so let's follow
      14              : //! the flow of an example XACT_COMMIT Postgres record:
      15              : //!
      16              : //! (Postgres XACT_COMMIT record)
      17              : //! |
      18              : //! |--> pageserver::walingest::WalIngest::decode_xact_record
      19              : //!      |
      20              : //!      |--> ([`XactRecord::Commit`])
      21              : //!           |
      22              : //!           |--> pageserver::walingest::WalIngest::ingest_xact_record
      23              : //!                |
      24              : //!                |--> (NeonWalRecord::ClogSetCommitted)
      25              : //!                     |
      26              : //!                     |--> write to KV store within the pageserver
      27              : 
      28              : use bytes::Bytes;
      29              : use pageserver_api::reltag::{RelTag, SlruKind};
      30              : use postgres_ffi::walrecord::{
      31              :     XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
      32              :     XlSmgrTruncate, XlXactParsedRecord,
      33              : };
      34              : use postgres_ffi::{Oid, TransactionId};
      35              : use serde::{Deserialize, Serialize};
      36              : use utils::lsn::Lsn;
      37              : 
      38              : use crate::serialized_batch::SerializedValueBatch;
      39              : 
      40            0 : #[derive(Serialize, Deserialize)]
      41              : pub enum FlushUncommittedRecords {
      42              :     Yes,
      43              :     No,
      44              : }
      45              : 
      46              : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
      47            0 : #[derive(Serialize, Deserialize)]
      48              : pub struct InterpretedWalRecord {
      49              :     /// Optional metadata record - may cause writes to metadata keys
      50              :     /// in the storage engine
      51              :     pub metadata_record: Option<MetadataRecord>,
      52              :     /// A pre-serialized batch along with the required metadata for ingestion
      53              :     /// by the pageserver
      54              :     pub batch: SerializedValueBatch,
      55              :     /// Byte offset within WAL for the start of the next PG WAL record.
      56              :     /// Usually this is the end LSN of the current record, but in case of
      57              :     /// XLOG SWITCH records it will be within the next segment.
      58              :     pub next_record_lsn: Lsn,
      59              :     /// Whether to flush all uncommitted modifications to the storage engine
      60              :     /// before ingesting this record. This is currently only used for legacy PG
      61              :     /// database creations which read pages from a template database. Such WAL
      62              :     /// records require reading data blocks while ingesting, hence the need to flush.
      63              :     pub flush_uncommitted: FlushUncommittedRecords,
      64              :     /// Transaction id of the original PG WAL record
      65              :     pub xid: TransactionId,
      66              : }
      67              : 
      68              : /// The interpreted part of the Postgres WAL record which requires metadata
      69              : /// writes to the underlying storage engine.
      70            0 : #[derive(Serialize, Deserialize)]
      71              : pub enum MetadataRecord {
      72              :     Heapam(HeapamRecord),
      73              :     Neonrmgr(NeonrmgrRecord),
      74              :     Smgr(SmgrRecord),
      75              :     Dbase(DbaseRecord),
      76              :     Clog(ClogRecord),
      77              :     Xact(XactRecord),
      78              :     MultiXact(MultiXactRecord),
      79              :     Relmap(RelmapRecord),
      80              :     Xlog(XlogRecord),
      81              :     LogicalMessage(LogicalMessageRecord),
      82              :     Standby(StandbyRecord),
      83              :     Replorigin(ReploriginRecord),
      84              : }
      85              : 
      86            0 : #[derive(Serialize, Deserialize)]
      87              : pub enum HeapamRecord {
      88              :     ClearVmBits(ClearVmBits),
      89              : }
      90              : 
      91            0 : #[derive(Serialize, Deserialize)]
      92              : pub struct ClearVmBits {
      93              :     pub new_heap_blkno: Option<u32>,
      94              :     pub old_heap_blkno: Option<u32>,
      95              :     pub vm_rel: RelTag,
      96              :     pub flags: u8,
      97              : }
      98              : 
      99            0 : #[derive(Serialize, Deserialize)]
     100              : pub enum NeonrmgrRecord {
     101              :     ClearVmBits(ClearVmBits),
     102              : }
     103              : 
     104            0 : #[derive(Serialize, Deserialize)]
     105              : pub enum SmgrRecord {
     106              :     Create(SmgrCreate),
     107              :     Truncate(XlSmgrTruncate),
     108              : }
     109              : 
     110            0 : #[derive(Serialize, Deserialize)]
     111              : pub struct SmgrCreate {
     112              :     pub rel: RelTag,
     113              : }
     114              : 
     115            0 : #[derive(Serialize, Deserialize)]
     116              : pub enum DbaseRecord {
     117              :     Create(DbaseCreate),
     118              :     Drop(DbaseDrop),
     119              : }
     120              : 
     121            0 : #[derive(Serialize, Deserialize)]
     122              : pub struct DbaseCreate {
     123              :     pub db_id: Oid,
     124              :     pub tablespace_id: Oid,
     125              :     pub src_db_id: Oid,
     126              :     pub src_tablespace_id: Oid,
     127              : }
     128              : 
     129            0 : #[derive(Serialize, Deserialize)]
     130              : pub struct DbaseDrop {
     131              :     pub db_id: Oid,
     132              :     pub tablespace_ids: Vec<Oid>,
     133              : }
     134              : 
     135            0 : #[derive(Serialize, Deserialize)]
     136              : pub enum ClogRecord {
     137              :     ZeroPage(ClogZeroPage),
     138              :     Truncate(ClogTruncate),
     139              : }
     140              : 
     141            0 : #[derive(Serialize, Deserialize)]
     142              : pub struct ClogZeroPage {
     143              :     pub segno: u32,
     144              :     pub rpageno: u32,
     145              : }
     146              : 
     147            0 : #[derive(Serialize, Deserialize)]
     148              : pub struct ClogTruncate {
     149              :     pub pageno: u32,
     150              :     pub oldest_xid: TransactionId,
     151              :     pub oldest_xid_db: Oid,
     152              : }
     153              : 
     154            0 : #[derive(Serialize, Deserialize)]
     155              : pub enum XactRecord {
     156              :     Commit(XactCommon),
     157              :     Abort(XactCommon),
     158              :     CommitPrepared(XactCommon),
     159              :     AbortPrepared(XactCommon),
     160              :     Prepare(XactPrepare),
     161              : }
     162              : 
     163            0 : #[derive(Serialize, Deserialize)]
     164              : pub struct XactCommon {
     165              :     pub parsed: XlXactParsedRecord,
     166              :     pub origin_id: u16,
     167              :     // Fields below are only used for logging
     168              :     pub xl_xid: TransactionId,
     169              :     pub lsn: Lsn,
     170              : }
     171              : 
     172            0 : #[derive(Serialize, Deserialize)]
     173              : pub struct XactPrepare {
     174              :     pub xl_xid: TransactionId,
     175              :     pub data: Bytes,
     176              : }
     177              : 
     178            0 : #[derive(Serialize, Deserialize)]
     179              : pub enum MultiXactRecord {
     180              :     ZeroPage(MultiXactZeroPage),
     181              :     Create(XlMultiXactCreate),
     182              :     Truncate(XlMultiXactTruncate),
     183              : }
     184              : 
     185            0 : #[derive(Serialize, Deserialize)]
     186              : pub struct MultiXactZeroPage {
     187              :     pub slru_kind: SlruKind,
     188              :     pub segno: u32,
     189              :     pub rpageno: u32,
     190              : }
     191              : 
     192            0 : #[derive(Serialize, Deserialize)]
     193              : pub enum RelmapRecord {
     194              :     Update(RelmapUpdate),
     195              : }
     196              : 
     197            0 : #[derive(Serialize, Deserialize)]
     198              : pub struct RelmapUpdate {
     199              :     pub update: XlRelmapUpdate,
     200              :     pub buf: Bytes,
     201              : }
     202              : 
     203            0 : #[derive(Serialize, Deserialize)]
     204              : pub enum XlogRecord {
     205              :     Raw(RawXlogRecord),
     206              : }
     207              : 
     208            0 : #[derive(Serialize, Deserialize)]
     209              : pub struct RawXlogRecord {
     210              :     pub info: u8,
     211              :     pub lsn: Lsn,
     212              :     pub buf: Bytes,
     213              : }
     214              : 
     215            0 : #[derive(Serialize, Deserialize)]
     216              : pub enum LogicalMessageRecord {
     217              :     Put(PutLogicalMessage),
     218              :     #[cfg(feature = "testing")]
     219              :     Failpoint,
     220              : }
     221              : 
     222            0 : #[derive(Serialize, Deserialize)]
     223              : pub struct PutLogicalMessage {
     224              :     pub path: String,
     225              :     pub buf: Bytes,
     226              : }
     227              : 
     228            0 : #[derive(Serialize, Deserialize)]
     229              : pub enum StandbyRecord {
     230              :     RunningXacts(StandbyRunningXacts),
     231              : }
     232              : 
     233            0 : #[derive(Serialize, Deserialize)]
     234              : pub struct StandbyRunningXacts {
     235              :     pub oldest_running_xid: TransactionId,
     236              : }
     237              : 
     238            0 : #[derive(Serialize, Deserialize)]
     239              : pub enum ReploriginRecord {
     240              :     Set(XlReploriginSet),
     241              :     Drop(XlReploriginDrop),
     242              : }
        

Generated by: LCOV version 2.1-beta