Line data Source code
1 : //! This module contains logic for decoding and interpreting
2 : //! raw bytes which represent a raw Postgres WAL record.
3 :
4 : use crate::models::*;
5 : use crate::serialized_batch::SerializedValueBatch;
6 : use bytes::{Buf, Bytes};
7 : use pageserver_api::reltag::{RelTag, SlruKind};
8 : use pageserver_api::shard::ShardIdentity;
9 : use postgres_ffi::pg_constants;
10 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
11 : use postgres_ffi::walrecord::*;
12 : use utils::lsn::Lsn;
13 :
14 : impl InterpretedWalRecord {
15 : /// Decode and interpreted raw bytes which represent one Postgres WAL record.
16 : /// Data blocks which do not match the provided shard identity are filtered out.
17 : /// Shard 0 is a special case since it tracks all relation sizes. We only give it
18 : /// the keys that are being written as that is enough for updating relation sizes.
19 145852 : pub fn from_bytes_filtered(
20 145852 : buf: Bytes,
21 145852 : shard: &ShardIdentity,
22 145852 : next_record_lsn: Lsn,
23 145852 : pg_version: u32,
24 145852 : ) -> anyhow::Result<InterpretedWalRecord> {
25 145852 : let mut decoded = DecodedWALRecord::default();
26 145852 : decode_wal_record(buf, &mut decoded, pg_version)?;
27 145852 : let xid = decoded.xl_xid;
28 :
29 145852 : let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
30 0 : FlushUncommittedRecords::Yes
31 : } else {
32 145852 : FlushUncommittedRecords::No
33 : };
34 :
35 145852 : let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?;
36 145852 : let batch = SerializedValueBatch::from_decoded_filtered(
37 145852 : decoded,
38 145852 : shard,
39 145852 : next_record_lsn,
40 145852 : pg_version,
41 145852 : )?;
42 :
43 145852 : Ok(InterpretedWalRecord {
44 145852 : metadata_record,
45 145852 : batch,
46 145852 : next_record_lsn,
47 145852 : flush_uncommitted,
48 145852 : xid,
49 145852 : })
50 145852 : }
51 : }
52 :
53 : impl MetadataRecord {
54 145852 : fn from_decoded(
55 145852 : decoded: &DecodedWALRecord,
56 145852 : next_record_lsn: Lsn,
57 145852 : pg_version: u32,
58 145852 : ) -> anyhow::Result<Option<MetadataRecord>> {
59 145852 : // Note: this doesn't actually copy the bytes since
60 145852 : // the [`Bytes`] type implements it via a level of indirection.
61 145852 : let mut buf = decoded.record.clone();
62 145852 : buf.advance(decoded.main_data_offset);
63 145852 :
64 145852 : match decoded.xl_rmid {
65 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
66 145474 : Self::decode_heapam_record(&mut buf, decoded, pg_version)
67 : }
68 0 : pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version),
69 : // Handle other special record types
70 16 : pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded),
71 0 : pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version),
72 : pg_constants::RM_TBLSPC_ID => {
73 0 : tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
74 0 : Ok(None)
75 : }
76 0 : pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
77 : pg_constants::RM_XACT_ID => {
78 24 : Self::decode_xact_record(&mut buf, decoded, next_record_lsn)
79 : }
80 : pg_constants::RM_MULTIXACT_ID => {
81 0 : Self::decode_multixact_record(&mut buf, decoded, pg_version)
82 : }
83 0 : pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded),
84 : // This is an odd duck. It needs to go to all shards.
85 : // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
86 : // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
87 : // the pageserver and decode it there.
88 : //
89 : // Alternatively, one can make the checkpoint part of the subscription protocol
90 : // to the pageserver. This should work fine, but can be done at a later point.
91 : pg_constants::RM_XLOG_ID => {
92 30 : Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)
93 : }
94 : pg_constants::RM_LOGICALMSG_ID => {
95 0 : Self::decode_logical_message_record(&mut buf, decoded)
96 : }
97 16 : pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded),
98 0 : pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded),
99 292 : _unexpected => {
100 292 : // TODO: consider failing here instead of blindly doing something without
101 292 : // understanding the protocol
102 292 : Ok(None)
103 : }
104 : }
105 145852 : }
106 :
107 145474 : fn decode_heapam_record(
108 145474 : buf: &mut Bytes,
109 145474 : decoded: &DecodedWALRecord,
110 145474 : pg_version: u32,
111 145474 : ) -> anyhow::Result<Option<MetadataRecord>> {
112 145474 : // Handle VM bit updates that are implicitly part of heap records.
113 145474 :
114 145474 : // First, look at the record to determine which VM bits need
115 145474 : // to be cleared. If either of these variables is set, we
116 145474 : // need to clear the corresponding bits in the visibility map.
117 145474 : let mut new_heap_blkno: Option<u32> = None;
118 145474 : let mut old_heap_blkno: Option<u32> = None;
119 145474 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
120 145474 :
121 145474 : match pg_version {
122 : 14 => {
123 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
124 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
125 0 :
126 0 : if info == pg_constants::XLOG_HEAP_INSERT {
127 0 : let xlrec = v14::XlHeapInsert::decode(buf);
128 0 : assert_eq!(0, buf.remaining());
129 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
130 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
131 0 : }
132 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
133 0 : let xlrec = v14::XlHeapDelete::decode(buf);
134 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
135 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
136 0 : }
137 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
138 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
139 : {
140 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
141 0 : // the size of tuple data is inferred from the size of the record.
142 0 : // we can't validate the remaining number of bytes without parsing
143 0 : // the tuple data.
144 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
145 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
146 0 : }
147 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
148 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
149 0 : // non-HOT update where the new tuple goes to different page than
150 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
151 0 : // set.
152 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
153 0 : }
154 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
155 0 : let xlrec = v14::XlHeapLock::decode(buf);
156 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
157 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
158 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
159 0 : }
160 0 : }
161 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
162 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
163 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
164 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
165 :
166 0 : let offset_array_len =
167 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
168 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
169 0 : 0
170 : } else {
171 0 : size_of::<u16>() * xlrec.ntuples as usize
172 : };
173 0 : assert_eq!(offset_array_len, buf.remaining());
174 :
175 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
176 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
177 0 : }
178 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
179 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
180 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
181 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
182 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
183 0 : }
184 0 : }
185 : } else {
186 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
187 : }
188 : }
189 : 15 => {
190 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
191 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
192 145286 :
193 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
194 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
195 145276 : assert_eq!(0, buf.remaining());
196 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
197 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
198 145272 : }
199 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
200 0 : let xlrec = v15::XlHeapDelete::decode(buf);
201 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
202 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
203 0 : }
204 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
205 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
206 : {
207 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
208 8 : // the size of tuple data is inferred from the size of the record.
209 8 : // we can't validate the remaining number of bytes without parsing
210 8 : // the tuple data.
211 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
212 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
213 8 : }
214 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
215 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
216 0 : // non-HOT update where the new tuple goes to different page than
217 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
218 0 : // set.
219 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
220 8 : }
221 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
222 0 : let xlrec = v15::XlHeapLock::decode(buf);
223 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
224 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
225 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
226 0 : }
227 2 : }
228 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
229 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
230 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
231 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
232 :
233 42 : let offset_array_len =
234 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
235 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
236 2 : 0
237 : } else {
238 40 : size_of::<u16>() * xlrec.ntuples as usize
239 : };
240 42 : assert_eq!(offset_array_len, buf.remaining());
241 :
242 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
243 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
244 34 : }
245 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
246 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
247 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
248 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
249 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
250 0 : }
251 146 : }
252 : } else {
253 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
254 : }
255 : }
256 : 16 => {
257 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
258 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
259 0 :
260 0 : if info == pg_constants::XLOG_HEAP_INSERT {
261 0 : let xlrec = v16::XlHeapInsert::decode(buf);
262 0 : assert_eq!(0, buf.remaining());
263 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
264 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
265 0 : }
266 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
267 0 : let xlrec = v16::XlHeapDelete::decode(buf);
268 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
269 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
270 0 : }
271 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
272 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
273 : {
274 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
275 0 : // the size of tuple data is inferred from the size of the record.
276 0 : // we can't validate the remaining number of bytes without parsing
277 0 : // the tuple data.
278 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
279 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
280 0 : }
281 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
282 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
283 0 : // non-HOT update where the new tuple goes to different page than
284 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
285 0 : // set.
286 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
287 0 : }
288 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
289 0 : let xlrec = v16::XlHeapLock::decode(buf);
290 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
291 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
292 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
293 0 : }
294 0 : }
295 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
296 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
297 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
298 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
299 :
300 0 : let offset_array_len =
301 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
302 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
303 0 : 0
304 : } else {
305 0 : size_of::<u16>() * xlrec.ntuples as usize
306 : };
307 0 : assert_eq!(offset_array_len, buf.remaining());
308 :
309 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
310 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
311 0 : }
312 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
313 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
314 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
315 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
316 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
317 0 : }
318 0 : }
319 : } else {
320 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
321 : }
322 : }
323 : 17 => {
324 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
325 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
326 0 :
327 0 : if info == pg_constants::XLOG_HEAP_INSERT {
328 0 : let xlrec = v17::XlHeapInsert::decode(buf);
329 0 : assert_eq!(0, buf.remaining());
330 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
331 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
332 0 : }
333 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
334 0 : let xlrec = v17::XlHeapDelete::decode(buf);
335 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
336 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
337 0 : }
338 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
339 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
340 : {
341 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
342 0 : // the size of tuple data is inferred from the size of the record.
343 0 : // we can't validate the remaining number of bytes without parsing
344 0 : // the tuple data.
345 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
346 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
347 0 : }
348 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
349 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
350 0 : // non-HOT update where the new tuple goes to different page than
351 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
352 0 : // set.
353 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
354 0 : }
355 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
356 0 : let xlrec = v17::XlHeapLock::decode(buf);
357 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
358 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
359 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
360 0 : }
361 0 : }
362 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
363 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
364 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
365 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
366 :
367 0 : let offset_array_len =
368 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
369 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
370 0 : 0
371 : } else {
372 0 : size_of::<u16>() * xlrec.ntuples as usize
373 : };
374 0 : assert_eq!(offset_array_len, buf.remaining());
375 :
376 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
377 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
378 0 : }
379 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
380 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
381 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
382 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
383 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
384 0 : }
385 0 : }
386 : } else {
387 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
388 : }
389 : }
390 0 : _ => {}
391 : }
392 :
393 145474 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
394 12 : let vm_rel = RelTag {
395 12 : forknum: VISIBILITYMAP_FORKNUM,
396 12 : spcnode: decoded.blocks[0].rnode_spcnode,
397 12 : dbnode: decoded.blocks[0].rnode_dbnode,
398 12 : relnode: decoded.blocks[0].rnode_relnode,
399 12 : };
400 12 :
401 12 : Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
402 12 : ClearVmBits {
403 12 : new_heap_blkno,
404 12 : old_heap_blkno,
405 12 : vm_rel,
406 12 : flags,
407 12 : },
408 12 : ))))
409 : } else {
410 145462 : Ok(None)
411 : }
412 145474 : }
413 :
414 0 : fn decode_neonmgr_record(
415 0 : buf: &mut Bytes,
416 0 : decoded: &DecodedWALRecord,
417 0 : pg_version: u32,
418 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
419 0 : // Handle VM bit updates that are implicitly part of heap records.
420 0 :
421 0 : // First, look at the record to determine which VM bits need
422 0 : // to be cleared. If either of these variables is set, we
423 0 : // need to clear the corresponding bits in the visibility map.
424 0 : let mut new_heap_blkno: Option<u32> = None;
425 0 : let mut old_heap_blkno: Option<u32> = None;
426 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
427 0 :
428 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
429 :
430 0 : match pg_version {
431 : 16 | 17 => {
432 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
433 0 :
434 0 : match info {
435 : pg_constants::XLOG_NEON_HEAP_INSERT => {
436 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
437 0 : assert_eq!(0, buf.remaining());
438 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
439 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
440 0 : }
441 : }
442 : pg_constants::XLOG_NEON_HEAP_DELETE => {
443 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
444 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
445 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
446 0 : }
447 : }
448 : pg_constants::XLOG_NEON_HEAP_UPDATE
449 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
450 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
451 0 : // the size of tuple data is inferred from the size of the record.
452 0 : // we can't validate the remaining number of bytes without parsing
453 0 : // the tuple data.
454 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
455 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
456 0 : }
457 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
458 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
459 0 : // non-HOT update where the new tuple goes to different page than
460 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
461 0 : // set.
462 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
463 0 : }
464 : }
465 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
466 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
467 :
468 0 : let offset_array_len =
469 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
470 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
471 0 : 0
472 : } else {
473 0 : size_of::<u16>() * xlrec.ntuples as usize
474 : };
475 0 : assert_eq!(offset_array_len, buf.remaining());
476 :
477 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
478 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
479 0 : }
480 : }
481 : pg_constants::XLOG_NEON_HEAP_LOCK => {
482 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
483 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
484 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
485 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
486 0 : }
487 : }
488 0 : info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
489 : }
490 : }
491 0 : _ => anyhow::bail!(
492 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
493 0 : pg_version
494 0 : ),
495 : }
496 :
497 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
498 0 : let vm_rel = RelTag {
499 0 : forknum: VISIBILITYMAP_FORKNUM,
500 0 : spcnode: decoded.blocks[0].rnode_spcnode,
501 0 : dbnode: decoded.blocks[0].rnode_dbnode,
502 0 : relnode: decoded.blocks[0].rnode_relnode,
503 0 : };
504 0 :
505 0 : Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
506 0 : ClearVmBits {
507 0 : new_heap_blkno,
508 0 : old_heap_blkno,
509 0 : vm_rel,
510 0 : flags,
511 0 : },
512 0 : ))))
513 : } else {
514 0 : Ok(None)
515 : }
516 0 : }
517 :
518 16 : fn decode_smgr_record(
519 16 : buf: &mut Bytes,
520 16 : decoded: &DecodedWALRecord,
521 16 : ) -> anyhow::Result<Option<MetadataRecord>> {
522 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
523 16 : if info == pg_constants::XLOG_SMGR_CREATE {
524 16 : let create = XlSmgrCreate::decode(buf);
525 16 : let rel = RelTag {
526 16 : spcnode: create.rnode.spcnode,
527 16 : dbnode: create.rnode.dbnode,
528 16 : relnode: create.rnode.relnode,
529 16 : forknum: create.forknum,
530 16 : };
531 16 :
532 16 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
533 16 : rel,
534 16 : }))));
535 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
536 0 : let truncate = XlSmgrTruncate::decode(buf);
537 0 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
538 0 : }
539 0 :
540 0 : Ok(None)
541 16 : }
542 :
543 0 : fn decode_dbase_record(
544 0 : buf: &mut Bytes,
545 0 : decoded: &DecodedWALRecord,
546 0 : pg_version: u32,
547 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
548 0 : // TODO: Refactor this to avoid the duplication between postgres versions.
549 0 :
550 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
551 0 : tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
552 :
553 0 : if pg_version == 14 {
554 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
555 0 : let createdb = XlCreateDatabase::decode(buf);
556 0 : tracing::debug!("XLOG_DBASE_CREATE v14");
557 :
558 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
559 0 : db_id: createdb.db_id,
560 0 : tablespace_id: createdb.tablespace_id,
561 0 : src_db_id: createdb.src_db_id,
562 0 : src_tablespace_id: createdb.src_tablespace_id,
563 0 : }));
564 0 :
565 0 : return Ok(Some(record));
566 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
567 0 : let dropdb = XlDropDatabase::decode(buf);
568 0 :
569 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
570 0 : db_id: dropdb.db_id,
571 0 : tablespace_ids: dropdb.tablespace_ids,
572 0 : }));
573 0 :
574 0 : return Ok(Some(record));
575 0 : }
576 0 : } else if pg_version == 15 {
577 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
578 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
579 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
580 : // The XLOG record was renamed between v14 and v15,
581 : // but the record format is the same.
582 : // So we can reuse XlCreateDatabase here.
583 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
584 :
585 0 : let createdb = XlCreateDatabase::decode(buf);
586 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
587 0 : db_id: createdb.db_id,
588 0 : tablespace_id: createdb.tablespace_id,
589 0 : src_db_id: createdb.src_db_id,
590 0 : src_tablespace_id: createdb.src_tablespace_id,
591 0 : }));
592 0 :
593 0 : return Ok(Some(record));
594 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
595 0 : let dropdb = XlDropDatabase::decode(buf);
596 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
597 0 : db_id: dropdb.db_id,
598 0 : tablespace_ids: dropdb.tablespace_ids,
599 0 : }));
600 0 :
601 0 : return Ok(Some(record));
602 0 : }
603 0 : } else if pg_version == 16 {
604 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
605 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
606 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
607 : // The XLOG record was renamed between v14 and v15,
608 : // but the record format is the same.
609 : // So we can reuse XlCreateDatabase here.
610 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
611 :
612 0 : let createdb = XlCreateDatabase::decode(buf);
613 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
614 0 : db_id: createdb.db_id,
615 0 : tablespace_id: createdb.tablespace_id,
616 0 : src_db_id: createdb.src_db_id,
617 0 : src_tablespace_id: createdb.src_tablespace_id,
618 0 : }));
619 0 :
620 0 : return Ok(Some(record));
621 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
622 0 : let dropdb = XlDropDatabase::decode(buf);
623 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
624 0 : db_id: dropdb.db_id,
625 0 : tablespace_ids: dropdb.tablespace_ids,
626 0 : }));
627 0 :
628 0 : return Ok(Some(record));
629 0 : }
630 0 : } else if pg_version == 17 {
631 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
632 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
633 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
634 : // The XLOG record was renamed between v14 and v15,
635 : // but the record format is the same.
636 : // So we can reuse XlCreateDatabase here.
637 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
638 :
639 0 : let createdb = XlCreateDatabase::decode(buf);
640 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
641 0 : db_id: createdb.db_id,
642 0 : tablespace_id: createdb.tablespace_id,
643 0 : src_db_id: createdb.src_db_id,
644 0 : src_tablespace_id: createdb.src_tablespace_id,
645 0 : }));
646 0 :
647 0 : return Ok(Some(record));
648 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
649 0 : let dropdb = XlDropDatabase::decode(buf);
650 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
651 0 : db_id: dropdb.db_id,
652 0 : tablespace_ids: dropdb.tablespace_ids,
653 0 : }));
654 0 :
655 0 : return Ok(Some(record));
656 0 : }
657 0 : }
658 :
659 0 : Ok(None)
660 0 : }
661 :
662 0 : fn decode_clog_record(
663 0 : buf: &mut Bytes,
664 0 : decoded: &DecodedWALRecord,
665 0 : pg_version: u32,
666 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
667 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
668 0 :
669 0 : if info == pg_constants::CLOG_ZEROPAGE {
670 0 : let pageno = if pg_version < 17 {
671 0 : buf.get_u32_le()
672 : } else {
673 0 : buf.get_u64_le() as u32
674 : };
675 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
676 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
677 0 :
678 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
679 0 : ClogZeroPage { segno, rpageno },
680 0 : ))))
681 : } else {
682 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
683 0 : let xlrec = XlClogTruncate::decode(buf, pg_version);
684 0 :
685 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
686 0 : ClogTruncate {
687 0 : pageno: xlrec.pageno,
688 0 : oldest_xid: xlrec.oldest_xid,
689 0 : oldest_xid_db: xlrec.oldest_xid_db,
690 0 : },
691 0 : ))))
692 : }
693 0 : }
694 :
695 24 : fn decode_xact_record(
696 24 : buf: &mut Bytes,
697 24 : decoded: &DecodedWALRecord,
698 24 : lsn: Lsn,
699 24 : ) -> anyhow::Result<Option<MetadataRecord>> {
700 24 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
701 24 : let origin_id = decoded.origin_id;
702 24 : let xl_xid = decoded.xl_xid;
703 24 :
704 24 : if info == pg_constants::XLOG_XACT_COMMIT {
705 8 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
706 8 : return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
707 8 : parsed,
708 8 : origin_id,
709 8 : xl_xid,
710 8 : lsn,
711 8 : }))));
712 16 : } else if info == pg_constants::XLOG_XACT_ABORT {
713 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
714 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
715 0 : parsed,
716 0 : origin_id,
717 0 : xl_xid,
718 0 : lsn,
719 0 : }))));
720 16 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
721 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
722 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
723 0 : XactCommon {
724 0 : parsed,
725 0 : origin_id,
726 0 : xl_xid,
727 0 : lsn,
728 0 : },
729 0 : ))));
730 16 : } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
731 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
732 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
733 0 : XactCommon {
734 0 : parsed,
735 0 : origin_id,
736 0 : xl_xid,
737 0 : lsn,
738 0 : },
739 0 : ))));
740 16 : } else if info == pg_constants::XLOG_XACT_PREPARE {
741 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
742 0 : XactPrepare {
743 0 : xl_xid: decoded.xl_xid,
744 0 : data: Bytes::copy_from_slice(&buf[..]),
745 0 : },
746 0 : ))));
747 16 : }
748 16 :
749 16 : Ok(None)
750 24 : }
751 :
752 0 : fn decode_multixact_record(
753 0 : buf: &mut Bytes,
754 0 : decoded: &DecodedWALRecord,
755 0 : pg_version: u32,
756 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
757 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
758 0 :
759 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
760 0 : || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
761 : {
762 0 : let pageno = if pg_version < 17 {
763 0 : buf.get_u32_le()
764 : } else {
765 0 : buf.get_u64_le() as u32
766 : };
767 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
768 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
769 :
770 0 : let slru_kind = match info {
771 0 : pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
772 0 : pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
773 0 : _ => unreachable!(),
774 : };
775 :
776 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
777 0 : MultiXactZeroPage {
778 0 : slru_kind,
779 0 : segno,
780 0 : rpageno,
781 0 : },
782 0 : ))));
783 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
784 0 : let xlrec = XlMultiXactCreate::decode(buf);
785 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
786 0 : xlrec,
787 0 : ))));
788 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
789 0 : let xlrec = XlMultiXactTruncate::decode(buf);
790 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
791 0 : xlrec,
792 0 : ))));
793 0 : }
794 0 :
795 0 : Ok(None)
796 0 : }
797 :
798 0 : fn decode_relmap_record(
799 0 : buf: &mut Bytes,
800 0 : decoded: &DecodedWALRecord,
801 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
802 0 : let update = XlRelmapUpdate::decode(buf);
803 0 :
804 0 : let mut buf = decoded.record.clone();
805 0 : buf.advance(decoded.main_data_offset);
806 0 : // skip xl_relmap_update
807 0 : buf.advance(12);
808 0 :
809 0 : Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
810 0 : RelmapUpdate {
811 0 : update,
812 0 : buf: Bytes::copy_from_slice(&buf[..]),
813 0 : },
814 0 : ))))
815 0 : }
816 :
817 30 : fn decode_xlog_record(
818 30 : buf: &mut Bytes,
819 30 : decoded: &DecodedWALRecord,
820 30 : lsn: Lsn,
821 30 : ) -> anyhow::Result<Option<MetadataRecord>> {
822 30 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
823 30 : Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
824 30 : info,
825 30 : lsn,
826 30 : buf: buf.clone(),
827 30 : }))))
828 30 : }
829 :
830 0 : fn decode_logical_message_record(
831 0 : buf: &mut Bytes,
832 0 : decoded: &DecodedWALRecord,
833 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
834 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
835 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
836 0 : let xlrec = XlLogicalMessage::decode(buf);
837 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
838 :
839 : #[cfg(feature = "testing")]
840 0 : if prefix == "neon-test" {
841 0 : return Ok(Some(MetadataRecord::LogicalMessage(
842 0 : LogicalMessageRecord::Failpoint,
843 0 : )));
844 0 : }
845 :
846 0 : if let Some(path) = prefix.strip_prefix("neon-file:") {
847 0 : let buf_size = xlrec.prefix_size + xlrec.message_size;
848 0 : let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
849 0 : return Ok(Some(MetadataRecord::LogicalMessage(
850 0 : LogicalMessageRecord::Put(PutLogicalMessage {
851 0 : path: path.to_string(),
852 0 : buf,
853 0 : }),
854 0 : )));
855 0 : }
856 0 : }
857 :
858 0 : Ok(None)
859 0 : }
860 :
861 16 : fn decode_standby_record(
862 16 : buf: &mut Bytes,
863 16 : decoded: &DecodedWALRecord,
864 16 : ) -> anyhow::Result<Option<MetadataRecord>> {
865 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
866 16 : if info == pg_constants::XLOG_RUNNING_XACTS {
867 0 : let xlrec = XlRunningXacts::decode(buf);
868 0 : return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
869 0 : StandbyRunningXacts {
870 0 : oldest_running_xid: xlrec.oldest_running_xid,
871 0 : },
872 0 : ))));
873 16 : }
874 16 :
875 16 : Ok(None)
876 16 : }
877 :
878 0 : fn decode_replorigin_record(
879 0 : buf: &mut Bytes,
880 0 : decoded: &DecodedWALRecord,
881 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
882 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
883 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
884 0 : let xlrec = XlReploriginSet::decode(buf);
885 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
886 0 : xlrec,
887 0 : ))));
888 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
889 0 : let xlrec = XlReploriginDrop::decode(buf);
890 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
891 0 : xlrec,
892 0 : ))));
893 0 : }
894 0 :
895 0 : Ok(None)
896 0 : }
897 : }
|