Line data Source code
1 : //! This module houses types which represent decoded PG WAL records
2 : //! ready for the pageserver to interpret. They are derived from the original
3 : //! WAL records, so that each struct corresponds closely to one WAL record of
4 : //! a specific kind. They contain the same information as the original WAL records,
5 : //! but the values are already serialized in a [`SerializedValueBatch`], which
6 : //! is the format that the pageserver is expecting them in.
7 : //!
8 : //! The ingestion code uses these structs to help with parsing the WAL records,
9 : //! and it splits them into a stream of modifications to the key-value pairs that
10 : //! are ultimately stored in delta layers. See also the split-out counterparts in
11 : //! [`postgres_ffi::walrecord`].
12 : //!
13 : //! The pipeline which processes WAL records is not super obvious, so let's follow
14 : //! the flow of an example XACT_COMMIT Postgres record:
15 : //!
16 : //! (Postgres XACT_COMMIT record)
17 : //! |
18 : //! |--> pageserver::walingest::WalIngest::decode_xact_record
19 : //! |
20 : //! |--> ([`XactRecord::Commit`])
21 : //! |
22 : //! |--> pageserver::walingest::WalIngest::ingest_xact_record
23 : //! |
24 : //! |--> (NeonWalRecord::ClogSetCommitted)
25 : //! |
26 : //! |--> write to KV store within the pageserver
27 :
28 : use bytes::Bytes;
29 : use pageserver_api::reltag::{RelTag, SlruKind};
30 : use postgres_ffi::walrecord::{
31 : XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
32 : XlSmgrTruncate, XlXactParsedRecord,
33 : };
34 : use postgres_ffi::{Oid, TransactionId};
35 : use serde::{Deserialize, Serialize};
36 : use utils::lsn::Lsn;
37 :
38 : use crate::serialized_batch::SerializedValueBatch;
39 :
40 : // Code generated by protobuf.
41 : pub mod proto {
42 : // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
43 : // we don't use these types for anything but broker data transmission,
44 : // so it's ok to ignore this one.
45 : #![allow(clippy::derive_partial_eq_without_eq)]
46 : // The generated ValueMeta has a `len` method generate for its `len` field.
47 : #![allow(clippy::len_without_is_empty)]
48 : include!(concat!(env!("OUT_DIR"), concat!("/interpreted_wal.rs")));
49 : }
50 :
51 0 : #[derive(Copy, Clone, Serialize, Deserialize)]
52 : pub enum FlushUncommittedRecords {
53 : Yes,
54 : No,
55 : }
56 :
57 : /// A batch of interpreted WAL records
58 0 : #[derive(Serialize, Deserialize)]
59 : pub struct InterpretedWalRecords {
60 : pub records: Vec<InterpretedWalRecord>,
61 : // Start LSN of the next record after the batch.
62 : // Note that said record may not belong to the current shard.
63 : pub next_record_lsn: Lsn,
64 : // Inclusive start LSN of the PG WAL from which the interpreted
65 : // WAL records were extracted. Note that this is not necessarily the
66 : // start LSN of the first interpreted record in the batch.
67 : pub raw_wal_start_lsn: Option<Lsn>,
68 : }
69 :
70 : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
71 0 : #[derive(Serialize, Deserialize, Clone)]
72 : pub struct InterpretedWalRecord {
73 : /// Optional metadata record - may cause writes to metadata keys
74 : /// in the storage engine
75 : pub metadata_record: Option<MetadataRecord>,
76 : /// A pre-serialized batch along with the required metadata for ingestion
77 : /// by the pageserver
78 : pub batch: SerializedValueBatch,
79 : /// Byte offset within WAL for the start of the next PG WAL record.
80 : /// Usually this is the end LSN of the current record, but in case of
81 : /// XLOG SWITCH records it will be within the next segment.
82 : pub next_record_lsn: Lsn,
83 : /// Whether to flush all uncommitted modifications to the storage engine
84 : /// before ingesting this record. This is currently only used for legacy PG
85 : /// database creations which read pages from a template database. Such WAL
86 : /// records require reading data blocks while ingesting, hence the need to flush.
87 : pub flush_uncommitted: FlushUncommittedRecords,
88 : /// Transaction id of the original PG WAL record
89 : pub xid: TransactionId,
90 : }
91 :
92 : impl InterpretedWalRecord {
93 : /// Checks if the WAL record is empty
94 : ///
95 : /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
96 : /// pageserver.
97 200 : pub fn is_empty(&self) -> bool {
98 200 : self.batch.is_empty()
99 200 : && self.metadata_record.is_none()
100 200 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
101 200 : }
102 :
103 : /// Checks if the WAL record is observed (i.e. contains only metadata
104 : /// for observed values)
105 0 : pub fn is_observed(&self) -> bool {
106 0 : self.batch.is_observed()
107 0 : && self.metadata_record.is_none()
108 0 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
109 0 : }
110 : }
111 :
112 : /// The interpreted part of the Postgres WAL record which requires metadata
113 : /// writes to the underlying storage engine.
114 0 : #[derive(Clone, Serialize, Deserialize)]
115 : pub enum MetadataRecord {
116 : Heapam(HeapamRecord),
117 : Neonrmgr(NeonrmgrRecord),
118 : Smgr(SmgrRecord),
119 : Dbase(DbaseRecord),
120 : Clog(ClogRecord),
121 : Xact(XactRecord),
122 : MultiXact(MultiXactRecord),
123 : Relmap(RelmapRecord),
124 : Xlog(XlogRecord),
125 : LogicalMessage(LogicalMessageRecord),
126 : Standby(StandbyRecord),
127 : Replorigin(ReploriginRecord),
128 : }
129 :
130 0 : #[derive(Clone, Serialize, Deserialize)]
131 : pub enum HeapamRecord {
132 : ClearVmBits(ClearVmBits),
133 : }
134 :
135 0 : #[derive(Clone, Serialize, Deserialize)]
136 : pub struct ClearVmBits {
137 : pub new_heap_blkno: Option<u32>,
138 : pub old_heap_blkno: Option<u32>,
139 : pub vm_rel: RelTag,
140 : pub flags: u8,
141 : }
142 :
143 0 : #[derive(Clone, Serialize, Deserialize)]
144 : pub enum NeonrmgrRecord {
145 : ClearVmBits(ClearVmBits),
146 : }
147 :
148 0 : #[derive(Clone, Serialize, Deserialize)]
149 : pub enum SmgrRecord {
150 : Create(SmgrCreate),
151 : Truncate(XlSmgrTruncate),
152 : }
153 :
154 0 : #[derive(Clone, Serialize, Deserialize)]
155 : pub struct SmgrCreate {
156 : pub rel: RelTag,
157 : }
158 :
159 0 : #[derive(Clone, Serialize, Deserialize)]
160 : pub enum DbaseRecord {
161 : Create(DbaseCreate),
162 : Drop(DbaseDrop),
163 : }
164 :
165 0 : #[derive(Clone, Serialize, Deserialize)]
166 : pub struct DbaseCreate {
167 : pub db_id: Oid,
168 : pub tablespace_id: Oid,
169 : pub src_db_id: Oid,
170 : pub src_tablespace_id: Oid,
171 : }
172 :
173 0 : #[derive(Clone, Serialize, Deserialize)]
174 : pub struct DbaseDrop {
175 : pub db_id: Oid,
176 : pub tablespace_ids: Vec<Oid>,
177 : }
178 :
179 0 : #[derive(Clone, Serialize, Deserialize)]
180 : pub enum ClogRecord {
181 : ZeroPage(ClogZeroPage),
182 : Truncate(ClogTruncate),
183 : }
184 :
185 0 : #[derive(Clone, Serialize, Deserialize)]
186 : pub struct ClogZeroPage {
187 : pub segno: u32,
188 : pub rpageno: u32,
189 : }
190 :
191 0 : #[derive(Clone, Serialize, Deserialize)]
192 : pub struct ClogTruncate {
193 : pub pageno: u32,
194 : pub oldest_xid: TransactionId,
195 : pub oldest_xid_db: Oid,
196 : }
197 :
198 0 : #[derive(Clone, Serialize, Deserialize)]
199 : pub enum XactRecord {
200 : Commit(XactCommon),
201 : Abort(XactCommon),
202 : CommitPrepared(XactCommon),
203 : AbortPrepared(XactCommon),
204 : Prepare(XactPrepare),
205 : }
206 :
207 0 : #[derive(Clone, Serialize, Deserialize)]
208 : pub struct XactCommon {
209 : pub parsed: XlXactParsedRecord,
210 : pub origin_id: u16,
211 : // Fields below are only used for logging
212 : pub xl_xid: TransactionId,
213 : pub lsn: Lsn,
214 : }
215 :
216 0 : #[derive(Clone, Serialize, Deserialize)]
217 : pub struct XactPrepare {
218 : pub xl_xid: TransactionId,
219 : pub data: Bytes,
220 : }
221 :
222 0 : #[derive(Clone, Serialize, Deserialize)]
223 : pub enum MultiXactRecord {
224 : ZeroPage(MultiXactZeroPage),
225 : Create(XlMultiXactCreate),
226 : Truncate(XlMultiXactTruncate),
227 : }
228 :
229 0 : #[derive(Clone, Serialize, Deserialize)]
230 : pub struct MultiXactZeroPage {
231 : pub slru_kind: SlruKind,
232 : pub segno: u32,
233 : pub rpageno: u32,
234 : }
235 :
236 0 : #[derive(Clone, Serialize, Deserialize)]
237 : pub enum RelmapRecord {
238 : Update(RelmapUpdate),
239 : }
240 :
241 0 : #[derive(Clone, Serialize, Deserialize)]
242 : pub struct RelmapUpdate {
243 : pub update: XlRelmapUpdate,
244 : pub buf: Bytes,
245 : }
246 :
247 0 : #[derive(Clone, Serialize, Deserialize)]
248 : pub enum XlogRecord {
249 : Raw(RawXlogRecord),
250 : }
251 :
252 0 : #[derive(Clone, Serialize, Deserialize)]
253 : pub struct RawXlogRecord {
254 : pub info: u8,
255 : pub lsn: Lsn,
256 : pub buf: Bytes,
257 : }
258 :
259 0 : #[derive(Clone, Serialize, Deserialize)]
260 : pub enum LogicalMessageRecord {
261 : Put(PutLogicalMessage),
262 : #[cfg(feature = "testing")]
263 : Failpoint,
264 : }
265 :
266 0 : #[derive(Clone, Serialize, Deserialize)]
267 : pub struct PutLogicalMessage {
268 : pub path: String,
269 : pub buf: Bytes,
270 : }
271 :
272 0 : #[derive(Clone, Serialize, Deserialize)]
273 : pub enum StandbyRecord {
274 : RunningXacts(StandbyRunningXacts),
275 : }
276 :
277 0 : #[derive(Clone, Serialize, Deserialize)]
278 : pub struct StandbyRunningXacts {
279 : pub oldest_running_xid: TransactionId,
280 : }
281 :
282 0 : #[derive(Clone, Serialize, Deserialize)]
283 : pub enum ReploriginRecord {
284 : Set(XlReploriginSet),
285 : Drop(XlReploriginDrop),
286 : }
|