Line data Source code
1 : //! This module houses types which represent decoded PG WAL records
2 : //! ready for the pageserver to interpret. They are derived from the original
3 : //! WAL records, so that each struct corresponds closely to one WAL record of
4 : //! a specific kind. They contain the same information as the original WAL records,
5 : //! but the values are already serialized in a [`SerializedValueBatch`], which
6 : //! is the format that the pageserver is expecting them in.
7 : //!
8 : //! The ingestion code uses these structs to help with parsing the WAL records,
9 : //! and it splits them into a stream of modifications to the key-value pairs that
10 : //! are ultimately stored in delta layers. See also the split-out counterparts in
11 : //! [`postgres_ffi::walrecord`].
12 : //!
13 : //! The pipeline which processes WAL records is not super obvious, so let's follow
14 : //! the flow of an example XACT_COMMIT Postgres record:
15 : //!
16 : //! (Postgres XACT_COMMIT record)
17 : //! |
18 : //! |--> pageserver::walingest::WalIngest::decode_xact_record
19 : //! |
20 : //! |--> ([`XactRecord::Commit`])
21 : //! |
22 : //! |--> pageserver::walingest::WalIngest::ingest_xact_record
23 : //! |
24 : //! |--> (NeonWalRecord::ClogSetCommitted)
25 : //! |
26 : //! |--> write to KV store within the pageserver
27 :
28 : use bytes::Bytes;
29 : use pageserver_api::reltag::{RelTag, SlruKind};
30 : use postgres_ffi::walrecord::{
31 : XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
32 : XlSmgrTruncate, XlXactParsedRecord,
33 : };
34 : use postgres_ffi::{Oid, TransactionId};
35 : use serde::{Deserialize, Serialize};
36 : use utils::lsn::Lsn;
37 :
38 : use crate::serialized_batch::SerializedValueBatch;
39 :
40 : // Code generated by protobuf.
41 : pub mod proto {
42 : // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
43 : // we don't use these types for anything but broker data transmission,
44 : // so it's ok to ignore this one.
45 : #![allow(clippy::derive_partial_eq_without_eq)]
46 : // The generated ValueMeta has a `len` method generate for its `len` field.
47 : #![allow(clippy::len_without_is_empty)]
48 : tonic::include_proto!("interpreted_wal");
49 : }
50 :
51 0 : #[derive(Serialize, Deserialize)]
52 : pub enum FlushUncommittedRecords {
53 : Yes,
54 : No,
55 : }
56 :
57 : /// A batch of interpreted WAL records
58 0 : #[derive(Serialize, Deserialize)]
59 : pub struct InterpretedWalRecords {
60 : pub records: Vec<InterpretedWalRecord>,
61 : // Start LSN of the next record after the batch.
62 : // Note that said record may belong to the current shard.
63 : pub next_record_lsn: Option<Lsn>,
64 : }
65 :
66 : /// An interpreted Postgres WAL record, ready to be handled by the pageserver
67 0 : #[derive(Serialize, Deserialize)]
68 : pub struct InterpretedWalRecord {
69 : /// Optional metadata record - may cause writes to metadata keys
70 : /// in the storage engine
71 : pub metadata_record: Option<MetadataRecord>,
72 : /// A pre-serialized batch along with the required metadata for ingestion
73 : /// by the pageserver
74 : pub batch: SerializedValueBatch,
75 : /// Byte offset within WAL for the start of the next PG WAL record.
76 : /// Usually this is the end LSN of the current record, but in case of
77 : /// XLOG SWITCH records it will be within the next segment.
78 : pub next_record_lsn: Lsn,
79 : /// Whether to flush all uncommitted modifications to the storage engine
80 : /// before ingesting this record. This is currently only used for legacy PG
81 : /// database creations which read pages from a template database. Such WAL
82 : /// records require reading data blocks while ingesting, hence the need to flush.
83 : pub flush_uncommitted: FlushUncommittedRecords,
84 : /// Transaction id of the original PG WAL record
85 : pub xid: TransactionId,
86 : }
87 :
88 : impl InterpretedWalRecord {
89 : /// Checks if the WAL record is empty
90 : ///
91 : /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
92 : /// pageserver.
93 0 : pub fn is_empty(&self) -> bool {
94 0 : self.batch.is_empty()
95 0 : && self.metadata_record.is_none()
96 0 : && matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
97 0 : }
98 : }
99 :
100 : /// The interpreted part of the Postgres WAL record which requires metadata
101 : /// writes to the underlying storage engine.
102 0 : #[derive(Serialize, Deserialize)]
103 : pub enum MetadataRecord {
104 : Heapam(HeapamRecord),
105 : Neonrmgr(NeonrmgrRecord),
106 : Smgr(SmgrRecord),
107 : Dbase(DbaseRecord),
108 : Clog(ClogRecord),
109 : Xact(XactRecord),
110 : MultiXact(MultiXactRecord),
111 : Relmap(RelmapRecord),
112 : Xlog(XlogRecord),
113 : LogicalMessage(LogicalMessageRecord),
114 : Standby(StandbyRecord),
115 : Replorigin(ReploriginRecord),
116 : }
117 :
118 0 : #[derive(Serialize, Deserialize)]
119 : pub enum HeapamRecord {
120 : ClearVmBits(ClearVmBits),
121 : }
122 :
123 0 : #[derive(Serialize, Deserialize)]
124 : pub struct ClearVmBits {
125 : pub new_heap_blkno: Option<u32>,
126 : pub old_heap_blkno: Option<u32>,
127 : pub vm_rel: RelTag,
128 : pub flags: u8,
129 : }
130 :
131 0 : #[derive(Serialize, Deserialize)]
132 : pub enum NeonrmgrRecord {
133 : ClearVmBits(ClearVmBits),
134 : }
135 :
136 0 : #[derive(Serialize, Deserialize)]
137 : pub enum SmgrRecord {
138 : Create(SmgrCreate),
139 : Truncate(XlSmgrTruncate),
140 : }
141 :
142 0 : #[derive(Serialize, Deserialize)]
143 : pub struct SmgrCreate {
144 : pub rel: RelTag,
145 : }
146 :
147 0 : #[derive(Serialize, Deserialize)]
148 : pub enum DbaseRecord {
149 : Create(DbaseCreate),
150 : Drop(DbaseDrop),
151 : }
152 :
153 0 : #[derive(Serialize, Deserialize)]
154 : pub struct DbaseCreate {
155 : pub db_id: Oid,
156 : pub tablespace_id: Oid,
157 : pub src_db_id: Oid,
158 : pub src_tablespace_id: Oid,
159 : }
160 :
161 0 : #[derive(Serialize, Deserialize)]
162 : pub struct DbaseDrop {
163 : pub db_id: Oid,
164 : pub tablespace_ids: Vec<Oid>,
165 : }
166 :
167 0 : #[derive(Serialize, Deserialize)]
168 : pub enum ClogRecord {
169 : ZeroPage(ClogZeroPage),
170 : Truncate(ClogTruncate),
171 : }
172 :
173 0 : #[derive(Serialize, Deserialize)]
174 : pub struct ClogZeroPage {
175 : pub segno: u32,
176 : pub rpageno: u32,
177 : }
178 :
179 0 : #[derive(Serialize, Deserialize)]
180 : pub struct ClogTruncate {
181 : pub pageno: u32,
182 : pub oldest_xid: TransactionId,
183 : pub oldest_xid_db: Oid,
184 : }
185 :
186 0 : #[derive(Serialize, Deserialize)]
187 : pub enum XactRecord {
188 : Commit(XactCommon),
189 : Abort(XactCommon),
190 : CommitPrepared(XactCommon),
191 : AbortPrepared(XactCommon),
192 : Prepare(XactPrepare),
193 : }
194 :
195 0 : #[derive(Serialize, Deserialize)]
196 : pub struct XactCommon {
197 : pub parsed: XlXactParsedRecord,
198 : pub origin_id: u16,
199 : // Fields below are only used for logging
200 : pub xl_xid: TransactionId,
201 : pub lsn: Lsn,
202 : }
203 :
204 0 : #[derive(Serialize, Deserialize)]
205 : pub struct XactPrepare {
206 : pub xl_xid: TransactionId,
207 : pub data: Bytes,
208 : }
209 :
210 0 : #[derive(Serialize, Deserialize)]
211 : pub enum MultiXactRecord {
212 : ZeroPage(MultiXactZeroPage),
213 : Create(XlMultiXactCreate),
214 : Truncate(XlMultiXactTruncate),
215 : }
216 :
217 0 : #[derive(Serialize, Deserialize)]
218 : pub struct MultiXactZeroPage {
219 : pub slru_kind: SlruKind,
220 : pub segno: u32,
221 : pub rpageno: u32,
222 : }
223 :
224 0 : #[derive(Serialize, Deserialize)]
225 : pub enum RelmapRecord {
226 : Update(RelmapUpdate),
227 : }
228 :
229 0 : #[derive(Serialize, Deserialize)]
230 : pub struct RelmapUpdate {
231 : pub update: XlRelmapUpdate,
232 : pub buf: Bytes,
233 : }
234 :
235 0 : #[derive(Serialize, Deserialize)]
236 : pub enum XlogRecord {
237 : Raw(RawXlogRecord),
238 : }
239 :
240 0 : #[derive(Serialize, Deserialize)]
241 : pub struct RawXlogRecord {
242 : pub info: u8,
243 : pub lsn: Lsn,
244 : pub buf: Bytes,
245 : }
246 :
247 0 : #[derive(Serialize, Deserialize)]
248 : pub enum LogicalMessageRecord {
249 : Put(PutLogicalMessage),
250 : #[cfg(feature = "testing")]
251 : Failpoint,
252 : }
253 :
254 0 : #[derive(Serialize, Deserialize)]
255 : pub struct PutLogicalMessage {
256 : pub path: String,
257 : pub buf: Bytes,
258 : }
259 :
260 0 : #[derive(Serialize, Deserialize)]
261 : pub enum StandbyRecord {
262 : RunningXacts(StandbyRunningXacts),
263 : }
264 :
265 0 : #[derive(Serialize, Deserialize)]
266 : pub struct StandbyRunningXacts {
267 : pub oldest_running_xid: TransactionId,
268 : }
269 :
270 0 : #[derive(Serialize, Deserialize)]
271 : pub enum ReploriginRecord {
272 : Set(XlReploriginSet),
273 : Drop(XlReploriginDrop),
274 : }
|