Line data Source code
1 : //! This module contains logic for decoding and interpreting
2 : //! raw bytes which represent a raw Postgres WAL record.
3 :
4 : use std::collections::HashMap;
5 :
6 : use crate::models::*;
7 : use crate::serialized_batch::SerializedValueBatch;
8 : use bytes::{Buf, Bytes};
9 : use pageserver_api::key::rel_block_to_key;
10 : use pageserver_api::reltag::{RelTag, SlruKind};
11 : use pageserver_api::shard::ShardIdentity;
12 : use postgres_ffi::pg_constants;
13 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
14 : use postgres_ffi::walrecord::*;
15 : use utils::lsn::Lsn;
16 :
17 : impl InterpretedWalRecord {
18 : /// Decode and interpreted raw bytes which represent one Postgres WAL record.
19 : /// Data blocks which do not match any of the provided shard identities are filtered out.
20 : /// Shard 0 is a special case since it tracks all relation sizes. We only give it
21 : /// the keys that are being written as that is enough for updating relation sizes.
22 292300 : pub fn from_bytes_filtered(
23 292300 : buf: Bytes,
24 292300 : shards: &[ShardIdentity],
25 292300 : next_record_lsn: Lsn,
26 292300 : pg_version: u32,
27 292300 : ) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
28 292300 : let mut decoded = DecodedWALRecord::default();
29 292300 : decode_wal_record(buf, &mut decoded, pg_version)?;
30 292300 : let xid = decoded.xl_xid;
31 :
32 292300 : let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
33 0 : FlushUncommittedRecords::Yes
34 : } else {
35 292300 : FlushUncommittedRecords::No
36 : };
37 :
38 292300 : let mut shard_records: HashMap<ShardIdentity, InterpretedWalRecord> =
39 292300 : HashMap::with_capacity(shards.len());
40 584799 : for shard in shards {
41 292499 : shard_records.insert(
42 292499 : *shard,
43 292499 : InterpretedWalRecord {
44 292499 : metadata_record: None,
45 292499 : batch: SerializedValueBatch::default(),
46 292499 : next_record_lsn,
47 292499 : flush_uncommitted,
48 292499 : xid,
49 292499 : },
50 292499 : );
51 292499 : }
52 :
53 292300 : MetadataRecord::from_decoded_filtered(
54 292300 : &decoded,
55 292300 : &mut shard_records,
56 292300 : next_record_lsn,
57 292300 : pg_version,
58 292300 : )?;
59 292300 : SerializedValueBatch::from_decoded_filtered(
60 292300 : decoded,
61 292300 : &mut shard_records,
62 292300 : next_record_lsn,
63 292300 : pg_version,
64 292300 : )?;
65 :
66 292300 : Ok(shard_records)
67 292300 : }
68 : }
69 :
70 : impl MetadataRecord {
71 : /// Populates the given `shard_records` with metadata records from this WAL record, if any,
72 : /// discarding those belonging to other shards.
73 : ///
74 : /// Only metadata records relevant for the given shards is emitted. Currently, most metadata
75 : /// records are broadcast to all shards for simplicity, but this should be improved.
76 292300 : fn from_decoded_filtered(
77 292300 : decoded: &DecodedWALRecord,
78 292300 : shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
79 292300 : next_record_lsn: Lsn,
80 292300 : pg_version: u32,
81 292300 : ) -> anyhow::Result<()> {
82 292300 : // Note: this doesn't actually copy the bytes since
83 292300 : // the [`Bytes`] type implements it via a level of indirection.
84 292300 : let mut buf = decoded.record.clone();
85 292300 : buf.advance(decoded.main_data_offset);
86 :
87 : // First, generate metadata records from the decoded WAL record.
88 292300 : let metadata_record = match decoded.xl_rmid {
89 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
90 290948 : Self::decode_heapam_record(&mut buf, decoded, pg_version)?
91 : }
92 0 : pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?,
93 : // Handle other special record types
94 32 : pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?,
95 0 : pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?,
96 : pg_constants::RM_TBLSPC_ID => {
97 0 : tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
98 0 : None
99 : }
100 0 : pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?,
101 : pg_constants::RM_XACT_ID => {
102 48 : Self::decode_xact_record(&mut buf, decoded, next_record_lsn)?
103 : }
104 : pg_constants::RM_MULTIXACT_ID => {
105 0 : Self::decode_multixact_record(&mut buf, decoded, pg_version)?
106 : }
107 0 : pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?,
108 : // This is an odd duck. It needs to go to all shards.
109 : // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
110 : // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
111 : // the pageserver and decode it there.
112 : //
113 : // Alternatively, one can make the checkpoint part of the subscription protocol
114 : // to the pageserver. This should work fine, but can be done at a later point.
115 : pg_constants::RM_XLOG_ID => {
116 60 : Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
117 : }
118 : pg_constants::RM_LOGICALMSG_ID => {
119 596 : Self::decode_logical_message_record(&mut buf, decoded)?
120 : }
121 32 : pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
122 0 : pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
123 584 : _unexpected => {
124 584 : // TODO: consider failing here instead of blindly doing something without
125 584 : // understanding the protocol
126 584 : None
127 : }
128 : };
129 :
130 : // Next, filter the metadata record by shard.
131 292384 : for (shard, record) in shard_records.iter_mut() {
132 680 : match metadata_record {
133 : Some(
134 24 : MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref clear_vm_bits))
135 0 : | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref clear_vm_bits)),
136 : ) => {
137 : // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
138 : // of the main relation. These are sharded and managed just like regular relation pages.
139 : // See: https://github.com/neondatabase/neon/issues/9855
140 24 : let is_local_vm_page = |heap_blk| {
141 24 : let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
142 24 : shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
143 24 : };
144 : // Send the old and new VM page updates to their respective shards.
145 24 : let updated_old_heap_blkno = clear_vm_bits
146 24 : .old_heap_blkno
147 24 : .filter(|&blkno| is_local_vm_page(blkno));
148 24 : let updated_new_heap_blkno = clear_vm_bits
149 24 : .new_heap_blkno
150 24 : .filter(|&blkno| is_local_vm_page(blkno));
151 24 : // If neither VM page belongs to this shard, discard the record.
152 24 : if updated_old_heap_blkno.is_some() || updated_new_heap_blkno.is_some() {
153 : // Clone the record and update it for the current shard.
154 24 : let mut for_shard = metadata_record.clone();
155 24 : match for_shard {
156 : Some(
157 : MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
158 24 : ref mut clear_vm_bits,
159 : ))
160 : | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
161 0 : ref mut clear_vm_bits,
162 : )),
163 24 : ) => {
164 24 : clear_vm_bits.old_heap_blkno = updated_old_heap_blkno;
165 24 : clear_vm_bits.new_heap_blkno = updated_new_heap_blkno;
166 24 : record.metadata_record = for_shard;
167 24 : }
168 : _ => {
169 0 : unreachable!("for_shard is a clone of what we checked above")
170 : }
171 : }
172 0 : }
173 : }
174 : Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
175 : // Filter LogicalMessage records (AUX files) to only be stored on shard zero
176 680 : if shard.is_shard_zero() {
177 596 : record.metadata_record = metadata_record;
178 596 : // No other shards should receive this record, so we stop traversing shards early.
179 596 : break;
180 84 : }
181 : }
182 291680 : _ => {
183 291680 : // All other metadata records are sent to all shards.
184 291680 : record.metadata_record = metadata_record.clone();
185 291680 : }
186 : }
187 : }
188 :
189 292300 : Ok(())
190 292300 : }
191 :
192 290948 : fn decode_heapam_record(
193 290948 : buf: &mut Bytes,
194 290948 : decoded: &DecodedWALRecord,
195 290948 : pg_version: u32,
196 290948 : ) -> anyhow::Result<Option<MetadataRecord>> {
197 290948 : // Handle VM bit updates that are implicitly part of heap records.
198 290948 :
199 290948 : // First, look at the record to determine which VM bits need
200 290948 : // to be cleared. If either of these variables is set, we
201 290948 : // need to clear the corresponding bits in the visibility map.
202 290948 : let mut new_heap_blkno: Option<u32> = None;
203 290948 : let mut old_heap_blkno: Option<u32> = None;
204 290948 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
205 290948 :
206 290948 : match pg_version {
207 : 14 => {
208 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
209 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
210 0 :
211 0 : if info == pg_constants::XLOG_HEAP_INSERT {
212 0 : let xlrec = v14::XlHeapInsert::decode(buf);
213 0 : assert_eq!(0, buf.remaining());
214 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
215 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
216 0 : }
217 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
218 0 : let xlrec = v14::XlHeapDelete::decode(buf);
219 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
220 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
221 0 : }
222 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
223 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
224 : {
225 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
226 0 : // the size of tuple data is inferred from the size of the record.
227 0 : // we can't validate the remaining number of bytes without parsing
228 0 : // the tuple data.
229 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
230 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
231 0 : }
232 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
233 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
234 0 : // non-HOT update where the new tuple goes to different page than
235 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
236 0 : // set.
237 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
238 0 : }
239 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
240 0 : let xlrec = v14::XlHeapLock::decode(buf);
241 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
242 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
243 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
244 0 : }
245 0 : }
246 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
247 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
248 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
249 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
250 :
251 0 : let offset_array_len =
252 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
253 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
254 0 : 0
255 : } else {
256 0 : size_of::<u16>() * xlrec.ntuples as usize
257 : };
258 0 : assert_eq!(offset_array_len, buf.remaining());
259 :
260 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
261 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
262 0 : }
263 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
264 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
265 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
266 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
267 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
268 0 : }
269 0 : }
270 : } else {
271 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
272 : }
273 : }
274 : 15 => {
275 290948 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
276 290572 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
277 290572 :
278 290572 : if info == pg_constants::XLOG_HEAP_INSERT {
279 290552 : let xlrec = v15::XlHeapInsert::decode(buf);
280 290552 : assert_eq!(0, buf.remaining());
281 290552 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
282 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
283 290544 : }
284 20 : } else if info == pg_constants::XLOG_HEAP_DELETE {
285 0 : let xlrec = v15::XlHeapDelete::decode(buf);
286 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
287 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
288 0 : }
289 20 : } else if info == pg_constants::XLOG_HEAP_UPDATE
290 4 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
291 : {
292 16 : let xlrec = v15::XlHeapUpdate::decode(buf);
293 16 : // the size of tuple data is inferred from the size of the record.
294 16 : // we can't validate the remaining number of bytes without parsing
295 16 : // the tuple data.
296 16 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
297 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
298 16 : }
299 16 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
300 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
301 0 : // non-HOT update where the new tuple goes to different page than
302 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
303 0 : // set.
304 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
305 16 : }
306 4 : } else if info == pg_constants::XLOG_HEAP_LOCK {
307 0 : let xlrec = v15::XlHeapLock::decode(buf);
308 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
309 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
310 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
311 0 : }
312 4 : }
313 376 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
314 376 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
315 376 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
316 84 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
317 :
318 84 : let offset_array_len =
319 84 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
320 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
321 4 : 0
322 : } else {
323 80 : size_of::<u16>() * xlrec.ntuples as usize
324 : };
325 84 : assert_eq!(offset_array_len, buf.remaining());
326 :
327 84 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
328 16 : new_heap_blkno = Some(decoded.blocks[0].blkno);
329 68 : }
330 292 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
331 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
332 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
333 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
334 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
335 0 : }
336 292 : }
337 : } else {
338 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
339 : }
340 : }
341 : 16 => {
342 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
343 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
344 0 :
345 0 : if info == pg_constants::XLOG_HEAP_INSERT {
346 0 : let xlrec = v16::XlHeapInsert::decode(buf);
347 0 : assert_eq!(0, buf.remaining());
348 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
349 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
350 0 : }
351 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
352 0 : let xlrec = v16::XlHeapDelete::decode(buf);
353 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
354 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
355 0 : }
356 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
357 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
358 : {
359 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
360 0 : // the size of tuple data is inferred from the size of the record.
361 0 : // we can't validate the remaining number of bytes without parsing
362 0 : // the tuple data.
363 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
364 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
365 0 : }
366 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
367 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
368 0 : // non-HOT update where the new tuple goes to different page than
369 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
370 0 : // set.
371 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
372 0 : }
373 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
374 0 : let xlrec = v16::XlHeapLock::decode(buf);
375 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
376 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
377 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
378 0 : }
379 0 : }
380 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
381 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
382 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
383 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
384 :
385 0 : let offset_array_len =
386 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
387 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
388 0 : 0
389 : } else {
390 0 : size_of::<u16>() * xlrec.ntuples as usize
391 : };
392 0 : assert_eq!(offset_array_len, buf.remaining());
393 :
394 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
395 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
396 0 : }
397 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
398 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
399 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
400 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
401 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
402 0 : }
403 0 : }
404 : } else {
405 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
406 : }
407 : }
408 : 17 => {
409 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
410 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
411 0 :
412 0 : if info == pg_constants::XLOG_HEAP_INSERT {
413 0 : let xlrec = v17::XlHeapInsert::decode(buf);
414 0 : assert_eq!(0, buf.remaining());
415 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
416 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
417 0 : }
418 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
419 0 : let xlrec = v17::XlHeapDelete::decode(buf);
420 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
421 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
422 0 : }
423 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
424 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
425 : {
426 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
427 0 : // the size of tuple data is inferred from the size of the record.
428 0 : // we can't validate the remaining number of bytes without parsing
429 0 : // the tuple data.
430 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
431 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
432 0 : }
433 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
434 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
435 0 : // non-HOT update where the new tuple goes to different page than
436 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
437 0 : // set.
438 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
439 0 : }
440 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
441 0 : let xlrec = v17::XlHeapLock::decode(buf);
442 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
443 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
444 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
445 0 : }
446 0 : }
447 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
448 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
449 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
450 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
451 :
452 0 : let offset_array_len =
453 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
454 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
455 0 : 0
456 : } else {
457 0 : size_of::<u16>() * xlrec.ntuples as usize
458 : };
459 0 : assert_eq!(offset_array_len, buf.remaining());
460 :
461 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
462 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
463 0 : }
464 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
465 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
466 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
467 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
468 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
469 0 : }
470 0 : }
471 : } else {
472 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
473 : }
474 : }
475 0 : _ => {}
476 : }
477 :
478 290948 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
479 24 : let vm_rel = RelTag {
480 24 : forknum: VISIBILITYMAP_FORKNUM,
481 24 : spcnode: decoded.blocks[0].rnode_spcnode,
482 24 : dbnode: decoded.blocks[0].rnode_dbnode,
483 24 : relnode: decoded.blocks[0].rnode_relnode,
484 24 : };
485 24 :
486 24 : Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
487 24 : ClearVmBits {
488 24 : new_heap_blkno,
489 24 : old_heap_blkno,
490 24 : vm_rel,
491 24 : flags,
492 24 : },
493 24 : ))))
494 : } else {
495 290924 : Ok(None)
496 : }
497 290948 : }
498 :
499 0 : fn decode_neonmgr_record(
500 0 : buf: &mut Bytes,
501 0 : decoded: &DecodedWALRecord,
502 0 : pg_version: u32,
503 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
504 0 : // Handle VM bit updates that are implicitly part of heap records.
505 0 :
506 0 : // First, look at the record to determine which VM bits need
507 0 : // to be cleared. If either of these variables is set, we
508 0 : // need to clear the corresponding bits in the visibility map.
509 0 : let mut new_heap_blkno: Option<u32> = None;
510 0 : let mut old_heap_blkno: Option<u32> = None;
511 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
512 0 :
513 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
514 :
515 0 : match pg_version {
516 : 16 | 17 => {
517 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
518 0 :
519 0 : match info {
520 : pg_constants::XLOG_NEON_HEAP_INSERT => {
521 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
522 0 : assert_eq!(0, buf.remaining());
523 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
524 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
525 0 : }
526 : }
527 : pg_constants::XLOG_NEON_HEAP_DELETE => {
528 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
529 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
530 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
531 0 : }
532 : }
533 : pg_constants::XLOG_NEON_HEAP_UPDATE
534 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
535 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
536 0 : // the size of tuple data is inferred from the size of the record.
537 0 : // we can't validate the remaining number of bytes without parsing
538 0 : // the tuple data.
539 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
540 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
541 0 : }
542 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
543 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
544 0 : // non-HOT update where the new tuple goes to different page than
545 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
546 0 : // set.
547 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
548 0 : }
549 : }
550 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
551 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
552 :
553 0 : let offset_array_len =
554 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
555 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
556 0 : 0
557 : } else {
558 0 : size_of::<u16>() * xlrec.ntuples as usize
559 : };
560 0 : assert_eq!(offset_array_len, buf.remaining());
561 :
562 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
563 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
564 0 : }
565 : }
566 : pg_constants::XLOG_NEON_HEAP_LOCK => {
567 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
568 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
569 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
570 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
571 0 : }
572 : }
573 0 : info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
574 : }
575 : }
576 0 : _ => anyhow::bail!(
577 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
578 0 : pg_version
579 0 : ),
580 : }
581 :
582 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
583 0 : let vm_rel = RelTag {
584 0 : forknum: VISIBILITYMAP_FORKNUM,
585 0 : spcnode: decoded.blocks[0].rnode_spcnode,
586 0 : dbnode: decoded.blocks[0].rnode_dbnode,
587 0 : relnode: decoded.blocks[0].rnode_relnode,
588 0 : };
589 0 :
590 0 : Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
591 0 : ClearVmBits {
592 0 : new_heap_blkno,
593 0 : old_heap_blkno,
594 0 : vm_rel,
595 0 : flags,
596 0 : },
597 0 : ))))
598 : } else {
599 0 : Ok(None)
600 : }
601 0 : }
602 :
603 32 : fn decode_smgr_record(
604 32 : buf: &mut Bytes,
605 32 : decoded: &DecodedWALRecord,
606 32 : ) -> anyhow::Result<Option<MetadataRecord>> {
607 32 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
608 32 : if info == pg_constants::XLOG_SMGR_CREATE {
609 32 : let create = XlSmgrCreate::decode(buf);
610 32 : let rel = RelTag {
611 32 : spcnode: create.rnode.spcnode,
612 32 : dbnode: create.rnode.dbnode,
613 32 : relnode: create.rnode.relnode,
614 32 : forknum: create.forknum,
615 32 : };
616 32 :
617 32 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
618 32 : rel,
619 32 : }))));
620 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
621 0 : let truncate = XlSmgrTruncate::decode(buf);
622 0 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
623 0 : }
624 0 :
625 0 : Ok(None)
626 32 : }
627 :
628 0 : fn decode_dbase_record(
629 0 : buf: &mut Bytes,
630 0 : decoded: &DecodedWALRecord,
631 0 : pg_version: u32,
632 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
633 0 : // TODO: Refactor this to avoid the duplication between postgres versions.
634 0 :
635 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
636 0 : tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
637 :
638 0 : if pg_version == 14 {
639 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
640 0 : let createdb = XlCreateDatabase::decode(buf);
641 0 : tracing::debug!("XLOG_DBASE_CREATE v14");
642 :
643 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
644 0 : db_id: createdb.db_id,
645 0 : tablespace_id: createdb.tablespace_id,
646 0 : src_db_id: createdb.src_db_id,
647 0 : src_tablespace_id: createdb.src_tablespace_id,
648 0 : }));
649 0 :
650 0 : return Ok(Some(record));
651 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
652 0 : let dropdb = XlDropDatabase::decode(buf);
653 0 :
654 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
655 0 : db_id: dropdb.db_id,
656 0 : tablespace_ids: dropdb.tablespace_ids,
657 0 : }));
658 0 :
659 0 : return Ok(Some(record));
660 0 : }
661 0 : } else if pg_version == 15 {
662 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
663 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
664 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
665 : // The XLOG record was renamed between v14 and v15,
666 : // but the record format is the same.
667 : // So we can reuse XlCreateDatabase here.
668 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
669 :
670 0 : let createdb = XlCreateDatabase::decode(buf);
671 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
672 0 : db_id: createdb.db_id,
673 0 : tablespace_id: createdb.tablespace_id,
674 0 : src_db_id: createdb.src_db_id,
675 0 : src_tablespace_id: createdb.src_tablespace_id,
676 0 : }));
677 0 :
678 0 : return Ok(Some(record));
679 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
680 0 : let dropdb = XlDropDatabase::decode(buf);
681 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
682 0 : db_id: dropdb.db_id,
683 0 : tablespace_ids: dropdb.tablespace_ids,
684 0 : }));
685 0 :
686 0 : return Ok(Some(record));
687 0 : }
688 0 : } else if pg_version == 16 {
689 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
690 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
691 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
692 : // The XLOG record was renamed between v14 and v15,
693 : // but the record format is the same.
694 : // So we can reuse XlCreateDatabase here.
695 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
696 :
697 0 : let createdb = XlCreateDatabase::decode(buf);
698 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
699 0 : db_id: createdb.db_id,
700 0 : tablespace_id: createdb.tablespace_id,
701 0 : src_db_id: createdb.src_db_id,
702 0 : src_tablespace_id: createdb.src_tablespace_id,
703 0 : }));
704 0 :
705 0 : return Ok(Some(record));
706 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
707 0 : let dropdb = XlDropDatabase::decode(buf);
708 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
709 0 : db_id: dropdb.db_id,
710 0 : tablespace_ids: dropdb.tablespace_ids,
711 0 : }));
712 0 :
713 0 : return Ok(Some(record));
714 0 : }
715 0 : } else if pg_version == 17 {
716 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
717 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
718 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
719 : // The XLOG record was renamed between v14 and v15,
720 : // but the record format is the same.
721 : // So we can reuse XlCreateDatabase here.
722 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
723 :
724 0 : let createdb = XlCreateDatabase::decode(buf);
725 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
726 0 : db_id: createdb.db_id,
727 0 : tablespace_id: createdb.tablespace_id,
728 0 : src_db_id: createdb.src_db_id,
729 0 : src_tablespace_id: createdb.src_tablespace_id,
730 0 : }));
731 0 :
732 0 : return Ok(Some(record));
733 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
734 0 : let dropdb = XlDropDatabase::decode(buf);
735 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
736 0 : db_id: dropdb.db_id,
737 0 : tablespace_ids: dropdb.tablespace_ids,
738 0 : }));
739 0 :
740 0 : return Ok(Some(record));
741 0 : }
742 0 : }
743 :
744 0 : Ok(None)
745 0 : }
746 :
747 0 : fn decode_clog_record(
748 0 : buf: &mut Bytes,
749 0 : decoded: &DecodedWALRecord,
750 0 : pg_version: u32,
751 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
752 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
753 0 :
754 0 : if info == pg_constants::CLOG_ZEROPAGE {
755 0 : let pageno = if pg_version < 17 {
756 0 : buf.get_u32_le()
757 : } else {
758 0 : buf.get_u64_le() as u32
759 : };
760 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
761 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
762 0 :
763 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
764 0 : ClogZeroPage { segno, rpageno },
765 0 : ))))
766 : } else {
767 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
768 0 : let xlrec = XlClogTruncate::decode(buf, pg_version);
769 0 :
770 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
771 0 : ClogTruncate {
772 0 : pageno: xlrec.pageno,
773 0 : oldest_xid: xlrec.oldest_xid,
774 0 : oldest_xid_db: xlrec.oldest_xid_db,
775 0 : },
776 0 : ))))
777 : }
778 0 : }
779 :
780 48 : fn decode_xact_record(
781 48 : buf: &mut Bytes,
782 48 : decoded: &DecodedWALRecord,
783 48 : lsn: Lsn,
784 48 : ) -> anyhow::Result<Option<MetadataRecord>> {
785 48 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
786 48 : let origin_id = decoded.origin_id;
787 48 : let xl_xid = decoded.xl_xid;
788 48 :
789 48 : if info == pg_constants::XLOG_XACT_COMMIT {
790 16 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
791 16 : return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
792 16 : parsed,
793 16 : origin_id,
794 16 : xl_xid,
795 16 : lsn,
796 16 : }))));
797 32 : } else if info == pg_constants::XLOG_XACT_ABORT {
798 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
799 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
800 0 : parsed,
801 0 : origin_id,
802 0 : xl_xid,
803 0 : lsn,
804 0 : }))));
805 32 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
806 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
807 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
808 0 : XactCommon {
809 0 : parsed,
810 0 : origin_id,
811 0 : xl_xid,
812 0 : lsn,
813 0 : },
814 0 : ))));
815 32 : } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
816 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
817 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
818 0 : XactCommon {
819 0 : parsed,
820 0 : origin_id,
821 0 : xl_xid,
822 0 : lsn,
823 0 : },
824 0 : ))));
825 32 : } else if info == pg_constants::XLOG_XACT_PREPARE {
826 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
827 0 : XactPrepare {
828 0 : xl_xid: decoded.xl_xid,
829 0 : data: Bytes::copy_from_slice(&buf[..]),
830 0 : },
831 0 : ))));
832 32 : }
833 32 :
834 32 : Ok(None)
835 48 : }
836 :
837 0 : fn decode_multixact_record(
838 0 : buf: &mut Bytes,
839 0 : decoded: &DecodedWALRecord,
840 0 : pg_version: u32,
841 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
842 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
843 0 :
844 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
845 0 : || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
846 : {
847 0 : let pageno = if pg_version < 17 {
848 0 : buf.get_u32_le()
849 : } else {
850 0 : buf.get_u64_le() as u32
851 : };
852 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
853 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
854 :
855 0 : let slru_kind = match info {
856 0 : pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
857 0 : pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
858 0 : _ => unreachable!(),
859 : };
860 :
861 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
862 0 : MultiXactZeroPage {
863 0 : slru_kind,
864 0 : segno,
865 0 : rpageno,
866 0 : },
867 0 : ))));
868 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
869 0 : let xlrec = XlMultiXactCreate::decode(buf);
870 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
871 0 : xlrec,
872 0 : ))));
873 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
874 0 : let xlrec = XlMultiXactTruncate::decode(buf);
875 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
876 0 : xlrec,
877 0 : ))));
878 0 : }
879 0 :
880 0 : Ok(None)
881 0 : }
882 :
883 0 : fn decode_relmap_record(
884 0 : buf: &mut Bytes,
885 0 : decoded: &DecodedWALRecord,
886 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
887 0 : let update = XlRelmapUpdate::decode(buf);
888 0 :
889 0 : let mut buf = decoded.record.clone();
890 0 : buf.advance(decoded.main_data_offset);
891 0 : // skip xl_relmap_update
892 0 : buf.advance(12);
893 0 :
894 0 : Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
895 0 : RelmapUpdate {
896 0 : update,
897 0 : buf: Bytes::copy_from_slice(&buf[..]),
898 0 : },
899 0 : ))))
900 0 : }
901 :
902 60 : fn decode_xlog_record(
903 60 : buf: &mut Bytes,
904 60 : decoded: &DecodedWALRecord,
905 60 : lsn: Lsn,
906 60 : ) -> anyhow::Result<Option<MetadataRecord>> {
907 60 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
908 60 : Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
909 60 : info,
910 60 : lsn,
911 60 : buf: buf.clone(),
912 60 : }))))
913 60 : }
914 :
915 596 : fn decode_logical_message_record(
916 596 : buf: &mut Bytes,
917 596 : decoded: &DecodedWALRecord,
918 596 : ) -> anyhow::Result<Option<MetadataRecord>> {
919 596 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
920 596 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
921 596 : let xlrec = XlLogicalMessage::decode(buf);
922 596 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
923 :
924 : #[cfg(feature = "testing")]
925 596 : if prefix == "neon-test" {
926 0 : return Ok(Some(MetadataRecord::LogicalMessage(
927 0 : LogicalMessageRecord::Failpoint,
928 0 : )));
929 596 : }
930 :
931 596 : if let Some(path) = prefix.strip_prefix("neon-file:") {
932 596 : let buf_size = xlrec.prefix_size + xlrec.message_size;
933 596 : let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
934 596 : return Ok(Some(MetadataRecord::LogicalMessage(
935 596 : LogicalMessageRecord::Put(PutLogicalMessage {
936 596 : path: path.to_string(),
937 596 : buf,
938 596 : }),
939 596 : )));
940 0 : }
941 0 : }
942 :
943 0 : Ok(None)
944 596 : }
945 :
946 32 : fn decode_standby_record(
947 32 : buf: &mut Bytes,
948 32 : decoded: &DecodedWALRecord,
949 32 : ) -> anyhow::Result<Option<MetadataRecord>> {
950 32 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
951 32 : if info == pg_constants::XLOG_RUNNING_XACTS {
952 0 : let xlrec = XlRunningXacts::decode(buf);
953 0 : return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
954 0 : StandbyRunningXacts {
955 0 : oldest_running_xid: xlrec.oldest_running_xid,
956 0 : },
957 0 : ))));
958 32 : }
959 32 :
960 32 : Ok(None)
961 32 : }
962 :
963 0 : fn decode_replorigin_record(
964 0 : buf: &mut Bytes,
965 0 : decoded: &DecodedWALRecord,
966 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
967 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
968 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
969 0 : let xlrec = XlReploriginSet::decode(buf);
970 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
971 0 : xlrec,
972 0 : ))));
973 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
974 0 : let xlrec = XlReploriginDrop::decode(buf);
975 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
976 0 : xlrec,
977 0 : ))));
978 0 : }
979 0 :
980 0 : Ok(None)
981 0 : }
982 : }
|