Line data Source code
1 : //! This module contains logic for decoding and interpreting
2 : //! raw bytes which represent a raw Postgres WAL record.
3 :
4 : use crate::models::*;
5 : use crate::serialized_batch::SerializedValueBatch;
6 : use bytes::{Buf, Bytes};
7 : use pageserver_api::reltag::{RelTag, SlruKind};
8 : use pageserver_api::shard::ShardIdentity;
9 : use postgres_ffi::pg_constants;
10 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
11 : use postgres_ffi::walrecord::*;
12 : use utils::lsn::Lsn;
13 :
14 : impl InterpretedWalRecord {
15 : /// Decode and interpreted raw bytes which represent one Postgres WAL record.
16 : /// Data blocks which do not match the provided shard identity are filtered out.
17 : /// Shard 0 is a special case since it tracks all relation sizes. We only give it
18 : /// the keys that are being written as that is enough for updating relation sizes.
19 145852 : pub fn from_bytes_filtered(
20 145852 : buf: Bytes,
21 145852 : shard: &ShardIdentity,
22 145852 : record_end_lsn: Lsn,
23 145852 : pg_version: u32,
24 145852 : ) -> anyhow::Result<InterpretedWalRecord> {
25 145852 : let mut decoded = DecodedWALRecord::default();
26 145852 : decode_wal_record(buf, &mut decoded, pg_version)?;
27 145852 : let xid = decoded.xl_xid;
28 :
29 145852 : let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
30 0 : FlushUncommittedRecords::Yes
31 : } else {
32 145852 : FlushUncommittedRecords::No
33 : };
34 :
35 145852 : let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?;
36 145852 : let batch = SerializedValueBatch::from_decoded_filtered(
37 145852 : decoded,
38 145852 : shard,
39 145852 : record_end_lsn,
40 145852 : pg_version,
41 145852 : )?;
42 :
43 145852 : Ok(InterpretedWalRecord {
44 145852 : metadata_record,
45 145852 : batch,
46 145852 : end_lsn: record_end_lsn,
47 145852 : flush_uncommitted,
48 145852 : xid,
49 145852 : })
50 145852 : }
51 : }
52 :
53 : impl MetadataRecord {
54 145852 : fn from_decoded(
55 145852 : decoded: &DecodedWALRecord,
56 145852 : record_end_lsn: Lsn,
57 145852 : pg_version: u32,
58 145852 : ) -> anyhow::Result<Option<MetadataRecord>> {
59 145852 : // Note: this doesn't actually copy the bytes since
60 145852 : // the [`Bytes`] type implements it via a level of indirection.
61 145852 : let mut buf = decoded.record.clone();
62 145852 : buf.advance(decoded.main_data_offset);
63 145852 :
64 145852 : match decoded.xl_rmid {
65 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
66 145474 : Self::decode_heapam_record(&mut buf, decoded, pg_version)
67 : }
68 0 : pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version),
69 : // Handle other special record types
70 16 : pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded),
71 0 : pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version),
72 : pg_constants::RM_TBLSPC_ID => {
73 0 : tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
74 0 : Ok(None)
75 : }
76 0 : pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
77 24 : pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn),
78 : pg_constants::RM_MULTIXACT_ID => {
79 0 : Self::decode_multixact_record(&mut buf, decoded, pg_version)
80 : }
81 0 : pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded),
82 : // This is an odd duck. It needs to go to all shards.
83 : // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
84 : // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
85 : // the pageserver and decode it there.
86 : //
87 : // Alternatively, one can make the checkpoint part of the subscription protocol
88 : // to the pageserver. This should work fine, but can be done at a later point.
89 30 : pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn),
90 : pg_constants::RM_LOGICALMSG_ID => {
91 0 : Self::decode_logical_message_record(&mut buf, decoded)
92 : }
93 16 : pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded),
94 0 : pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded),
95 292 : _unexpected => {
96 292 : // TODO: consider failing here instead of blindly doing something without
97 292 : // understanding the protocol
98 292 : Ok(None)
99 : }
100 : }
101 145852 : }
102 :
103 145474 : fn decode_heapam_record(
104 145474 : buf: &mut Bytes,
105 145474 : decoded: &DecodedWALRecord,
106 145474 : pg_version: u32,
107 145474 : ) -> anyhow::Result<Option<MetadataRecord>> {
108 145474 : // Handle VM bit updates that are implicitly part of heap records.
109 145474 :
110 145474 : // First, look at the record to determine which VM bits need
111 145474 : // to be cleared. If either of these variables is set, we
112 145474 : // need to clear the corresponding bits in the visibility map.
113 145474 : let mut new_heap_blkno: Option<u32> = None;
114 145474 : let mut old_heap_blkno: Option<u32> = None;
115 145474 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
116 145474 :
117 145474 : match pg_version {
118 : 14 => {
119 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
120 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
121 0 :
122 0 : if info == pg_constants::XLOG_HEAP_INSERT {
123 0 : let xlrec = v14::XlHeapInsert::decode(buf);
124 0 : assert_eq!(0, buf.remaining());
125 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
126 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
127 0 : }
128 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
129 0 : let xlrec = v14::XlHeapDelete::decode(buf);
130 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
131 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
132 0 : }
133 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
134 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
135 : {
136 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
137 0 : // the size of tuple data is inferred from the size of the record.
138 0 : // we can't validate the remaining number of bytes without parsing
139 0 : // the tuple data.
140 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
141 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
142 0 : }
143 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
144 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
145 0 : // non-HOT update where the new tuple goes to different page than
146 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
147 0 : // set.
148 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
149 0 : }
150 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
151 0 : let xlrec = v14::XlHeapLock::decode(buf);
152 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
153 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
154 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
155 0 : }
156 0 : }
157 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
158 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
159 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
160 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
161 :
162 0 : let offset_array_len =
163 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
164 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
165 0 : 0
166 : } else {
167 0 : size_of::<u16>() * xlrec.ntuples as usize
168 : };
169 0 : assert_eq!(offset_array_len, buf.remaining());
170 :
171 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
172 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
173 0 : }
174 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
175 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
176 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
177 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
178 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
179 0 : }
180 0 : }
181 : } else {
182 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
183 : }
184 : }
185 : 15 => {
186 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
187 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
188 145286 :
189 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
190 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
191 145276 : assert_eq!(0, buf.remaining());
192 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
193 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
194 145272 : }
195 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
196 0 : let xlrec = v15::XlHeapDelete::decode(buf);
197 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
198 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
199 0 : }
200 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
201 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
202 : {
203 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
204 8 : // the size of tuple data is inferred from the size of the record.
205 8 : // we can't validate the remaining number of bytes without parsing
206 8 : // the tuple data.
207 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
208 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
209 8 : }
210 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
211 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
212 0 : // non-HOT update where the new tuple goes to different page than
213 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
214 0 : // set.
215 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
216 8 : }
217 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
218 0 : let xlrec = v15::XlHeapLock::decode(buf);
219 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
220 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
221 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
222 0 : }
223 2 : }
224 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
225 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
226 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
227 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
228 :
229 42 : let offset_array_len =
230 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
231 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
232 2 : 0
233 : } else {
234 40 : size_of::<u16>() * xlrec.ntuples as usize
235 : };
236 42 : assert_eq!(offset_array_len, buf.remaining());
237 :
238 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
239 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
240 34 : }
241 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
242 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
243 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
244 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
245 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
246 0 : }
247 146 : }
248 : } else {
249 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
250 : }
251 : }
252 : 16 => {
253 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
254 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
255 0 :
256 0 : if info == pg_constants::XLOG_HEAP_INSERT {
257 0 : let xlrec = v16::XlHeapInsert::decode(buf);
258 0 : assert_eq!(0, buf.remaining());
259 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
260 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
261 0 : }
262 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
263 0 : let xlrec = v16::XlHeapDelete::decode(buf);
264 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
265 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
266 0 : }
267 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
268 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
269 : {
270 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
271 0 : // the size of tuple data is inferred from the size of the record.
272 0 : // we can't validate the remaining number of bytes without parsing
273 0 : // the tuple data.
274 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
275 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
276 0 : }
277 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
278 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
279 0 : // non-HOT update where the new tuple goes to different page than
280 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
281 0 : // set.
282 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
283 0 : }
284 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
285 0 : let xlrec = v16::XlHeapLock::decode(buf);
286 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
287 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
288 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
289 0 : }
290 0 : }
291 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
292 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
293 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
294 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
295 :
296 0 : let offset_array_len =
297 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
298 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
299 0 : 0
300 : } else {
301 0 : size_of::<u16>() * xlrec.ntuples as usize
302 : };
303 0 : assert_eq!(offset_array_len, buf.remaining());
304 :
305 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
306 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
307 0 : }
308 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
309 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
310 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
311 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
312 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
313 0 : }
314 0 : }
315 : } else {
316 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
317 : }
318 : }
319 : 17 => {
320 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
321 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
322 0 :
323 0 : if info == pg_constants::XLOG_HEAP_INSERT {
324 0 : let xlrec = v17::XlHeapInsert::decode(buf);
325 0 : assert_eq!(0, buf.remaining());
326 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
327 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
328 0 : }
329 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
330 0 : let xlrec = v17::XlHeapDelete::decode(buf);
331 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
332 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
333 0 : }
334 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
335 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
336 : {
337 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
338 0 : // the size of tuple data is inferred from the size of the record.
339 0 : // we can't validate the remaining number of bytes without parsing
340 0 : // the tuple data.
341 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
342 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
343 0 : }
344 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
345 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
346 0 : // non-HOT update where the new tuple goes to different page than
347 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
348 0 : // set.
349 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
350 0 : }
351 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
352 0 : let xlrec = v17::XlHeapLock::decode(buf);
353 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
354 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
355 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
356 0 : }
357 0 : }
358 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
359 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
360 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
361 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
362 :
363 0 : let offset_array_len =
364 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
365 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
366 0 : 0
367 : } else {
368 0 : size_of::<u16>() * xlrec.ntuples as usize
369 : };
370 0 : assert_eq!(offset_array_len, buf.remaining());
371 :
372 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
373 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
374 0 : }
375 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
376 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
377 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
378 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
379 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
380 0 : }
381 0 : }
382 : } else {
383 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
384 : }
385 : }
386 0 : _ => {}
387 : }
388 :
389 145474 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
390 12 : let vm_rel = RelTag {
391 12 : forknum: VISIBILITYMAP_FORKNUM,
392 12 : spcnode: decoded.blocks[0].rnode_spcnode,
393 12 : dbnode: decoded.blocks[0].rnode_dbnode,
394 12 : relnode: decoded.blocks[0].rnode_relnode,
395 12 : };
396 12 :
397 12 : Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
398 12 : ClearVmBits {
399 12 : new_heap_blkno,
400 12 : old_heap_blkno,
401 12 : vm_rel,
402 12 : flags,
403 12 : },
404 12 : ))))
405 : } else {
406 145462 : Ok(None)
407 : }
408 145474 : }
409 :
410 0 : fn decode_neonmgr_record(
411 0 : buf: &mut Bytes,
412 0 : decoded: &DecodedWALRecord,
413 0 : pg_version: u32,
414 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
415 0 : // Handle VM bit updates that are implicitly part of heap records.
416 0 :
417 0 : // First, look at the record to determine which VM bits need
418 0 : // to be cleared. If either of these variables is set, we
419 0 : // need to clear the corresponding bits in the visibility map.
420 0 : let mut new_heap_blkno: Option<u32> = None;
421 0 : let mut old_heap_blkno: Option<u32> = None;
422 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
423 0 :
424 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
425 :
426 0 : match pg_version {
427 : 16 | 17 => {
428 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
429 0 :
430 0 : match info {
431 : pg_constants::XLOG_NEON_HEAP_INSERT => {
432 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
433 0 : assert_eq!(0, buf.remaining());
434 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
435 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
436 0 : }
437 : }
438 : pg_constants::XLOG_NEON_HEAP_DELETE => {
439 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
440 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
441 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
442 0 : }
443 : }
444 : pg_constants::XLOG_NEON_HEAP_UPDATE
445 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
446 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
447 0 : // the size of tuple data is inferred from the size of the record.
448 0 : // we can't validate the remaining number of bytes without parsing
449 0 : // the tuple data.
450 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
451 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
452 0 : }
453 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
454 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
455 0 : // non-HOT update where the new tuple goes to different page than
456 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
457 0 : // set.
458 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
459 0 : }
460 : }
461 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
462 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
463 :
464 0 : let offset_array_len =
465 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
466 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
467 0 : 0
468 : } else {
469 0 : size_of::<u16>() * xlrec.ntuples as usize
470 : };
471 0 : assert_eq!(offset_array_len, buf.remaining());
472 :
473 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
474 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
475 0 : }
476 : }
477 : pg_constants::XLOG_NEON_HEAP_LOCK => {
478 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
479 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
480 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
481 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
482 0 : }
483 : }
484 0 : info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
485 : }
486 : }
487 0 : _ => anyhow::bail!(
488 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
489 0 : pg_version
490 0 : ),
491 : }
492 :
493 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
494 0 : let vm_rel = RelTag {
495 0 : forknum: VISIBILITYMAP_FORKNUM,
496 0 : spcnode: decoded.blocks[0].rnode_spcnode,
497 0 : dbnode: decoded.blocks[0].rnode_dbnode,
498 0 : relnode: decoded.blocks[0].rnode_relnode,
499 0 : };
500 0 :
501 0 : Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
502 0 : ClearVmBits {
503 0 : new_heap_blkno,
504 0 : old_heap_blkno,
505 0 : vm_rel,
506 0 : flags,
507 0 : },
508 0 : ))))
509 : } else {
510 0 : Ok(None)
511 : }
512 0 : }
513 :
514 16 : fn decode_smgr_record(
515 16 : buf: &mut Bytes,
516 16 : decoded: &DecodedWALRecord,
517 16 : ) -> anyhow::Result<Option<MetadataRecord>> {
518 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
519 16 : if info == pg_constants::XLOG_SMGR_CREATE {
520 16 : let create = XlSmgrCreate::decode(buf);
521 16 : let rel = RelTag {
522 16 : spcnode: create.rnode.spcnode,
523 16 : dbnode: create.rnode.dbnode,
524 16 : relnode: create.rnode.relnode,
525 16 : forknum: create.forknum,
526 16 : };
527 16 :
528 16 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
529 16 : rel,
530 16 : }))));
531 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
532 0 : let truncate = XlSmgrTruncate::decode(buf);
533 0 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
534 0 : }
535 0 :
536 0 : Ok(None)
537 16 : }
538 :
539 0 : fn decode_dbase_record(
540 0 : buf: &mut Bytes,
541 0 : decoded: &DecodedWALRecord,
542 0 : pg_version: u32,
543 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
544 0 : // TODO: Refactor this to avoid the duplication between postgres versions.
545 0 :
546 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
547 0 : tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
548 :
549 0 : if pg_version == 14 {
550 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
551 0 : let createdb = XlCreateDatabase::decode(buf);
552 0 : tracing::debug!("XLOG_DBASE_CREATE v14");
553 :
554 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
555 0 : db_id: createdb.db_id,
556 0 : tablespace_id: createdb.tablespace_id,
557 0 : src_db_id: createdb.src_db_id,
558 0 : src_tablespace_id: createdb.src_tablespace_id,
559 0 : }));
560 0 :
561 0 : return Ok(Some(record));
562 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
563 0 : let dropdb = XlDropDatabase::decode(buf);
564 0 :
565 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
566 0 : db_id: dropdb.db_id,
567 0 : tablespace_ids: dropdb.tablespace_ids,
568 0 : }));
569 0 :
570 0 : return Ok(Some(record));
571 0 : }
572 0 : } else if pg_version == 15 {
573 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
574 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
575 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
576 : // The XLOG record was renamed between v14 and v15,
577 : // but the record format is the same.
578 : // So we can reuse XlCreateDatabase here.
579 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
580 :
581 0 : let createdb = XlCreateDatabase::decode(buf);
582 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
583 0 : db_id: createdb.db_id,
584 0 : tablespace_id: createdb.tablespace_id,
585 0 : src_db_id: createdb.src_db_id,
586 0 : src_tablespace_id: createdb.src_tablespace_id,
587 0 : }));
588 0 :
589 0 : return Ok(Some(record));
590 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
591 0 : let dropdb = XlDropDatabase::decode(buf);
592 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
593 0 : db_id: dropdb.db_id,
594 0 : tablespace_ids: dropdb.tablespace_ids,
595 0 : }));
596 0 :
597 0 : return Ok(Some(record));
598 0 : }
599 0 : } else if pg_version == 16 {
600 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
601 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
602 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
603 : // The XLOG record was renamed between v14 and v15,
604 : // but the record format is the same.
605 : // So we can reuse XlCreateDatabase here.
606 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
607 :
608 0 : let createdb = XlCreateDatabase::decode(buf);
609 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
610 0 : db_id: createdb.db_id,
611 0 : tablespace_id: createdb.tablespace_id,
612 0 : src_db_id: createdb.src_db_id,
613 0 : src_tablespace_id: createdb.src_tablespace_id,
614 0 : }));
615 0 :
616 0 : return Ok(Some(record));
617 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
618 0 : let dropdb = XlDropDatabase::decode(buf);
619 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
620 0 : db_id: dropdb.db_id,
621 0 : tablespace_ids: dropdb.tablespace_ids,
622 0 : }));
623 0 :
624 0 : return Ok(Some(record));
625 0 : }
626 0 : } else if pg_version == 17 {
627 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
628 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
629 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
630 : // The XLOG record was renamed between v14 and v15,
631 : // but the record format is the same.
632 : // So we can reuse XlCreateDatabase here.
633 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
634 :
635 0 : let createdb = XlCreateDatabase::decode(buf);
636 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
637 0 : db_id: createdb.db_id,
638 0 : tablespace_id: createdb.tablespace_id,
639 0 : src_db_id: createdb.src_db_id,
640 0 : src_tablespace_id: createdb.src_tablespace_id,
641 0 : }));
642 0 :
643 0 : return Ok(Some(record));
644 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
645 0 : let dropdb = XlDropDatabase::decode(buf);
646 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
647 0 : db_id: dropdb.db_id,
648 0 : tablespace_ids: dropdb.tablespace_ids,
649 0 : }));
650 0 :
651 0 : return Ok(Some(record));
652 0 : }
653 0 : }
654 :
655 0 : Ok(None)
656 0 : }
657 :
658 0 : fn decode_clog_record(
659 0 : buf: &mut Bytes,
660 0 : decoded: &DecodedWALRecord,
661 0 : pg_version: u32,
662 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
663 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
664 0 :
665 0 : if info == pg_constants::CLOG_ZEROPAGE {
666 0 : let pageno = if pg_version < 17 {
667 0 : buf.get_u32_le()
668 : } else {
669 0 : buf.get_u64_le() as u32
670 : };
671 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
672 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
673 0 :
674 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
675 0 : ClogZeroPage { segno, rpageno },
676 0 : ))))
677 : } else {
678 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
679 0 : let xlrec = XlClogTruncate::decode(buf, pg_version);
680 0 :
681 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
682 0 : ClogTruncate {
683 0 : pageno: xlrec.pageno,
684 0 : oldest_xid: xlrec.oldest_xid,
685 0 : oldest_xid_db: xlrec.oldest_xid_db,
686 0 : },
687 0 : ))))
688 : }
689 0 : }
690 :
691 24 : fn decode_xact_record(
692 24 : buf: &mut Bytes,
693 24 : decoded: &DecodedWALRecord,
694 24 : lsn: Lsn,
695 24 : ) -> anyhow::Result<Option<MetadataRecord>> {
696 24 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
697 24 : let origin_id = decoded.origin_id;
698 24 : let xl_xid = decoded.xl_xid;
699 24 :
700 24 : if info == pg_constants::XLOG_XACT_COMMIT {
701 8 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
702 8 : return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
703 8 : parsed,
704 8 : origin_id,
705 8 : xl_xid,
706 8 : lsn,
707 8 : }))));
708 16 : } else if info == pg_constants::XLOG_XACT_ABORT {
709 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
710 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
711 0 : parsed,
712 0 : origin_id,
713 0 : xl_xid,
714 0 : lsn,
715 0 : }))));
716 16 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
717 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
718 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
719 0 : XactCommon {
720 0 : parsed,
721 0 : origin_id,
722 0 : xl_xid,
723 0 : lsn,
724 0 : },
725 0 : ))));
726 16 : } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
727 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
728 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
729 0 : XactCommon {
730 0 : parsed,
731 0 : origin_id,
732 0 : xl_xid,
733 0 : lsn,
734 0 : },
735 0 : ))));
736 16 : } else if info == pg_constants::XLOG_XACT_PREPARE {
737 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
738 0 : XactPrepare {
739 0 : xl_xid: decoded.xl_xid,
740 0 : data: Bytes::copy_from_slice(&buf[..]),
741 0 : },
742 0 : ))));
743 16 : }
744 16 :
745 16 : Ok(None)
746 24 : }
747 :
748 0 : fn decode_multixact_record(
749 0 : buf: &mut Bytes,
750 0 : decoded: &DecodedWALRecord,
751 0 : pg_version: u32,
752 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
753 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
754 0 :
755 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
756 0 : || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
757 : {
758 0 : let pageno = if pg_version < 17 {
759 0 : buf.get_u32_le()
760 : } else {
761 0 : buf.get_u64_le() as u32
762 : };
763 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
764 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
765 :
766 0 : let slru_kind = match info {
767 0 : pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
768 0 : pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
769 0 : _ => unreachable!(),
770 : };
771 :
772 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
773 0 : MultiXactZeroPage {
774 0 : slru_kind,
775 0 : segno,
776 0 : rpageno,
777 0 : },
778 0 : ))));
779 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
780 0 : let xlrec = XlMultiXactCreate::decode(buf);
781 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
782 0 : xlrec,
783 0 : ))));
784 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
785 0 : let xlrec = XlMultiXactTruncate::decode(buf);
786 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
787 0 : xlrec,
788 0 : ))));
789 0 : }
790 0 :
791 0 : Ok(None)
792 0 : }
793 :
794 0 : fn decode_relmap_record(
795 0 : buf: &mut Bytes,
796 0 : decoded: &DecodedWALRecord,
797 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
798 0 : let update = XlRelmapUpdate::decode(buf);
799 0 :
800 0 : let mut buf = decoded.record.clone();
801 0 : buf.advance(decoded.main_data_offset);
802 0 : // skip xl_relmap_update
803 0 : buf.advance(12);
804 0 :
805 0 : Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
806 0 : RelmapUpdate {
807 0 : update,
808 0 : buf: Bytes::copy_from_slice(&buf[..]),
809 0 : },
810 0 : ))))
811 0 : }
812 :
813 30 : fn decode_xlog_record(
814 30 : buf: &mut Bytes,
815 30 : decoded: &DecodedWALRecord,
816 30 : lsn: Lsn,
817 30 : ) -> anyhow::Result<Option<MetadataRecord>> {
818 30 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
819 30 : Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
820 30 : info,
821 30 : lsn,
822 30 : buf: buf.clone(),
823 30 : }))))
824 30 : }
825 :
826 0 : fn decode_logical_message_record(
827 0 : buf: &mut Bytes,
828 0 : decoded: &DecodedWALRecord,
829 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
830 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
831 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
832 0 : let xlrec = XlLogicalMessage::decode(buf);
833 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
834 :
835 : #[cfg(feature = "testing")]
836 0 : if prefix == "neon-test" {
837 0 : return Ok(Some(MetadataRecord::LogicalMessage(
838 0 : LogicalMessageRecord::Failpoint,
839 0 : )));
840 0 : }
841 :
842 0 : if let Some(path) = prefix.strip_prefix("neon-file:") {
843 0 : let buf_size = xlrec.prefix_size + xlrec.message_size;
844 0 : let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
845 0 : return Ok(Some(MetadataRecord::LogicalMessage(
846 0 : LogicalMessageRecord::Put(PutLogicalMessage {
847 0 : path: path.to_string(),
848 0 : buf,
849 0 : }),
850 0 : )));
851 0 : }
852 0 : }
853 :
854 0 : Ok(None)
855 0 : }
856 :
857 16 : fn decode_standby_record(
858 16 : buf: &mut Bytes,
859 16 : decoded: &DecodedWALRecord,
860 16 : ) -> anyhow::Result<Option<MetadataRecord>> {
861 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
862 16 : if info == pg_constants::XLOG_RUNNING_XACTS {
863 0 : let xlrec = XlRunningXacts::decode(buf);
864 0 : return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
865 0 : StandbyRunningXacts {
866 0 : oldest_running_xid: xlrec.oldest_running_xid,
867 0 : },
868 0 : ))));
869 16 : }
870 16 :
871 16 : Ok(None)
872 16 : }
873 :
874 0 : fn decode_replorigin_record(
875 0 : buf: &mut Bytes,
876 0 : decoded: &DecodedWALRecord,
877 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
878 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
879 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
880 0 : let xlrec = XlReploriginSet::decode(buf);
881 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
882 0 : xlrec,
883 0 : ))));
884 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
885 0 : let xlrec = XlReploriginDrop::decode(buf);
886 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
887 0 : xlrec,
888 0 : ))));
889 0 : }
890 0 :
891 0 : Ok(None)
892 0 : }
893 : }
|