Line data Source code
1 : //! This module houses types which represent decoded PG WAL records
2 : //! ready for the pageserver to interpret. They are derived from the original
3 : //! WAL records, so that each struct corresponds closely to one WAL record of
4 : //! a specific kind. They contain the same information as the original WAL records,
5 : //! but the values are already serialized in a [`SerializedValueBatch`], which
6 : //! is the format that the pageserver is expecting them in.
7 : //!
8 : //! The ingestion code uses these structs to help with parsing the WAL records,
9 : //! and it splits them into a stream of modifications to the key-value pairs that
10 : //! are ultimately stored in delta layers. See also the split-out counterparts in
11 : //! [`postgres_ffi::walrecord`].
12 : //!
13 : //! The pipeline which processes WAL records is not super obvious, so let's follow
14 : //! the flow of an example XACT_COMMIT Postgres record:
15 : //!
16 : //! (Postgres XACT_COMMIT record)
17 : //! |
18 : //! |--> pageserver::walingest::WalIngest::decode_xact_record
19 : //! |
20 : //! |--> ([`XactRecord::Commit`])
21 : //! |
22 : //! |--> pageserver::walingest::WalIngest::ingest_xact_record
23 : //! |
24 : //! |--> (NeonWalRecord::ClogSetCommitted)
25 : //! |
26 : //! |--> write to KV store within the pageserver
27 :
28 : pub mod record;
29 : pub mod value;
30 :
31 : use bytes::Bytes;
32 : use pageserver_api::reltag::{RelTag, SlruKind};
33 : use postgres_ffi::walrecord::{
34 : XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
35 : XlSmgrTruncate, XlXactParsedRecord,
36 : };
37 : use postgres_ffi::{Oid, TransactionId};
38 : use serde::{Deserialize, Serialize};
39 : use utils::lsn::Lsn;
40 :
41 : use crate::serialized_batch::SerializedValueBatch;
42 :
43 : // Code generated by protobuf.
44 : pub mod proto {
45 : // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
46 : // we don't use these types for anything but broker data transmission,
47 : // so it's ok to ignore this one.
48 : #![allow(clippy::derive_partial_eq_without_eq)]
49 : // The generated ValueMeta has a `len` method generate for its `len` field.
50 : #![allow(clippy::len_without_is_empty)]
51 : include!(concat!(env!("OUT_DIR"), concat!("/interpreted_wal.rs")));
52 : }
53 :
54 0 : #[derive(Copy, Clone, Serialize, Deserialize)]
55 : pub enum FlushUncommittedRecords {
56 : Yes,
57 : No,
58 : }
59 :
60 : /// A batch of interpreted WAL records
61 0 : #[derive(Serialize, Deserialize)]
62 : pub struct InterpretedWalRecords {
63 : pub records: Vec<InterpretedWalRecord>,
64 : // Start LSN of the next record after the batch.
65 : // Note that said record may not belong to the current shard.
66 : pub next_record_lsn: Lsn,
67 : // Inclusive start LSN of the PG WAL from which the interpreted
68 : // WAL records were extracted. Note that this is not necessarily the
69 : // start LSN of the first interpreted record in the batch.
70 : pub raw_wal_start_lsn: Option<Lsn>,
71 : }
72 :
73 : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
74 0 : #[derive(Serialize, Deserialize, Clone)]
75 : pub struct InterpretedWalRecord {
76 : /// Optional metadata record - may cause writes to metadata keys
77 : /// in the storage engine
78 : pub metadata_record: Option<MetadataRecord>,
79 : /// A pre-serialized batch along with the required metadata for ingestion
80 : /// by the pageserver
81 : pub batch: SerializedValueBatch,
82 : /// Byte offset within WAL for the start of the next PG WAL record.
83 : /// Usually this is the end LSN of the current record, but in case of
84 : /// XLOG SWITCH records it will be within the next segment.
85 : pub next_record_lsn: Lsn,
86 : /// Whether to flush all uncommitted modifications to the storage engine
87 : /// before ingesting this record. This is currently only used for legacy PG
88 : /// database creations which read pages from a template database. Such WAL
89 : /// records require reading data blocks while ingesting, hence the need to flush.
90 : pub flush_uncommitted: FlushUncommittedRecords,
91 : /// Transaction id of the original PG WAL record
92 : pub xid: TransactionId,
93 : }
94 :
95 : impl InterpretedWalRecord {
96 : /// Checks if the WAL record is empty
97 : ///
98 : /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
99 : /// pageserver.
100 200 : pub fn is_empty(&self) -> bool {
101 200 : self.batch.is_empty()
102 200 : && self.metadata_record.is_none()
103 200 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
104 200 : }
105 :
106 : /// Checks if the WAL record is observed (i.e. contains only metadata
107 : /// for observed values)
108 0 : pub fn is_observed(&self) -> bool {
109 0 : self.batch.is_observed()
110 0 : && self.metadata_record.is_none()
111 0 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
112 0 : }
113 : }
114 :
115 : /// The interpreted part of the Postgres WAL record which requires metadata
116 : /// writes to the underlying storage engine.
117 0 : #[derive(Clone, Serialize, Deserialize)]
118 : pub enum MetadataRecord {
119 : Heapam(HeapamRecord),
120 : Neonrmgr(NeonrmgrRecord),
121 : Smgr(SmgrRecord),
122 : Dbase(DbaseRecord),
123 : Clog(ClogRecord),
124 : Xact(XactRecord),
125 : MultiXact(MultiXactRecord),
126 : Relmap(RelmapRecord),
127 : Xlog(XlogRecord),
128 : LogicalMessage(LogicalMessageRecord),
129 : Standby(StandbyRecord),
130 : Replorigin(ReploriginRecord),
131 : }
132 :
133 0 : #[derive(Clone, Serialize, Deserialize)]
134 : pub enum HeapamRecord {
135 : ClearVmBits(ClearVmBits),
136 : }
137 :
138 0 : #[derive(Clone, Serialize, Deserialize)]
139 : pub struct ClearVmBits {
140 : pub new_heap_blkno: Option<u32>,
141 : pub old_heap_blkno: Option<u32>,
142 : pub vm_rel: RelTag,
143 : pub flags: u8,
144 : }
145 :
146 0 : #[derive(Clone, Serialize, Deserialize)]
147 : pub enum NeonrmgrRecord {
148 : ClearVmBits(ClearVmBits),
149 : }
150 :
151 0 : #[derive(Clone, Serialize, Deserialize)]
152 : pub enum SmgrRecord {
153 : Create(SmgrCreate),
154 : Truncate(XlSmgrTruncate),
155 : }
156 :
157 0 : #[derive(Clone, Serialize, Deserialize)]
158 : pub struct SmgrCreate {
159 : pub rel: RelTag,
160 : }
161 :
162 0 : #[derive(Clone, Serialize, Deserialize)]
163 : pub enum DbaseRecord {
164 : Create(DbaseCreate),
165 : Drop(DbaseDrop),
166 : }
167 :
168 0 : #[derive(Clone, Serialize, Deserialize)]
169 : pub struct DbaseCreate {
170 : pub db_id: Oid,
171 : pub tablespace_id: Oid,
172 : pub src_db_id: Oid,
173 : pub src_tablespace_id: Oid,
174 : }
175 :
176 0 : #[derive(Clone, Serialize, Deserialize)]
177 : pub struct DbaseDrop {
178 : pub db_id: Oid,
179 : pub tablespace_ids: Vec<Oid>,
180 : }
181 :
182 0 : #[derive(Clone, Serialize, Deserialize)]
183 : pub enum ClogRecord {
184 : ZeroPage(ClogZeroPage),
185 : Truncate(ClogTruncate),
186 : }
187 :
188 0 : #[derive(Clone, Serialize, Deserialize)]
189 : pub struct ClogZeroPage {
190 : pub segno: u32,
191 : pub rpageno: u32,
192 : }
193 :
194 0 : #[derive(Clone, Serialize, Deserialize)]
195 : pub struct ClogTruncate {
196 : pub pageno: u32,
197 : pub oldest_xid: TransactionId,
198 : pub oldest_xid_db: Oid,
199 : }
200 :
201 0 : #[derive(Clone, Serialize, Deserialize)]
202 : pub enum XactRecord {
203 : Commit(XactCommon),
204 : Abort(XactCommon),
205 : CommitPrepared(XactCommon),
206 : AbortPrepared(XactCommon),
207 : Prepare(XactPrepare),
208 : }
209 :
210 0 : #[derive(Clone, Serialize, Deserialize)]
211 : pub struct XactCommon {
212 : pub parsed: XlXactParsedRecord,
213 : pub origin_id: u16,
214 : // Fields below are only used for logging
215 : pub xl_xid: TransactionId,
216 : pub lsn: Lsn,
217 : }
218 :
219 0 : #[derive(Clone, Serialize, Deserialize)]
220 : pub struct XactPrepare {
221 : pub xl_xid: TransactionId,
222 : pub data: Bytes,
223 : }
224 :
225 0 : #[derive(Clone, Serialize, Deserialize)]
226 : pub enum MultiXactRecord {
227 : ZeroPage(MultiXactZeroPage),
228 : Create(XlMultiXactCreate),
229 : Truncate(XlMultiXactTruncate),
230 : }
231 :
232 0 : #[derive(Clone, Serialize, Deserialize)]
233 : pub struct MultiXactZeroPage {
234 : pub slru_kind: SlruKind,
235 : pub segno: u32,
236 : pub rpageno: u32,
237 : }
238 :
239 0 : #[derive(Clone, Serialize, Deserialize)]
240 : pub enum RelmapRecord {
241 : Update(RelmapUpdate),
242 : }
243 :
244 0 : #[derive(Clone, Serialize, Deserialize)]
245 : pub struct RelmapUpdate {
246 : pub update: XlRelmapUpdate,
247 : pub buf: Bytes,
248 : }
249 :
250 0 : #[derive(Clone, Serialize, Deserialize)]
251 : pub enum XlogRecord {
252 : Raw(RawXlogRecord),
253 : }
254 :
255 0 : #[derive(Clone, Serialize, Deserialize)]
256 : pub struct RawXlogRecord {
257 : pub info: u8,
258 : pub lsn: Lsn,
259 : pub buf: Bytes,
260 : }
261 :
262 0 : #[derive(Clone, Serialize, Deserialize)]
263 : pub enum LogicalMessageRecord {
264 : Put(PutLogicalMessage),
265 : #[cfg(feature = "testing")]
266 : Failpoint,
267 : }
268 :
269 0 : #[derive(Clone, Serialize, Deserialize)]
270 : pub struct PutLogicalMessage {
271 : pub path: String,
272 : pub buf: Bytes,
273 : }
274 :
275 0 : #[derive(Clone, Serialize, Deserialize)]
276 : pub enum StandbyRecord {
277 : RunningXacts(StandbyRunningXacts),
278 : }
279 :
280 0 : #[derive(Clone, Serialize, Deserialize)]
281 : pub struct StandbyRunningXacts {
282 : pub oldest_running_xid: TransactionId,
283 : }
284 :
285 0 : #[derive(Clone, Serialize, Deserialize)]
286 : pub enum ReploriginRecord {
287 : Set(XlReploriginSet),
288 : Drop(XlReploriginDrop),
289 : }
|