Line data Source code
1 : //!
2 : //! Functions for parsing WAL records.
3 : //!
4 :
5 : use anyhow::Result;
6 : use bytes::{Buf, Bytes};
7 : use postgres_ffi::dispatch_pgversion;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::BLCKSZ;
10 : use postgres_ffi::{BlockNumber, TimestampTz};
11 : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
12 : use postgres_ffi::{RepOriginId, XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
13 : use serde::{Deserialize, Serialize};
14 : use tracing::*;
15 : use utils::{bin_ser::DeserializeError, lsn::Lsn};
16 :
17 : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
18 : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
19 5454 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20 : pub enum NeonWalRecord {
21 : /// Native PostgreSQL WAL record
22 : Postgres { will_init: bool, rec: Bytes },
23 :
24 : /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
25 : ClearVisibilityMapFlags {
26 : new_heap_blkno: Option<u32>,
27 : old_heap_blkno: Option<u32>,
28 : flags: u8,
29 : },
30 : /// Mark transaction IDs as committed on a CLOG page
31 : ClogSetCommitted {
32 : xids: Vec<TransactionId>,
33 : timestamp: TimestampTz,
34 : },
35 : /// Mark transaction IDs as aborted on a CLOG page
36 : ClogSetAborted { xids: Vec<TransactionId> },
37 : /// Extend multixact offsets SLRU
38 : MultixactOffsetCreate {
39 : mid: MultiXactId,
40 : moff: MultiXactOffset,
41 : },
42 : /// Extend multixact members SLRU.
43 : MultixactMembersCreate {
44 : moff: MultiXactOffset,
45 : members: Vec<MultiXactMember>,
46 : },
47 : /// Update the map of AUX files, either writing or dropping an entry
48 : AuxFile {
49 : file_path: String,
50 : content: Option<Bytes>,
51 : },
52 :
53 : /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
54 : #[cfg(test)]
55 : Test {
56 : /// Append a string to the image.
57 : append: String,
58 : /// Clear the image before appending.
59 : clear: bool,
60 : /// Treat this record as an init record. `clear` should be set to true if this field is set
61 : /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
62 : /// its references in `timeline.rs`.
63 : will_init: bool,
64 : },
65 : }
66 :
67 : impl NeonWalRecord {
68 : /// Does replaying this WAL record initialize the page from scratch, or does
69 : /// it need to be applied over the previous image of the page?
70 439824 : pub fn will_init(&self) -> bool {
71 439824 : // If you change this function, you'll also need to change ValueBytes::will_init
72 439824 : match self {
73 436878 : NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
74 : #[cfg(test)]
75 2856 : NeonWalRecord::Test { will_init, .. } => *will_init,
76 : // None of the special neon record types currently initialize the page
77 90 : _ => false,
78 : }
79 439824 : }
80 :
81 : #[cfg(test)]
82 432 : pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
83 432 : Self::Test {
84 432 : append: s.as_ref().to_string(),
85 432 : clear: false,
86 432 : will_init: false,
87 432 : }
88 432 : }
89 :
90 : #[cfg(test)]
91 6 : pub(crate) fn wal_clear() -> Self {
92 6 : Self::Test {
93 6 : append: "".to_string(),
94 6 : clear: true,
95 6 : will_init: false,
96 6 : }
97 6 : }
98 :
99 : #[cfg(test)]
100 18 : pub(crate) fn wal_init() -> Self {
101 18 : Self::Test {
102 18 : append: "".to_string(),
103 18 : clear: true,
104 18 : will_init: true,
105 18 : }
106 18 : }
107 : }
108 :
109 : /// DecodedBkpBlock represents per-page data contained in a WAL record.
110 : #[derive(Default)]
111 : pub struct DecodedBkpBlock {
112 : /* Is this block ref in use? */
113 : //in_use: bool,
114 :
115 : /* Identify the block this refers to */
116 : pub rnode_spcnode: u32,
117 : pub rnode_dbnode: u32,
118 : pub rnode_relnode: u32,
119 : // Note that we have a few special forknum values for non-rel files.
120 : pub forknum: u8,
121 : pub blkno: u32,
122 :
123 : /* copy of the fork_flags field from the XLogRecordBlockHeader */
124 : pub flags: u8,
125 :
126 : /* Information on full-page image, if any */
127 : pub has_image: bool,
128 : /* has image, even for consistency checking */
129 : pub apply_image: bool,
130 : /* has image that should be restored */
131 : pub will_init: bool,
132 : /* record doesn't need previous page version to apply */
133 : //char *bkp_image;
134 : pub hole_offset: u16,
135 : pub hole_length: u16,
136 : pub bimg_offset: u32,
137 : pub bimg_len: u16,
138 : pub bimg_info: u8,
139 :
140 : /* Buffer holding the rmgr-specific data associated with this block */
141 : has_data: bool,
142 : data_len: u16,
143 : }
144 :
145 : impl DecodedBkpBlock {
146 436926 : pub fn new() -> DecodedBkpBlock {
147 436926 : Default::default()
148 436926 : }
149 : }
150 :
151 : #[derive(Default)]
152 : pub struct DecodedWALRecord {
153 : pub xl_xid: TransactionId,
154 : pub xl_info: u8,
155 : pub xl_rmid: u8,
156 : pub record: Bytes, // raw XLogRecord
157 :
158 : pub blocks: Vec<DecodedBkpBlock>,
159 : pub main_data_offset: usize,
160 : pub origin_id: u16,
161 : }
162 :
163 : impl DecodedWALRecord {
164 : /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
165 : /// by reading other existing relations' data blocks. This is more complex to apply than new-style database
166 : /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
167 437556 : pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
168 437556 : if self.xl_rmid == pg_constants::RM_DBASE_ID {
169 0 : let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
170 0 : match pg_version {
171 : 14 => {
172 : // Postgres 14 database creations are always the legacy kind
173 0 : info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
174 : }
175 0 : 15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
176 0 : 16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
177 : _ => {
178 0 : panic!("Unsupported postgres version {pg_version}")
179 : }
180 : }
181 : } else {
182 437556 : false
183 : }
184 437556 : }
185 : }
186 :
187 : #[repr(C)]
188 : #[derive(Debug, Clone, Copy)]
189 : pub struct RelFileNode {
190 : pub spcnode: Oid, /* tablespace */
191 : pub dbnode: Oid, /* database */
192 : pub relnode: Oid, /* relation */
193 : }
194 :
195 : #[repr(C)]
196 : #[derive(Debug)]
197 : pub struct XlRelmapUpdate {
198 : pub dbid: Oid, /* database ID, or 0 for shared map */
199 : pub tsid: Oid, /* database's tablespace, or pg_global */
200 : pub nbytes: i32, /* size of relmap data */
201 : }
202 :
203 : impl XlRelmapUpdate {
204 0 : pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
205 0 : XlRelmapUpdate {
206 0 : dbid: buf.get_u32_le(),
207 0 : tsid: buf.get_u32_le(),
208 0 : nbytes: buf.get_i32_le(),
209 0 : }
210 0 : }
211 : }
212 :
213 : pub mod v14 {
214 : use bytes::{Buf, Bytes};
215 : use postgres_ffi::{OffsetNumber, TransactionId};
216 :
217 : #[repr(C)]
218 : #[derive(Debug)]
219 : pub struct XlHeapInsert {
220 : pub offnum: OffsetNumber,
221 : pub flags: u8,
222 : }
223 :
224 : impl XlHeapInsert {
225 435828 : pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
226 435828 : XlHeapInsert {
227 435828 : offnum: buf.get_u16_le(),
228 435828 : flags: buf.get_u8(),
229 435828 : }
230 435828 : }
231 : }
232 :
233 : #[repr(C)]
234 : #[derive(Debug)]
235 : pub struct XlHeapMultiInsert {
236 : pub flags: u8,
237 : pub _padding: u8,
238 : pub ntuples: u16,
239 : }
240 :
241 : impl XlHeapMultiInsert {
242 126 : pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
243 126 : XlHeapMultiInsert {
244 126 : flags: buf.get_u8(),
245 126 : _padding: buf.get_u8(),
246 126 : ntuples: buf.get_u16_le(),
247 126 : }
248 126 : }
249 : }
250 :
251 : #[repr(C)]
252 : #[derive(Debug)]
253 : pub struct XlHeapDelete {
254 : pub xmax: TransactionId,
255 : pub offnum: OffsetNumber,
256 : pub _padding: u16,
257 : pub t_cid: u32,
258 : pub infobits_set: u8,
259 : pub flags: u8,
260 : }
261 :
262 : impl XlHeapDelete {
263 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
264 0 : XlHeapDelete {
265 0 : xmax: buf.get_u32_le(),
266 0 : offnum: buf.get_u16_le(),
267 0 : _padding: buf.get_u16_le(),
268 0 : t_cid: buf.get_u32_le(),
269 0 : infobits_set: buf.get_u8(),
270 0 : flags: buf.get_u8(),
271 0 : }
272 0 : }
273 : }
274 :
275 : #[repr(C)]
276 : #[derive(Debug)]
277 : pub struct XlHeapUpdate {
278 : pub old_xmax: TransactionId,
279 : pub old_offnum: OffsetNumber,
280 : pub old_infobits_set: u8,
281 : pub flags: u8,
282 : pub t_cid: u32,
283 : pub new_xmax: TransactionId,
284 : pub new_offnum: OffsetNumber,
285 : }
286 :
287 : impl XlHeapUpdate {
288 24 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
289 24 : XlHeapUpdate {
290 24 : old_xmax: buf.get_u32_le(),
291 24 : old_offnum: buf.get_u16_le(),
292 24 : old_infobits_set: buf.get_u8(),
293 24 : flags: buf.get_u8(),
294 24 : t_cid: buf.get_u32_le(),
295 24 : new_xmax: buf.get_u32_le(),
296 24 : new_offnum: buf.get_u16_le(),
297 24 : }
298 24 : }
299 : }
300 :
301 : #[repr(C)]
302 : #[derive(Debug)]
303 : pub struct XlHeapLock {
304 : pub locking_xid: TransactionId,
305 : pub offnum: OffsetNumber,
306 : pub _padding: u16,
307 : pub t_cid: u32,
308 : pub infobits_set: u8,
309 : pub flags: u8,
310 : }
311 :
312 : impl XlHeapLock {
313 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
314 0 : XlHeapLock {
315 0 : locking_xid: buf.get_u32_le(),
316 0 : offnum: buf.get_u16_le(),
317 0 : _padding: buf.get_u16_le(),
318 0 : t_cid: buf.get_u32_le(),
319 0 : infobits_set: buf.get_u8(),
320 0 : flags: buf.get_u8(),
321 0 : }
322 0 : }
323 : }
324 :
325 : #[repr(C)]
326 : #[derive(Debug)]
327 : pub struct XlHeapLockUpdated {
328 : pub xmax: TransactionId,
329 : pub offnum: OffsetNumber,
330 : pub infobits_set: u8,
331 : pub flags: u8,
332 : }
333 :
334 : impl XlHeapLockUpdated {
335 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
336 0 : XlHeapLockUpdated {
337 0 : xmax: buf.get_u32_le(),
338 0 : offnum: buf.get_u16_le(),
339 0 : infobits_set: buf.get_u8(),
340 0 : flags: buf.get_u8(),
341 0 : }
342 0 : }
343 : }
344 : }
345 :
346 : pub mod v15 {
347 : pub use super::v14::{
348 : XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
349 : };
350 : }
351 :
352 : pub mod v16 {
353 : pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
354 : use bytes::{Buf, Bytes};
355 : use postgres_ffi::{OffsetNumber, TransactionId};
356 :
357 : pub struct XlHeapDelete {
358 : pub xmax: TransactionId,
359 : pub offnum: OffsetNumber,
360 : pub infobits_set: u8,
361 : pub flags: u8,
362 : }
363 :
364 : impl XlHeapDelete {
365 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
366 0 : XlHeapDelete {
367 0 : xmax: buf.get_u32_le(),
368 0 : offnum: buf.get_u16_le(),
369 0 : infobits_set: buf.get_u8(),
370 0 : flags: buf.get_u8(),
371 0 : }
372 0 : }
373 : }
374 :
375 : #[repr(C)]
376 : #[derive(Debug)]
377 : pub struct XlHeapUpdate {
378 : pub old_xmax: TransactionId,
379 : pub old_offnum: OffsetNumber,
380 : pub old_infobits_set: u8,
381 : pub flags: u8,
382 : pub new_xmax: TransactionId,
383 : pub new_offnum: OffsetNumber,
384 : }
385 :
386 : impl XlHeapUpdate {
387 0 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
388 0 : XlHeapUpdate {
389 0 : old_xmax: buf.get_u32_le(),
390 0 : old_offnum: buf.get_u16_le(),
391 0 : old_infobits_set: buf.get_u8(),
392 0 : flags: buf.get_u8(),
393 0 : new_xmax: buf.get_u32_le(),
394 0 : new_offnum: buf.get_u16_le(),
395 0 : }
396 0 : }
397 : }
398 :
399 : #[repr(C)]
400 : #[derive(Debug)]
401 : pub struct XlHeapLock {
402 : pub locking_xid: TransactionId,
403 : pub offnum: OffsetNumber,
404 : pub infobits_set: u8,
405 : pub flags: u8,
406 : }
407 :
408 : impl XlHeapLock {
409 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
410 0 : XlHeapLock {
411 0 : locking_xid: buf.get_u32_le(),
412 0 : offnum: buf.get_u16_le(),
413 0 : infobits_set: buf.get_u8(),
414 0 : flags: buf.get_u8(),
415 0 : }
416 0 : }
417 : }
418 :
419 : /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
420 : pub mod rm_neon {
421 : use bytes::{Buf, Bytes};
422 : use postgres_ffi::{OffsetNumber, TransactionId};
423 :
424 : #[repr(C)]
425 : #[derive(Debug)]
426 : pub struct XlNeonHeapInsert {
427 : pub offnum: OffsetNumber,
428 : pub flags: u8,
429 : }
430 :
431 : impl XlNeonHeapInsert {
432 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
433 0 : XlNeonHeapInsert {
434 0 : offnum: buf.get_u16_le(),
435 0 : flags: buf.get_u8(),
436 0 : }
437 0 : }
438 : }
439 :
440 : #[repr(C)]
441 : #[derive(Debug)]
442 : pub struct XlNeonHeapMultiInsert {
443 : pub flags: u8,
444 : pub _padding: u8,
445 : pub ntuples: u16,
446 : pub t_cid: u32,
447 : }
448 :
449 : impl XlNeonHeapMultiInsert {
450 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
451 0 : XlNeonHeapMultiInsert {
452 0 : flags: buf.get_u8(),
453 0 : _padding: buf.get_u8(),
454 0 : ntuples: buf.get_u16_le(),
455 0 : t_cid: buf.get_u32_le(),
456 0 : }
457 0 : }
458 : }
459 :
460 : #[repr(C)]
461 : #[derive(Debug)]
462 : pub struct XlNeonHeapDelete {
463 : pub xmax: TransactionId,
464 : pub offnum: OffsetNumber,
465 : pub infobits_set: u8,
466 : pub flags: u8,
467 : pub t_cid: u32,
468 : }
469 :
470 : impl XlNeonHeapDelete {
471 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
472 0 : XlNeonHeapDelete {
473 0 : xmax: buf.get_u32_le(),
474 0 : offnum: buf.get_u16_le(),
475 0 : infobits_set: buf.get_u8(),
476 0 : flags: buf.get_u8(),
477 0 : t_cid: buf.get_u32_le(),
478 0 : }
479 0 : }
480 : }
481 :
482 : #[repr(C)]
483 : #[derive(Debug)]
484 : pub struct XlNeonHeapUpdate {
485 : pub old_xmax: TransactionId,
486 : pub old_offnum: OffsetNumber,
487 : pub old_infobits_set: u8,
488 : pub flags: u8,
489 : pub t_cid: u32,
490 : pub new_xmax: TransactionId,
491 : pub new_offnum: OffsetNumber,
492 : }
493 :
494 : impl XlNeonHeapUpdate {
495 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
496 0 : XlNeonHeapUpdate {
497 0 : old_xmax: buf.get_u32_le(),
498 0 : old_offnum: buf.get_u16_le(),
499 0 : old_infobits_set: buf.get_u8(),
500 0 : flags: buf.get_u8(),
501 0 : t_cid: buf.get_u32(),
502 0 : new_xmax: buf.get_u32_le(),
503 0 : new_offnum: buf.get_u16_le(),
504 0 : }
505 0 : }
506 : }
507 :
508 : #[repr(C)]
509 : #[derive(Debug)]
510 : pub struct XlNeonHeapLock {
511 : pub locking_xid: TransactionId,
512 : pub t_cid: u32,
513 : pub offnum: OffsetNumber,
514 : pub infobits_set: u8,
515 : pub flags: u8,
516 : }
517 :
518 : impl XlNeonHeapLock {
519 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
520 0 : XlNeonHeapLock {
521 0 : locking_xid: buf.get_u32_le(),
522 0 : t_cid: buf.get_u32_le(),
523 0 : offnum: buf.get_u16_le(),
524 0 : infobits_set: buf.get_u8(),
525 0 : flags: buf.get_u8(),
526 0 : }
527 0 : }
528 : }
529 : }
530 : }
531 :
532 : #[repr(C)]
533 : #[derive(Debug)]
534 : pub struct XlSmgrCreate {
535 : pub rnode: RelFileNode,
536 : // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
537 : // well-defined size?
538 : pub forknum: u8,
539 : }
540 :
541 : impl XlSmgrCreate {
542 48 : pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
543 48 : XlSmgrCreate {
544 48 : rnode: RelFileNode {
545 48 : spcnode: buf.get_u32_le(), /* tablespace */
546 48 : dbnode: buf.get_u32_le(), /* database */
547 48 : relnode: buf.get_u32_le(), /* relation */
548 48 : },
549 48 : forknum: buf.get_u32_le() as u8,
550 48 : }
551 48 : }
552 : }
553 :
554 : #[repr(C)]
555 : #[derive(Debug)]
556 : pub struct XlSmgrTruncate {
557 : pub blkno: BlockNumber,
558 : pub rnode: RelFileNode,
559 : pub flags: u32,
560 : }
561 :
562 : impl XlSmgrTruncate {
563 0 : pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
564 0 : XlSmgrTruncate {
565 0 : blkno: buf.get_u32_le(),
566 0 : rnode: RelFileNode {
567 0 : spcnode: buf.get_u32_le(), /* tablespace */
568 0 : dbnode: buf.get_u32_le(), /* database */
569 0 : relnode: buf.get_u32_le(), /* relation */
570 0 : },
571 0 : flags: buf.get_u32_le(),
572 0 : }
573 0 : }
574 : }
575 :
576 : #[repr(C)]
577 : #[derive(Debug)]
578 : pub struct XlCreateDatabase {
579 : pub db_id: Oid,
580 : pub tablespace_id: Oid,
581 : pub src_db_id: Oid,
582 : pub src_tablespace_id: Oid,
583 : }
584 :
585 : impl XlCreateDatabase {
586 0 : pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
587 0 : XlCreateDatabase {
588 0 : db_id: buf.get_u32_le(),
589 0 : tablespace_id: buf.get_u32_le(),
590 0 : src_db_id: buf.get_u32_le(),
591 0 : src_tablespace_id: buf.get_u32_le(),
592 0 : }
593 0 : }
594 : }
595 :
596 : #[repr(C)]
597 : #[derive(Debug)]
598 : pub struct XlDropDatabase {
599 : pub db_id: Oid,
600 : pub n_tablespaces: Oid, /* number of tablespace IDs */
601 : pub tablespace_ids: Vec<Oid>,
602 : }
603 :
604 : impl XlDropDatabase {
605 0 : pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
606 0 : let mut rec = XlDropDatabase {
607 0 : db_id: buf.get_u32_le(),
608 0 : n_tablespaces: buf.get_u32_le(),
609 0 : tablespace_ids: Vec::<Oid>::new(),
610 0 : };
611 :
612 0 : for _i in 0..rec.n_tablespaces {
613 0 : let id = buf.get_u32_le();
614 0 : rec.tablespace_ids.push(id);
615 0 : }
616 :
617 0 : rec
618 0 : }
619 : }
620 :
621 : ///
622 : /// Note: Parsing some fields is missing, because they're not needed.
623 : ///
624 : /// This is similar to the xl_xact_parsed_commit and
625 : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
626 : /// struct for commits and aborts.
627 : ///
628 : #[derive(Debug)]
629 : pub struct XlXactParsedRecord {
630 : pub xid: TransactionId,
631 : pub info: u8,
632 : pub xact_time: TimestampTz,
633 : pub xinfo: u32,
634 :
635 : pub db_id: Oid,
636 : /* MyDatabaseId */
637 : pub ts_id: Oid,
638 : /* MyDatabaseTableSpace */
639 : pub subxacts: Vec<TransactionId>,
640 :
641 : pub xnodes: Vec<RelFileNode>,
642 : pub origin_lsn: Lsn,
643 : }
644 :
645 : impl XlXactParsedRecord {
646 : /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
647 : /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
648 : /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
649 24 : pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
650 24 : let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
651 24 : // The record starts with time of commit/abort
652 24 : let xact_time = buf.get_i64_le();
653 24 : let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
654 24 : buf.get_u32_le()
655 : } else {
656 0 : 0
657 : };
658 : let db_id;
659 : let ts_id;
660 24 : if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
661 24 : db_id = buf.get_u32_le();
662 24 : ts_id = buf.get_u32_le();
663 24 : } else {
664 0 : db_id = 0;
665 0 : ts_id = 0;
666 0 : }
667 24 : let mut subxacts = Vec::<TransactionId>::new();
668 24 : if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
669 0 : let nsubxacts = buf.get_i32_le();
670 0 : for _i in 0..nsubxacts {
671 0 : let subxact = buf.get_u32_le();
672 0 : subxacts.push(subxact);
673 0 : }
674 24 : }
675 24 : let mut xnodes = Vec::<RelFileNode>::new();
676 24 : if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
677 0 : let nrels = buf.get_i32_le();
678 0 : for _i in 0..nrels {
679 0 : let spcnode = buf.get_u32_le();
680 0 : let dbnode = buf.get_u32_le();
681 0 : let relnode = buf.get_u32_le();
682 0 : trace!(
683 0 : "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
684 : spcnode,
685 : dbnode,
686 : relnode
687 : );
688 0 : xnodes.push(RelFileNode {
689 0 : spcnode,
690 0 : dbnode,
691 0 : relnode,
692 0 : });
693 : }
694 24 : }
695 :
696 24 : if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
697 0 : let nitems = buf.get_i32_le();
698 0 : debug!(
699 0 : "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
700 : nitems
701 : );
702 0 : let sizeof_xl_xact_stats_item = 12;
703 0 : buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
704 24 : }
705 :
706 24 : if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
707 24 : let nmsgs = buf.get_i32_le();
708 24 : let sizeof_shared_invalidation_message = 16;
709 24 : buf.advance(
710 24 : (nmsgs * sizeof_shared_invalidation_message)
711 24 : .try_into()
712 24 : .unwrap(),
713 24 : );
714 24 : }
715 :
716 24 : if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
717 0 : xid = buf.get_u32_le();
718 0 : debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
719 24 : }
720 :
721 24 : let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 {
722 0 : Lsn(buf.get_u64_le())
723 : } else {
724 24 : Lsn::INVALID
725 : };
726 24 : XlXactParsedRecord {
727 24 : xid,
728 24 : info,
729 24 : xact_time,
730 24 : xinfo,
731 24 : db_id,
732 24 : ts_id,
733 24 : subxacts,
734 24 : xnodes,
735 24 : origin_lsn,
736 24 : }
737 24 : }
738 : }
739 :
740 : #[repr(C)]
741 : #[derive(Debug)]
742 : pub struct XlClogTruncate {
743 : pub pageno: u32,
744 : pub oldest_xid: TransactionId,
745 : pub oldest_xid_db: Oid,
746 : }
747 :
748 : impl XlClogTruncate {
749 0 : pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
750 0 : XlClogTruncate {
751 0 : pageno: buf.get_u32_le(),
752 0 : oldest_xid: buf.get_u32_le(),
753 0 : oldest_xid_db: buf.get_u32_le(),
754 0 : }
755 0 : }
756 : }
757 :
758 : #[repr(C)]
759 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
760 : pub struct MultiXactMember {
761 : pub xid: TransactionId,
762 : pub status: MultiXactStatus,
763 : }
764 :
765 : impl MultiXactMember {
766 0 : pub fn decode(buf: &mut Bytes) -> MultiXactMember {
767 0 : MultiXactMember {
768 0 : xid: buf.get_u32_le(),
769 0 : status: buf.get_u32_le(),
770 0 : }
771 0 : }
772 : }
773 :
774 : #[repr(C)]
775 : #[derive(Debug)]
776 : pub struct XlMultiXactCreate {
777 : pub mid: MultiXactId,
778 : /* new MultiXact's ID */
779 : pub moff: MultiXactOffset,
780 : /* its starting offset in members file */
781 : pub nmembers: u32,
782 : /* number of member XIDs */
783 : pub members: Vec<MultiXactMember>,
784 : }
785 :
786 : impl XlMultiXactCreate {
787 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
788 0 : let mid = buf.get_u32_le();
789 0 : let moff = buf.get_u32_le();
790 0 : let nmembers = buf.get_u32_le();
791 0 : let mut members = Vec::new();
792 0 : for _ in 0..nmembers {
793 0 : members.push(MultiXactMember::decode(buf));
794 0 : }
795 0 : XlMultiXactCreate {
796 0 : mid,
797 0 : moff,
798 0 : nmembers,
799 0 : members,
800 0 : }
801 0 : }
802 : }
803 :
804 : #[repr(C)]
805 : #[derive(Debug)]
806 : pub struct XlMultiXactTruncate {
807 : pub oldest_multi_db: Oid,
808 : /* to-be-truncated range of multixact offsets */
809 : pub start_trunc_off: MultiXactId,
810 : /* just for completeness' sake */
811 : pub end_trunc_off: MultiXactId,
812 :
813 : /* to-be-truncated range of multixact members */
814 : pub start_trunc_memb: MultiXactOffset,
815 : pub end_trunc_memb: MultiXactOffset,
816 : }
817 :
818 : impl XlMultiXactTruncate {
819 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
820 0 : XlMultiXactTruncate {
821 0 : oldest_multi_db: buf.get_u32_le(),
822 0 : start_trunc_off: buf.get_u32_le(),
823 0 : end_trunc_off: buf.get_u32_le(),
824 0 : start_trunc_memb: buf.get_u32_le(),
825 0 : end_trunc_memb: buf.get_u32_le(),
826 0 : }
827 0 : }
828 : }
829 :
830 : #[repr(C)]
831 : #[derive(Debug)]
832 : pub struct XlLogicalMessage {
833 : pub db_id: Oid,
834 : pub transactional: bool,
835 : pub prefix_size: usize,
836 : pub message_size: usize,
837 : }
838 :
839 : impl XlLogicalMessage {
840 0 : pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
841 0 : XlLogicalMessage {
842 0 : db_id: buf.get_u32_le(),
843 0 : transactional: buf.get_u32_le() != 0, // 4-bytes alignment
844 0 : prefix_size: buf.get_u64_le() as usize,
845 0 : message_size: buf.get_u64_le() as usize,
846 0 : }
847 0 : }
848 : }
849 :
850 : #[repr(C)]
851 : #[derive(Debug)]
852 : pub struct XlRunningXacts {
853 : pub xcnt: u32,
854 : pub subxcnt: u32,
855 : pub subxid_overflow: bool,
856 : pub next_xid: TransactionId,
857 : pub oldest_running_xid: TransactionId,
858 : pub latest_completed_xid: TransactionId,
859 : pub xids: Vec<TransactionId>,
860 : }
861 :
862 : impl XlRunningXacts {
863 0 : pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
864 0 : let xcnt = buf.get_u32_le();
865 0 : let subxcnt = buf.get_u32_le();
866 0 : let subxid_overflow = buf.get_u32_le() != 0;
867 0 : let next_xid = buf.get_u32_le();
868 0 : let oldest_running_xid = buf.get_u32_le();
869 0 : let latest_completed_xid = buf.get_u32_le();
870 0 : let mut xids = Vec::new();
871 0 : for _ in 0..(xcnt + subxcnt) {
872 0 : xids.push(buf.get_u32_le());
873 0 : }
874 0 : XlRunningXacts {
875 0 : xcnt,
876 0 : subxcnt,
877 0 : subxid_overflow,
878 0 : next_xid,
879 0 : oldest_running_xid,
880 0 : latest_completed_xid,
881 0 : xids,
882 0 : }
883 0 : }
884 : }
885 :
886 : #[repr(C)]
887 : #[derive(Debug)]
888 : pub struct XlReploriginDrop {
889 : pub node_id: RepOriginId,
890 : }
891 :
892 : impl XlReploriginDrop {
893 0 : pub fn decode(buf: &mut Bytes) -> XlReploriginDrop {
894 0 : XlReploriginDrop {
895 0 : node_id: buf.get_u16_le(),
896 0 : }
897 0 : }
898 : }
899 :
900 : #[repr(C)]
901 : #[derive(Debug)]
902 : pub struct XlReploriginSet {
903 : pub remote_lsn: Lsn,
904 : pub node_id: RepOriginId,
905 : }
906 :
907 : impl XlReploriginSet {
908 0 : pub fn decode(buf: &mut Bytes) -> XlReploriginSet {
909 0 : XlReploriginSet {
910 0 : remote_lsn: Lsn(buf.get_u64_le()),
911 0 : node_id: buf.get_u16_le(),
912 0 : }
913 0 : }
914 : }
915 :
916 : /// Main routine to decode a WAL record and figure out which blocks are modified
917 : //
918 : // See xlogrecord.h for details
919 : // The overall layout of an XLOG record is:
920 : // Fixed-size header (XLogRecord struct)
921 : // XLogRecordBlockHeader struct
922 : // If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
923 : // If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
924 : // XLogRecordBlockCompressHeader struct follows.
925 : // If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
926 : // BlockNumber follows
927 : // XLogRecordBlockHeader struct
928 : // ...
929 : // XLogRecordDataHeader[Short|Long] struct
930 : // block data
931 : // block data
932 : // ...
933 : // main data
934 : //
935 : //
936 : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
937 : // It would be more natural for this function to return a DecodedWALRecord as return value,
938 : // but reusing the caller-supplied struct avoids an allocation.
939 : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
940 : //
941 437556 : pub fn decode_wal_record(
942 437556 : record: Bytes,
943 437556 : decoded: &mut DecodedWALRecord,
944 437556 : pg_version: u32,
945 437556 : ) -> Result<()> {
946 437556 : let mut rnode_spcnode: u32 = 0;
947 437556 : let mut rnode_dbnode: u32 = 0;
948 437556 : let mut rnode_relnode: u32 = 0;
949 437556 : let mut got_rnode = false;
950 437556 : let mut origin_id: u16 = 0;
951 437556 :
952 437556 : let mut buf = record.clone();
953 :
954 : // 1. Parse XLogRecord struct
955 :
956 : // FIXME: assume little-endian here
957 437556 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
958 :
959 437556 : trace!(
960 0 : "decode_wal_record xl_rmid = {} xl_info = {}",
961 : xlogrec.xl_rmid,
962 : xlogrec.xl_info
963 : );
964 :
965 437556 : let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
966 437556 :
967 437556 : if buf.remaining() != remaining {
968 0 : //TODO error
969 437556 : }
970 :
971 437556 : let mut max_block_id = 0;
972 437556 : let mut blocks_total_len: u32 = 0;
973 437556 : let mut main_data_len = 0;
974 437556 : let mut datatotal: u32 = 0;
975 437556 : decoded.blocks.clear();
976 :
977 : // 2. Decode the headers.
978 : // XLogRecordBlockHeaders if any,
979 : // XLogRecordDataHeader[Short|Long]
980 1311966 : while buf.remaining() > datatotal as usize {
981 874410 : let block_id = buf.get_u8();
982 874410 :
983 874410 : match block_id {
984 437436 : pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
985 437436 : /* XLogRecordDataHeaderShort */
986 437436 : main_data_len = buf.get_u8() as u32;
987 437436 : datatotal += main_data_len;
988 437436 : }
989 :
990 48 : pg_constants::XLR_BLOCK_ID_DATA_LONG => {
991 48 : /* XLogRecordDataHeaderLong */
992 48 : main_data_len = buf.get_u32_le();
993 48 : datatotal += main_data_len;
994 48 : }
995 :
996 0 : pg_constants::XLR_BLOCK_ID_ORIGIN => {
997 0 : // RepOriginId is uint16
998 0 : origin_id = buf.get_u16_le();
999 0 : }
1000 :
1001 0 : pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
1002 0 : // TransactionId is uint32
1003 0 : buf.advance(4);
1004 0 : }
1005 :
1006 436926 : 0..=pg_constants::XLR_MAX_BLOCK_ID => {
1007 : /* XLogRecordBlockHeader */
1008 436926 : let mut blk = DecodedBkpBlock::new();
1009 436926 :
1010 436926 : if block_id <= max_block_id {
1011 436926 : // TODO
1012 436926 : //report_invalid_record(state,
1013 436926 : // "out-of-order block_id %u at %X/%X",
1014 436926 : // block_id,
1015 436926 : // (uint32) (state->ReadRecPtr >> 32),
1016 436926 : // (uint32) state->ReadRecPtr);
1017 436926 : // goto err;
1018 436926 : }
1019 436926 : max_block_id = block_id;
1020 436926 :
1021 436926 : let fork_flags: u8 = buf.get_u8();
1022 436926 : blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
1023 436926 : blk.flags = fork_flags;
1024 436926 : blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
1025 436926 : blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
1026 436926 : blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
1027 436926 : blk.data_len = buf.get_u16_le();
1028 436926 :
1029 436926 : /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
1030 436926 :
1031 436926 : datatotal += blk.data_len as u32;
1032 436926 : blocks_total_len += blk.data_len as u32;
1033 436926 :
1034 436926 : if blk.has_image {
1035 180 : blk.bimg_len = buf.get_u16_le();
1036 180 : blk.hole_offset = buf.get_u16_le();
1037 180 : blk.bimg_info = buf.get_u8();
1038 :
1039 0 : blk.apply_image = dispatch_pgversion!(
1040 180 : pg_version,
1041 0 : (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
1042 : );
1043 :
1044 180 : let blk_img_is_compressed =
1045 180 : postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version);
1046 180 :
1047 180 : if blk_img_is_compressed {
1048 0 : debug!("compressed block image , pg_version = {}", pg_version);
1049 180 : }
1050 :
1051 180 : if blk_img_is_compressed {
1052 0 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
1053 0 : blk.hole_length = buf.get_u16_le();
1054 0 : } else {
1055 0 : blk.hole_length = 0;
1056 0 : }
1057 180 : } else {
1058 180 : blk.hole_length = BLCKSZ - blk.bimg_len;
1059 180 : }
1060 180 : datatotal += blk.bimg_len as u32;
1061 180 : blocks_total_len += blk.bimg_len as u32;
1062 180 :
1063 180 : /*
1064 180 : * cross-check that hole_offset > 0, hole_length > 0 and
1065 180 : * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1066 180 : */
1067 180 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
1068 108 : && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
1069 0 : {
1070 0 : // TODO
1071 0 : /*
1072 0 : report_invalid_record(state,
1073 0 : "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1074 0 : (unsigned int) blk->hole_offset,
1075 0 : (unsigned int) blk->hole_length,
1076 0 : (unsigned int) blk->bimg_len,
1077 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1078 0 : goto err;
1079 0 : */
1080 180 : }
1081 :
1082 : /*
1083 : * cross-check that hole_offset == 0 and hole_length == 0 if
1084 : * the HAS_HOLE flag is not set.
1085 : */
1086 180 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
1087 72 : && (blk.hole_offset != 0 || blk.hole_length != 0)
1088 0 : {
1089 0 : // TODO
1090 0 : /*
1091 0 : report_invalid_record(state,
1092 0 : "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1093 0 : (unsigned int) blk->hole_offset,
1094 0 : (unsigned int) blk->hole_length,
1095 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1096 0 : goto err;
1097 0 : */
1098 180 : }
1099 :
1100 : /*
1101 : * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1102 : * flag is set.
1103 : */
1104 180 : if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
1105 72 : // TODO
1106 72 : /*
1107 72 : report_invalid_record(state,
1108 72 : "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1109 72 : (unsigned int) blk->bimg_len,
1110 72 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1111 72 : goto err;
1112 72 : */
1113 108 : }
1114 :
1115 : /*
1116 : * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1117 : * IS_COMPRESSED flag is set.
1118 : */
1119 180 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
1120 72 : && !blk_img_is_compressed
1121 72 : && blk.bimg_len != BLCKSZ
1122 0 : {
1123 0 : // TODO
1124 0 : /*
1125 0 : report_invalid_record(state,
1126 0 : "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1127 0 : (unsigned int) blk->data_len,
1128 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1129 0 : goto err;
1130 0 : */
1131 180 : }
1132 436746 : }
1133 436926 : if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
1134 436926 : rnode_spcnode = buf.get_u32_le();
1135 436926 : rnode_dbnode = buf.get_u32_le();
1136 436926 : rnode_relnode = buf.get_u32_le();
1137 436926 : got_rnode = true;
1138 436926 : } else if !got_rnode {
1139 0 : // TODO
1140 0 : /*
1141 0 : report_invalid_record(state,
1142 0 : "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1143 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1144 0 : goto err; */
1145 0 : }
1146 :
1147 436926 : blk.rnode_spcnode = rnode_spcnode;
1148 436926 : blk.rnode_dbnode = rnode_dbnode;
1149 436926 : blk.rnode_relnode = rnode_relnode;
1150 436926 :
1151 436926 : blk.blkno = buf.get_u32_le();
1152 436926 : trace!(
1153 0 : "this record affects {}/{}/{} blk {}",
1154 : rnode_spcnode,
1155 : rnode_dbnode,
1156 : rnode_relnode,
1157 : blk.blkno
1158 : );
1159 :
1160 436926 : decoded.blocks.push(blk);
1161 : }
1162 :
1163 0 : _ => {
1164 0 : // TODO: invalid block_id
1165 0 : }
1166 : }
1167 : }
1168 :
1169 : // 3. Decode blocks.
1170 437556 : let mut ptr = record.len() - buf.remaining();
1171 437556 : for blk in decoded.blocks.iter_mut() {
1172 436926 : if blk.has_image {
1173 180 : blk.bimg_offset = ptr as u32;
1174 180 : ptr += blk.bimg_len as usize;
1175 436746 : }
1176 436926 : if blk.has_data {
1177 436746 : ptr += blk.data_len as usize;
1178 436746 : }
1179 : }
1180 : // We don't need them, so just skip blocks_total_len bytes
1181 437556 : buf.advance(blocks_total_len as usize);
1182 437556 : assert_eq!(ptr, record.len() - buf.remaining());
1183 :
1184 437556 : let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
1185 437556 :
1186 437556 : // 4. Decode main_data
1187 437556 : if main_data_len > 0 {
1188 437484 : assert_eq!(buf.remaining(), main_data_len as usize);
1189 72 : }
1190 :
1191 437556 : decoded.xl_xid = xlogrec.xl_xid;
1192 437556 : decoded.xl_info = xlogrec.xl_info;
1193 437556 : decoded.xl_rmid = xlogrec.xl_rmid;
1194 437556 : decoded.record = record;
1195 437556 : decoded.origin_id = origin_id;
1196 437556 : decoded.main_data_offset = main_data_offset;
1197 437556 :
1198 437556 : Ok(())
1199 437556 : }
1200 :
1201 : ///
1202 : /// Build a human-readable string to describe a WAL record
1203 : ///
1204 : /// For debugging purposes
1205 0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
1206 0 : match rec {
1207 0 : NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
1208 0 : "will_init: {}, {}",
1209 0 : will_init,
1210 0 : describe_postgres_wal_record(rec)?
1211 : )),
1212 0 : _ => Ok(format!("{:?}", rec)),
1213 : }
1214 0 : }
1215 :
1216 0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
1217 0 : // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
1218 0 : // Maybe use the postgres wal redo process, the same used for replaying WAL records?
1219 0 : // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
1220 0 : // without worrying about security?
1221 0 : //
1222 0 : // But for now, we have a hand-written code for a few common WAL record types here.
1223 0 :
1224 0 : let mut buf = record.clone();
1225 :
1226 : // 1. Parse XLogRecord struct
1227 :
1228 : // FIXME: assume little-endian here
1229 0 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
1230 :
1231 : let unknown_str: String;
1232 :
1233 0 : let result: &str = match xlogrec.xl_rmid {
1234 : pg_constants::RM_HEAP2_ID => {
1235 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1236 0 : match info {
1237 0 : pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
1238 0 : pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
1239 : _ => {
1240 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1241 0 : &unknown_str
1242 : }
1243 : }
1244 : }
1245 : pg_constants::RM_HEAP_ID => {
1246 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1247 0 : match info {
1248 0 : pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
1249 0 : pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
1250 0 : pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
1251 0 : pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
1252 : _ => {
1253 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1254 0 : &unknown_str
1255 : }
1256 : }
1257 : }
1258 : pg_constants::RM_XLOG_ID => {
1259 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1260 0 : match info {
1261 0 : pg_constants::XLOG_FPI => "XLOG FPI",
1262 0 : pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
1263 : _ => {
1264 0 : unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
1265 0 : &unknown_str
1266 : }
1267 : }
1268 : }
1269 0 : rmid => {
1270 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1271 0 :
1272 0 : unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
1273 0 : &unknown_str
1274 : }
1275 : };
1276 :
1277 0 : Ok(String::from(result))
1278 0 : }
|