Line data Source code
1 : //!
2 : //! Functions for parsing WAL records.
3 : //!
4 :
5 : use anyhow::Result;
6 : use bytes::{Buf, Bytes};
7 : use postgres_ffi::dispatch_pgversion;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::BLCKSZ;
10 : use postgres_ffi::{BlockNumber, TimestampTz};
11 : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
12 : use postgres_ffi::{RepOriginId, XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
13 : use serde::{Deserialize, Serialize};
14 : use tracing::*;
15 : use utils::{bin_ser::DeserializeError, lsn::Lsn};
16 :
17 : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
18 : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
19 1866 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20 : pub enum NeonWalRecord {
21 : /// Native PostgreSQL WAL record
22 : Postgres { will_init: bool, rec: Bytes },
23 :
24 : /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
25 : ClearVisibilityMapFlags {
26 : new_heap_blkno: Option<u32>,
27 : old_heap_blkno: Option<u32>,
28 : flags: u8,
29 : },
30 : /// Mark transaction IDs as committed on a CLOG page
31 : ClogSetCommitted {
32 : xids: Vec<TransactionId>,
33 : timestamp: TimestampTz,
34 : },
35 : /// Mark transaction IDs as aborted on a CLOG page
36 : ClogSetAborted { xids: Vec<TransactionId> },
37 : /// Extend multixact offsets SLRU
38 : MultixactOffsetCreate {
39 : mid: MultiXactId,
40 : moff: MultiXactOffset,
41 : },
42 : /// Extend multixact members SLRU.
43 : MultixactMembersCreate {
44 : moff: MultiXactOffset,
45 : members: Vec<MultiXactMember>,
46 : },
47 : /// Update the map of AUX files, either writing or dropping an entry
48 : AuxFile {
49 : file_path: String,
50 : content: Option<Bytes>,
51 : },
52 :
53 : /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
54 : #[cfg(test)]
55 : Test {
56 : /// Append a string to the image.
57 : append: String,
58 : /// Clear the image before appending.
59 : clear: bool,
60 : /// Treat this record as an init record. `clear` should be set to true if this field is set
61 : /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
62 : /// its references in `timeline.rs`.
63 : will_init: bool,
64 : },
65 : }
66 :
67 : impl NeonWalRecord {
68 : /// Does replaying this WAL record initialize the page from scratch, or does
69 : /// it need to be applied over the previous image of the page?
70 146646 : pub fn will_init(&self) -> bool {
71 146646 : // If you change this function, you'll also need to change ValueBytes::will_init
72 146646 : match self {
73 145626 : NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
74 : #[cfg(test)]
75 1000 : NeonWalRecord::Test { will_init, .. } => *will_init,
76 : // None of the special neon record types currently initialize the page
77 20 : _ => false,
78 : }
79 146646 : }
80 :
81 : #[cfg(test)]
82 160 : pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
83 160 : Self::Test {
84 160 : append: s.as_ref().to_string(),
85 160 : clear: false,
86 160 : will_init: false,
87 160 : }
88 160 : }
89 :
90 : #[cfg(test)]
91 2 : pub(crate) fn wal_clear() -> Self {
92 2 : Self::Test {
93 2 : append: "".to_string(),
94 2 : clear: true,
95 2 : will_init: false,
96 2 : }
97 2 : }
98 :
99 : #[cfg(test)]
100 10 : pub(crate) fn wal_init() -> Self {
101 10 : Self::Test {
102 10 : append: "".to_string(),
103 10 : clear: true,
104 10 : will_init: true,
105 10 : }
106 10 : }
107 : }
108 :
109 : /// DecodedBkpBlock represents per-page data contained in a WAL record.
110 : #[derive(Default)]
111 : pub struct DecodedBkpBlock {
112 : /* Is this block ref in use? */
113 : //in_use: bool,
114 :
115 : /* Identify the block this refers to */
116 : pub rnode_spcnode: u32,
117 : pub rnode_dbnode: u32,
118 : pub rnode_relnode: u32,
119 : // Note that we have a few special forknum values for non-rel files.
120 : pub forknum: u8,
121 : pub blkno: u32,
122 :
123 : /* copy of the fork_flags field from the XLogRecordBlockHeader */
124 : pub flags: u8,
125 :
126 : /* Information on full-page image, if any */
127 : pub has_image: bool,
128 : /* has image, even for consistency checking */
129 : pub apply_image: bool,
130 : /* has image that should be restored */
131 : pub will_init: bool,
132 : /* record doesn't need previous page version to apply */
133 : //char *bkp_image;
134 : pub hole_offset: u16,
135 : pub hole_length: u16,
136 : pub bimg_offset: u32,
137 : pub bimg_len: u16,
138 : pub bimg_info: u8,
139 :
140 : /* Buffer holding the rmgr-specific data associated with this block */
141 : has_data: bool,
142 : data_len: u16,
143 : }
144 :
145 : impl DecodedBkpBlock {
146 145642 : pub fn new() -> DecodedBkpBlock {
147 145642 : Default::default()
148 145642 : }
149 : }
150 :
151 : #[derive(Default)]
152 : pub struct DecodedWALRecord {
153 : pub xl_xid: TransactionId,
154 : pub xl_info: u8,
155 : pub xl_rmid: u8,
156 : pub record: Bytes, // raw XLogRecord
157 :
158 : pub blocks: Vec<DecodedBkpBlock>,
159 : pub main_data_offset: usize,
160 : pub origin_id: u16,
161 : }
162 :
163 : impl DecodedWALRecord {
164 : /// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
165 : /// by reading other existing relations' data blocks. This is more complex to apply than new-style database
166 : /// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
167 145852 : pub(crate) fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
168 145852 : if self.xl_rmid == pg_constants::RM_DBASE_ID {
169 0 : let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
170 0 : match pg_version {
171 : 14 => {
172 : // Postgres 14 database creations are always the legacy kind
173 0 : info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
174 : }
175 0 : 15 => info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
176 0 : 16 => info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
177 0 : 17 => info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY,
178 : _ => {
179 0 : panic!("Unsupported postgres version {pg_version}")
180 : }
181 : }
182 : } else {
183 145852 : false
184 : }
185 145852 : }
186 : }
187 :
188 : #[repr(C)]
189 : #[derive(Debug, Clone, Copy)]
190 : pub struct RelFileNode {
191 : pub spcnode: Oid, /* tablespace */
192 : pub dbnode: Oid, /* database */
193 : pub relnode: Oid, /* relation */
194 : }
195 :
196 : #[repr(C)]
197 : #[derive(Debug)]
198 : pub struct XlRelmapUpdate {
199 : pub dbid: Oid, /* database ID, or 0 for shared map */
200 : pub tsid: Oid, /* database's tablespace, or pg_global */
201 : pub nbytes: i32, /* size of relmap data */
202 : }
203 :
204 : impl XlRelmapUpdate {
205 0 : pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
206 0 : XlRelmapUpdate {
207 0 : dbid: buf.get_u32_le(),
208 0 : tsid: buf.get_u32_le(),
209 0 : nbytes: buf.get_i32_le(),
210 0 : }
211 0 : }
212 : }
213 :
214 : pub mod v14 {
215 : use bytes::{Buf, Bytes};
216 : use postgres_ffi::{OffsetNumber, TransactionId};
217 :
218 : #[repr(C)]
219 : #[derive(Debug)]
220 : pub struct XlHeapInsert {
221 : pub offnum: OffsetNumber,
222 : pub flags: u8,
223 : }
224 :
225 : impl XlHeapInsert {
226 145276 : pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
227 145276 : XlHeapInsert {
228 145276 : offnum: buf.get_u16_le(),
229 145276 : flags: buf.get_u8(),
230 145276 : }
231 145276 : }
232 : }
233 :
234 : #[repr(C)]
235 : #[derive(Debug)]
236 : pub struct XlHeapMultiInsert {
237 : pub flags: u8,
238 : pub _padding: u8,
239 : pub ntuples: u16,
240 : }
241 :
242 : impl XlHeapMultiInsert {
243 42 : pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
244 42 : XlHeapMultiInsert {
245 42 : flags: buf.get_u8(),
246 42 : _padding: buf.get_u8(),
247 42 : ntuples: buf.get_u16_le(),
248 42 : }
249 42 : }
250 : }
251 :
252 : #[repr(C)]
253 : #[derive(Debug)]
254 : pub struct XlHeapDelete {
255 : pub xmax: TransactionId,
256 : pub offnum: OffsetNumber,
257 : pub _padding: u16,
258 : pub t_cid: u32,
259 : pub infobits_set: u8,
260 : pub flags: u8,
261 : }
262 :
263 : impl XlHeapDelete {
264 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
265 0 : XlHeapDelete {
266 0 : xmax: buf.get_u32_le(),
267 0 : offnum: buf.get_u16_le(),
268 0 : _padding: buf.get_u16_le(),
269 0 : t_cid: buf.get_u32_le(),
270 0 : infobits_set: buf.get_u8(),
271 0 : flags: buf.get_u8(),
272 0 : }
273 0 : }
274 : }
275 :
276 : #[repr(C)]
277 : #[derive(Debug)]
278 : pub struct XlHeapUpdate {
279 : pub old_xmax: TransactionId,
280 : pub old_offnum: OffsetNumber,
281 : pub old_infobits_set: u8,
282 : pub flags: u8,
283 : pub t_cid: u32,
284 : pub new_xmax: TransactionId,
285 : pub new_offnum: OffsetNumber,
286 : }
287 :
288 : impl XlHeapUpdate {
289 8 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
290 8 : XlHeapUpdate {
291 8 : old_xmax: buf.get_u32_le(),
292 8 : old_offnum: buf.get_u16_le(),
293 8 : old_infobits_set: buf.get_u8(),
294 8 : flags: buf.get_u8(),
295 8 : t_cid: buf.get_u32_le(),
296 8 : new_xmax: buf.get_u32_le(),
297 8 : new_offnum: buf.get_u16_le(),
298 8 : }
299 8 : }
300 : }
301 :
302 : #[repr(C)]
303 : #[derive(Debug)]
304 : pub struct XlHeapLock {
305 : pub locking_xid: TransactionId,
306 : pub offnum: OffsetNumber,
307 : pub _padding: u16,
308 : pub t_cid: u32,
309 : pub infobits_set: u8,
310 : pub flags: u8,
311 : }
312 :
313 : impl XlHeapLock {
314 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
315 0 : XlHeapLock {
316 0 : locking_xid: buf.get_u32_le(),
317 0 : offnum: buf.get_u16_le(),
318 0 : _padding: buf.get_u16_le(),
319 0 : t_cid: buf.get_u32_le(),
320 0 : infobits_set: buf.get_u8(),
321 0 : flags: buf.get_u8(),
322 0 : }
323 0 : }
324 : }
325 :
326 : #[repr(C)]
327 : #[derive(Debug)]
328 : pub struct XlHeapLockUpdated {
329 : pub xmax: TransactionId,
330 : pub offnum: OffsetNumber,
331 : pub infobits_set: u8,
332 : pub flags: u8,
333 : }
334 :
335 : impl XlHeapLockUpdated {
336 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
337 0 : XlHeapLockUpdated {
338 0 : xmax: buf.get_u32_le(),
339 0 : offnum: buf.get_u16_le(),
340 0 : infobits_set: buf.get_u8(),
341 0 : flags: buf.get_u8(),
342 0 : }
343 0 : }
344 : }
345 :
346 : #[repr(C)]
347 : #[derive(Debug)]
348 : pub struct XlParameterChange {
349 : pub max_connections: i32,
350 : pub max_worker_processes: i32,
351 : pub max_wal_senders: i32,
352 : pub max_prepared_xacts: i32,
353 : pub max_locks_per_xact: i32,
354 : pub wal_level: i32,
355 : pub wal_log_hints: bool,
356 : pub track_commit_timestamp: bool,
357 : pub _padding: [u8; 2],
358 : }
359 :
360 : impl XlParameterChange {
361 0 : pub fn decode(buf: &mut Bytes) -> XlParameterChange {
362 0 : XlParameterChange {
363 0 : max_connections: buf.get_i32_le(),
364 0 : max_worker_processes: buf.get_i32_le(),
365 0 : max_wal_senders: buf.get_i32_le(),
366 0 : max_prepared_xacts: buf.get_i32_le(),
367 0 : max_locks_per_xact: buf.get_i32_le(),
368 0 : wal_level: buf.get_i32_le(),
369 0 : wal_log_hints: buf.get_u8() != 0,
370 0 : track_commit_timestamp: buf.get_u8() != 0,
371 0 : _padding: [buf.get_u8(), buf.get_u8()],
372 0 : }
373 0 : }
374 : }
375 : }
376 :
377 : pub mod v15 {
378 : pub use super::v14::{
379 : XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
380 : XlParameterChange,
381 : };
382 : }
383 :
384 : pub mod v16 {
385 : pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
386 : use bytes::{Buf, Bytes};
387 : use postgres_ffi::{OffsetNumber, TransactionId};
388 :
389 : pub struct XlHeapDelete {
390 : pub xmax: TransactionId,
391 : pub offnum: OffsetNumber,
392 : pub infobits_set: u8,
393 : pub flags: u8,
394 : }
395 :
396 : impl XlHeapDelete {
397 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
398 0 : XlHeapDelete {
399 0 : xmax: buf.get_u32_le(),
400 0 : offnum: buf.get_u16_le(),
401 0 : infobits_set: buf.get_u8(),
402 0 : flags: buf.get_u8(),
403 0 : }
404 0 : }
405 : }
406 :
407 : #[repr(C)]
408 : #[derive(Debug)]
409 : pub struct XlHeapUpdate {
410 : pub old_xmax: TransactionId,
411 : pub old_offnum: OffsetNumber,
412 : pub old_infobits_set: u8,
413 : pub flags: u8,
414 : pub new_xmax: TransactionId,
415 : pub new_offnum: OffsetNumber,
416 : }
417 :
418 : impl XlHeapUpdate {
419 0 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
420 0 : XlHeapUpdate {
421 0 : old_xmax: buf.get_u32_le(),
422 0 : old_offnum: buf.get_u16_le(),
423 0 : old_infobits_set: buf.get_u8(),
424 0 : flags: buf.get_u8(),
425 0 : new_xmax: buf.get_u32_le(),
426 0 : new_offnum: buf.get_u16_le(),
427 0 : }
428 0 : }
429 : }
430 :
431 : #[repr(C)]
432 : #[derive(Debug)]
433 : pub struct XlHeapLock {
434 : pub locking_xid: TransactionId,
435 : pub offnum: OffsetNumber,
436 : pub infobits_set: u8,
437 : pub flags: u8,
438 : }
439 :
440 : impl XlHeapLock {
441 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
442 0 : XlHeapLock {
443 0 : locking_xid: buf.get_u32_le(),
444 0 : offnum: buf.get_u16_le(),
445 0 : infobits_set: buf.get_u8(),
446 0 : flags: buf.get_u8(),
447 0 : }
448 0 : }
449 : }
450 :
451 : /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
452 : pub mod rm_neon {
453 : use bytes::{Buf, Bytes};
454 : use postgres_ffi::{OffsetNumber, TransactionId};
455 :
456 : #[repr(C)]
457 : #[derive(Debug)]
458 : pub struct XlNeonHeapInsert {
459 : pub offnum: OffsetNumber,
460 : pub flags: u8,
461 : }
462 :
463 : impl XlNeonHeapInsert {
464 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
465 0 : XlNeonHeapInsert {
466 0 : offnum: buf.get_u16_le(),
467 0 : flags: buf.get_u8(),
468 0 : }
469 0 : }
470 : }
471 :
472 : #[repr(C)]
473 : #[derive(Debug)]
474 : pub struct XlNeonHeapMultiInsert {
475 : pub flags: u8,
476 : pub _padding: u8,
477 : pub ntuples: u16,
478 : pub t_cid: u32,
479 : }
480 :
481 : impl XlNeonHeapMultiInsert {
482 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
483 0 : XlNeonHeapMultiInsert {
484 0 : flags: buf.get_u8(),
485 0 : _padding: buf.get_u8(),
486 0 : ntuples: buf.get_u16_le(),
487 0 : t_cid: buf.get_u32_le(),
488 0 : }
489 0 : }
490 : }
491 :
492 : #[repr(C)]
493 : #[derive(Debug)]
494 : pub struct XlNeonHeapDelete {
495 : pub xmax: TransactionId,
496 : pub offnum: OffsetNumber,
497 : pub infobits_set: u8,
498 : pub flags: u8,
499 : pub t_cid: u32,
500 : }
501 :
502 : impl XlNeonHeapDelete {
503 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
504 0 : XlNeonHeapDelete {
505 0 : xmax: buf.get_u32_le(),
506 0 : offnum: buf.get_u16_le(),
507 0 : infobits_set: buf.get_u8(),
508 0 : flags: buf.get_u8(),
509 0 : t_cid: buf.get_u32_le(),
510 0 : }
511 0 : }
512 : }
513 :
514 : #[repr(C)]
515 : #[derive(Debug)]
516 : pub struct XlNeonHeapUpdate {
517 : pub old_xmax: TransactionId,
518 : pub old_offnum: OffsetNumber,
519 : pub old_infobits_set: u8,
520 : pub flags: u8,
521 : pub t_cid: u32,
522 : pub new_xmax: TransactionId,
523 : pub new_offnum: OffsetNumber,
524 : }
525 :
526 : impl XlNeonHeapUpdate {
527 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
528 0 : XlNeonHeapUpdate {
529 0 : old_xmax: buf.get_u32_le(),
530 0 : old_offnum: buf.get_u16_le(),
531 0 : old_infobits_set: buf.get_u8(),
532 0 : flags: buf.get_u8(),
533 0 : t_cid: buf.get_u32(),
534 0 : new_xmax: buf.get_u32_le(),
535 0 : new_offnum: buf.get_u16_le(),
536 0 : }
537 0 : }
538 : }
539 :
540 : #[repr(C)]
541 : #[derive(Debug)]
542 : pub struct XlNeonHeapLock {
543 : pub locking_xid: TransactionId,
544 : pub t_cid: u32,
545 : pub offnum: OffsetNumber,
546 : pub infobits_set: u8,
547 : pub flags: u8,
548 : }
549 :
550 : impl XlNeonHeapLock {
551 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
552 0 : XlNeonHeapLock {
553 0 : locking_xid: buf.get_u32_le(),
554 0 : t_cid: buf.get_u32_le(),
555 0 : offnum: buf.get_u16_le(),
556 0 : infobits_set: buf.get_u8(),
557 0 : flags: buf.get_u8(),
558 0 : }
559 0 : }
560 : }
561 : }
562 : }
563 :
564 : pub mod v17 {
565 : pub use super::v14::XlHeapLockUpdated;
566 : use bytes::{Buf, Bytes};
567 : pub use postgres_ffi::{TimeLineID, TimestampTz};
568 :
569 : pub use super::v16::rm_neon;
570 : pub use super::v16::{
571 : XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
572 : };
573 :
574 : #[repr(C)]
575 : #[derive(Debug)]
576 : pub struct XlEndOfRecovery {
577 : pub end_time: TimestampTz,
578 : pub this_time_line_id: TimeLineID,
579 : pub prev_time_line_id: TimeLineID,
580 : pub wal_level: i32,
581 : }
582 :
583 : impl XlEndOfRecovery {
584 0 : pub fn decode(buf: &mut Bytes) -> XlEndOfRecovery {
585 0 : XlEndOfRecovery {
586 0 : end_time: buf.get_i64_le(),
587 0 : this_time_line_id: buf.get_u32_le(),
588 0 : prev_time_line_id: buf.get_u32_le(),
589 0 : wal_level: buf.get_i32_le(),
590 0 : }
591 0 : }
592 : }
593 : }
594 :
595 : #[repr(C)]
596 : #[derive(Debug)]
597 : pub struct XlSmgrCreate {
598 : pub rnode: RelFileNode,
599 : // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
600 : // well-defined size?
601 : pub forknum: u8,
602 : }
603 :
604 : impl XlSmgrCreate {
605 16 : pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
606 16 : XlSmgrCreate {
607 16 : rnode: RelFileNode {
608 16 : spcnode: buf.get_u32_le(), /* tablespace */
609 16 : dbnode: buf.get_u32_le(), /* database */
610 16 : relnode: buf.get_u32_le(), /* relation */
611 16 : },
612 16 : forknum: buf.get_u32_le() as u8,
613 16 : }
614 16 : }
615 : }
616 :
617 : #[repr(C)]
618 : #[derive(Debug)]
619 : pub struct XlSmgrTruncate {
620 : pub blkno: BlockNumber,
621 : pub rnode: RelFileNode,
622 : pub flags: u32,
623 : }
624 :
625 : impl XlSmgrTruncate {
626 0 : pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
627 0 : XlSmgrTruncate {
628 0 : blkno: buf.get_u32_le(),
629 0 : rnode: RelFileNode {
630 0 : spcnode: buf.get_u32_le(), /* tablespace */
631 0 : dbnode: buf.get_u32_le(), /* database */
632 0 : relnode: buf.get_u32_le(), /* relation */
633 0 : },
634 0 : flags: buf.get_u32_le(),
635 0 : }
636 0 : }
637 : }
638 :
639 : #[repr(C)]
640 : #[derive(Debug)]
641 : pub struct XlCreateDatabase {
642 : pub db_id: Oid,
643 : pub tablespace_id: Oid,
644 : pub src_db_id: Oid,
645 : pub src_tablespace_id: Oid,
646 : }
647 :
648 : impl XlCreateDatabase {
649 0 : pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
650 0 : XlCreateDatabase {
651 0 : db_id: buf.get_u32_le(),
652 0 : tablespace_id: buf.get_u32_le(),
653 0 : src_db_id: buf.get_u32_le(),
654 0 : src_tablespace_id: buf.get_u32_le(),
655 0 : }
656 0 : }
657 : }
658 :
659 : #[repr(C)]
660 : #[derive(Debug)]
661 : pub struct XlDropDatabase {
662 : pub db_id: Oid,
663 : pub n_tablespaces: Oid, /* number of tablespace IDs */
664 : pub tablespace_ids: Vec<Oid>,
665 : }
666 :
667 : impl XlDropDatabase {
668 0 : pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
669 0 : let mut rec = XlDropDatabase {
670 0 : db_id: buf.get_u32_le(),
671 0 : n_tablespaces: buf.get_u32_le(),
672 0 : tablespace_ids: Vec::<Oid>::new(),
673 0 : };
674 :
675 0 : for _i in 0..rec.n_tablespaces {
676 0 : let id = buf.get_u32_le();
677 0 : rec.tablespace_ids.push(id);
678 0 : }
679 :
680 0 : rec
681 0 : }
682 : }
683 :
684 : ///
685 : /// Note: Parsing some fields is missing, because they're not needed.
686 : ///
687 : /// This is similar to the xl_xact_parsed_commit and
688 : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
689 : /// struct for commits and aborts.
690 : ///
691 : #[derive(Debug)]
692 : pub struct XlXactParsedRecord {
693 : pub xid: TransactionId,
694 : pub info: u8,
695 : pub xact_time: TimestampTz,
696 : pub xinfo: u32,
697 :
698 : pub db_id: Oid,
699 : /* MyDatabaseId */
700 : pub ts_id: Oid,
701 : /* MyDatabaseTableSpace */
702 : pub subxacts: Vec<TransactionId>,
703 :
704 : pub xnodes: Vec<RelFileNode>,
705 : pub origin_lsn: Lsn,
706 : }
707 :
708 : impl XlXactParsedRecord {
709 : /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
710 : /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
711 : /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
712 8 : pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
713 8 : let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
714 8 : // The record starts with time of commit/abort
715 8 : let xact_time = buf.get_i64_le();
716 8 : let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
717 8 : buf.get_u32_le()
718 : } else {
719 0 : 0
720 : };
721 : let db_id;
722 : let ts_id;
723 8 : if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
724 8 : db_id = buf.get_u32_le();
725 8 : ts_id = buf.get_u32_le();
726 8 : } else {
727 0 : db_id = 0;
728 0 : ts_id = 0;
729 0 : }
730 8 : let mut subxacts = Vec::<TransactionId>::new();
731 8 : if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
732 0 : let nsubxacts = buf.get_i32_le();
733 0 : for _i in 0..nsubxacts {
734 0 : let subxact = buf.get_u32_le();
735 0 : subxacts.push(subxact);
736 0 : }
737 8 : }
738 8 : let mut xnodes = Vec::<RelFileNode>::new();
739 8 : if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
740 0 : let nrels = buf.get_i32_le();
741 0 : for _i in 0..nrels {
742 0 : let spcnode = buf.get_u32_le();
743 0 : let dbnode = buf.get_u32_le();
744 0 : let relnode = buf.get_u32_le();
745 0 : trace!(
746 0 : "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
747 : spcnode,
748 : dbnode,
749 : relnode
750 : );
751 0 : xnodes.push(RelFileNode {
752 0 : spcnode,
753 0 : dbnode,
754 0 : relnode,
755 0 : });
756 : }
757 8 : }
758 :
759 8 : if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
760 0 : let nitems = buf.get_i32_le();
761 0 : debug!(
762 0 : "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
763 : nitems
764 : );
765 0 : let sizeof_xl_xact_stats_item = 12;
766 0 : buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
767 8 : }
768 :
769 8 : if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
770 8 : let nmsgs = buf.get_i32_le();
771 8 : let sizeof_shared_invalidation_message = 16;
772 8 : buf.advance(
773 8 : (nmsgs * sizeof_shared_invalidation_message)
774 8 : .try_into()
775 8 : .unwrap(),
776 8 : );
777 8 : }
778 :
779 8 : if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
780 0 : xid = buf.get_u32_le();
781 0 : debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
782 8 : }
783 :
784 8 : let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 {
785 0 : Lsn(buf.get_u64_le())
786 : } else {
787 8 : Lsn::INVALID
788 : };
789 8 : XlXactParsedRecord {
790 8 : xid,
791 8 : info,
792 8 : xact_time,
793 8 : xinfo,
794 8 : db_id,
795 8 : ts_id,
796 8 : subxacts,
797 8 : xnodes,
798 8 : origin_lsn,
799 8 : }
800 8 : }
801 : }
802 :
803 : #[repr(C)]
804 : #[derive(Debug)]
805 : pub struct XlClogTruncate {
806 : pub pageno: u32,
807 : pub oldest_xid: TransactionId,
808 : pub oldest_xid_db: Oid,
809 : }
810 :
811 : impl XlClogTruncate {
812 0 : pub fn decode(buf: &mut Bytes, pg_version: u32) -> XlClogTruncate {
813 0 : XlClogTruncate {
814 0 : pageno: if pg_version < 17 {
815 0 : buf.get_u32_le()
816 : } else {
817 0 : buf.get_u64_le() as u32
818 : },
819 0 : oldest_xid: buf.get_u32_le(),
820 0 : oldest_xid_db: buf.get_u32_le(),
821 0 : }
822 0 : }
823 : }
824 :
825 : #[repr(C)]
826 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
827 : pub struct MultiXactMember {
828 : pub xid: TransactionId,
829 : pub status: MultiXactStatus,
830 : }
831 :
832 : impl MultiXactMember {
833 0 : pub fn decode(buf: &mut Bytes) -> MultiXactMember {
834 0 : MultiXactMember {
835 0 : xid: buf.get_u32_le(),
836 0 : status: buf.get_u32_le(),
837 0 : }
838 0 : }
839 : }
840 :
841 : #[repr(C)]
842 : #[derive(Debug)]
843 : pub struct XlMultiXactCreate {
844 : pub mid: MultiXactId,
845 : /* new MultiXact's ID */
846 : pub moff: MultiXactOffset,
847 : /* its starting offset in members file */
848 : pub nmembers: u32,
849 : /* number of member XIDs */
850 : pub members: Vec<MultiXactMember>,
851 : }
852 :
853 : impl XlMultiXactCreate {
854 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
855 0 : let mid = buf.get_u32_le();
856 0 : let moff = buf.get_u32_le();
857 0 : let nmembers = buf.get_u32_le();
858 0 : let mut members = Vec::new();
859 0 : for _ in 0..nmembers {
860 0 : members.push(MultiXactMember::decode(buf));
861 0 : }
862 0 : XlMultiXactCreate {
863 0 : mid,
864 0 : moff,
865 0 : nmembers,
866 0 : members,
867 0 : }
868 0 : }
869 : }
870 :
871 : #[repr(C)]
872 : #[derive(Debug)]
873 : pub struct XlMultiXactTruncate {
874 : pub oldest_multi_db: Oid,
875 : /* to-be-truncated range of multixact offsets */
876 : pub start_trunc_off: MultiXactId,
877 : /* just for completeness' sake */
878 : pub end_trunc_off: MultiXactId,
879 :
880 : /* to-be-truncated range of multixact members */
881 : pub start_trunc_memb: MultiXactOffset,
882 : pub end_trunc_memb: MultiXactOffset,
883 : }
884 :
885 : impl XlMultiXactTruncate {
886 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
887 0 : XlMultiXactTruncate {
888 0 : oldest_multi_db: buf.get_u32_le(),
889 0 : start_trunc_off: buf.get_u32_le(),
890 0 : end_trunc_off: buf.get_u32_le(),
891 0 : start_trunc_memb: buf.get_u32_le(),
892 0 : end_trunc_memb: buf.get_u32_le(),
893 0 : }
894 0 : }
895 : }
896 :
897 : #[repr(C)]
898 : #[derive(Debug)]
899 : pub struct XlLogicalMessage {
900 : pub db_id: Oid,
901 : pub transactional: bool,
902 : pub prefix_size: usize,
903 : pub message_size: usize,
904 : }
905 :
906 : impl XlLogicalMessage {
907 0 : pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
908 0 : XlLogicalMessage {
909 0 : db_id: buf.get_u32_le(),
910 0 : transactional: buf.get_u32_le() != 0, // 4-bytes alignment
911 0 : prefix_size: buf.get_u64_le() as usize,
912 0 : message_size: buf.get_u64_le() as usize,
913 0 : }
914 0 : }
915 : }
916 :
917 : #[repr(C)]
918 : #[derive(Debug)]
919 : pub struct XlRunningXacts {
920 : pub xcnt: u32,
921 : pub subxcnt: u32,
922 : pub subxid_overflow: bool,
923 : pub next_xid: TransactionId,
924 : pub oldest_running_xid: TransactionId,
925 : pub latest_completed_xid: TransactionId,
926 : pub xids: Vec<TransactionId>,
927 : }
928 :
929 : impl XlRunningXacts {
930 0 : pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
931 0 : let xcnt = buf.get_u32_le();
932 0 : let subxcnt = buf.get_u32_le();
933 0 : let subxid_overflow = buf.get_u32_le() != 0;
934 0 : let next_xid = buf.get_u32_le();
935 0 : let oldest_running_xid = buf.get_u32_le();
936 0 : let latest_completed_xid = buf.get_u32_le();
937 0 : let mut xids = Vec::new();
938 0 : for _ in 0..(xcnt + subxcnt) {
939 0 : xids.push(buf.get_u32_le());
940 0 : }
941 0 : XlRunningXacts {
942 0 : xcnt,
943 0 : subxcnt,
944 0 : subxid_overflow,
945 0 : next_xid,
946 0 : oldest_running_xid,
947 0 : latest_completed_xid,
948 0 : xids,
949 0 : }
950 0 : }
951 : }
952 :
953 : #[repr(C)]
954 : #[derive(Debug)]
955 : pub struct XlReploriginDrop {
956 : pub node_id: RepOriginId,
957 : }
958 :
959 : impl XlReploriginDrop {
960 0 : pub fn decode(buf: &mut Bytes) -> XlReploriginDrop {
961 0 : XlReploriginDrop {
962 0 : node_id: buf.get_u16_le(),
963 0 : }
964 0 : }
965 : }
966 :
967 : #[repr(C)]
968 : #[derive(Debug)]
969 : pub struct XlReploriginSet {
970 : pub remote_lsn: Lsn,
971 : pub node_id: RepOriginId,
972 : }
973 :
974 : impl XlReploriginSet {
975 0 : pub fn decode(buf: &mut Bytes) -> XlReploriginSet {
976 0 : XlReploriginSet {
977 0 : remote_lsn: Lsn(buf.get_u64_le()),
978 0 : node_id: buf.get_u16_le(),
979 0 : }
980 0 : }
981 : }
982 :
983 : /// Main routine to decode a WAL record and figure out which blocks are modified
984 : //
985 : // See xlogrecord.h for details
986 : // The overall layout of an XLOG record is:
987 : // Fixed-size header (XLogRecord struct)
988 : // XLogRecordBlockHeader struct
989 : // If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
990 : // If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
991 : // XLogRecordBlockCompressHeader struct follows.
992 : // If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
993 : // BlockNumber follows
994 : // XLogRecordBlockHeader struct
995 : // ...
996 : // XLogRecordDataHeader[Short|Long] struct
997 : // block data
998 : // block data
999 : // ...
1000 : // main data
1001 : //
1002 : //
1003 : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
1004 : // It would be more natural for this function to return a DecodedWALRecord as return value,
1005 : // but reusing the caller-supplied struct avoids an allocation.
1006 : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
1007 : //
1008 145852 : pub fn decode_wal_record(
1009 145852 : record: Bytes,
1010 145852 : decoded: &mut DecodedWALRecord,
1011 145852 : pg_version: u32,
1012 145852 : ) -> Result<()> {
1013 145852 : let mut rnode_spcnode: u32 = 0;
1014 145852 : let mut rnode_dbnode: u32 = 0;
1015 145852 : let mut rnode_relnode: u32 = 0;
1016 145852 : let mut got_rnode = false;
1017 145852 : let mut origin_id: u16 = 0;
1018 145852 :
1019 145852 : let mut buf = record.clone();
1020 :
1021 : // 1. Parse XLogRecord struct
1022 :
1023 : // FIXME: assume little-endian here
1024 145852 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
1025 :
1026 145852 : trace!(
1027 0 : "decode_wal_record xl_rmid = {} xl_info = {}",
1028 : xlogrec.xl_rmid,
1029 : xlogrec.xl_info
1030 : );
1031 :
1032 145852 : let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
1033 145852 :
1034 145852 : if buf.remaining() != remaining {
1035 0 : //TODO error
1036 145852 : }
1037 :
1038 145852 : let mut max_block_id = 0;
1039 145852 : let mut blocks_total_len: u32 = 0;
1040 145852 : let mut main_data_len = 0;
1041 145852 : let mut datatotal: u32 = 0;
1042 145852 : decoded.blocks.clear();
1043 :
1044 : // 2. Decode the headers.
1045 : // XLogRecordBlockHeaders if any,
1046 : // XLogRecordDataHeader[Short|Long]
1047 437322 : while buf.remaining() > datatotal as usize {
1048 291470 : let block_id = buf.get_u8();
1049 291470 :
1050 291470 : match block_id {
1051 145812 : pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
1052 145812 : /* XLogRecordDataHeaderShort */
1053 145812 : main_data_len = buf.get_u8() as u32;
1054 145812 : datatotal += main_data_len;
1055 145812 : }
1056 :
1057 16 : pg_constants::XLR_BLOCK_ID_DATA_LONG => {
1058 16 : /* XLogRecordDataHeaderLong */
1059 16 : main_data_len = buf.get_u32_le();
1060 16 : datatotal += main_data_len;
1061 16 : }
1062 :
1063 0 : pg_constants::XLR_BLOCK_ID_ORIGIN => {
1064 0 : // RepOriginId is uint16
1065 0 : origin_id = buf.get_u16_le();
1066 0 : }
1067 :
1068 0 : pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
1069 0 : // TransactionId is uint32
1070 0 : buf.advance(4);
1071 0 : }
1072 :
1073 145642 : 0..=pg_constants::XLR_MAX_BLOCK_ID => {
1074 : /* XLogRecordBlockHeader */
1075 145642 : let mut blk = DecodedBkpBlock::new();
1076 145642 :
1077 145642 : if block_id <= max_block_id {
1078 145642 : // TODO
1079 145642 : //report_invalid_record(state,
1080 145642 : // "out-of-order block_id %u at %X/%X",
1081 145642 : // block_id,
1082 145642 : // (uint32) (state->ReadRecPtr >> 32),
1083 145642 : // (uint32) state->ReadRecPtr);
1084 145642 : // goto err;
1085 145642 : }
1086 145642 : max_block_id = block_id;
1087 145642 :
1088 145642 : let fork_flags: u8 = buf.get_u8();
1089 145642 : blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
1090 145642 : blk.flags = fork_flags;
1091 145642 : blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
1092 145642 : blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
1093 145642 : blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
1094 145642 : blk.data_len = buf.get_u16_le();
1095 145642 :
1096 145642 : /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
1097 145642 :
1098 145642 : datatotal += blk.data_len as u32;
1099 145642 : blocks_total_len += blk.data_len as u32;
1100 145642 :
1101 145642 : if blk.has_image {
1102 60 : blk.bimg_len = buf.get_u16_le();
1103 60 : blk.hole_offset = buf.get_u16_le();
1104 60 : blk.bimg_info = buf.get_u8();
1105 :
1106 0 : blk.apply_image = dispatch_pgversion!(
1107 60 : pg_version,
1108 0 : (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
1109 : );
1110 :
1111 60 : let blk_img_is_compressed =
1112 60 : postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version);
1113 60 :
1114 60 : if blk_img_is_compressed {
1115 0 : debug!("compressed block image , pg_version = {}", pg_version);
1116 60 : }
1117 :
1118 60 : if blk_img_is_compressed {
1119 0 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
1120 0 : blk.hole_length = buf.get_u16_le();
1121 0 : } else {
1122 0 : blk.hole_length = 0;
1123 0 : }
1124 60 : } else {
1125 60 : blk.hole_length = BLCKSZ - blk.bimg_len;
1126 60 : }
1127 60 : datatotal += blk.bimg_len as u32;
1128 60 : blocks_total_len += blk.bimg_len as u32;
1129 60 :
1130 60 : /*
1131 60 : * cross-check that hole_offset > 0, hole_length > 0 and
1132 60 : * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1133 60 : */
1134 60 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
1135 36 : && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
1136 0 : {
1137 0 : // TODO
1138 0 : /*
1139 0 : report_invalid_record(state,
1140 0 : "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1141 0 : (unsigned int) blk->hole_offset,
1142 0 : (unsigned int) blk->hole_length,
1143 0 : (unsigned int) blk->bimg_len,
1144 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1145 0 : goto err;
1146 0 : */
1147 60 : }
1148 :
1149 : /*
1150 : * cross-check that hole_offset == 0 and hole_length == 0 if
1151 : * the HAS_HOLE flag is not set.
1152 : */
1153 60 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
1154 24 : && (blk.hole_offset != 0 || blk.hole_length != 0)
1155 0 : {
1156 0 : // TODO
1157 0 : /*
1158 0 : report_invalid_record(state,
1159 0 : "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1160 0 : (unsigned int) blk->hole_offset,
1161 0 : (unsigned int) blk->hole_length,
1162 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1163 0 : goto err;
1164 0 : */
1165 60 : }
1166 :
1167 : /*
1168 : * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1169 : * flag is set.
1170 : */
1171 60 : if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
1172 24 : // TODO
1173 24 : /*
1174 24 : report_invalid_record(state,
1175 24 : "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1176 24 : (unsigned int) blk->bimg_len,
1177 24 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1178 24 : goto err;
1179 24 : */
1180 36 : }
1181 :
1182 : /*
1183 : * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1184 : * IS_COMPRESSED flag is set.
1185 : */
1186 60 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
1187 24 : && !blk_img_is_compressed
1188 24 : && blk.bimg_len != BLCKSZ
1189 0 : {
1190 0 : // TODO
1191 0 : /*
1192 0 : report_invalid_record(state,
1193 0 : "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1194 0 : (unsigned int) blk->data_len,
1195 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1196 0 : goto err;
1197 0 : */
1198 60 : }
1199 145582 : }
1200 145642 : if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
1201 145642 : rnode_spcnode = buf.get_u32_le();
1202 145642 : rnode_dbnode = buf.get_u32_le();
1203 145642 : rnode_relnode = buf.get_u32_le();
1204 145642 : got_rnode = true;
1205 145642 : } else if !got_rnode {
1206 0 : // TODO
1207 0 : /*
1208 0 : report_invalid_record(state,
1209 0 : "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1210 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1211 0 : goto err; */
1212 0 : }
1213 :
1214 145642 : blk.rnode_spcnode = rnode_spcnode;
1215 145642 : blk.rnode_dbnode = rnode_dbnode;
1216 145642 : blk.rnode_relnode = rnode_relnode;
1217 145642 :
1218 145642 : blk.blkno = buf.get_u32_le();
1219 145642 : trace!(
1220 0 : "this record affects {}/{}/{} blk {}",
1221 : rnode_spcnode,
1222 : rnode_dbnode,
1223 : rnode_relnode,
1224 : blk.blkno
1225 : );
1226 :
1227 145642 : decoded.blocks.push(blk);
1228 : }
1229 :
1230 0 : _ => {
1231 0 : // TODO: invalid block_id
1232 0 : }
1233 : }
1234 : }
1235 :
1236 : // 3. Decode blocks.
1237 145852 : let mut ptr = record.len() - buf.remaining();
1238 145852 : for blk in decoded.blocks.iter_mut() {
1239 145642 : if blk.has_image {
1240 60 : blk.bimg_offset = ptr as u32;
1241 60 : ptr += blk.bimg_len as usize;
1242 145582 : }
1243 145642 : if blk.has_data {
1244 145582 : ptr += blk.data_len as usize;
1245 145582 : }
1246 : }
1247 : // We don't need them, so just skip blocks_total_len bytes
1248 145852 : buf.advance(blocks_total_len as usize);
1249 145852 : assert_eq!(ptr, record.len() - buf.remaining());
1250 :
1251 145852 : let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
1252 145852 :
1253 145852 : // 4. Decode main_data
1254 145852 : if main_data_len > 0 {
1255 145828 : assert_eq!(buf.remaining(), main_data_len as usize);
1256 24 : }
1257 :
1258 145852 : decoded.xl_xid = xlogrec.xl_xid;
1259 145852 : decoded.xl_info = xlogrec.xl_info;
1260 145852 : decoded.xl_rmid = xlogrec.xl_rmid;
1261 145852 : decoded.record = record;
1262 145852 : decoded.origin_id = origin_id;
1263 145852 : decoded.main_data_offset = main_data_offset;
1264 145852 :
1265 145852 : Ok(())
1266 145852 : }
1267 :
1268 : ///
1269 : /// Build a human-readable string to describe a WAL record
1270 : ///
1271 : /// For debugging purposes
1272 0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
1273 0 : match rec {
1274 0 : NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
1275 0 : "will_init: {}, {}",
1276 0 : will_init,
1277 0 : describe_postgres_wal_record(rec)?
1278 : )),
1279 0 : _ => Ok(format!("{:?}", rec)),
1280 : }
1281 0 : }
1282 :
1283 0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
1284 0 : // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
1285 0 : // Maybe use the postgres wal redo process, the same used for replaying WAL records?
1286 0 : // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
1287 0 : // without worrying about security?
1288 0 : //
1289 0 : // But for now, we have a hand-written code for a few common WAL record types here.
1290 0 :
1291 0 : let mut buf = record.clone();
1292 :
1293 : // 1. Parse XLogRecord struct
1294 :
1295 : // FIXME: assume little-endian here
1296 0 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
1297 :
1298 : let unknown_str: String;
1299 :
1300 0 : let result: &str = match xlogrec.xl_rmid {
1301 : pg_constants::RM_HEAP2_ID => {
1302 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1303 0 : match info {
1304 0 : pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
1305 0 : pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
1306 : _ => {
1307 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1308 0 : &unknown_str
1309 : }
1310 : }
1311 : }
1312 : pg_constants::RM_HEAP_ID => {
1313 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1314 0 : match info {
1315 0 : pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
1316 0 : pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
1317 0 : pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
1318 0 : pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
1319 : _ => {
1320 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1321 0 : &unknown_str
1322 : }
1323 : }
1324 : }
1325 : pg_constants::RM_XLOG_ID => {
1326 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1327 0 : match info {
1328 0 : pg_constants::XLOG_FPI => "XLOG FPI",
1329 0 : pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
1330 : _ => {
1331 0 : unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
1332 0 : &unknown_str
1333 : }
1334 : }
1335 : }
1336 0 : rmid => {
1337 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1338 0 :
1339 0 : unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
1340 0 : &unknown_str
1341 : }
1342 : };
1343 :
1344 0 : Ok(String::from(result))
1345 0 : }
|