Line data Source code
1 : //! This module houses types which represent decoded PG WAL records
2 : //! ready for the pageserver to interpret. They are derived from the original
3 : //! WAL records, so that each struct corresponds closely to one WAL record of
4 : //! a specific kind. They contain the same information as the original WAL records,
5 : //! but the values are already serialized in a [`SerializedValueBatch`], which
6 : //! is the format that the pageserver is expecting them in.
7 : //!
8 : //! The ingestion code uses these structs to help with parsing the WAL records,
9 : //! and it splits them into a stream of modifications to the key-value pairs that
10 : //! are ultimately stored in delta layers. See also the split-out counterparts in
11 : //! [`postgres_ffi::walrecord`].
12 : //!
13 : //! The pipeline which processes WAL records is not super obvious, so let's follow
14 : //! the flow of an example XACT_COMMIT Postgres record:
15 : //!
16 : //! (Postgres XACT_COMMIT record)
17 : //! |
18 : //! |--> pageserver::walingest::WalIngest::decode_xact_record
19 : //! |
20 : //! |--> ([`XactRecord::Commit`])
21 : //! |
22 : //! |--> pageserver::walingest::WalIngest::ingest_xact_record
23 : //! |
24 : //! |--> (NeonWalRecord::ClogSetCommitted)
25 : //! |
26 : //! |--> write to KV store within the pageserver
27 :
28 : use bytes::Bytes;
29 : use pageserver_api::reltag::{RelTag, SlruKind};
30 : use postgres_ffi::walrecord::{
31 : XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
32 : XlSmgrTruncate, XlXactParsedRecord,
33 : };
34 : use postgres_ffi::{Oid, TransactionId};
35 : use serde::{Deserialize, Serialize};
36 : use utils::lsn::Lsn;
37 :
38 : use crate::serialized_batch::SerializedValueBatch;
39 :
40 : // Code generated by protobuf.
41 : pub mod proto {
42 : // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
43 : // we don't use these types for anything but broker data transmission,
44 : // so it's ok to ignore this one.
45 : #![allow(clippy::derive_partial_eq_without_eq)]
46 : // The generated ValueMeta has a `len` method generate for its `len` field.
47 : #![allow(clippy::len_without_is_empty)]
48 : include!(concat!(env!("OUT_DIR"), concat!("/interpreted_wal.rs")));
49 : }
50 :
51 0 : #[derive(Copy, Clone, Serialize, Deserialize)]
52 : pub enum FlushUncommittedRecords {
53 : Yes,
54 : No,
55 : }
56 :
57 : /// A batch of interpreted WAL records
58 0 : #[derive(Serialize, Deserialize)]
59 : pub struct InterpretedWalRecords {
60 : pub records: Vec<InterpretedWalRecord>,
61 : // Start LSN of the next record after the batch.
62 : // Note that said record may not belong to the current shard.
63 : pub next_record_lsn: Option<Lsn>,
64 : }
65 :
66 : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
67 0 : #[derive(Serialize, Deserialize, Clone)]
68 : pub struct InterpretedWalRecord {
69 : /// Optional metadata record - may cause writes to metadata keys
70 : /// in the storage engine
71 : pub metadata_record: Option<MetadataRecord>,
72 : /// A pre-serialized batch along with the required metadata for ingestion
73 : /// by the pageserver
74 : pub batch: SerializedValueBatch,
75 : /// Byte offset within WAL for the start of the next PG WAL record.
76 : /// Usually this is the end LSN of the current record, but in case of
77 : /// XLOG SWITCH records it will be within the next segment.
78 : pub next_record_lsn: Lsn,
79 : /// Whether to flush all uncommitted modifications to the storage engine
80 : /// before ingesting this record. This is currently only used for legacy PG
81 : /// database creations which read pages from a template database. Such WAL
82 : /// records require reading data blocks while ingesting, hence the need to flush.
83 : pub flush_uncommitted: FlushUncommittedRecords,
84 : /// Transaction id of the original PG WAL record
85 : pub xid: TransactionId,
86 : }
87 :
88 : impl InterpretedWalRecord {
89 : /// Checks if the WAL record is empty
90 : ///
91 : /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
92 : /// pageserver.
93 795 : pub fn is_empty(&self) -> bool {
94 795 : self.batch.is_empty()
95 795 : && self.metadata_record.is_none()
96 199 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
97 795 : }
98 :
99 : /// Checks if the WAL record is observed (i.e. contains only metadata
100 : /// for observed values)
101 0 : pub fn is_observed(&self) -> bool {
102 0 : self.batch.is_observed()
103 0 : && self.metadata_record.is_none()
104 0 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
105 0 : }
106 : }
107 :
108 : /// The interpreted part of the Postgres WAL record which requires metadata
109 : /// writes to the underlying storage engine.
110 0 : #[derive(Clone, Serialize, Deserialize)]
111 : pub enum MetadataRecord {
112 : Heapam(HeapamRecord),
113 : Neonrmgr(NeonrmgrRecord),
114 : Smgr(SmgrRecord),
115 : Dbase(DbaseRecord),
116 : Clog(ClogRecord),
117 : Xact(XactRecord),
118 : MultiXact(MultiXactRecord),
119 : Relmap(RelmapRecord),
120 : Xlog(XlogRecord),
121 : LogicalMessage(LogicalMessageRecord),
122 : Standby(StandbyRecord),
123 : Replorigin(ReploriginRecord),
124 : }
125 :
126 0 : #[derive(Clone, Serialize, Deserialize)]
127 : pub enum HeapamRecord {
128 : ClearVmBits(ClearVmBits),
129 : }
130 :
131 0 : #[derive(Clone, Serialize, Deserialize)]
132 : pub struct ClearVmBits {
133 : pub new_heap_blkno: Option<u32>,
134 : pub old_heap_blkno: Option<u32>,
135 : pub vm_rel: RelTag,
136 : pub flags: u8,
137 : }
138 :
139 0 : #[derive(Clone, Serialize, Deserialize)]
140 : pub enum NeonrmgrRecord {
141 : ClearVmBits(ClearVmBits),
142 : }
143 :
144 0 : #[derive(Clone, Serialize, Deserialize)]
145 : pub enum SmgrRecord {
146 : Create(SmgrCreate),
147 : Truncate(XlSmgrTruncate),
148 : }
149 :
150 0 : #[derive(Clone, Serialize, Deserialize)]
151 : pub struct SmgrCreate {
152 : pub rel: RelTag,
153 : }
154 :
155 0 : #[derive(Clone, Serialize, Deserialize)]
156 : pub enum DbaseRecord {
157 : Create(DbaseCreate),
158 : Drop(DbaseDrop),
159 : }
160 :
161 0 : #[derive(Clone, Serialize, Deserialize)]
162 : pub struct DbaseCreate {
163 : pub db_id: Oid,
164 : pub tablespace_id: Oid,
165 : pub src_db_id: Oid,
166 : pub src_tablespace_id: Oid,
167 : }
168 :
169 0 : #[derive(Clone, Serialize, Deserialize)]
170 : pub struct DbaseDrop {
171 : pub db_id: Oid,
172 : pub tablespace_ids: Vec<Oid>,
173 : }
174 :
175 0 : #[derive(Clone, Serialize, Deserialize)]
176 : pub enum ClogRecord {
177 : ZeroPage(ClogZeroPage),
178 : Truncate(ClogTruncate),
179 : }
180 :
181 0 : #[derive(Clone, Serialize, Deserialize)]
182 : pub struct ClogZeroPage {
183 : pub segno: u32,
184 : pub rpageno: u32,
185 : }
186 :
187 0 : #[derive(Clone, Serialize, Deserialize)]
188 : pub struct ClogTruncate {
189 : pub pageno: u32,
190 : pub oldest_xid: TransactionId,
191 : pub oldest_xid_db: Oid,
192 : }
193 :
194 0 : #[derive(Clone, Serialize, Deserialize)]
195 : pub enum XactRecord {
196 : Commit(XactCommon),
197 : Abort(XactCommon),
198 : CommitPrepared(XactCommon),
199 : AbortPrepared(XactCommon),
200 : Prepare(XactPrepare),
201 : }
202 :
203 0 : #[derive(Clone, Serialize, Deserialize)]
204 : pub struct XactCommon {
205 : pub parsed: XlXactParsedRecord,
206 : pub origin_id: u16,
207 : // Fields below are only used for logging
208 : pub xl_xid: TransactionId,
209 : pub lsn: Lsn,
210 : }
211 :
212 0 : #[derive(Clone, Serialize, Deserialize)]
213 : pub struct XactPrepare {
214 : pub xl_xid: TransactionId,
215 : pub data: Bytes,
216 : }
217 :
218 0 : #[derive(Clone, Serialize, Deserialize)]
219 : pub enum MultiXactRecord {
220 : ZeroPage(MultiXactZeroPage),
221 : Create(XlMultiXactCreate),
222 : Truncate(XlMultiXactTruncate),
223 : }
224 :
225 0 : #[derive(Clone, Serialize, Deserialize)]
226 : pub struct MultiXactZeroPage {
227 : pub slru_kind: SlruKind,
228 : pub segno: u32,
229 : pub rpageno: u32,
230 : }
231 :
232 0 : #[derive(Clone, Serialize, Deserialize)]
233 : pub enum RelmapRecord {
234 : Update(RelmapUpdate),
235 : }
236 :
237 0 : #[derive(Clone, Serialize, Deserialize)]
238 : pub struct RelmapUpdate {
239 : pub update: XlRelmapUpdate,
240 : pub buf: Bytes,
241 : }
242 :
243 0 : #[derive(Clone, Serialize, Deserialize)]
244 : pub enum XlogRecord {
245 : Raw(RawXlogRecord),
246 : }
247 :
248 0 : #[derive(Clone, Serialize, Deserialize)]
249 : pub struct RawXlogRecord {
250 : pub info: u8,
251 : pub lsn: Lsn,
252 : pub buf: Bytes,
253 : }
254 :
255 0 : #[derive(Clone, Serialize, Deserialize)]
256 : pub enum LogicalMessageRecord {
257 : Put(PutLogicalMessage),
258 : #[cfg(feature = "testing")]
259 : Failpoint,
260 : }
261 :
262 0 : #[derive(Clone, Serialize, Deserialize)]
263 : pub struct PutLogicalMessage {
264 : pub path: String,
265 : pub buf: Bytes,
266 : }
267 :
268 0 : #[derive(Clone, Serialize, Deserialize)]
269 : pub enum StandbyRecord {
270 : RunningXacts(StandbyRunningXacts),
271 : }
272 :
273 0 : #[derive(Clone, Serialize, Deserialize)]
274 : pub struct StandbyRunningXacts {
275 : pub oldest_running_xid: TransactionId,
276 : }
277 :
278 0 : #[derive(Clone, Serialize, Deserialize)]
279 : pub enum ReploriginRecord {
280 : Set(XlReploriginSet),
281 : Drop(XlReploriginDrop),
282 : }
|