Line data Source code
1 : //! This module contains logic for decoding and interpreting
2 : //! raw bytes which represent a raw Postgres WAL record.
3 :
4 : use crate::models::*;
5 : use crate::serialized_batch::SerializedValueBatch;
6 : use bytes::{Buf, Bytes};
7 : use pageserver_api::key::rel_block_to_key;
8 : use pageserver_api::reltag::{RelTag, SlruKind};
9 : use pageserver_api::shard::ShardIdentity;
10 : use postgres_ffi::pg_constants;
11 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
12 : use postgres_ffi::walrecord::*;
13 : use utils::lsn::Lsn;
14 :
15 : impl InterpretedWalRecord {
16 : /// Decode and interpreted raw bytes which represent one Postgres WAL record.
17 : /// Data blocks which do not match the provided shard identity are filtered out.
18 : /// Shard 0 is a special case since it tracks all relation sizes. We only give it
19 : /// the keys that are being written as that is enough for updating relation sizes.
20 145852 : pub fn from_bytes_filtered(
21 145852 : buf: Bytes,
22 145852 : shard: &ShardIdentity,
23 145852 : next_record_lsn: Lsn,
24 145852 : pg_version: u32,
25 145852 : ) -> anyhow::Result<InterpretedWalRecord> {
26 145852 : let mut decoded = DecodedWALRecord::default();
27 145852 : decode_wal_record(buf, &mut decoded, pg_version)?;
28 145852 : let xid = decoded.xl_xid;
29 :
30 145852 : let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
31 0 : FlushUncommittedRecords::Yes
32 : } else {
33 145852 : FlushUncommittedRecords::No
34 : };
35 :
36 145852 : let metadata_record =
37 145852 : MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?;
38 145852 : let batch = SerializedValueBatch::from_decoded_filtered(
39 145852 : decoded,
40 145852 : shard,
41 145852 : next_record_lsn,
42 145852 : pg_version,
43 145852 : )?;
44 :
45 145852 : Ok(InterpretedWalRecord {
46 145852 : metadata_record,
47 145852 : batch,
48 145852 : next_record_lsn,
49 145852 : flush_uncommitted,
50 145852 : xid,
51 145852 : })
52 145852 : }
53 : }
54 :
55 : impl MetadataRecord {
56 : /// Builds a metadata record for this WAL record, if any.
57 : ///
58 : /// Only metadata records relevant for the given shard are emitted. Currently, most metadata
59 : /// records are broadcast to all shards for simplicity, but this should be improved.
60 145852 : fn from_decoded_filtered(
61 145852 : decoded: &DecodedWALRecord,
62 145852 : shard: &ShardIdentity,
63 145852 : next_record_lsn: Lsn,
64 145852 : pg_version: u32,
65 145852 : ) -> anyhow::Result<Option<MetadataRecord>> {
66 145852 : // Note: this doesn't actually copy the bytes since
67 145852 : // the [`Bytes`] type implements it via a level of indirection.
68 145852 : let mut buf = decoded.record.clone();
69 145852 : buf.advance(decoded.main_data_offset);
70 :
71 : // First, generate metadata records from the decoded WAL record.
72 145852 : let mut metadata_record = match decoded.xl_rmid {
73 : pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
74 145474 : Self::decode_heapam_record(&mut buf, decoded, pg_version)?
75 : }
76 0 : pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?,
77 : // Handle other special record types
78 16 : pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?,
79 0 : pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?,
80 : pg_constants::RM_TBLSPC_ID => {
81 0 : tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
82 0 : None
83 : }
84 0 : pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?,
85 : pg_constants::RM_XACT_ID => {
86 24 : Self::decode_xact_record(&mut buf, decoded, next_record_lsn)?
87 : }
88 : pg_constants::RM_MULTIXACT_ID => {
89 0 : Self::decode_multixact_record(&mut buf, decoded, pg_version)?
90 : }
91 0 : pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?,
92 : // This is an odd duck. It needs to go to all shards.
93 : // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
94 : // in WalIngest::new), we have to send the whole DecodedWalRecord::record to
95 : // the pageserver and decode it there.
96 : //
97 : // Alternatively, one can make the checkpoint part of the subscription protocol
98 : // to the pageserver. This should work fine, but can be done at a later point.
99 : pg_constants::RM_XLOG_ID => {
100 30 : Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
101 : }
102 : pg_constants::RM_LOGICALMSG_ID => {
103 0 : Self::decode_logical_message_record(&mut buf, decoded)?
104 : }
105 16 : pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
106 0 : pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
107 292 : _unexpected => {
108 292 : // TODO: consider failing here instead of blindly doing something without
109 292 : // understanding the protocol
110 292 : None
111 : }
112 : };
113 :
114 : // Next, filter the metadata record by shard.
115 0 : match metadata_record {
116 : Some(
117 12 : MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
118 0 : | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
119 : ) => {
120 : // Route VM page updates to the shards that own them. VM pages are stored in the VM fork
121 : // of the main relation. These are sharded and managed just like regular relation pages.
122 : // See: https://github.com/neondatabase/neon/issues/9855
123 12 : let is_local_vm_page = |heap_blk| {
124 12 : let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
125 12 : shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
126 12 : };
127 : // Send the old and new VM page updates to their respective shards.
128 12 : clear_vm_bits.old_heap_blkno = clear_vm_bits
129 12 : .old_heap_blkno
130 12 : .filter(|&blkno| is_local_vm_page(blkno));
131 12 : clear_vm_bits.new_heap_blkno = clear_vm_bits
132 12 : .new_heap_blkno
133 12 : .filter(|&blkno| is_local_vm_page(blkno));
134 12 : // If neither VM page belongs to this shard, discard the record.
135 12 : if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
136 : {
137 0 : metadata_record = None
138 12 : }
139 : }
140 : Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
141 : // Filter LogicalMessage records (AUX files) to only be stored on shard zero
142 0 : if !shard.is_shard_zero() {
143 0 : metadata_record = None;
144 0 : }
145 : }
146 145840 : _ => {}
147 : }
148 :
149 145852 : Ok(metadata_record)
150 145852 : }
151 :
152 145474 : fn decode_heapam_record(
153 145474 : buf: &mut Bytes,
154 145474 : decoded: &DecodedWALRecord,
155 145474 : pg_version: u32,
156 145474 : ) -> anyhow::Result<Option<MetadataRecord>> {
157 145474 : // Handle VM bit updates that are implicitly part of heap records.
158 145474 :
159 145474 : // First, look at the record to determine which VM bits need
160 145474 : // to be cleared. If either of these variables is set, we
161 145474 : // need to clear the corresponding bits in the visibility map.
162 145474 : let mut new_heap_blkno: Option<u32> = None;
163 145474 : let mut old_heap_blkno: Option<u32> = None;
164 145474 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
165 145474 :
166 145474 : match pg_version {
167 : 14 => {
168 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
169 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
170 0 :
171 0 : if info == pg_constants::XLOG_HEAP_INSERT {
172 0 : let xlrec = v14::XlHeapInsert::decode(buf);
173 0 : assert_eq!(0, buf.remaining());
174 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
175 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
176 0 : }
177 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
178 0 : let xlrec = v14::XlHeapDelete::decode(buf);
179 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
180 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
181 0 : }
182 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
183 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
184 : {
185 0 : let xlrec = v14::XlHeapUpdate::decode(buf);
186 0 : // the size of tuple data is inferred from the size of the record.
187 0 : // we can't validate the remaining number of bytes without parsing
188 0 : // the tuple data.
189 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
190 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
191 0 : }
192 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
193 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
194 0 : // non-HOT update where the new tuple goes to different page than
195 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
196 0 : // set.
197 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
198 0 : }
199 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
200 0 : let xlrec = v14::XlHeapLock::decode(buf);
201 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
202 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
203 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
204 0 : }
205 0 : }
206 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
207 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
208 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
209 0 : let xlrec = v14::XlHeapMultiInsert::decode(buf);
210 :
211 0 : let offset_array_len =
212 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
213 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
214 0 : 0
215 : } else {
216 0 : size_of::<u16>() * xlrec.ntuples as usize
217 : };
218 0 : assert_eq!(offset_array_len, buf.remaining());
219 :
220 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
221 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
222 0 : }
223 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
224 0 : let xlrec = v14::XlHeapLockUpdated::decode(buf);
225 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
226 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
227 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
228 0 : }
229 0 : }
230 : } else {
231 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
232 : }
233 : }
234 : 15 => {
235 145474 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
236 145286 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
237 145286 :
238 145286 : if info == pg_constants::XLOG_HEAP_INSERT {
239 145276 : let xlrec = v15::XlHeapInsert::decode(buf);
240 145276 : assert_eq!(0, buf.remaining());
241 145276 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
242 4 : new_heap_blkno = Some(decoded.blocks[0].blkno);
243 145272 : }
244 10 : } else if info == pg_constants::XLOG_HEAP_DELETE {
245 0 : let xlrec = v15::XlHeapDelete::decode(buf);
246 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
247 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
248 0 : }
249 10 : } else if info == pg_constants::XLOG_HEAP_UPDATE
250 2 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
251 : {
252 8 : let xlrec = v15::XlHeapUpdate::decode(buf);
253 8 : // the size of tuple data is inferred from the size of the record.
254 8 : // we can't validate the remaining number of bytes without parsing
255 8 : // the tuple data.
256 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
257 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
258 8 : }
259 8 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
260 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
261 0 : // non-HOT update where the new tuple goes to different page than
262 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
263 0 : // set.
264 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
265 8 : }
266 2 : } else if info == pg_constants::XLOG_HEAP_LOCK {
267 0 : let xlrec = v15::XlHeapLock::decode(buf);
268 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
269 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
270 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
271 0 : }
272 2 : }
273 188 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
274 188 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
275 188 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
276 42 : let xlrec = v15::XlHeapMultiInsert::decode(buf);
277 :
278 42 : let offset_array_len =
279 42 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
280 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
281 2 : 0
282 : } else {
283 40 : size_of::<u16>() * xlrec.ntuples as usize
284 : };
285 42 : assert_eq!(offset_array_len, buf.remaining());
286 :
287 42 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
288 8 : new_heap_blkno = Some(decoded.blocks[0].blkno);
289 34 : }
290 146 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
291 0 : let xlrec = v15::XlHeapLockUpdated::decode(buf);
292 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
293 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
294 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
295 0 : }
296 146 : }
297 : } else {
298 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
299 : }
300 : }
301 : 16 => {
302 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
303 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
304 0 :
305 0 : if info == pg_constants::XLOG_HEAP_INSERT {
306 0 : let xlrec = v16::XlHeapInsert::decode(buf);
307 0 : assert_eq!(0, buf.remaining());
308 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
309 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
310 0 : }
311 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
312 0 : let xlrec = v16::XlHeapDelete::decode(buf);
313 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
314 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
315 0 : }
316 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
317 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
318 : {
319 0 : let xlrec = v16::XlHeapUpdate::decode(buf);
320 0 : // the size of tuple data is inferred from the size of the record.
321 0 : // we can't validate the remaining number of bytes without parsing
322 0 : // the tuple data.
323 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
324 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
325 0 : }
326 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
327 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
328 0 : // non-HOT update where the new tuple goes to different page than
329 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
330 0 : // set.
331 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
332 0 : }
333 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
334 0 : let xlrec = v16::XlHeapLock::decode(buf);
335 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
336 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
337 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
338 0 : }
339 0 : }
340 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
341 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
342 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
343 0 : let xlrec = v16::XlHeapMultiInsert::decode(buf);
344 :
345 0 : let offset_array_len =
346 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
347 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
348 0 : 0
349 : } else {
350 0 : size_of::<u16>() * xlrec.ntuples as usize
351 : };
352 0 : assert_eq!(offset_array_len, buf.remaining());
353 :
354 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
355 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
356 0 : }
357 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
358 0 : let xlrec = v16::XlHeapLockUpdated::decode(buf);
359 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
360 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
361 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
362 0 : }
363 0 : }
364 : } else {
365 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
366 : }
367 : }
368 : 17 => {
369 0 : if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
370 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
371 0 :
372 0 : if info == pg_constants::XLOG_HEAP_INSERT {
373 0 : let xlrec = v17::XlHeapInsert::decode(buf);
374 0 : assert_eq!(0, buf.remaining());
375 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
376 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
377 0 : }
378 0 : } else if info == pg_constants::XLOG_HEAP_DELETE {
379 0 : let xlrec = v17::XlHeapDelete::decode(buf);
380 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
381 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
382 0 : }
383 0 : } else if info == pg_constants::XLOG_HEAP_UPDATE
384 0 : || info == pg_constants::XLOG_HEAP_HOT_UPDATE
385 : {
386 0 : let xlrec = v17::XlHeapUpdate::decode(buf);
387 0 : // the size of tuple data is inferred from the size of the record.
388 0 : // we can't validate the remaining number of bytes without parsing
389 0 : // the tuple data.
390 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
391 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
392 0 : }
393 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
394 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
395 0 : // non-HOT update where the new tuple goes to different page than
396 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
397 0 : // set.
398 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
399 0 : }
400 0 : } else if info == pg_constants::XLOG_HEAP_LOCK {
401 0 : let xlrec = v17::XlHeapLock::decode(buf);
402 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
403 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
404 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
405 0 : }
406 0 : }
407 0 : } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
408 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
409 0 : if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
410 0 : let xlrec = v17::XlHeapMultiInsert::decode(buf);
411 :
412 0 : let offset_array_len =
413 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
414 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
415 0 : 0
416 : } else {
417 0 : size_of::<u16>() * xlrec.ntuples as usize
418 : };
419 0 : assert_eq!(offset_array_len, buf.remaining());
420 :
421 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
422 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
423 0 : }
424 0 : } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
425 0 : let xlrec = v17::XlHeapLockUpdated::decode(buf);
426 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
427 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
428 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
429 0 : }
430 0 : }
431 : } else {
432 0 : anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
433 : }
434 : }
435 0 : _ => {}
436 : }
437 :
438 145474 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
439 12 : let vm_rel = RelTag {
440 12 : forknum: VISIBILITYMAP_FORKNUM,
441 12 : spcnode: decoded.blocks[0].rnode_spcnode,
442 12 : dbnode: decoded.blocks[0].rnode_dbnode,
443 12 : relnode: decoded.blocks[0].rnode_relnode,
444 12 : };
445 12 :
446 12 : Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
447 12 : ClearVmBits {
448 12 : new_heap_blkno,
449 12 : old_heap_blkno,
450 12 : vm_rel,
451 12 : flags,
452 12 : },
453 12 : ))))
454 : } else {
455 145462 : Ok(None)
456 : }
457 145474 : }
458 :
459 0 : fn decode_neonmgr_record(
460 0 : buf: &mut Bytes,
461 0 : decoded: &DecodedWALRecord,
462 0 : pg_version: u32,
463 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
464 0 : // Handle VM bit updates that are implicitly part of heap records.
465 0 :
466 0 : // First, look at the record to determine which VM bits need
467 0 : // to be cleared. If either of these variables is set, we
468 0 : // need to clear the corresponding bits in the visibility map.
469 0 : let mut new_heap_blkno: Option<u32> = None;
470 0 : let mut old_heap_blkno: Option<u32> = None;
471 0 : let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
472 0 :
473 0 : assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
474 :
475 0 : match pg_version {
476 : 16 | 17 => {
477 0 : let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
478 0 :
479 0 : match info {
480 : pg_constants::XLOG_NEON_HEAP_INSERT => {
481 0 : let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
482 0 : assert_eq!(0, buf.remaining());
483 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
484 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
485 0 : }
486 : }
487 : pg_constants::XLOG_NEON_HEAP_DELETE => {
488 0 : let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
489 0 : if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
490 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
491 0 : }
492 : }
493 : pg_constants::XLOG_NEON_HEAP_UPDATE
494 : | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
495 0 : let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
496 0 : // the size of tuple data is inferred from the size of the record.
497 0 : // we can't validate the remaining number of bytes without parsing
498 0 : // the tuple data.
499 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
500 0 : old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
501 0 : }
502 0 : if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
503 0 : // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
504 0 : // non-HOT update where the new tuple goes to different page than
505 0 : // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
506 0 : // set.
507 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
508 0 : }
509 : }
510 : pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
511 0 : let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
512 :
513 0 : let offset_array_len =
514 0 : if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
515 : // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
516 0 : 0
517 : } else {
518 0 : size_of::<u16>() * xlrec.ntuples as usize
519 : };
520 0 : assert_eq!(offset_array_len, buf.remaining());
521 :
522 0 : if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
523 0 : new_heap_blkno = Some(decoded.blocks[0].blkno);
524 0 : }
525 : }
526 : pg_constants::XLOG_NEON_HEAP_LOCK => {
527 0 : let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
528 0 : if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
529 0 : old_heap_blkno = Some(decoded.blocks[0].blkno);
530 0 : flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
531 0 : }
532 : }
533 0 : info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
534 : }
535 : }
536 0 : _ => anyhow::bail!(
537 0 : "Neon RMGR has no known compatibility with PostgreSQL version {}",
538 0 : pg_version
539 0 : ),
540 : }
541 :
542 0 : if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
543 0 : let vm_rel = RelTag {
544 0 : forknum: VISIBILITYMAP_FORKNUM,
545 0 : spcnode: decoded.blocks[0].rnode_spcnode,
546 0 : dbnode: decoded.blocks[0].rnode_dbnode,
547 0 : relnode: decoded.blocks[0].rnode_relnode,
548 0 : };
549 0 :
550 0 : Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
551 0 : ClearVmBits {
552 0 : new_heap_blkno,
553 0 : old_heap_blkno,
554 0 : vm_rel,
555 0 : flags,
556 0 : },
557 0 : ))))
558 : } else {
559 0 : Ok(None)
560 : }
561 0 : }
562 :
563 16 : fn decode_smgr_record(
564 16 : buf: &mut Bytes,
565 16 : decoded: &DecodedWALRecord,
566 16 : ) -> anyhow::Result<Option<MetadataRecord>> {
567 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
568 16 : if info == pg_constants::XLOG_SMGR_CREATE {
569 16 : let create = XlSmgrCreate::decode(buf);
570 16 : let rel = RelTag {
571 16 : spcnode: create.rnode.spcnode,
572 16 : dbnode: create.rnode.dbnode,
573 16 : relnode: create.rnode.relnode,
574 16 : forknum: create.forknum,
575 16 : };
576 16 :
577 16 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
578 16 : rel,
579 16 : }))));
580 0 : } else if info == pg_constants::XLOG_SMGR_TRUNCATE {
581 0 : let truncate = XlSmgrTruncate::decode(buf);
582 0 : return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
583 0 : }
584 0 :
585 0 : Ok(None)
586 16 : }
587 :
588 0 : fn decode_dbase_record(
589 0 : buf: &mut Bytes,
590 0 : decoded: &DecodedWALRecord,
591 0 : pg_version: u32,
592 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
593 0 : // TODO: Refactor this to avoid the duplication between postgres versions.
594 0 :
595 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
596 0 : tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
597 :
598 0 : if pg_version == 14 {
599 0 : if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
600 0 : let createdb = XlCreateDatabase::decode(buf);
601 0 : tracing::debug!("XLOG_DBASE_CREATE v14");
602 :
603 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
604 0 : db_id: createdb.db_id,
605 0 : tablespace_id: createdb.tablespace_id,
606 0 : src_db_id: createdb.src_db_id,
607 0 : src_tablespace_id: createdb.src_tablespace_id,
608 0 : }));
609 0 :
610 0 : return Ok(Some(record));
611 0 : } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
612 0 : let dropdb = XlDropDatabase::decode(buf);
613 0 :
614 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
615 0 : db_id: dropdb.db_id,
616 0 : tablespace_ids: dropdb.tablespace_ids,
617 0 : }));
618 0 :
619 0 : return Ok(Some(record));
620 0 : }
621 0 : } else if pg_version == 15 {
622 0 : if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
623 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
624 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
625 : // The XLOG record was renamed between v14 and v15,
626 : // but the record format is the same.
627 : // So we can reuse XlCreateDatabase here.
628 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
629 :
630 0 : let createdb = XlCreateDatabase::decode(buf);
631 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
632 0 : db_id: createdb.db_id,
633 0 : tablespace_id: createdb.tablespace_id,
634 0 : src_db_id: createdb.src_db_id,
635 0 : src_tablespace_id: createdb.src_tablespace_id,
636 0 : }));
637 0 :
638 0 : return Ok(Some(record));
639 0 : } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
640 0 : let dropdb = XlDropDatabase::decode(buf);
641 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
642 0 : db_id: dropdb.db_id,
643 0 : tablespace_ids: dropdb.tablespace_ids,
644 0 : }));
645 0 :
646 0 : return Ok(Some(record));
647 0 : }
648 0 : } else if pg_version == 16 {
649 0 : if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
650 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
651 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
652 : // The XLOG record was renamed between v14 and v15,
653 : // but the record format is the same.
654 : // So we can reuse XlCreateDatabase here.
655 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
656 :
657 0 : let createdb = XlCreateDatabase::decode(buf);
658 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
659 0 : db_id: createdb.db_id,
660 0 : tablespace_id: createdb.tablespace_id,
661 0 : src_db_id: createdb.src_db_id,
662 0 : src_tablespace_id: createdb.src_tablespace_id,
663 0 : }));
664 0 :
665 0 : return Ok(Some(record));
666 0 : } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
667 0 : let dropdb = XlDropDatabase::decode(buf);
668 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
669 0 : db_id: dropdb.db_id,
670 0 : tablespace_ids: dropdb.tablespace_ids,
671 0 : }));
672 0 :
673 0 : return Ok(Some(record));
674 0 : }
675 0 : } else if pg_version == 17 {
676 0 : if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
677 0 : tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
678 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
679 : // The XLOG record was renamed between v14 and v15,
680 : // but the record format is the same.
681 : // So we can reuse XlCreateDatabase here.
682 0 : tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
683 :
684 0 : let createdb = XlCreateDatabase::decode(buf);
685 0 : let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
686 0 : db_id: createdb.db_id,
687 0 : tablespace_id: createdb.tablespace_id,
688 0 : src_db_id: createdb.src_db_id,
689 0 : src_tablespace_id: createdb.src_tablespace_id,
690 0 : }));
691 0 :
692 0 : return Ok(Some(record));
693 0 : } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
694 0 : let dropdb = XlDropDatabase::decode(buf);
695 0 : let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
696 0 : db_id: dropdb.db_id,
697 0 : tablespace_ids: dropdb.tablespace_ids,
698 0 : }));
699 0 :
700 0 : return Ok(Some(record));
701 0 : }
702 0 : }
703 :
704 0 : Ok(None)
705 0 : }
706 :
707 0 : fn decode_clog_record(
708 0 : buf: &mut Bytes,
709 0 : decoded: &DecodedWALRecord,
710 0 : pg_version: u32,
711 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
712 0 : let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
713 0 :
714 0 : if info == pg_constants::CLOG_ZEROPAGE {
715 0 : let pageno = if pg_version < 17 {
716 0 : buf.get_u32_le()
717 : } else {
718 0 : buf.get_u64_le() as u32
719 : };
720 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
721 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
722 0 :
723 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
724 0 : ClogZeroPage { segno, rpageno },
725 0 : ))))
726 : } else {
727 0 : assert!(info == pg_constants::CLOG_TRUNCATE);
728 0 : let xlrec = XlClogTruncate::decode(buf, pg_version);
729 0 :
730 0 : Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
731 0 : ClogTruncate {
732 0 : pageno: xlrec.pageno,
733 0 : oldest_xid: xlrec.oldest_xid,
734 0 : oldest_xid_db: xlrec.oldest_xid_db,
735 0 : },
736 0 : ))))
737 : }
738 0 : }
739 :
740 24 : fn decode_xact_record(
741 24 : buf: &mut Bytes,
742 24 : decoded: &DecodedWALRecord,
743 24 : lsn: Lsn,
744 24 : ) -> anyhow::Result<Option<MetadataRecord>> {
745 24 : let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
746 24 : let origin_id = decoded.origin_id;
747 24 : let xl_xid = decoded.xl_xid;
748 24 :
749 24 : if info == pg_constants::XLOG_XACT_COMMIT {
750 8 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
751 8 : return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
752 8 : parsed,
753 8 : origin_id,
754 8 : xl_xid,
755 8 : lsn,
756 8 : }))));
757 16 : } else if info == pg_constants::XLOG_XACT_ABORT {
758 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
759 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
760 0 : parsed,
761 0 : origin_id,
762 0 : xl_xid,
763 0 : lsn,
764 0 : }))));
765 16 : } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
766 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
767 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
768 0 : XactCommon {
769 0 : parsed,
770 0 : origin_id,
771 0 : xl_xid,
772 0 : lsn,
773 0 : },
774 0 : ))));
775 16 : } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
776 0 : let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
777 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
778 0 : XactCommon {
779 0 : parsed,
780 0 : origin_id,
781 0 : xl_xid,
782 0 : lsn,
783 0 : },
784 0 : ))));
785 16 : } else if info == pg_constants::XLOG_XACT_PREPARE {
786 0 : return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
787 0 : XactPrepare {
788 0 : xl_xid: decoded.xl_xid,
789 0 : data: Bytes::copy_from_slice(&buf[..]),
790 0 : },
791 0 : ))));
792 16 : }
793 16 :
794 16 : Ok(None)
795 24 : }
796 :
797 0 : fn decode_multixact_record(
798 0 : buf: &mut Bytes,
799 0 : decoded: &DecodedWALRecord,
800 0 : pg_version: u32,
801 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
802 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
803 0 :
804 0 : if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
805 0 : || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
806 : {
807 0 : let pageno = if pg_version < 17 {
808 0 : buf.get_u32_le()
809 : } else {
810 0 : buf.get_u64_le() as u32
811 : };
812 0 : let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
813 0 : let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
814 :
815 0 : let slru_kind = match info {
816 0 : pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
817 0 : pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
818 0 : _ => unreachable!(),
819 : };
820 :
821 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
822 0 : MultiXactZeroPage {
823 0 : slru_kind,
824 0 : segno,
825 0 : rpageno,
826 0 : },
827 0 : ))));
828 0 : } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
829 0 : let xlrec = XlMultiXactCreate::decode(buf);
830 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
831 0 : xlrec,
832 0 : ))));
833 0 : } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
834 0 : let xlrec = XlMultiXactTruncate::decode(buf);
835 0 : return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
836 0 : xlrec,
837 0 : ))));
838 0 : }
839 0 :
840 0 : Ok(None)
841 0 : }
842 :
843 0 : fn decode_relmap_record(
844 0 : buf: &mut Bytes,
845 0 : decoded: &DecodedWALRecord,
846 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
847 0 : let update = XlRelmapUpdate::decode(buf);
848 0 :
849 0 : let mut buf = decoded.record.clone();
850 0 : buf.advance(decoded.main_data_offset);
851 0 : // skip xl_relmap_update
852 0 : buf.advance(12);
853 0 :
854 0 : Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
855 0 : RelmapUpdate {
856 0 : update,
857 0 : buf: Bytes::copy_from_slice(&buf[..]),
858 0 : },
859 0 : ))))
860 0 : }
861 :
862 30 : fn decode_xlog_record(
863 30 : buf: &mut Bytes,
864 30 : decoded: &DecodedWALRecord,
865 30 : lsn: Lsn,
866 30 : ) -> anyhow::Result<Option<MetadataRecord>> {
867 30 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
868 30 : Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
869 30 : info,
870 30 : lsn,
871 30 : buf: buf.clone(),
872 30 : }))))
873 30 : }
874 :
875 0 : fn decode_logical_message_record(
876 0 : buf: &mut Bytes,
877 0 : decoded: &DecodedWALRecord,
878 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
879 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
880 0 : if info == pg_constants::XLOG_LOGICAL_MESSAGE {
881 0 : let xlrec = XlLogicalMessage::decode(buf);
882 0 : let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
883 :
884 : #[cfg(feature = "testing")]
885 0 : if prefix == "neon-test" {
886 0 : return Ok(Some(MetadataRecord::LogicalMessage(
887 0 : LogicalMessageRecord::Failpoint,
888 0 : )));
889 0 : }
890 :
891 0 : if let Some(path) = prefix.strip_prefix("neon-file:") {
892 0 : let buf_size = xlrec.prefix_size + xlrec.message_size;
893 0 : let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
894 0 : return Ok(Some(MetadataRecord::LogicalMessage(
895 0 : LogicalMessageRecord::Put(PutLogicalMessage {
896 0 : path: path.to_string(),
897 0 : buf,
898 0 : }),
899 0 : )));
900 0 : }
901 0 : }
902 :
903 0 : Ok(None)
904 0 : }
905 :
906 16 : fn decode_standby_record(
907 16 : buf: &mut Bytes,
908 16 : decoded: &DecodedWALRecord,
909 16 : ) -> anyhow::Result<Option<MetadataRecord>> {
910 16 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
911 16 : if info == pg_constants::XLOG_RUNNING_XACTS {
912 0 : let xlrec = XlRunningXacts::decode(buf);
913 0 : return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
914 0 : StandbyRunningXacts {
915 0 : oldest_running_xid: xlrec.oldest_running_xid,
916 0 : },
917 0 : ))));
918 16 : }
919 16 :
920 16 : Ok(None)
921 16 : }
922 :
923 0 : fn decode_replorigin_record(
924 0 : buf: &mut Bytes,
925 0 : decoded: &DecodedWALRecord,
926 0 : ) -> anyhow::Result<Option<MetadataRecord>> {
927 0 : let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
928 0 : if info == pg_constants::XLOG_REPLORIGIN_SET {
929 0 : let xlrec = XlReploriginSet::decode(buf);
930 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
931 0 : xlrec,
932 0 : ))));
933 0 : } else if info == pg_constants::XLOG_REPLORIGIN_DROP {
934 0 : let xlrec = XlReploriginDrop::decode(buf);
935 0 : return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
936 0 : xlrec,
937 0 : ))));
938 0 : }
939 0 :
940 0 : Ok(None)
941 0 : }
942 : }
|