Line data Source code
1 : //! This module contains logic for decoding and interpreting
2 : //! raw bytes which represent a raw Postgres WAL record.
3 :
4 : use std::collections::HashMap;
5 :
6 : use bytes::{Buf, Bytes};
7 : use pageserver_api::key::rel_block_to_key;
8 : use pageserver_api::reltag::{RelTag, SlruKind};
9 : use pageserver_api::shard::ShardIdentity;
10 : use postgres_ffi::pg_constants;
11 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
12 : use postgres_ffi::walrecord::*;
13 : use utils::lsn::Lsn;
14 :
15 : use crate::models::*;
16 : use crate::serialized_batch::SerializedValueBatch;
17 :
18 : impl InterpretedWalRecord {
19 : /// Decode and interpreted raw bytes which represent one Postgres WAL record.
20 : /// Data blocks which do not match any of the provided shard identities are filtered out.
21 : /// Shard 0 is a special case since it tracks all relation sizes. We only give it
22 : /// the keys that are being written as that is enough for updating relation sizes.
23 292310 : pub fn from_bytes_filtered(
24 292310 : buf: Bytes,
25 292310 : shards: &[ShardIdentity],
26 292310 : next_record_lsn: Lsn,
27 292310 : pg_version: u32,
28 292310 : ) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
29 292310 : let mut decoded = DecodedWALRecord::default();
30 292310 : decode_wal_record(buf, &mut decoded, pg_version)?;
31 292310 : let xid = decoded.xl_xid;
32 :
33 292310 : let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
34 0 : FlushUncommittedRecords::Yes
35 : } else {
36 292310 : FlushUncommittedRecords::No
37 : };
38 :
39 292310 : let mut shard_records: HashMap<ShardIdentity, InterpretedWalRecord> =
40 292310 : HashMap::with_capacity(shards.len());
41 584820 : for shard in shards {
42 292510 : shard_records.insert(
43 292510 : *shard,
44 292510 : InterpretedWalRecord {
45 292510 : metadata_record: None,
46 292510 : batch: SerializedValueBatch::default(),
47 292510 : next_record_lsn,
48 292510 : flush_uncommitted,
49 292510 : xid,
50 292510 : },
51 292510 : );
52 292510 : }
53 :
54 292310 : MetadataRecord::from_decoded_filtered(
55 292310 : &decoded,
56 292310 : &mut shard_records,
57 292310 : next_record_lsn,
58 292310 : pg_version,
59 292310 : )?;
60 292310 : SerializedValueBatch::from_decoded_filtered(
61 292310 : decoded,
62 292310 : &mut shard_records,
63 292310 : next_record_lsn,
64 292310 : pg_version,
65 292310 : )?;
66 :
67 292310 : Ok(shard_records)
68 292310 : }
69 : }
70 :
71 : impl MetadataRecord {
72 : /// Populates the given `shard_records` with metadata records from this WAL record, if any,
73 : /// discarding those belonging to other shards.
74 : ///
75 : /// Only metadata records relevant for the given shards is emitted. Currently, most metadata
76 : /// records are broadcast to all shards for simplicity, but this should be improved.
77 292310 : fn from_decoded_filtered(
78 292310 : decoded: &DecodedWALRecord,
79 292310 : shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
80 292310 : next_record_lsn: Lsn,
81 292310 : pg_version: u32,
82 292310 : ) -> anyhow::Result<()> {
83 292310 : // Note: this doesn't actually copy the bytes since
84 292310 : // the [`Bytes`] type implements it via a level of indirection.
85 292310 : let mut buf = decoded.record.clone();
86 292310 : buf.advance(decoded.main_data_offset);
87 :
88 : // First, generate metadata records from the decoded WAL record.
89 292310 : let metadata_record = match decoded.xl_rmid {
90 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
91 290948 : Self::decode_heapam_record(&mut buf, decoded, pg_version)?
92 : }
93 0 : pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?,
94 : // Handle other special record types
95 32 : pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?,
96 0 : pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?,
97 : pg_constants::RM_TBLSPC_ID => {
98 0 : tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
99 0 : None
100 : }
101 0 : pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?,
102 : pg_constants::RM_XACT_ID => {
103 48 : Self::decode_xact_record(&mut buf, decoded, next_record_lsn)?
104 : }
105 : pg_constants::RM_MULTIXACT_ID => {
106 0 : Self::decode_multixact_record(&mut buf, decoded, pg_version)?
107 : }
108 0 : pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?,
109 : // This is an odd duck. It needs to go to all shards.
110 : // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
111 : // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
112 : // the pageserver and decode it there.
113 : //
114 : // Alternatively, one can make the checkpoint part of the subscription protocol
115 : // to the pageserver. This should work fine, but can be done at a later point.
116 : pg_constants::RM_XLOG_ID => {
117 60 : Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
118 : }
119 : pg_constants::RM_LOGICALMSG_ID => {
120 606 : Self::decode_logical_message_record(&mut buf, decoded)?
121 : }
122 32 : pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
123 0 : pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
124 584 : _unexpected => {
125 584 : // TODO: consider failing here instead of blindly doing something without
126 584 : // understanding the protocol
127 584 : None
128 : }
129 : };
130 :
131 : // Next, filter the metadata record by shard.
132 292431 : for (shard, record) in shard_records.iter_mut() {
133 718 : match metadata_record {
134 : Some(
135 24 : MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref clear_vm_bits))
136 0 : | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref clear_vm_bits)),
137 : ) => {
138 : // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
139 : // of the main relation. These are sharded and managed just like regular relation pages.
140 : // See: https://github.com/neondatabase/neon/issues/9855
141 24 : let is_local_vm_page = |heap_blk| {
142 24 : let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
143 24 : shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
144 24 : };
145 : // Send the old and new VM page updates to their respective shards.
146 24 : let updated_old_heap_blkno = clear_vm_bits
147 24 : .old_heap_blkno
148 24 : .filter(|&blkno| is_local_vm_page(blkno));
149 24 : let updated_new_heap_blkno = clear_vm_bits
150 24 : .new_heap_blkno
151 24 : .filter(|&blkno| is_local_vm_page(blkno));
152 24 : // If neither VM page belongs to this shard, discard the record.
153 24 : if updated_old_heap_blkno.is_some() || updated_new_heap_blkno.is_some() {
154 : // Clone the record and update it for the current shard.
155 24 : let mut for_shard = metadata_record.clone();
156 24 : match for_shard {
157 : Some(
158 : MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
159 24 : ref mut clear_vm_bits,
160 : ))
161 : | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
162 0 : ref mut clear_vm_bits,
163 : )),
164 24 : ) => {
165 24 : clear_vm_bits.old_heap_blkno = updated_old_heap_blkno;
166 24 : clear_vm_bits.new_heap_blkno = updated_new_heap_blkno;
167 24 : record.metadata_record = for_shard;
168 24 : }
169 : _ => {
170 0 : unreachable!("for_shard is a clone of what we checked above")
171 : }
172 : }
173 0 : }
174 : }
175 : Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
176 : // Filter LogicalMessage records (AUX files) to only be stored on shard zero
177 718 : if shard.is_shard_zero() {
178 597 : record.metadata_record = metadata_record;
179 597 : // No other shards should receive this record, so we stop traversing shards early.
180 597 : break;
181 121 : }
182 : }
183 291689 : _ => {
184 291689 : // All other metadata records are sent to all shards.
185 291689 : record.metadata_record = metadata_record.clone();
186 291689 : }
187 : }
188 : }
189 :
190 292310 : Ok(())
191 292310 : }
192 :
193 290948 : fn decode_heapam_record(
194 290948 : buf: &mut Bytes,
195 290948 : decoded: &DecodedWALRecord,
196 290948 : pg_version: u32,
197 290948 : ) -> anyhow::Result<Option<MetadataRecord>> {
198 290948 : // Handle VM bit updates that are implicitly part of heap records.
199 290948 :
200 290948 : // First, look at the record to determine which VM bits need
201 290948 : // to be cleared. If either of these variables is set, we
202 290948 : // need to clear the corresponding bits in the visibility map.
203 290948 : let mut new_heap_blkno: Option<u32> = None;
204 290948 : let mut old_heap_blkno: Option<u32> = None;
205 290948 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
206 290948 :
207 290948 : match pg_version {
208 : 14 => {
209 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
210 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
211 0 :
212 0 : if info == pg_constants::XLOG_HEAP_INSERT {
213 0 : let xlrec = v14::XlHeapInsert::decode(buf);
214 0 : assert_eq!(0, buf.remaining());
215 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
216 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
217 0 : }
218 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
219 0 : let xlrec = v14::XlHeapDelete::decode(buf);
220 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
221 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
222 0 : }
223 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
224 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
225 : {
226 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
227 0 : // the size of tuple data is inferred from the size of the record.
228 0 : // we can't validate the remaining number of bytes without parsing
229 0 : // the tuple data.
230 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
231 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
232 0 : }
233 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
234 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
235 0 : // non-HOT update where the new tuple goes to different page than
236 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
237 0 : // set.
238 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
239 0 : }
240 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
241 0 : let xlrec = v14::XlHeapLock::decode(buf);
242 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
243 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
244 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
245 0 : }
246 0 : }
247 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
248 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
249 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
250 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
251 :
252 0 : let offset_array_len =
253 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
254 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
255 0 : 0
256 : } else {
257 0 : size_of::<u16>() * xlrec.ntuples as usize
258 : };
259 0 : assert_eq!(offset_array_len, buf.remaining());
260 :
261 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
262 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
263 0 : }
264 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
265 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
266 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
267 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
268 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
269 0 : }
270 0 : }
271 : } else {
272 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
273 : }
274 : }
275 : 15 => {
276 290948 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
277 290572 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
278 290572 :
279 290572 : if info == pg_constants::XLOG_HEAP_INSERT {
280 290552 : let xlrec = v15::XlHeapInsert::decode(buf);
281 290552 : assert_eq!(0, buf.remaining());
282 290552 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
283 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
284 290544 : }
285 20 : } else if info == pg_constants::XLOG_HEAP_DELETE {
286 0 : let xlrec = v15::XlHeapDelete::decode(buf);
287 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
288 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
289 0 : }
290 20 : } else if info == pg_constants::XLOG_HEAP_UPDATE
291 4 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
292 : {
293 16 : let xlrec = v15::XlHeapUpdate::decode(buf);
294 16 : // the size of tuple data is inferred from the size of the record.
295 16 : // we can't validate the remaining number of bytes without parsing
296 16 : // the tuple data.
297 16 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
298 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
299 16 : }
300 16 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
301 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
302 0 : // non-HOT update where the new tuple goes to different page than
303 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
304 0 : // set.
305 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
306 16 : }
307 4 : } else if info == pg_constants::XLOG_HEAP_LOCK {
308 0 : let xlrec = v15::XlHeapLock::decode(buf);
309 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
310 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
311 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
312 0 : }
313 4 : }
314 376 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
315 376 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
316 376 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
317 84 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
318 :
319 84 : let offset_array_len =
320 84 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
321 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
322 4 : 0
323 : } else {
324 80 : size_of::<u16>() * xlrec.ntuples as usize
325 : };
326 84 : assert_eq!(offset_array_len, buf.remaining());
327 :
328 84 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
329 16 : new_heap_blkno = Some(decoded.blocks[0].blkno);
330 68 : }
331 292 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
332 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
333 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
334 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
335 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
336 0 : }
337 292 : }
338 : } else {
339 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
340 : }
341 : }
342 : 16 => {
343 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
344 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
345 0 :
346 0 : if info == pg_constants::XLOG_HEAP_INSERT {
347 0 : let xlrec = v16::XlHeapInsert::decode(buf);
348 0 : assert_eq!(0, buf.remaining());
349 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
350 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
351 0 : }
352 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
353 0 : let xlrec = v16::XlHeapDelete::decode(buf);
354 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
355 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
356 0 : }
357 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
358 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
359 : {
360 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
361 0 : // the size of tuple data is inferred from the size of the record.
362 0 : // we can't validate the remaining number of bytes without parsing
363 0 : // the tuple data.
364 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
365 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
366 0 : }
367 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
368 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
369 0 : // non-HOT update where the new tuple goes to different page than
370 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
371 0 : // set.
372 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
373 0 : }
374 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
375 0 : let xlrec = v16::XlHeapLock::decode(buf);
376 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
377 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
378 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
379 0 : }
380 0 : }
381 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
382 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
383 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
384 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
385 :
386 0 : let offset_array_len =
387 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
388 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
389 0 : 0
390 : } else {
391 0 : size_of::<u16>() * xlrec.ntuples as usize
392 : };
393 0 : assert_eq!(offset_array_len, buf.remaining());
394 :
395 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
396 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
397 0 : }
398 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
399 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
400 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
401 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
402 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
403 0 : }
404 0 : }
405 : } else {
406 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
407 : }
408 : }
409 : 17 => {
410 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
411 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
412 0 :
413 0 : if info == pg_constants::XLOG_HEAP_INSERT {
414 0 : let xlrec = v17::XlHeapInsert::decode(buf);
415 0 : assert_eq!(0, buf.remaining());
416 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
417 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
418 0 : }
419 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
420 0 : let xlrec = v17::XlHeapDelete::decode(buf);
421 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
422 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
423 0 : }
424 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
425 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
426 : {
427 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
428 0 : // the size of tuple data is inferred from the size of the record.
429 0 : // we can't validate the remaining number of bytes without parsing
430 0 : // the tuple data.
431 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
432 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
433 0 : }
434 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
435 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
436 0 : // non-HOT update where the new tuple goes to different page than
437 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
438 0 : // set.
439 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
440 0 : }
441 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
442 0 : let xlrec = v17::XlHeapLock::decode(buf);
443 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
444 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
445 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
446 0 : }
447 0 : }
448 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
449 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
450 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
451 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
452 :
453 0 : let offset_array_len =
454 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
455 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
456 0 : 0
457 : } else {
458 0 : size_of::<u16>() * xlrec.ntuples as usize
459 : };
460 0 : assert_eq!(offset_array_len, buf.remaining());
461 :
462 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
463 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
464 0 : }
465 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
466 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
467 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
468 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
469 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
470 0 : }
471 0 : }
472 : } else {
473 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
474 : }
475 : }
476 0 : _ => {}
477 : }
478 :
479 290948 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
480 24 : let vm_rel = RelTag {
481 24 : forknum: VISIBILITYMAP_FORKNUM,
482 24 : spcnode: decoded.blocks[0].rnode_spcnode,
483 24 : dbnode: decoded.blocks[0].rnode_dbnode,
484 24 : relnode: decoded.blocks[0].rnode_relnode,
485 24 : };
486 24 :
487 24 : Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
488 24 : ClearVmBits {
489 24 : new_heap_blkno,
490 24 : old_heap_blkno,
491 24 : vm_rel,
492 24 : flags,
493 24 : },
494 24 : ))))
495 : } else {
496 290924 : Ok(None)
497 : }
498 290948 : }
499 :
500 0 : fn decode_neonmgr_record(
501 0 : buf: &mut Bytes,
502 0 : decoded: &DecodedWALRecord,
503 0 : pg_version: u32,
504 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
505 0 : // Handle VM bit updates that are implicitly part of heap records.
506 0 :
507 0 : // First, look at the record to determine which VM bits need
508 0 : // to be cleared. If either of these variables is set, we
509 0 : // need to clear the corresponding bits in the visibility map.
510 0 : let mut new_heap_blkno: Option<u32> = None;
511 0 : let mut old_heap_blkno: Option<u32> = None;
512 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
513 0 :
514 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
515 :
516 0 : match pg_version {
517 : 16 | 17 => {
518 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
519 0 :
520 0 : match info {
521 : pg_constants::XLOG_NEON_HEAP_INSERT => {
522 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
523 0 : assert_eq!(0, buf.remaining());
524 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
525 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
526 0 : }
527 : }
528 : pg_constants::XLOG_NEON_HEAP_DELETE => {
529 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
530 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
531 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
532 0 : }
533 : }
534 : pg_constants::XLOG_NEON_HEAP_UPDATE
535 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
536 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
537 0 : // the size of tuple data is inferred from the size of the record.
538 0 : // we can't validate the remaining number of bytes without parsing
539 0 : // the tuple data.
540 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
541 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
542 0 : }
543 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
544 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
545 0 : // non-HOT update where the new tuple goes to different page than
546 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
547 0 : // set.
548 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
549 0 : }
550 : }
551 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
552 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
553 :
554 0 : let offset_array_len =
555 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
556 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
557 0 : 0
558 : } else {
559 0 : size_of::<u16>() * xlrec.ntuples as usize
560 : };
561 0 : assert_eq!(offset_array_len, buf.remaining());
562 :
563 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
564 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
565 0 : }
566 : }
567 : pg_constants::XLOG_NEON_HEAP_LOCK => {
568 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
569 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
570 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
571 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
572 0 : }
573 : }
574 0 : info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
575 : }
576 : }
577 0 : _ => anyhow::bail!(
578 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
579 0 : pg_version
580 0 : ),
581 : }
582 :
583 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
584 0 : let vm_rel = RelTag {
585 0 : forknum: VISIBILITYMAP_FORKNUM,
586 0 : spcnode: decoded.blocks[0].rnode_spcnode,
587 0 : dbnode: decoded.blocks[0].rnode_dbnode,
588 0 : relnode: decoded.blocks[0].rnode_relnode,
589 0 : };
590 0 :
591 0 : Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
592 0 : ClearVmBits {
593 0 : new_heap_blkno,
594 0 : old_heap_blkno,
595 0 : vm_rel,
596 0 : flags,
597 0 : },
598 0 : ))))
599 : } else {
600 0 : Ok(None)
601 : }
602 0 : }
603 :
604 32 : fn decode_smgr_record(
605 32 : buf: &mut Bytes,
606 32 : decoded: &DecodedWALRecord,
607 32 : ) -> anyhow::Result<Option<MetadataRecord>> {
608 32 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
609 32 : if info == pg_constants::XLOG_SMGR_CREATE {
610 32 : let create = XlSmgrCreate::decode(buf);
611 32 : let rel = RelTag {
612 32 : spcnode: create.rnode.spcnode,
613 32 : dbnode: create.rnode.dbnode,
614 32 : relnode: create.rnode.relnode,
615 32 : forknum: create.forknum,
616 32 : };
617 32 :
618 32 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
619 32 : rel,
620 32 : }))));
621 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
622 0 : let truncate = XlSmgrTruncate::decode(buf);
623 0 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
624 0 : }
625 0 :
626 0 : Ok(None)
627 32 : }
628 :
629 0 : fn decode_dbase_record(
630 0 : buf: &mut Bytes,
631 0 : decoded: &DecodedWALRecord,
632 0 : pg_version: u32,
633 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
634 0 : // TODO: Refactor this to avoid the duplication between postgres versions.
635 0 :
636 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
637 0 : tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
638 :
639 0 : if pg_version == 14 {
640 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
641 0 : let createdb = XlCreateDatabase::decode(buf);
642 0 : tracing::debug!("XLOG_DBASE_CREATE v14");
643 :
644 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
645 0 : db_id: createdb.db_id,
646 0 : tablespace_id: createdb.tablespace_id,
647 0 : src_db_id: createdb.src_db_id,
648 0 : src_tablespace_id: createdb.src_tablespace_id,
649 0 : }));
650 0 :
651 0 : return Ok(Some(record));
652 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
653 0 : let dropdb = XlDropDatabase::decode(buf);
654 0 :
655 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
656 0 : db_id: dropdb.db_id,
657 0 : tablespace_ids: dropdb.tablespace_ids,
658 0 : }));
659 0 :
660 0 : return Ok(Some(record));
661 0 : }
662 0 : } else if pg_version == 15 {
663 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
664 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
665 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
666 : // The XLOG record was renamed between v14 and v15,
667 : // but the record format is the same.
668 : // So we can reuse XlCreateDatabase here.
669 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
670 :
671 0 : let createdb = XlCreateDatabase::decode(buf);
672 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
673 0 : db_id: createdb.db_id,
674 0 : tablespace_id: createdb.tablespace_id,
675 0 : src_db_id: createdb.src_db_id,
676 0 : src_tablespace_id: createdb.src_tablespace_id,
677 0 : }));
678 0 :
679 0 : return Ok(Some(record));
680 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
681 0 : let dropdb = XlDropDatabase::decode(buf);
682 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
683 0 : db_id: dropdb.db_id,
684 0 : tablespace_ids: dropdb.tablespace_ids,
685 0 : }));
686 0 :
687 0 : return Ok(Some(record));
688 0 : }
689 0 : } else if pg_version == 16 {
690 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
691 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
692 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
693 : // The XLOG record was renamed between v14 and v15,
694 : // but the record format is the same.
695 : // So we can reuse XlCreateDatabase here.
696 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
697 :
698 0 : let createdb = XlCreateDatabase::decode(buf);
699 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
700 0 : db_id: createdb.db_id,
701 0 : tablespace_id: createdb.tablespace_id,
702 0 : src_db_id: createdb.src_db_id,
703 0 : src_tablespace_id: createdb.src_tablespace_id,
704 0 : }));
705 0 :
706 0 : return Ok(Some(record));
707 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
708 0 : let dropdb = XlDropDatabase::decode(buf);
709 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
710 0 : db_id: dropdb.db_id,
711 0 : tablespace_ids: dropdb.tablespace_ids,
712 0 : }));
713 0 :
714 0 : return Ok(Some(record));
715 0 : }
716 0 : } else if pg_version == 17 {
717 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
718 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
719 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
720 : // The XLOG record was renamed between v14 and v15,
721 : // but the record format is the same.
722 : // So we can reuse XlCreateDatabase here.
723 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
724 :
725 0 : let createdb = XlCreateDatabase::decode(buf);
726 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
727 0 : db_id: createdb.db_id,
728 0 : tablespace_id: createdb.tablespace_id,
729 0 : src_db_id: createdb.src_db_id,
730 0 : src_tablespace_id: createdb.src_tablespace_id,
731 0 : }));
732 0 :
733 0 : return Ok(Some(record));
734 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
735 0 : let dropdb = XlDropDatabase::decode(buf);
736 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
737 0 : db_id: dropdb.db_id,
738 0 : tablespace_ids: dropdb.tablespace_ids,
739 0 : }));
740 0 :
741 0 : return Ok(Some(record));
742 0 : }
743 0 : }
744 :
745 0 : Ok(None)
746 0 : }
747 :
748 0 : fn decode_clog_record(
749 0 : buf: &mut Bytes,
750 0 : decoded: &DecodedWALRecord,
751 0 : pg_version: u32,
752 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
753 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
754 0 :
755 0 : if info == pg_constants::CLOG_ZEROPAGE {
756 0 : let pageno = if pg_version < 17 {
757 0 : buf.get_u32_le()
758 : } else {
759 0 : buf.get_u64_le() as u32
760 : };
761 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
762 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
763 0 :
764 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
765 0 : ClogZeroPage { segno, rpageno },
766 0 : ))))
767 : } else {
768 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
769 0 : let xlrec = XlClogTruncate::decode(buf, pg_version);
770 0 :
771 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
772 0 : ClogTruncate {
773 0 : pageno: xlrec.pageno,
774 0 : oldest_xid: xlrec.oldest_xid,
775 0 : oldest_xid_db: xlrec.oldest_xid_db,
776 0 : },
777 0 : ))))
778 : }
779 0 : }
780 :
781 48 : fn decode_xact_record(
782 48 : buf: &mut Bytes,
783 48 : decoded: &DecodedWALRecord,
784 48 : lsn: Lsn,
785 48 : ) -> anyhow::Result<Option<MetadataRecord>> {
786 48 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
787 48 : let origin_id = decoded.origin_id;
788 48 : let xl_xid = decoded.xl_xid;
789 48 :
790 48 : if info == pg_constants::XLOG_XACT_COMMIT {
791 16 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
792 16 : return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
793 16 : parsed,
794 16 : origin_id,
795 16 : xl_xid,
796 16 : lsn,
797 16 : }))));
798 32 : } else if info == pg_constants::XLOG_XACT_ABORT {
799 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
800 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
801 0 : parsed,
802 0 : origin_id,
803 0 : xl_xid,
804 0 : lsn,
805 0 : }))));
806 32 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
807 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
808 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
809 0 : XactCommon {
810 0 : parsed,
811 0 : origin_id,
812 0 : xl_xid,
813 0 : lsn,
814 0 : },
815 0 : ))));
816 32 : } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
817 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
818 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
819 0 : XactCommon {
820 0 : parsed,
821 0 : origin_id,
822 0 : xl_xid,
823 0 : lsn,
824 0 : },
825 0 : ))));
826 32 : } else if info == pg_constants::XLOG_XACT_PREPARE {
827 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
828 0 : XactPrepare {
829 0 : xl_xid: decoded.xl_xid,
830 0 : data: Bytes::copy_from_slice(&buf[..]),
831 0 : },
832 0 : ))));
833 32 : }
834 32 :
835 32 : Ok(None)
836 48 : }
837 :
838 0 : fn decode_multixact_record(
839 0 : buf: &mut Bytes,
840 0 : decoded: &DecodedWALRecord,
841 0 : pg_version: u32,
842 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
843 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
844 0 :
845 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
846 0 : || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
847 : {
848 0 : let pageno = if pg_version < 17 {
849 0 : buf.get_u32_le()
850 : } else {
851 0 : buf.get_u64_le() as u32
852 : };
853 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
854 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
855 :
856 0 : let slru_kind = match info {
857 0 : pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
858 0 : pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
859 0 : _ => unreachable!(),
860 : };
861 :
862 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
863 0 : MultiXactZeroPage {
864 0 : slru_kind,
865 0 : segno,
866 0 : rpageno,
867 0 : },
868 0 : ))));
869 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
870 0 : let xlrec = XlMultiXactCreate::decode(buf);
871 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
872 0 : xlrec,
873 0 : ))));
874 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
875 0 : let xlrec = XlMultiXactTruncate::decode(buf);
876 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
877 0 : xlrec,
878 0 : ))));
879 0 : }
880 0 :
881 0 : Ok(None)
882 0 : }
883 :
884 0 : fn decode_relmap_record(
885 0 : buf: &mut Bytes,
886 0 : decoded: &DecodedWALRecord,
887 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
888 0 : let update = XlRelmapUpdate::decode(buf);
889 0 :
890 0 : let mut buf = decoded.record.clone();
891 0 : buf.advance(decoded.main_data_offset);
892 0 : // skip xl_relmap_update
893 0 : buf.advance(12);
894 0 :
895 0 : Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
896 0 : RelmapUpdate {
897 0 : update,
898 0 : buf: Bytes::copy_from_slice(&buf[..]),
899 0 : },
900 0 : ))))
901 0 : }
902 :
903 60 : fn decode_xlog_record(
904 60 : buf: &mut Bytes,
905 60 : decoded: &DecodedWALRecord,
906 60 : lsn: Lsn,
907 60 : ) -> anyhow::Result<Option<MetadataRecord>> {
908 60 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
909 60 : Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
910 60 : info,
911 60 : lsn,
912 60 : buf: buf.clone(),
913 60 : }))))
914 60 : }
915 :
916 606 : fn decode_logical_message_record(
917 606 : buf: &mut Bytes,
918 606 : decoded: &DecodedWALRecord,
919 606 : ) -> anyhow::Result<Option<MetadataRecord>> {
920 606 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
921 606 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
922 606 : let xlrec = XlLogicalMessage::decode(buf);
923 606 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
924 :
925 : #[cfg(feature = "testing")]
926 606 : if prefix == "neon-test" {
927 0 : return Ok(Some(MetadataRecord::LogicalMessage(
928 0 : LogicalMessageRecord::Failpoint,
929 0 : )));
930 606 : }
931 :
932 606 : if let Some(path) = prefix.strip_prefix("neon-file:") {
933 597 : let buf_size = xlrec.prefix_size + xlrec.message_size;
934 597 : let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
935 597 : return Ok(Some(MetadataRecord::LogicalMessage(
936 597 : LogicalMessageRecord::Put(PutLogicalMessage {
937 597 : path: path.to_string(),
938 597 : buf,
939 597 : }),
940 597 : )));
941 9 : }
942 0 : }
943 :
944 9 : Ok(None)
945 606 : }
946 :
947 32 : fn decode_standby_record(
948 32 : buf: &mut Bytes,
949 32 : decoded: &DecodedWALRecord,
950 32 : ) -> anyhow::Result<Option<MetadataRecord>> {
951 32 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
952 32 : if info == pg_constants::XLOG_RUNNING_XACTS {
953 0 : let xlrec = XlRunningXacts::decode(buf);
954 0 : return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
955 0 : StandbyRunningXacts {
956 0 : oldest_running_xid: xlrec.oldest_running_xid,
957 0 : },
958 0 : ))));
959 32 : }
960 32 :
961 32 : Ok(None)
962 32 : }
963 :
964 0 : fn decode_replorigin_record(
965 0 : buf: &mut Bytes,
966 0 : decoded: &DecodedWALRecord,
967 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
968 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
969 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
970 0 : let xlrec = XlReploriginSet::decode(buf);
971 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
972 0 : xlrec,
973 0 : ))));
974 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
975 0 : let xlrec = XlReploriginDrop::decode(buf);
976 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
977 0 : xlrec,
978 0 : ))));
979 0 : }
980 0 :
981 0 : Ok(None)
982 0 : }
983 : }
|