Line data Source code
1 : //! This module houses types which represent decoded PG WAL records
2 : //! ready for the pageserver to interpret. They are derived from the original
3 : //! WAL records, so that each struct corresponds closely to one WAL record of
4 : //! a specific kind. They contain the same information as the original WAL records,
5 : //! but the values are already serialized in a [`SerializedValueBatch`], which
6 : //! is the format that the pageserver is expecting them in.
7 : //!
8 : //! The ingestion code uses these structs to help with parsing the WAL records,
9 : //! and it splits them into a stream of modifications to the key-value pairs that
10 : //! are ultimately stored in delta layers. See also the split-out counterparts in
11 : //! [`postgres_ffi::walrecord`].
12 : //!
13 : //! The pipeline which processes WAL records is not super obvious, so let's follow
14 : //! the flow of an example XACT_COMMIT Postgres record:
15 : //!
16 : //! (Postgres XACT_COMMIT record)
17 : //! |
18 : //! |--> pageserver::walingest::WalIngest::decode_xact_record
19 : //! |
20 : //! |--> ([`XactRecord::Commit`])
21 : //! |
22 : //! |--> pageserver::walingest::WalIngest::ingest_xact_record
23 : //! |
24 : //! |--> (NeonWalRecord::ClogSetCommitted)
25 : //! |
26 : //! |--> write to KV store within the pageserver
27 :
28 : use bytes::Bytes;
29 : use pageserver_api::reltag::{RelTag, SlruKind};
30 : use postgres_ffi::walrecord::{
31 : XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
32 : XlSmgrTruncate, XlXactParsedRecord,
33 : };
34 : use postgres_ffi::{Oid, TransactionId};
35 : use serde::{Deserialize, Serialize};
36 : use utils::lsn::Lsn;
37 :
38 : use crate::serialized_batch::SerializedValueBatch;
39 :
40 0 : #[derive(Serialize, Deserialize)]
41 : pub enum FlushUncommittedRecords {
42 : Yes,
43 : No,
44 : }
45 :
46 : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
47 0 : #[derive(Serialize, Deserialize)]
48 : pub struct InterpretedWalRecord {
49 : /// Optional metadata record - may cause writes to metadata keys
50 : /// in the storage engine
51 : pub metadata_record: Option<MetadataRecord>,
52 : /// A pre-serialized batch along with the required metadata for ingestion
53 : /// by the pageserver
54 : pub batch: SerializedValueBatch,
55 : /// Byte offset within WAL for the start of the next PG WAL record.
56 : /// Usually this is the end LSN of the current record, but in case of
57 : /// XLOG SWITCH records it will be within the next segment.
58 : pub next_record_lsn: Lsn,
59 : /// Whether to flush all uncommitted modifications to the storage engine
60 : /// before ingesting this record. This is currently only used for legacy PG
61 : /// database creations which read pages from a template database. Such WAL
62 : /// records require reading data blocks while ingesting, hence the need to flush.
63 : pub flush_uncommitted: FlushUncommittedRecords,
64 : /// Transaction id of the original PG WAL record
65 : pub xid: TransactionId,
66 : }
67 :
68 : impl InterpretedWalRecord {
69 : /// Checks if the WAL record is empty
70 : ///
71 : /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
72 : /// pageserver.
73 0 : pub fn is_empty(&self) -> bool {
74 0 : self.batch.is_empty()
75 0 : && self.metadata_record.is_none()
76 0 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
77 0 : }
78 : }
79 :
80 : /// The interpreted part of the Postgres WAL record which requires metadata
81 : /// writes to the underlying storage engine.
82 0 : #[derive(Serialize, Deserialize)]
83 : pub enum MetadataRecord {
84 : Heapam(HeapamRecord),
85 : Neonrmgr(NeonrmgrRecord),
86 : Smgr(SmgrRecord),
87 : Dbase(DbaseRecord),
88 : Clog(ClogRecord),
89 : Xact(XactRecord),
90 : MultiXact(MultiXactRecord),
91 : Relmap(RelmapRecord),
92 : Xlog(XlogRecord),
93 : LogicalMessage(LogicalMessageRecord),
94 : Standby(StandbyRecord),
95 : Replorigin(ReploriginRecord),
96 : }
97 :
98 0 : #[derive(Serialize, Deserialize)]
99 : pub enum HeapamRecord {
100 : ClearVmBits(ClearVmBits),
101 : }
102 :
103 0 : #[derive(Serialize, Deserialize)]
104 : pub struct ClearVmBits {
105 : pub new_heap_blkno: Option<u32>,
106 : pub old_heap_blkno: Option<u32>,
107 : pub vm_rel: RelTag,
108 : pub flags: u8,
109 : }
110 :
111 0 : #[derive(Serialize, Deserialize)]
112 : pub enum NeonrmgrRecord {
113 : ClearVmBits(ClearVmBits),
114 : }
115 :
116 0 : #[derive(Serialize, Deserialize)]
117 : pub enum SmgrRecord {
118 : Create(SmgrCreate),
119 : Truncate(XlSmgrTruncate),
120 : }
121 :
122 0 : #[derive(Serialize, Deserialize)]
123 : pub struct SmgrCreate {
124 : pub rel: RelTag,
125 : }
126 :
127 0 : #[derive(Serialize, Deserialize)]
128 : pub enum DbaseRecord {
129 : Create(DbaseCreate),
130 : Drop(DbaseDrop),
131 : }
132 :
133 0 : #[derive(Serialize, Deserialize)]
134 : pub struct DbaseCreate {
135 : pub db_id: Oid,
136 : pub tablespace_id: Oid,
137 : pub src_db_id: Oid,
138 : pub src_tablespace_id: Oid,
139 : }
140 :
141 0 : #[derive(Serialize, Deserialize)]
142 : pub struct DbaseDrop {
143 : pub db_id: Oid,
144 : pub tablespace_ids: Vec<Oid>,
145 : }
146 :
147 0 : #[derive(Serialize, Deserialize)]
148 : pub enum ClogRecord {
149 : ZeroPage(ClogZeroPage),
150 : Truncate(ClogTruncate),
151 : }
152 :
153 0 : #[derive(Serialize, Deserialize)]
154 : pub struct ClogZeroPage {
155 : pub segno: u32,
156 : pub rpageno: u32,
157 : }
158 :
159 0 : #[derive(Serialize, Deserialize)]
160 : pub struct ClogTruncate {
161 : pub pageno: u32,
162 : pub oldest_xid: TransactionId,
163 : pub oldest_xid_db: Oid,
164 : }
165 :
166 0 : #[derive(Serialize, Deserialize)]
167 : pub enum XactRecord {
168 : Commit(XactCommon),
169 : Abort(XactCommon),
170 : CommitPrepared(XactCommon),
171 : AbortPrepared(XactCommon),
172 : Prepare(XactPrepare),
173 : }
174 :
175 0 : #[derive(Serialize, Deserialize)]
176 : pub struct XactCommon {
177 : pub parsed: XlXactParsedRecord,
178 : pub origin_id: u16,
179 : // Fields below are only used for logging
180 : pub xl_xid: TransactionId,
181 : pub lsn: Lsn,
182 : }
183 :
184 0 : #[derive(Serialize, Deserialize)]
185 : pub struct XactPrepare {
186 : pub xl_xid: TransactionId,
187 : pub data: Bytes,
188 : }
189 :
190 0 : #[derive(Serialize, Deserialize)]
191 : pub enum MultiXactRecord {
192 : ZeroPage(MultiXactZeroPage),
193 : Create(XlMultiXactCreate),
194 : Truncate(XlMultiXactTruncate),
195 : }
196 :
197 0 : #[derive(Serialize, Deserialize)]
198 : pub struct MultiXactZeroPage {
199 : pub slru_kind: SlruKind,
200 : pub segno: u32,
201 : pub rpageno: u32,
202 : }
203 :
204 0 : #[derive(Serialize, Deserialize)]
205 : pub enum RelmapRecord {
206 : Update(RelmapUpdate),
207 : }
208 :
209 0 : #[derive(Serialize, Deserialize)]
210 : pub struct RelmapUpdate {
211 : pub update: XlRelmapUpdate,
212 : pub buf: Bytes,
213 : }
214 :
215 0 : #[derive(Serialize, Deserialize)]
216 : pub enum XlogRecord {
217 : Raw(RawXlogRecord),
218 : }
219 :
220 0 : #[derive(Serialize, Deserialize)]
221 : pub struct RawXlogRecord {
222 : pub info: u8,
223 : pub lsn: Lsn,
224 : pub buf: Bytes,
225 : }
226 :
227 0 : #[derive(Serialize, Deserialize)]
228 : pub enum LogicalMessageRecord {
229 : Put(PutLogicalMessage),
230 : #[cfg(feature = "testing")]
231 : Failpoint,
232 : }
233 :
234 0 : #[derive(Serialize, Deserialize)]
235 : pub struct PutLogicalMessage {
236 : pub path: String,
237 : pub buf: Bytes,
238 : }
239 :
240 0 : #[derive(Serialize, Deserialize)]
241 : pub enum StandbyRecord {
242 : RunningXacts(StandbyRunningXacts),
243 : }
244 :
245 0 : #[derive(Serialize, Deserialize)]
246 : pub struct StandbyRunningXacts {
247 : pub oldest_running_xid: TransactionId,
248 : }
249 :
250 0 : #[derive(Serialize, Deserialize)]
251 : pub enum ReploriginRecord {
252 : Set(XlReploriginSet),
253 : Drop(XlReploriginDrop),
254 : }
|