Line data Source code
1 : //! This module houses types which represent decoded PG WAL records
2 : //! ready for the pageserver to interpret. They are derived from the original
3 : //! WAL records, so that each struct corresponds closely to one WAL record of
4 : //! a specific kind. They contain the same information as the original WAL records,
5 : //! but the values are already serialized in a [`SerializedValueBatch`], which
6 : //! is the format that the pageserver is expecting them in.
7 : //!
8 : //! The ingestion code uses these structs to help with parsing the WAL records,
9 : //! and it splits them into a stream of modifications to the key-value pairs that
10 : //! are ultimately stored in delta layers. See also the split-out counterparts in
11 : //! [`postgres_ffi::walrecord`].
12 : //!
13 : //! The pipeline which processes WAL records is not super obvious, so let's follow
14 : //! the flow of an example XACT_COMMIT Postgres record:
15 : //!
16 : //! (Postgres XACT_COMMIT record)
17 : //! |
18 : //! |--> pageserver::walingest::WalIngest::decode_xact_record
19 : //! |
20 : //! |--> ([`XactRecord::Commit`])
21 : //! |
22 : //! |--> pageserver::walingest::WalIngest::ingest_xact_record
23 : //! |
24 : //! |--> (NeonWalRecord::ClogSetCommitted)
25 : //! |
26 : //! |--> write to KV store within the pageserver
27 :
28 : use bytes::Bytes;
29 : use pageserver_api::reltag::{RelTag, SlruKind};
30 : use postgres_ffi::walrecord::{
31 : XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
32 : XlSmgrTruncate, XlXactParsedRecord,
33 : };
34 : use postgres_ffi::{Oid, TransactionId};
35 : use serde::{Deserialize, Serialize};
36 : use utils::lsn::Lsn;
37 :
38 : use crate::serialized_batch::SerializedValueBatch;
39 :
40 0 : #[derive(Serialize, Deserialize)]
41 : pub enum FlushUncommittedRecords {
42 : Yes,
43 : No,
44 : }
45 :
46 : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
47 0 : #[derive(Serialize, Deserialize)]
48 : pub struct InterpretedWalRecord {
49 : /// Optional metadata record - may cause writes to metadata keys
50 : /// in the storage engine
51 : pub metadata_record: Option<MetadataRecord>,
52 : /// A pre-serialized batch along with the required metadata for ingestion
53 : /// by the pageserver
54 : pub batch: SerializedValueBatch,
55 : /// Byte offset within WAL for the start of the next PG WAL record.
56 : /// Usually this is the end LSN of the current record, but in case of
57 : /// XLOG SWITCH records it will be within the next segment.
58 : pub next_record_lsn: Lsn,
59 : /// Whether to flush all uncommitted modifications to the storage engine
60 : /// before ingesting this record. This is currently only used for legacy PG
61 : /// database creations which read pages from a template database. Such WAL
62 : /// records require reading data blocks while ingesting, hence the need to flush.
63 : pub flush_uncommitted: FlushUncommittedRecords,
64 : /// Transaction id of the original PG WAL record
65 : pub xid: TransactionId,
66 : }
67 :
68 : /// The interpreted part of the Postgres WAL record which requires metadata
69 : /// writes to the underlying storage engine.
70 0 : #[derive(Serialize, Deserialize)]
71 : pub enum MetadataRecord {
72 : Heapam(HeapamRecord),
73 : Neonrmgr(NeonrmgrRecord),
74 : Smgr(SmgrRecord),
75 : Dbase(DbaseRecord),
76 : Clog(ClogRecord),
77 : Xact(XactRecord),
78 : MultiXact(MultiXactRecord),
79 : Relmap(RelmapRecord),
80 : Xlog(XlogRecord),
81 : LogicalMessage(LogicalMessageRecord),
82 : Standby(StandbyRecord),
83 : Replorigin(ReploriginRecord),
84 : }
85 :
86 0 : #[derive(Serialize, Deserialize)]
87 : pub enum HeapamRecord {
88 : ClearVmBits(ClearVmBits),
89 : }
90 :
91 0 : #[derive(Serialize, Deserialize)]
92 : pub struct ClearVmBits {
93 : pub new_heap_blkno: Option<u32>,
94 : pub old_heap_blkno: Option<u32>,
95 : pub vm_rel: RelTag,
96 : pub flags: u8,
97 : }
98 :
99 0 : #[derive(Serialize, Deserialize)]
100 : pub enum NeonrmgrRecord {
101 : ClearVmBits(ClearVmBits),
102 : }
103 :
104 0 : #[derive(Serialize, Deserialize)]
105 : pub enum SmgrRecord {
106 : Create(SmgrCreate),
107 : Truncate(XlSmgrTruncate),
108 : }
109 :
110 0 : #[derive(Serialize, Deserialize)]
111 : pub struct SmgrCreate {
112 : pub rel: RelTag,
113 : }
114 :
115 0 : #[derive(Serialize, Deserialize)]
116 : pub enum DbaseRecord {
117 : Create(DbaseCreate),
118 : Drop(DbaseDrop),
119 : }
120 :
121 0 : #[derive(Serialize, Deserialize)]
122 : pub struct DbaseCreate {
123 : pub db_id: Oid,
124 : pub tablespace_id: Oid,
125 : pub src_db_id: Oid,
126 : pub src_tablespace_id: Oid,
127 : }
128 :
129 0 : #[derive(Serialize, Deserialize)]
130 : pub struct DbaseDrop {
131 : pub db_id: Oid,
132 : pub tablespace_ids: Vec<Oid>,
133 : }
134 :
135 0 : #[derive(Serialize, Deserialize)]
136 : pub enum ClogRecord {
137 : ZeroPage(ClogZeroPage),
138 : Truncate(ClogTruncate),
139 : }
140 :
141 0 : #[derive(Serialize, Deserialize)]
142 : pub struct ClogZeroPage {
143 : pub segno: u32,
144 : pub rpageno: u32,
145 : }
146 :
147 0 : #[derive(Serialize, Deserialize)]
148 : pub struct ClogTruncate {
149 : pub pageno: u32,
150 : pub oldest_xid: TransactionId,
151 : pub oldest_xid_db: Oid,
152 : }
153 :
154 0 : #[derive(Serialize, Deserialize)]
155 : pub enum XactRecord {
156 : Commit(XactCommon),
157 : Abort(XactCommon),
158 : CommitPrepared(XactCommon),
159 : AbortPrepared(XactCommon),
160 : Prepare(XactPrepare),
161 : }
162 :
163 0 : #[derive(Serialize, Deserialize)]
164 : pub struct XactCommon {
165 : pub parsed: XlXactParsedRecord,
166 : pub origin_id: u16,
167 : // Fields below are only used for logging
168 : pub xl_xid: TransactionId,
169 : pub lsn: Lsn,
170 : }
171 :
172 0 : #[derive(Serialize, Deserialize)]
173 : pub struct XactPrepare {
174 : pub xl_xid: TransactionId,
175 : pub data: Bytes,
176 : }
177 :
178 0 : #[derive(Serialize, Deserialize)]
179 : pub enum MultiXactRecord {
180 : ZeroPage(MultiXactZeroPage),
181 : Create(XlMultiXactCreate),
182 : Truncate(XlMultiXactTruncate),
183 : }
184 :
185 0 : #[derive(Serialize, Deserialize)]
186 : pub struct MultiXactZeroPage {
187 : pub slru_kind: SlruKind,
188 : pub segno: u32,
189 : pub rpageno: u32,
190 : }
191 :
192 0 : #[derive(Serialize, Deserialize)]
193 : pub enum RelmapRecord {
194 : Update(RelmapUpdate),
195 : }
196 :
197 0 : #[derive(Serialize, Deserialize)]
198 : pub struct RelmapUpdate {
199 : pub update: XlRelmapUpdate,
200 : pub buf: Bytes,
201 : }
202 :
203 0 : #[derive(Serialize, Deserialize)]
204 : pub enum XlogRecord {
205 : Raw(RawXlogRecord),
206 : }
207 :
208 0 : #[derive(Serialize, Deserialize)]
209 : pub struct RawXlogRecord {
210 : pub info: u8,
211 : pub lsn: Lsn,
212 : pub buf: Bytes,
213 : }
214 :
215 0 : #[derive(Serialize, Deserialize)]
216 : pub enum LogicalMessageRecord {
217 : Put(PutLogicalMessage),
218 : #[cfg(feature = "testing")]
219 : Failpoint,
220 : }
221 :
222 0 : #[derive(Serialize, Deserialize)]
223 : pub struct PutLogicalMessage {
224 : pub path: String,
225 : pub buf: Bytes,
226 : }
227 :
228 0 : #[derive(Serialize, Deserialize)]
229 : pub enum StandbyRecord {
230 : RunningXacts(StandbyRunningXacts),
231 : }
232 :
233 0 : #[derive(Serialize, Deserialize)]
234 : pub struct StandbyRunningXacts {
235 : pub oldest_running_xid: TransactionId,
236 : }
237 :
238 0 : #[derive(Serialize, Deserialize)]
239 : pub enum ReploriginRecord {
240 : Set(XlReploriginSet),
241 : Drop(XlReploriginDrop),
242 : }
|