Line data Source code
1 : //! This module contains logic for decoding and interpreting
2 : //! raw bytes which represent a raw Postgres WAL record.
3 :
4 : use std::collections::HashMap;
5 :
6 : use bytes::{Buf, Bytes};
7 : use pageserver_api::key::rel_block_to_key;
8 : use pageserver_api::reltag::{RelTag, SlruKind};
9 : use pageserver_api::shard::ShardIdentity;
10 : use postgres_ffi::walrecord::*;
11 : use postgres_ffi::{PgMajorVersion, pg_constants};
12 : use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
13 : use utils::lsn::Lsn;
14 :
15 : use crate::models::*;
16 : use crate::serialized_batch::SerializedValueBatch;
17 :
18 : impl InterpretedWalRecord {
19 : /// Decode and interpreted raw bytes which represent one Postgres WAL record.
20 : /// Data blocks which do not match any of the provided shard identities are filtered out.
21 : /// Shard 0 is a special case since it tracks all relation sizes. We only give it
22 : /// the keys that are being written as that is enough for updating relation sizes.
23 73532 : pub fn from_bytes_filtered(
24 73532 : buf: Bytes,
25 73532 : shards: &[ShardIdentity],
26 73532 : next_record_lsn: Lsn,
27 73532 : pg_version: PgMajorVersion,
28 73532 : ) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
29 73532 : let mut decoded = DecodedWALRecord::default();
30 73532 : decode_wal_record(buf, &mut decoded, pg_version)?;
31 73532 : let xid = decoded.xl_xid;
32 :
33 73532 : let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
34 0 : FlushUncommittedRecords::Yes
35 : } else {
36 73532 : FlushUncommittedRecords::No
37 : };
38 :
39 73532 : let mut shard_records: HashMap<ShardIdentity, InterpretedWalRecord> =
40 73532 : HashMap::with_capacity(shards.len());
41 147264 : for shard in shards {
42 73732 : shard_records.insert(
43 73732 : *shard,
44 73732 : InterpretedWalRecord {
45 73732 : metadata_record: None,
46 73732 : batch: SerializedValueBatch::default(),
47 73732 : next_record_lsn,
48 73732 : flush_uncommitted,
49 73732 : xid,
50 73732 : },
51 73732 : );
52 73732 : }
53 :
54 73532 : MetadataRecord::from_decoded_filtered(
55 73532 : &decoded,
56 73532 : &mut shard_records,
57 73532 : next_record_lsn,
58 73532 : pg_version,
59 0 : )?;
60 73532 : SerializedValueBatch::from_decoded_filtered(
61 73532 : decoded,
62 73532 : &mut shard_records,
63 73532 : next_record_lsn,
64 73532 : pg_version,
65 0 : )?;
66 :
67 73532 : Ok(shard_records)
68 73532 : }
69 : }
70 :
71 : impl MetadataRecord {
72 : /// Populates the given `shard_records` with metadata records from this WAL record, if any,
73 : /// discarding those belonging to other shards.
74 : ///
75 : /// Only metadata records relevant for the given shards is emitted. Currently, most metadata
76 : /// records are broadcast to all shards for simplicity, but this should be improved.
77 73532 : fn from_decoded_filtered(
78 73532 : decoded: &DecodedWALRecord,
79 73532 : shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
80 73532 : next_record_lsn: Lsn,
81 73532 : pg_version: PgMajorVersion,
82 73532 : ) -> anyhow::Result<()> {
83 : // Note: this doesn't actually copy the bytes since
84 : // the [`Bytes`] type implements it via a level of indirection.
85 73532 : let mut buf = decoded.record.clone();
86 73532 : buf.advance(decoded.main_data_offset);
87 :
88 : // First, generate metadata records from the decoded WAL record.
89 73532 : let metadata_record = match decoded.xl_rmid {
90 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
91 72737 : Self::decode_heapam_record(&mut buf, decoded, pg_version)?
92 : }
93 0 : pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?,
94 : // Handle other special record types
95 8 : pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?,
96 0 : pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?,
97 : pg_constants::RM_TBLSPC_ID => {
98 0 : tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
99 0 : None
100 : }
101 0 : pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?,
102 : pg_constants::RM_XACT_ID => {
103 12 : Self::decode_xact_record(&mut buf, decoded, next_record_lsn)?
104 : }
105 : pg_constants::RM_MULTIXACT_ID => {
106 0 : Self::decode_multixact_record(&mut buf, decoded, pg_version)?
107 : }
108 0 : pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?,
109 : // This is an odd duck. It needs to go to all shards.
110 : // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
111 : // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
112 : // the pageserver and decode it there.
113 : //
114 : // Alternatively, one can make the checkpoint part of the subscription protocol
115 : // to the pageserver. This should work fine, but can be done at a later point.
116 : pg_constants::RM_XLOG_ID => {
117 15 : Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
118 : }
119 : pg_constants::RM_LOGICALMSG_ID => {
120 606 : Self::decode_logical_message_record(&mut buf, decoded)?
121 : }
122 8 : pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
123 0 : pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
124 146 : _unexpected => {
125 : // TODO: consider failing here instead of blindly doing something without
126 : // understanding the protocol
127 146 : None
128 : }
129 : };
130 :
131 : // Next, filter the metadata record by shard.
132 73655 : for (shard, record) in shard_records.iter_mut() {
133 720 : match metadata_record {
134 : Some(
135 6 : MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref clear_vm_bits))
136 0 : | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref clear_vm_bits)),
137 : ) => {
138 : // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
139 : // of the main relation. These are sharded and managed just like regular relation pages.
140 : // See: https://github.com/neondatabase/neon/issues/9855
141 6 : let is_local_vm_page = |heap_blk| {
142 6 : let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
143 6 : shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
144 6 : };
145 : // Send the old and new VM page updates to their respective shards.
146 6 : let updated_old_heap_blkno = clear_vm_bits
147 6 : .old_heap_blkno
148 6 : .filter(|&blkno| is_local_vm_page(blkno));
149 6 : let updated_new_heap_blkno = clear_vm_bits
150 6 : .new_heap_blkno
151 6 : .filter(|&blkno| is_local_vm_page(blkno));
152 : // If neither VM page belongs to this shard, discard the record.
153 6 : if updated_old_heap_blkno.is_some() || updated_new_heap_blkno.is_some() {
154 : // Clone the record and update it for the current shard.
155 6 : let mut for_shard = metadata_record.clone();
156 6 : match for_shard {
157 : Some(
158 : MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
159 6 : ref mut clear_vm_bits,
160 : ))
161 : | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
162 0 : ref mut clear_vm_bits,
163 : )),
164 6 : ) => {
165 6 : clear_vm_bits.old_heap_blkno = updated_old_heap_blkno;
166 6 : clear_vm_bits.new_heap_blkno = updated_new_heap_blkno;
167 6 : record.metadata_record = for_shard;
168 6 : }
169 : _ => {
170 0 : unreachable!("for_shard is a clone of what we checked above")
171 : }
172 : }
173 0 : }
174 : }
175 : Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
176 : // Filter LogicalMessage records (AUX files) to only be stored on shard zero
177 720 : if shard.is_shard_zero() {
178 597 : record.metadata_record = metadata_record;
179 : // No other shards should receive this record, so we stop traversing shards early.
180 597 : break;
181 123 : }
182 : }
183 72929 : _ => {
184 72929 : // All other metadata records are sent to all shards.
185 72929 : record.metadata_record = metadata_record.clone();
186 72929 : }
187 : }
188 : }
189 :
190 73532 : Ok(())
191 73532 : }
192 :
193 72737 : fn decode_heapam_record(
194 72737 : buf: &mut Bytes,
195 72737 : decoded: &DecodedWALRecord,
196 72737 : pg_version: PgMajorVersion,
197 72737 : ) -> anyhow::Result<Option<MetadataRecord>> {
198 : // Handle VM bit updates that are implicitly part of heap records.
199 :
200 : // First, look at the record to determine which VM bits need
201 : // to be cleared. If either of these variables is set, we
202 : // need to clear the corresponding bits in the visibility map.
203 72737 : let mut new_heap_blkno: Option<u32> = None;
204 72737 : let mut old_heap_blkno: Option<u32> = None;
205 72737 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
206 :
207 72737 : match pg_version {
208 : PgMajorVersion::PG14 => {
209 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
210 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
211 :
212 0 : if info == pg_constants::XLOG_HEAP_INSERT {
213 0 : let xlrec = v14::XlHeapInsert::decode(buf);
214 0 : assert_eq!(0, buf.remaining());
215 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
216 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
217 0 : }
218 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
219 0 : let xlrec = v14::XlHeapDelete::decode(buf);
220 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
221 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
222 0 : }
223 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
224 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
225 : {
226 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
227 : // the size of tuple data is inferred from the size of the record.
228 : // we can't validate the remaining number of bytes without parsing
229 : // the tuple data.
230 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
231 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
232 0 : }
233 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
234 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
235 0 : // non-HOT update where the new tuple goes to different page than
236 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
237 0 : // set.
238 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
239 0 : }
240 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
241 0 : let xlrec = v14::XlHeapLock::decode(buf);
242 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
243 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
244 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
245 0 : }
246 0 : }
247 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
248 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
249 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
250 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
251 :
252 0 : let offset_array_len =
253 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
254 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
255 0 : 0
256 : } else {
257 0 : size_of::<u16>() * xlrec.ntuples as usize
258 : };
259 0 : assert_eq!(offset_array_len, buf.remaining());
260 :
261 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
262 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
263 0 : }
264 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
265 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
266 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
267 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
268 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
269 0 : }
270 0 : }
271 : } else {
272 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
273 : }
274 : }
275 : PgMajorVersion::PG15 => {
276 72737 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
277 72643 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
278 :
279 72643 : if info == pg_constants::XLOG_HEAP_INSERT {
280 72638 : let xlrec = v15::XlHeapInsert::decode(buf);
281 72638 : assert_eq!(0, buf.remaining());
282 72638 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
283 2 : new_heap_blkno = Some(decoded.blocks[0].blkno);
284 72636 : }
285 5 : } else if info == pg_constants::XLOG_HEAP_DELETE {
286 0 : let xlrec = v15::XlHeapDelete::decode(buf);
287 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
288 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
289 0 : }
290 5 : } else if info == pg_constants::XLOG_HEAP_UPDATE
291 1 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
292 : {
293 4 : let xlrec = v15::XlHeapUpdate::decode(buf);
294 : // the size of tuple data is inferred from the size of the record.
295 : // we can't validate the remaining number of bytes without parsing
296 : // the tuple data.
297 4 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
298 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
299 4 : }
300 4 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
301 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
302 0 : // non-HOT update where the new tuple goes to different page than
303 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
304 0 : // set.
305 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
306 4 : }
307 1 : } else if info == pg_constants::XLOG_HEAP_LOCK {
308 0 : let xlrec = v15::XlHeapLock::decode(buf);
309 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
310 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
311 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
312 0 : }
313 1 : }
314 94 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
315 94 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
316 94 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
317 21 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
318 :
319 21 : let offset_array_len =
320 21 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
321 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
322 1 : 0
323 : } else {
324 20 : size_of::<u16>() * xlrec.ntuples as usize
325 : };
326 21 : assert_eq!(offset_array_len, buf.remaining());
327 :
328 21 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
329 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
330 17 : }
331 73 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
332 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
333 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
334 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
335 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
336 0 : }
337 73 : }
338 : } else {
339 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
340 : }
341 : }
342 : PgMajorVersion::PG16 => {
343 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
344 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
345 :
346 0 : if info == pg_constants::XLOG_HEAP_INSERT {
347 0 : let xlrec = v16::XlHeapInsert::decode(buf);
348 0 : assert_eq!(0, buf.remaining());
349 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
350 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
351 0 : }
352 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
353 0 : let xlrec = v16::XlHeapDelete::decode(buf);
354 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
355 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
356 0 : }
357 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
358 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
359 : {
360 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
361 : // the size of tuple data is inferred from the size of the record.
362 : // we can't validate the remaining number of bytes without parsing
363 : // the tuple data.
364 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
365 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
366 0 : }
367 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
368 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
369 0 : // non-HOT update where the new tuple goes to different page than
370 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
371 0 : // set.
372 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
373 0 : }
374 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
375 0 : let xlrec = v16::XlHeapLock::decode(buf);
376 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
377 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
378 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
379 0 : }
380 0 : }
381 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
382 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
383 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
384 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
385 :
386 0 : let offset_array_len =
387 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
388 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
389 0 : 0
390 : } else {
391 0 : size_of::<u16>() * xlrec.ntuples as usize
392 : };
393 0 : assert_eq!(offset_array_len, buf.remaining());
394 :
395 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
396 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
397 0 : }
398 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
399 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
400 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
401 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
402 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
403 0 : }
404 0 : }
405 : } else {
406 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
407 : }
408 : }
409 : PgMajorVersion::PG17 => {
410 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
411 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
412 :
413 0 : if info == pg_constants::XLOG_HEAP_INSERT {
414 0 : let xlrec = v17::XlHeapInsert::decode(buf);
415 0 : assert_eq!(0, buf.remaining());
416 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
417 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
418 0 : }
419 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
420 0 : let xlrec = v17::XlHeapDelete::decode(buf);
421 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
422 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
423 0 : }
424 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
425 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
426 : {
427 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
428 : // the size of tuple data is inferred from the size of the record.
429 : // we can't validate the remaining number of bytes without parsing
430 : // the tuple data.
431 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
432 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
433 0 : }
434 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
435 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
436 0 : // non-HOT update where the new tuple goes to different page than
437 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
438 0 : // set.
439 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
440 0 : }
441 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
442 0 : let xlrec = v17::XlHeapLock::decode(buf);
443 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
444 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
445 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
446 0 : }
447 0 : }
448 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
449 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
450 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
451 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
452 :
453 0 : let offset_array_len =
454 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
455 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
456 0 : 0
457 : } else {
458 0 : size_of::<u16>() * xlrec.ntuples as usize
459 : };
460 0 : assert_eq!(offset_array_len, buf.remaining());
461 :
462 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
463 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
464 0 : }
465 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
466 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
467 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
468 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
469 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
470 0 : }
471 0 : }
472 : } else {
473 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
474 : }
475 : }
476 : }
477 :
478 72737 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
479 6 : let vm_rel = RelTag {
480 6 : forknum: VISIBILITYMAP_FORKNUM,
481 6 : spcnode: decoded.blocks[0].rnode_spcnode,
482 6 : dbnode: decoded.blocks[0].rnode_dbnode,
483 6 : relnode: decoded.blocks[0].rnode_relnode,
484 6 : };
485 :
486 6 : Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
487 6 : ClearVmBits {
488 6 : new_heap_blkno,
489 6 : old_heap_blkno,
490 6 : vm_rel,
491 6 : flags,
492 6 : },
493 6 : ))))
494 : } else {
495 72731 : Ok(None)
496 : }
497 72737 : }
498 :
499 0 : fn decode_neonmgr_record(
500 0 : buf: &mut Bytes,
501 0 : decoded: &DecodedWALRecord,
502 0 : pg_version: PgMajorVersion,
503 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
504 : // Handle VM bit updates that are implicitly part of heap records.
505 :
506 : // First, look at the record to determine which VM bits need
507 : // to be cleared. If either of these variables is set, we
508 : // need to clear the corresponding bits in the visibility map.
509 0 : let mut new_heap_blkno: Option<u32> = None;
510 0 : let mut old_heap_blkno: Option<u32> = None;
511 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
512 :
513 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
514 :
515 0 : match pg_version {
516 : PgMajorVersion::PG16 | PgMajorVersion::PG17 => {
517 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
518 :
519 0 : match info {
520 : pg_constants::XLOG_NEON_HEAP_INSERT => {
521 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
522 0 : assert_eq!(0, buf.remaining());
523 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
524 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
525 0 : }
526 : }
527 : pg_constants::XLOG_NEON_HEAP_DELETE => {
528 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
529 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
530 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
531 0 : }
532 : }
533 : pg_constants::XLOG_NEON_HEAP_UPDATE
534 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
535 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
536 : // the size of tuple data is inferred from the size of the record.
537 : // we can't validate the remaining number of bytes without parsing
538 : // the tuple data.
539 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
540 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
541 0 : }
542 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
543 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
544 0 : // non-HOT update where the new tuple goes to different page than
545 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
546 0 : // set.
547 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
548 0 : }
549 : }
550 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
551 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
552 :
553 0 : let offset_array_len =
554 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
555 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
556 0 : 0
557 : } else {
558 0 : size_of::<u16>() * xlrec.ntuples as usize
559 : };
560 0 : assert_eq!(offset_array_len, buf.remaining());
561 :
562 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
563 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
564 0 : }
565 : }
566 : pg_constants::XLOG_NEON_HEAP_LOCK => {
567 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
568 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
569 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
570 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
571 0 : }
572 : }
573 0 : info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
574 : }
575 : }
576 0 : PgMajorVersion::PG15 | PgMajorVersion::PG14 => anyhow::bail!(
577 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
578 : pg_version
579 : ),
580 : }
581 :
582 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
583 0 : let vm_rel = RelTag {
584 0 : forknum: VISIBILITYMAP_FORKNUM,
585 0 : spcnode: decoded.blocks[0].rnode_spcnode,
586 0 : dbnode: decoded.blocks[0].rnode_dbnode,
587 0 : relnode: decoded.blocks[0].rnode_relnode,
588 0 : };
589 :
590 0 : Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
591 0 : ClearVmBits {
592 0 : new_heap_blkno,
593 0 : old_heap_blkno,
594 0 : vm_rel,
595 0 : flags,
596 0 : },
597 0 : ))))
598 : } else {
599 0 : Ok(None)
600 : }
601 0 : }
602 :
603 8 : fn decode_smgr_record(
604 8 : buf: &mut Bytes,
605 8 : decoded: &DecodedWALRecord,
606 8 : ) -> anyhow::Result<Option<MetadataRecord>> {
607 8 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
608 8 : if info == pg_constants::XLOG_SMGR_CREATE {
609 8 : let create = XlSmgrCreate::decode(buf);
610 8 : let rel = RelTag {
611 8 : spcnode: create.rnode.spcnode,
612 8 : dbnode: create.rnode.dbnode,
613 8 : relnode: create.rnode.relnode,
614 8 : forknum: create.forknum,
615 8 : };
616 :
617 8 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
618 8 : rel,
619 8 : }))));
620 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
621 0 : let truncate = XlSmgrTruncate::decode(buf);
622 0 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
623 0 : }
624 :
625 0 : Ok(None)
626 8 : }
627 :
628 0 : fn decode_dbase_record(
629 0 : buf: &mut Bytes,
630 0 : decoded: &DecodedWALRecord,
631 0 : pg_version: PgMajorVersion,
632 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
633 : // TODO: Refactor this to avoid the duplication between postgres versions.
634 :
635 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
636 0 : tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
637 :
638 0 : match pg_version {
639 : PgMajorVersion::PG14 => {
640 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
641 0 : let createdb = XlCreateDatabase::decode(buf);
642 0 : tracing::debug!("XLOG_DBASE_CREATE v14");
643 :
644 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
645 0 : db_id: createdb.db_id,
646 0 : tablespace_id: createdb.tablespace_id,
647 0 : src_db_id: createdb.src_db_id,
648 0 : src_tablespace_id: createdb.src_tablespace_id,
649 0 : }));
650 :
651 0 : return Ok(Some(record));
652 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
653 0 : let dropdb = XlDropDatabase::decode(buf);
654 :
655 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
656 0 : db_id: dropdb.db_id,
657 0 : tablespace_ids: dropdb.tablespace_ids,
658 0 : }));
659 :
660 0 : return Ok(Some(record));
661 0 : }
662 : }
663 : PgMajorVersion::PG15 => {
664 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
665 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
666 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
667 : // The XLOG record was renamed between v14 and v15,
668 : // but the record format is the same.
669 : // So we can reuse XlCreateDatabase here.
670 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
671 :
672 0 : let createdb = XlCreateDatabase::decode(buf);
673 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
674 0 : db_id: createdb.db_id,
675 0 : tablespace_id: createdb.tablespace_id,
676 0 : src_db_id: createdb.src_db_id,
677 0 : src_tablespace_id: createdb.src_tablespace_id,
678 0 : }));
679 :
680 0 : return Ok(Some(record));
681 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
682 0 : let dropdb = XlDropDatabase::decode(buf);
683 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
684 0 : db_id: dropdb.db_id,
685 0 : tablespace_ids: dropdb.tablespace_ids,
686 0 : }));
687 :
688 0 : return Ok(Some(record));
689 0 : }
690 : }
691 : PgMajorVersion::PG16 => {
692 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
693 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
694 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
695 : // The XLOG record was renamed between v14 and v15,
696 : // but the record format is the same.
697 : // So we can reuse XlCreateDatabase here.
698 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
699 :
700 0 : let createdb = XlCreateDatabase::decode(buf);
701 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
702 0 : db_id: createdb.db_id,
703 0 : tablespace_id: createdb.tablespace_id,
704 0 : src_db_id: createdb.src_db_id,
705 0 : src_tablespace_id: createdb.src_tablespace_id,
706 0 : }));
707 :
708 0 : return Ok(Some(record));
709 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
710 0 : let dropdb = XlDropDatabase::decode(buf);
711 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
712 0 : db_id: dropdb.db_id,
713 0 : tablespace_ids: dropdb.tablespace_ids,
714 0 : }));
715 :
716 0 : return Ok(Some(record));
717 0 : }
718 : }
719 : PgMajorVersion::PG17 => {
720 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
721 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
722 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
723 : // The XLOG record was renamed between v14 and v15,
724 : // but the record format is the same.
725 : // So we can reuse XlCreateDatabase here.
726 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
727 :
728 0 : let createdb = XlCreateDatabase::decode(buf);
729 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
730 0 : db_id: createdb.db_id,
731 0 : tablespace_id: createdb.tablespace_id,
732 0 : src_db_id: createdb.src_db_id,
733 0 : src_tablespace_id: createdb.src_tablespace_id,
734 0 : }));
735 :
736 0 : return Ok(Some(record));
737 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
738 0 : let dropdb = XlDropDatabase::decode(buf);
739 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
740 0 : db_id: dropdb.db_id,
741 0 : tablespace_ids: dropdb.tablespace_ids,
742 0 : }));
743 :
744 0 : return Ok(Some(record));
745 0 : }
746 : }
747 : }
748 :
749 0 : Ok(None)
750 0 : }
751 :
752 0 : fn decode_clog_record(
753 0 : buf: &mut Bytes,
754 0 : decoded: &DecodedWALRecord,
755 0 : pg_version: PgMajorVersion,
756 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
757 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
758 :
759 0 : if info == pg_constants::CLOG_ZEROPAGE {
760 0 : let pageno = if pg_version < PgMajorVersion::PG17 {
761 0 : buf.get_u32_le()
762 : } else {
763 0 : buf.get_u64_le() as u32
764 : };
765 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
766 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
767 :
768 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
769 0 : ClogZeroPage { segno, rpageno },
770 0 : ))))
771 : } else {
772 0 : assert_eq!(info, pg_constants::CLOG_TRUNCATE);
773 0 : let xlrec = XlClogTruncate::decode(buf, pg_version);
774 :
775 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
776 0 : ClogTruncate {
777 0 : pageno: xlrec.pageno,
778 0 : oldest_xid: xlrec.oldest_xid,
779 0 : oldest_xid_db: xlrec.oldest_xid_db,
780 0 : },
781 0 : ))))
782 : }
783 0 : }
784 :
785 12 : fn decode_xact_record(
786 12 : buf: &mut Bytes,
787 12 : decoded: &DecodedWALRecord,
788 12 : lsn: Lsn,
789 12 : ) -> anyhow::Result<Option<MetadataRecord>> {
790 12 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
791 12 : let origin_id = decoded.origin_id;
792 12 : let xl_xid = decoded.xl_xid;
793 :
794 12 : if info == pg_constants::XLOG_XACT_COMMIT {
795 4 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
796 4 : return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
797 4 : parsed,
798 4 : origin_id,
799 4 : xl_xid,
800 4 : lsn,
801 4 : }))));
802 8 : } else if info == pg_constants::XLOG_XACT_ABORT {
803 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
804 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
805 0 : parsed,
806 0 : origin_id,
807 0 : xl_xid,
808 0 : lsn,
809 0 : }))));
810 8 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
811 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
812 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
813 0 : XactCommon {
814 0 : parsed,
815 0 : origin_id,
816 0 : xl_xid,
817 0 : lsn,
818 0 : },
819 0 : ))));
820 8 : } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
821 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
822 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
823 0 : XactCommon {
824 0 : parsed,
825 0 : origin_id,
826 0 : xl_xid,
827 0 : lsn,
828 0 : },
829 0 : ))));
830 8 : } else if info == pg_constants::XLOG_XACT_PREPARE {
831 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
832 0 : XactPrepare {
833 0 : xl_xid: decoded.xl_xid,
834 0 : data: Bytes::copy_from_slice(&buf[..]),
835 0 : },
836 0 : ))));
837 8 : }
838 :
839 8 : Ok(None)
840 12 : }
841 :
842 0 : fn decode_multixact_record(
843 0 : buf: &mut Bytes,
844 0 : decoded: &DecodedWALRecord,
845 0 : pg_version: PgMajorVersion,
846 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
847 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
848 :
849 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
850 0 : || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
851 : {
852 0 : let pageno = if pg_version < PgMajorVersion::PG17 {
853 0 : buf.get_u32_le()
854 : } else {
855 0 : buf.get_u64_le() as u32
856 : };
857 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
858 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
859 :
860 0 : let slru_kind = match info {
861 0 : pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
862 0 : pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
863 0 : _ => unreachable!(),
864 : };
865 :
866 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
867 0 : MultiXactZeroPage {
868 0 : slru_kind,
869 0 : segno,
870 0 : rpageno,
871 0 : },
872 0 : ))));
873 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
874 0 : let xlrec = XlMultiXactCreate::decode(buf);
875 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
876 0 : xlrec,
877 0 : ))));
878 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
879 0 : let xlrec = XlMultiXactTruncate::decode(buf);
880 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
881 0 : xlrec,
882 0 : ))));
883 0 : }
884 :
885 0 : Ok(None)
886 0 : }
887 :
888 0 : fn decode_relmap_record(
889 0 : buf: &mut Bytes,
890 0 : decoded: &DecodedWALRecord,
891 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
892 0 : let update = XlRelmapUpdate::decode(buf);
893 :
894 0 : let mut buf = decoded.record.clone();
895 0 : buf.advance(decoded.main_data_offset);
896 : // skip xl_relmap_update
897 0 : buf.advance(12);
898 :
899 0 : Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
900 0 : RelmapUpdate {
901 0 : update,
902 0 : buf: Bytes::copy_from_slice(&buf[..]),
903 0 : },
904 0 : ))))
905 0 : }
906 :
907 15 : fn decode_xlog_record(
908 15 : buf: &mut Bytes,
909 15 : decoded: &DecodedWALRecord,
910 15 : lsn: Lsn,
911 15 : ) -> anyhow::Result<Option<MetadataRecord>> {
912 15 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
913 15 : Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
914 15 : info,
915 15 : lsn,
916 15 : buf: buf.clone(),
917 15 : }))))
918 15 : }
919 :
920 606 : fn decode_logical_message_record(
921 606 : buf: &mut Bytes,
922 606 : decoded: &DecodedWALRecord,
923 606 : ) -> anyhow::Result<Option<MetadataRecord>> {
924 606 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
925 606 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
926 606 : let xlrec = XlLogicalMessage::decode(buf);
927 606 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
928 :
929 : #[cfg(feature = "testing")]
930 606 : if prefix == "neon-test" {
931 0 : return Ok(Some(MetadataRecord::LogicalMessage(
932 0 : LogicalMessageRecord::Failpoint,
933 0 : )));
934 606 : }
935 :
936 606 : if let Some(path) = prefix.strip_prefix("neon-file:") {
937 597 : let buf_size = xlrec.prefix_size + xlrec.message_size;
938 597 : let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
939 597 : return Ok(Some(MetadataRecord::LogicalMessage(
940 597 : LogicalMessageRecord::Put(PutLogicalMessage {
941 597 : path: path.to_string(),
942 597 : buf,
943 597 : }),
944 597 : )));
945 9 : }
946 0 : }
947 :
948 9 : Ok(None)
949 606 : }
950 :
951 8 : fn decode_standby_record(
952 8 : buf: &mut Bytes,
953 8 : decoded: &DecodedWALRecord,
954 8 : ) -> anyhow::Result<Option<MetadataRecord>> {
955 8 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
956 8 : if info == pg_constants::XLOG_RUNNING_XACTS {
957 0 : let xlrec = XlRunningXacts::decode(buf);
958 0 : return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
959 0 : StandbyRunningXacts {
960 0 : oldest_running_xid: xlrec.oldest_running_xid,
961 0 : },
962 0 : ))));
963 8 : }
964 :
965 8 : Ok(None)
966 8 : }
967 :
968 0 : fn decode_replorigin_record(
969 0 : buf: &mut Bytes,
970 0 : decoded: &DecodedWALRecord,
971 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
972 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
973 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
974 0 : let xlrec = XlReploriginSet::decode(buf);
975 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
976 0 : xlrec,
977 0 : ))));
978 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
979 0 : let xlrec = XlReploriginDrop::decode(buf);
980 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
981 0 : xlrec,
982 0 : ))));
983 0 : }
984 :
985 0 : Ok(None)
986 0 : }
987 : }
|