Line data Source code
1 : //!
2 : //! Functions for parsing WAL records.
3 : //!
4 :
5 : use anyhow::Result;
6 : use bytes::{Buf, Bytes};
7 : use postgres_ffi::dispatch_pgversion;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::BLCKSZ;
10 : use postgres_ffi::{BlockNumber, TimestampTz};
11 : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
12 : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
13 : use serde::{Deserialize, Serialize};
14 : use tracing::*;
15 : use utils::bin_ser::DeserializeError;
16 :
17 : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
18 : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
19 132 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20 : pub enum NeonWalRecord {
21 : /// Native PostgreSQL WAL record
22 : Postgres { will_init: bool, rec: Bytes },
23 :
24 : /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
25 : ClearVisibilityMapFlags {
26 : new_heap_blkno: Option<u32>,
27 : old_heap_blkno: Option<u32>,
28 : flags: u8,
29 : },
30 : /// Mark transaction IDs as committed on a CLOG page
31 : ClogSetCommitted {
32 : xids: Vec<TransactionId>,
33 : timestamp: TimestampTz,
34 : },
35 : /// Mark transaction IDs as aborted on a CLOG page
36 : ClogSetAborted { xids: Vec<TransactionId> },
37 : /// Extend multixact offsets SLRU
38 : MultixactOffsetCreate {
39 : mid: MultiXactId,
40 : moff: MultiXactOffset,
41 : },
42 : /// Extend multixact members SLRU.
43 : MultixactMembersCreate {
44 : moff: MultiXactOffset,
45 : members: Vec<MultiXactMember>,
46 : },
47 : /// Update the map of AUX files, either writing or dropping an entry
48 : AuxFile {
49 : file_path: String,
50 : content: Option<Bytes>,
51 : },
52 : }
53 :
54 : impl NeonWalRecord {
55 : /// Does replaying this WAL record initialize the page from scratch, or does
56 : /// it need to be applied over the previous image of the page?
57 18 : pub fn will_init(&self) -> bool {
58 18 : // If you change this function, you'll also need to change ValueBytes::will_init
59 18 : match self {
60 8 : NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
61 :
62 : // None of the special neon record types currently initialize the page
63 10 : _ => false,
64 : }
65 18 : }
66 : }
67 :
68 : /// DecodedBkpBlock represents per-page data contained in a WAL record.
69 : #[derive(Default)]
70 : pub struct DecodedBkpBlock {
71 : /* Is this block ref in use? */
72 : //in_use: bool,
73 :
74 : /* Identify the block this refers to */
75 : pub rnode_spcnode: u32,
76 : pub rnode_dbnode: u32,
77 : pub rnode_relnode: u32,
78 : // Note that we have a few special forknum values for non-rel files.
79 : pub forknum: u8,
80 : pub blkno: u32,
81 :
82 : /* copy of the fork_flags field from the XLogRecordBlockHeader */
83 : pub flags: u8,
84 :
85 : /* Information on full-page image, if any */
86 : pub has_image: bool,
87 : /* has image, even for consistency checking */
88 : pub apply_image: bool,
89 : /* has image that should be restored */
90 : pub will_init: bool,
91 : /* record doesn't need previous page version to apply */
92 : //char *bkp_image;
93 : pub hole_offset: u16,
94 : pub hole_length: u16,
95 : pub bimg_offset: u32,
96 : pub bimg_len: u16,
97 : pub bimg_info: u8,
98 :
99 : /* Buffer holding the rmgr-specific data associated with this block */
100 : has_data: bool,
101 : data_len: u16,
102 : }
103 :
104 : impl DecodedBkpBlock {
105 145642 : pub fn new() -> DecodedBkpBlock {
106 145642 : Default::default()
107 145642 : }
108 : }
109 :
110 : #[derive(Default)]
111 : pub struct DecodedWALRecord {
112 : pub xl_xid: TransactionId,
113 : pub xl_info: u8,
114 : pub xl_rmid: u8,
115 : pub record: Bytes, // raw XLogRecord
116 :
117 : pub blocks: Vec<DecodedBkpBlock>,
118 : pub main_data_offset: usize,
119 : }
120 :
121 : #[repr(C)]
122 : #[derive(Debug, Clone, Copy)]
123 : pub struct RelFileNode {
124 : pub spcnode: Oid, /* tablespace */
125 : pub dbnode: Oid, /* database */
126 : pub relnode: Oid, /* relation */
127 : }
128 :
129 : #[repr(C)]
130 : #[derive(Debug)]
131 : pub struct XlRelmapUpdate {
132 : pub dbid: Oid, /* database ID, or 0 for shared map */
133 : pub tsid: Oid, /* database's tablespace, or pg_global */
134 : pub nbytes: i32, /* size of relmap data */
135 : }
136 :
137 : impl XlRelmapUpdate {
138 0 : pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
139 0 : XlRelmapUpdate {
140 0 : dbid: buf.get_u32_le(),
141 0 : tsid: buf.get_u32_le(),
142 0 : nbytes: buf.get_i32_le(),
143 0 : }
144 0 : }
145 : }
146 :
147 : pub mod v14 {
148 : use bytes::{Buf, Bytes};
149 : use postgres_ffi::{OffsetNumber, TransactionId};
150 :
151 : #[repr(C)]
152 : #[derive(Debug)]
153 : pub struct XlHeapInsert {
154 : pub offnum: OffsetNumber,
155 : pub flags: u8,
156 : }
157 :
158 : impl XlHeapInsert {
159 145276 : pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
160 145276 : XlHeapInsert {
161 145276 : offnum: buf.get_u16_le(),
162 145276 : flags: buf.get_u8(),
163 145276 : }
164 145276 : }
165 : }
166 :
167 : #[repr(C)]
168 : #[derive(Debug)]
169 : pub struct XlHeapMultiInsert {
170 : pub flags: u8,
171 : pub _padding: u8,
172 : pub ntuples: u16,
173 : }
174 :
175 : impl XlHeapMultiInsert {
176 42 : pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
177 42 : XlHeapMultiInsert {
178 42 : flags: buf.get_u8(),
179 42 : _padding: buf.get_u8(),
180 42 : ntuples: buf.get_u16_le(),
181 42 : }
182 42 : }
183 : }
184 :
185 : #[repr(C)]
186 : #[derive(Debug)]
187 : pub struct XlHeapDelete {
188 : pub xmax: TransactionId,
189 : pub offnum: OffsetNumber,
190 : pub _padding: u16,
191 : pub t_cid: u32,
192 : pub infobits_set: u8,
193 : pub flags: u8,
194 : }
195 :
196 : impl XlHeapDelete {
197 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
198 0 : XlHeapDelete {
199 0 : xmax: buf.get_u32_le(),
200 0 : offnum: buf.get_u16_le(),
201 0 : _padding: buf.get_u16_le(),
202 0 : t_cid: buf.get_u32_le(),
203 0 : infobits_set: buf.get_u8(),
204 0 : flags: buf.get_u8(),
205 0 : }
206 0 : }
207 : }
208 :
209 : #[repr(C)]
210 : #[derive(Debug)]
211 : pub struct XlHeapUpdate {
212 : pub old_xmax: TransactionId,
213 : pub old_offnum: OffsetNumber,
214 : pub old_infobits_set: u8,
215 : pub flags: u8,
216 : pub t_cid: u32,
217 : pub new_xmax: TransactionId,
218 : pub new_offnum: OffsetNumber,
219 : }
220 :
221 : impl XlHeapUpdate {
222 8 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
223 8 : XlHeapUpdate {
224 8 : old_xmax: buf.get_u32_le(),
225 8 : old_offnum: buf.get_u16_le(),
226 8 : old_infobits_set: buf.get_u8(),
227 8 : flags: buf.get_u8(),
228 8 : t_cid: buf.get_u32_le(),
229 8 : new_xmax: buf.get_u32_le(),
230 8 : new_offnum: buf.get_u16_le(),
231 8 : }
232 8 : }
233 : }
234 :
235 : #[repr(C)]
236 : #[derive(Debug)]
237 : pub struct XlHeapLock {
238 : pub locking_xid: TransactionId,
239 : pub offnum: OffsetNumber,
240 : pub _padding: u16,
241 : pub t_cid: u32,
242 : pub infobits_set: u8,
243 : pub flags: u8,
244 : }
245 :
246 : impl XlHeapLock {
247 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
248 0 : XlHeapLock {
249 0 : locking_xid: buf.get_u32_le(),
250 0 : offnum: buf.get_u16_le(),
251 0 : _padding: buf.get_u16_le(),
252 0 : t_cid: buf.get_u32_le(),
253 0 : infobits_set: buf.get_u8(),
254 0 : flags: buf.get_u8(),
255 0 : }
256 0 : }
257 : }
258 :
259 : #[repr(C)]
260 : #[derive(Debug)]
261 : pub struct XlHeapLockUpdated {
262 : pub xmax: TransactionId,
263 : pub offnum: OffsetNumber,
264 : pub infobits_set: u8,
265 : pub flags: u8,
266 : }
267 :
268 : impl XlHeapLockUpdated {
269 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
270 0 : XlHeapLockUpdated {
271 0 : xmax: buf.get_u32_le(),
272 0 : offnum: buf.get_u16_le(),
273 0 : infobits_set: buf.get_u8(),
274 0 : flags: buf.get_u8(),
275 0 : }
276 0 : }
277 : }
278 : }
279 :
280 : pub mod v15 {
281 : pub use super::v14::{
282 : XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
283 : };
284 : }
285 :
286 : pub mod v16 {
287 : pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
288 : use bytes::{Buf, Bytes};
289 : use postgres_ffi::{OffsetNumber, TransactionId};
290 :
291 : pub struct XlHeapDelete {
292 : pub xmax: TransactionId,
293 : pub offnum: OffsetNumber,
294 : pub infobits_set: u8,
295 : pub flags: u8,
296 : }
297 :
298 : impl XlHeapDelete {
299 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
300 0 : XlHeapDelete {
301 0 : xmax: buf.get_u32_le(),
302 0 : offnum: buf.get_u16_le(),
303 0 : infobits_set: buf.get_u8(),
304 0 : flags: buf.get_u8(),
305 0 : }
306 0 : }
307 : }
308 :
309 : #[repr(C)]
310 : #[derive(Debug)]
311 : pub struct XlHeapUpdate {
312 : pub old_xmax: TransactionId,
313 : pub old_offnum: OffsetNumber,
314 : pub old_infobits_set: u8,
315 : pub flags: u8,
316 : pub new_xmax: TransactionId,
317 : pub new_offnum: OffsetNumber,
318 : }
319 :
320 : impl XlHeapUpdate {
321 0 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
322 0 : XlHeapUpdate {
323 0 : old_xmax: buf.get_u32_le(),
324 0 : old_offnum: buf.get_u16_le(),
325 0 : old_infobits_set: buf.get_u8(),
326 0 : flags: buf.get_u8(),
327 0 : new_xmax: buf.get_u32_le(),
328 0 : new_offnum: buf.get_u16_le(),
329 0 : }
330 0 : }
331 : }
332 :
333 : #[repr(C)]
334 : #[derive(Debug)]
335 : pub struct XlHeapLock {
336 : pub locking_xid: TransactionId,
337 : pub offnum: OffsetNumber,
338 : pub infobits_set: u8,
339 : pub flags: u8,
340 : }
341 :
342 : impl XlHeapLock {
343 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
344 0 : XlHeapLock {
345 0 : locking_xid: buf.get_u32_le(),
346 0 : offnum: buf.get_u16_le(),
347 0 : infobits_set: buf.get_u8(),
348 0 : flags: buf.get_u8(),
349 0 : }
350 0 : }
351 : }
352 :
353 : /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
354 : pub mod rm_neon {
355 : use bytes::{Buf, Bytes};
356 : use postgres_ffi::{OffsetNumber, TransactionId};
357 :
358 : #[repr(C)]
359 : #[derive(Debug)]
360 : pub struct XlNeonHeapInsert {
361 : pub offnum: OffsetNumber,
362 : pub flags: u8,
363 : }
364 :
365 : impl XlNeonHeapInsert {
366 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
367 0 : XlNeonHeapInsert {
368 0 : offnum: buf.get_u16_le(),
369 0 : flags: buf.get_u8(),
370 0 : }
371 0 : }
372 : }
373 :
374 : #[repr(C)]
375 : #[derive(Debug)]
376 : pub struct XlNeonHeapMultiInsert {
377 : pub flags: u8,
378 : pub _padding: u8,
379 : pub ntuples: u16,
380 : pub t_cid: u32,
381 : }
382 :
383 : impl XlNeonHeapMultiInsert {
384 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
385 0 : XlNeonHeapMultiInsert {
386 0 : flags: buf.get_u8(),
387 0 : _padding: buf.get_u8(),
388 0 : ntuples: buf.get_u16_le(),
389 0 : t_cid: buf.get_u32_le(),
390 0 : }
391 0 : }
392 : }
393 :
394 : #[repr(C)]
395 : #[derive(Debug)]
396 : pub struct XlNeonHeapDelete {
397 : pub xmax: TransactionId,
398 : pub offnum: OffsetNumber,
399 : pub infobits_set: u8,
400 : pub flags: u8,
401 : pub t_cid: u32,
402 : }
403 :
404 : impl XlNeonHeapDelete {
405 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
406 0 : XlNeonHeapDelete {
407 0 : xmax: buf.get_u32_le(),
408 0 : offnum: buf.get_u16_le(),
409 0 : infobits_set: buf.get_u8(),
410 0 : flags: buf.get_u8(),
411 0 : t_cid: buf.get_u32_le(),
412 0 : }
413 0 : }
414 : }
415 :
416 : #[repr(C)]
417 : #[derive(Debug)]
418 : pub struct XlNeonHeapUpdate {
419 : pub old_xmax: TransactionId,
420 : pub old_offnum: OffsetNumber,
421 : pub old_infobits_set: u8,
422 : pub flags: u8,
423 : pub t_cid: u32,
424 : pub new_xmax: TransactionId,
425 : pub new_offnum: OffsetNumber,
426 : }
427 :
428 : impl XlNeonHeapUpdate {
429 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
430 0 : XlNeonHeapUpdate {
431 0 : old_xmax: buf.get_u32_le(),
432 0 : old_offnum: buf.get_u16_le(),
433 0 : old_infobits_set: buf.get_u8(),
434 0 : flags: buf.get_u8(),
435 0 : t_cid: buf.get_u32(),
436 0 : new_xmax: buf.get_u32_le(),
437 0 : new_offnum: buf.get_u16_le(),
438 0 : }
439 0 : }
440 : }
441 :
442 : #[repr(C)]
443 : #[derive(Debug)]
444 : pub struct XlNeonHeapLock {
445 : pub locking_xid: TransactionId,
446 : pub t_cid: u32,
447 : pub offnum: OffsetNumber,
448 : pub infobits_set: u8,
449 : pub flags: u8,
450 : }
451 :
452 : impl XlNeonHeapLock {
453 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
454 0 : XlNeonHeapLock {
455 0 : locking_xid: buf.get_u32_le(),
456 0 : t_cid: buf.get_u32_le(),
457 0 : offnum: buf.get_u16_le(),
458 0 : infobits_set: buf.get_u8(),
459 0 : flags: buf.get_u8(),
460 0 : }
461 0 : }
462 : }
463 : }
464 : }
465 :
466 : #[repr(C)]
467 : #[derive(Debug)]
468 : pub struct XlSmgrCreate {
469 : pub rnode: RelFileNode,
470 : // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
471 : // well-defined size?
472 : pub forknum: u8,
473 : }
474 :
475 : impl XlSmgrCreate {
476 16 : pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
477 16 : XlSmgrCreate {
478 16 : rnode: RelFileNode {
479 16 : spcnode: buf.get_u32_le(), /* tablespace */
480 16 : dbnode: buf.get_u32_le(), /* database */
481 16 : relnode: buf.get_u32_le(), /* relation */
482 16 : },
483 16 : forknum: buf.get_u32_le() as u8,
484 16 : }
485 16 : }
486 : }
487 :
488 : #[repr(C)]
489 : #[derive(Debug)]
490 : pub struct XlSmgrTruncate {
491 : pub blkno: BlockNumber,
492 : pub rnode: RelFileNode,
493 : pub flags: u32,
494 : }
495 :
496 : impl XlSmgrTruncate {
497 0 : pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
498 0 : XlSmgrTruncate {
499 0 : blkno: buf.get_u32_le(),
500 0 : rnode: RelFileNode {
501 0 : spcnode: buf.get_u32_le(), /* tablespace */
502 0 : dbnode: buf.get_u32_le(), /* database */
503 0 : relnode: buf.get_u32_le(), /* relation */
504 0 : },
505 0 : flags: buf.get_u32_le(),
506 0 : }
507 0 : }
508 : }
509 :
510 : #[repr(C)]
511 : #[derive(Debug)]
512 : pub struct XlCreateDatabase {
513 : pub db_id: Oid,
514 : pub tablespace_id: Oid,
515 : pub src_db_id: Oid,
516 : pub src_tablespace_id: Oid,
517 : }
518 :
519 : impl XlCreateDatabase {
520 0 : pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
521 0 : XlCreateDatabase {
522 0 : db_id: buf.get_u32_le(),
523 0 : tablespace_id: buf.get_u32_le(),
524 0 : src_db_id: buf.get_u32_le(),
525 0 : src_tablespace_id: buf.get_u32_le(),
526 0 : }
527 0 : }
528 : }
529 :
530 : #[repr(C)]
531 : #[derive(Debug)]
532 : pub struct XlDropDatabase {
533 : pub db_id: Oid,
534 : pub n_tablespaces: Oid, /* number of tablespace IDs */
535 : pub tablespace_ids: Vec<Oid>,
536 : }
537 :
538 : impl XlDropDatabase {
539 0 : pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
540 0 : let mut rec = XlDropDatabase {
541 0 : db_id: buf.get_u32_le(),
542 0 : n_tablespaces: buf.get_u32_le(),
543 0 : tablespace_ids: Vec::<Oid>::new(),
544 0 : };
545 :
546 0 : for _i in 0..rec.n_tablespaces {
547 0 : let id = buf.get_u32_le();
548 0 : rec.tablespace_ids.push(id);
549 0 : }
550 :
551 0 : rec
552 0 : }
553 : }
554 :
555 : ///
556 : /// Note: Parsing some fields is missing, because they're not needed.
557 : ///
558 : /// This is similar to the xl_xact_parsed_commit and
559 : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
560 : /// struct for commits and aborts.
561 : ///
562 : #[derive(Debug)]
563 : pub struct XlXactParsedRecord {
564 : pub xid: TransactionId,
565 : pub info: u8,
566 : pub xact_time: TimestampTz,
567 : pub xinfo: u32,
568 :
569 : pub db_id: Oid,
570 : /* MyDatabaseId */
571 : pub ts_id: Oid,
572 : /* MyDatabaseTableSpace */
573 : pub subxacts: Vec<TransactionId>,
574 :
575 : pub xnodes: Vec<RelFileNode>,
576 : }
577 :
578 : impl XlXactParsedRecord {
579 : /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
580 : /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
581 : /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
582 8 : pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
583 8 : let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
584 8 : // The record starts with time of commit/abort
585 8 : let xact_time = buf.get_i64_le();
586 8 : let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
587 8 : buf.get_u32_le()
588 : } else {
589 0 : 0
590 : };
591 : let db_id;
592 : let ts_id;
593 8 : if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
594 8 : db_id = buf.get_u32_le();
595 8 : ts_id = buf.get_u32_le();
596 8 : } else {
597 0 : db_id = 0;
598 0 : ts_id = 0;
599 0 : }
600 8 : let mut subxacts = Vec::<TransactionId>::new();
601 8 : if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
602 0 : let nsubxacts = buf.get_i32_le();
603 0 : for _i in 0..nsubxacts {
604 0 : let subxact = buf.get_u32_le();
605 0 : subxacts.push(subxact);
606 0 : }
607 8 : }
608 8 : let mut xnodes = Vec::<RelFileNode>::new();
609 8 : if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
610 0 : let nrels = buf.get_i32_le();
611 0 : for _i in 0..nrels {
612 0 : let spcnode = buf.get_u32_le();
613 0 : let dbnode = buf.get_u32_le();
614 0 : let relnode = buf.get_u32_le();
615 0 : trace!(
616 0 : "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
617 : spcnode,
618 : dbnode,
619 : relnode
620 : );
621 0 : xnodes.push(RelFileNode {
622 0 : spcnode,
623 0 : dbnode,
624 0 : relnode,
625 0 : });
626 : }
627 8 : }
628 :
629 8 : if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
630 0 : let nitems = buf.get_i32_le();
631 0 : debug!(
632 0 : "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
633 : nitems
634 : );
635 0 : let sizeof_xl_xact_stats_item = 12;
636 0 : buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
637 8 : }
638 :
639 8 : if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
640 8 : let nmsgs = buf.get_i32_le();
641 8 : let sizeof_shared_invalidation_message = 16;
642 8 : buf.advance(
643 8 : (nmsgs * sizeof_shared_invalidation_message)
644 8 : .try_into()
645 8 : .unwrap(),
646 8 : );
647 8 : }
648 :
649 8 : if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
650 0 : xid = buf.get_u32_le();
651 0 : debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
652 8 : }
653 :
654 8 : XlXactParsedRecord {
655 8 : xid,
656 8 : info,
657 8 : xact_time,
658 8 : xinfo,
659 8 : db_id,
660 8 : ts_id,
661 8 : subxacts,
662 8 : xnodes,
663 8 : }
664 8 : }
665 : }
666 :
667 : #[repr(C)]
668 : #[derive(Debug)]
669 : pub struct XlClogTruncate {
670 : pub pageno: u32,
671 : pub oldest_xid: TransactionId,
672 : pub oldest_xid_db: Oid,
673 : }
674 :
675 : impl XlClogTruncate {
676 0 : pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
677 0 : XlClogTruncate {
678 0 : pageno: buf.get_u32_le(),
679 0 : oldest_xid: buf.get_u32_le(),
680 0 : oldest_xid_db: buf.get_u32_le(),
681 0 : }
682 0 : }
683 : }
684 :
685 : #[repr(C)]
686 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
687 : pub struct MultiXactMember {
688 : pub xid: TransactionId,
689 : pub status: MultiXactStatus,
690 : }
691 :
692 : impl MultiXactMember {
693 0 : pub fn decode(buf: &mut Bytes) -> MultiXactMember {
694 0 : MultiXactMember {
695 0 : xid: buf.get_u32_le(),
696 0 : status: buf.get_u32_le(),
697 0 : }
698 0 : }
699 : }
700 :
701 : #[repr(C)]
702 : #[derive(Debug)]
703 : pub struct XlMultiXactCreate {
704 : pub mid: MultiXactId,
705 : /* new MultiXact's ID */
706 : pub moff: MultiXactOffset,
707 : /* its starting offset in members file */
708 : pub nmembers: u32,
709 : /* number of member XIDs */
710 : pub members: Vec<MultiXactMember>,
711 : }
712 :
713 : impl XlMultiXactCreate {
714 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
715 0 : let mid = buf.get_u32_le();
716 0 : let moff = buf.get_u32_le();
717 0 : let nmembers = buf.get_u32_le();
718 0 : let mut members = Vec::new();
719 0 : for _ in 0..nmembers {
720 0 : members.push(MultiXactMember::decode(buf));
721 0 : }
722 0 : XlMultiXactCreate {
723 0 : mid,
724 0 : moff,
725 0 : nmembers,
726 0 : members,
727 0 : }
728 0 : }
729 : }
730 :
731 : #[repr(C)]
732 : #[derive(Debug)]
733 : pub struct XlMultiXactTruncate {
734 : pub oldest_multi_db: Oid,
735 : /* to-be-truncated range of multixact offsets */
736 : pub start_trunc_off: MultiXactId,
737 : /* just for completeness' sake */
738 : pub end_trunc_off: MultiXactId,
739 :
740 : /* to-be-truncated range of multixact members */
741 : pub start_trunc_memb: MultiXactOffset,
742 : pub end_trunc_memb: MultiXactOffset,
743 : }
744 :
745 : impl XlMultiXactTruncate {
746 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
747 0 : XlMultiXactTruncate {
748 0 : oldest_multi_db: buf.get_u32_le(),
749 0 : start_trunc_off: buf.get_u32_le(),
750 0 : end_trunc_off: buf.get_u32_le(),
751 0 : start_trunc_memb: buf.get_u32_le(),
752 0 : end_trunc_memb: buf.get_u32_le(),
753 0 : }
754 0 : }
755 : }
756 :
757 : #[repr(C)]
758 : #[derive(Debug)]
759 : pub struct XlLogicalMessage {
760 : pub db_id: Oid,
761 : pub transactional: bool,
762 : pub prefix_size: usize,
763 : pub message_size: usize,
764 : }
765 :
766 : impl XlLogicalMessage {
767 0 : pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
768 0 : XlLogicalMessage {
769 0 : db_id: buf.get_u32_le(),
770 0 : transactional: buf.get_u32_le() != 0, // 4-bytes alignment
771 0 : prefix_size: buf.get_u64_le() as usize,
772 0 : message_size: buf.get_u64_le() as usize,
773 0 : }
774 0 : }
775 : }
776 :
777 : #[repr(C)]
778 : #[derive(Debug)]
779 : pub struct XlRunningXacts {
780 : pub xcnt: u32,
781 : pub subxcnt: u32,
782 : pub subxid_overflow: bool,
783 : pub next_xid: TransactionId,
784 : pub oldest_running_xid: TransactionId,
785 : pub latest_completed_xid: TransactionId,
786 : pub xids: Vec<TransactionId>,
787 : }
788 :
789 : impl XlRunningXacts {
790 0 : pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
791 0 : let xcnt = buf.get_u32_le();
792 0 : let subxcnt = buf.get_u32_le();
793 0 : let subxid_overflow = buf.get_u32_le() != 0;
794 0 : let next_xid = buf.get_u32_le();
795 0 : let oldest_running_xid = buf.get_u32_le();
796 0 : let latest_completed_xid = buf.get_u32_le();
797 0 : let mut xids = Vec::new();
798 0 : for _ in 0..(xcnt + subxcnt) {
799 0 : xids.push(buf.get_u32_le());
800 0 : }
801 0 : XlRunningXacts {
802 0 : xcnt,
803 0 : subxcnt,
804 0 : subxid_overflow,
805 0 : next_xid,
806 0 : oldest_running_xid,
807 0 : latest_completed_xid,
808 0 : xids,
809 0 : }
810 0 : }
811 : }
812 :
813 : /// Main routine to decode a WAL record and figure out which blocks are modified
814 : //
815 : // See xlogrecord.h for details
816 : // The overall layout of an XLOG record is:
817 : // Fixed-size header (XLogRecord struct)
818 : // XLogRecordBlockHeader struct
819 : // If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
820 : // If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
821 : // XLogRecordBlockCompressHeader struct follows.
822 : // If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
823 : // BlockNumber follows
824 : // XLogRecordBlockHeader struct
825 : // ...
826 : // XLogRecordDataHeader[Short|Long] struct
827 : // block data
828 : // block data
829 : // ...
830 : // main data
831 : //
832 : //
833 : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
834 : // It would be more natural for this function to return a DecodedWALRecord as return value,
835 : // but reusing the caller-supplied struct avoids an allocation.
836 : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
837 : //
838 145852 : pub fn decode_wal_record(
839 145852 : record: Bytes,
840 145852 : decoded: &mut DecodedWALRecord,
841 145852 : pg_version: u32,
842 145852 : ) -> Result<()> {
843 145852 : let mut rnode_spcnode: u32 = 0;
844 145852 : let mut rnode_dbnode: u32 = 0;
845 145852 : let mut rnode_relnode: u32 = 0;
846 145852 : let mut got_rnode = false;
847 145852 :
848 145852 : let mut buf = record.clone();
849 :
850 : // 1. Parse XLogRecord struct
851 :
852 : // FIXME: assume little-endian here
853 145852 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
854 :
855 145852 : trace!(
856 0 : "decode_wal_record xl_rmid = {} xl_info = {}",
857 : xlogrec.xl_rmid,
858 : xlogrec.xl_info
859 : );
860 :
861 145852 : let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
862 145852 :
863 145852 : if buf.remaining() != remaining {
864 0 : //TODO error
865 145852 : }
866 :
867 145852 : let mut max_block_id = 0;
868 145852 : let mut blocks_total_len: u32 = 0;
869 145852 : let mut main_data_len = 0;
870 145852 : let mut datatotal: u32 = 0;
871 145852 : decoded.blocks.clear();
872 :
873 : // 2. Decode the headers.
874 : // XLogRecordBlockHeaders if any,
875 : // XLogRecordDataHeader[Short|Long]
876 437322 : while buf.remaining() > datatotal as usize {
877 291470 : let block_id = buf.get_u8();
878 291470 :
879 291470 : match block_id {
880 145812 : pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
881 145812 : /* XLogRecordDataHeaderShort */
882 145812 : main_data_len = buf.get_u8() as u32;
883 145812 : datatotal += main_data_len;
884 145812 : }
885 :
886 16 : pg_constants::XLR_BLOCK_ID_DATA_LONG => {
887 16 : /* XLogRecordDataHeaderLong */
888 16 : main_data_len = buf.get_u32_le();
889 16 : datatotal += main_data_len;
890 16 : }
891 :
892 0 : pg_constants::XLR_BLOCK_ID_ORIGIN => {
893 0 : // RepOriginId is uint16
894 0 : buf.advance(2);
895 0 : }
896 :
897 0 : pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
898 0 : // TransactionId is uint32
899 0 : buf.advance(4);
900 0 : }
901 :
902 145642 : 0..=pg_constants::XLR_MAX_BLOCK_ID => {
903 : /* XLogRecordBlockHeader */
904 145642 : let mut blk = DecodedBkpBlock::new();
905 145642 :
906 145642 : if block_id <= max_block_id {
907 145642 : // TODO
908 145642 : //report_invalid_record(state,
909 145642 : // "out-of-order block_id %u at %X/%X",
910 145642 : // block_id,
911 145642 : // (uint32) (state->ReadRecPtr >> 32),
912 145642 : // (uint32) state->ReadRecPtr);
913 145642 : // goto err;
914 145642 : }
915 145642 : max_block_id = block_id;
916 145642 :
917 145642 : let fork_flags: u8 = buf.get_u8();
918 145642 : blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
919 145642 : blk.flags = fork_flags;
920 145642 : blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
921 145642 : blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
922 145642 : blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
923 145642 : blk.data_len = buf.get_u16_le();
924 145642 :
925 145642 : /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
926 145642 :
927 145642 : datatotal += blk.data_len as u32;
928 145642 : blocks_total_len += blk.data_len as u32;
929 145642 :
930 145642 : if blk.has_image {
931 60 : blk.bimg_len = buf.get_u16_le();
932 60 : blk.hole_offset = buf.get_u16_le();
933 60 : blk.bimg_info = buf.get_u8();
934 :
935 0 : blk.apply_image = dispatch_pgversion!(
936 60 : pg_version,
937 0 : (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
938 : );
939 :
940 60 : let blk_img_is_compressed =
941 60 : postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
942 :
943 60 : if blk_img_is_compressed {
944 0 : debug!("compressed block image , pg_version = {}", pg_version);
945 60 : }
946 :
947 60 : if blk_img_is_compressed {
948 0 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
949 0 : blk.hole_length = buf.get_u16_le();
950 0 : } else {
951 0 : blk.hole_length = 0;
952 0 : }
953 60 : } else {
954 60 : blk.hole_length = BLCKSZ - blk.bimg_len;
955 60 : }
956 60 : datatotal += blk.bimg_len as u32;
957 60 : blocks_total_len += blk.bimg_len as u32;
958 60 :
959 60 : /*
960 60 : * cross-check that hole_offset > 0, hole_length > 0 and
961 60 : * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
962 60 : */
963 60 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
964 36 : && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
965 0 : {
966 0 : // TODO
967 0 : /*
968 0 : report_invalid_record(state,
969 0 : "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
970 0 : (unsigned int) blk->hole_offset,
971 0 : (unsigned int) blk->hole_length,
972 0 : (unsigned int) blk->bimg_len,
973 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
974 0 : goto err;
975 0 : */
976 60 : }
977 :
978 : /*
979 : * cross-check that hole_offset == 0 and hole_length == 0 if
980 : * the HAS_HOLE flag is not set.
981 : */
982 60 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
983 24 : && (blk.hole_offset != 0 || blk.hole_length != 0)
984 0 : {
985 0 : // TODO
986 0 : /*
987 0 : report_invalid_record(state,
988 0 : "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
989 0 : (unsigned int) blk->hole_offset,
990 0 : (unsigned int) blk->hole_length,
991 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
992 0 : goto err;
993 0 : */
994 60 : }
995 :
996 : /*
997 : * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
998 : * flag is set.
999 : */
1000 60 : if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
1001 24 : // TODO
1002 24 : /*
1003 24 : report_invalid_record(state,
1004 24 : "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1005 24 : (unsigned int) blk->bimg_len,
1006 24 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1007 24 : goto err;
1008 24 : */
1009 36 : }
1010 :
1011 : /*
1012 : * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1013 : * IS_COMPRESSED flag is set.
1014 : */
1015 60 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
1016 24 : && !blk_img_is_compressed
1017 24 : && blk.bimg_len != BLCKSZ
1018 0 : {
1019 0 : // TODO
1020 0 : /*
1021 0 : report_invalid_record(state,
1022 0 : "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1023 0 : (unsigned int) blk->data_len,
1024 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1025 0 : goto err;
1026 0 : */
1027 60 : }
1028 145582 : }
1029 145642 : if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
1030 145642 : rnode_spcnode = buf.get_u32_le();
1031 145642 : rnode_dbnode = buf.get_u32_le();
1032 145642 : rnode_relnode = buf.get_u32_le();
1033 145642 : got_rnode = true;
1034 145642 : } else if !got_rnode {
1035 0 : // TODO
1036 0 : /*
1037 0 : report_invalid_record(state,
1038 0 : "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1039 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1040 0 : goto err; */
1041 0 : }
1042 :
1043 145642 : blk.rnode_spcnode = rnode_spcnode;
1044 145642 : blk.rnode_dbnode = rnode_dbnode;
1045 145642 : blk.rnode_relnode = rnode_relnode;
1046 145642 :
1047 145642 : blk.blkno = buf.get_u32_le();
1048 145642 : trace!(
1049 0 : "this record affects {}/{}/{} blk {}",
1050 : rnode_spcnode,
1051 : rnode_dbnode,
1052 : rnode_relnode,
1053 : blk.blkno
1054 : );
1055 :
1056 145642 : decoded.blocks.push(blk);
1057 : }
1058 :
1059 0 : _ => {
1060 0 : // TODO: invalid block_id
1061 0 : }
1062 : }
1063 : }
1064 :
1065 : // 3. Decode blocks.
1066 145852 : let mut ptr = record.len() - buf.remaining();
1067 145852 : for blk in decoded.blocks.iter_mut() {
1068 145642 : if blk.has_image {
1069 60 : blk.bimg_offset = ptr as u32;
1070 60 : ptr += blk.bimg_len as usize;
1071 145582 : }
1072 145642 : if blk.has_data {
1073 145582 : ptr += blk.data_len as usize;
1074 145582 : }
1075 : }
1076 : // We don't need them, so just skip blocks_total_len bytes
1077 145852 : buf.advance(blocks_total_len as usize);
1078 145852 : assert_eq!(ptr, record.len() - buf.remaining());
1079 :
1080 145852 : let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
1081 145852 :
1082 145852 : // 4. Decode main_data
1083 145852 : if main_data_len > 0 {
1084 145828 : assert_eq!(buf.remaining(), main_data_len as usize);
1085 24 : }
1086 :
1087 145852 : decoded.xl_xid = xlogrec.xl_xid;
1088 145852 : decoded.xl_info = xlogrec.xl_info;
1089 145852 : decoded.xl_rmid = xlogrec.xl_rmid;
1090 145852 : decoded.record = record;
1091 145852 : decoded.main_data_offset = main_data_offset;
1092 145852 :
1093 145852 : Ok(())
1094 145852 : }
1095 :
1096 : ///
1097 : /// Build a human-readable string to describe a WAL record
1098 : ///
1099 : /// For debugging purposes
1100 0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
1101 0 : match rec {
1102 0 : NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
1103 0 : "will_init: {}, {}",
1104 0 : will_init,
1105 0 : describe_postgres_wal_record(rec)?
1106 : )),
1107 0 : _ => Ok(format!("{:?}", rec)),
1108 : }
1109 0 : }
1110 :
1111 0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
1112 0 : // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
1113 0 : // Maybe use the postgres wal redo process, the same used for replaying WAL records?
1114 0 : // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
1115 0 : // without worrying about security?
1116 0 : //
1117 0 : // But for now, we have a hand-written code for a few common WAL record types here.
1118 0 :
1119 0 : let mut buf = record.clone();
1120 :
1121 : // 1. Parse XLogRecord struct
1122 :
1123 : // FIXME: assume little-endian here
1124 0 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
1125 :
1126 : let unknown_str: String;
1127 :
1128 0 : let result: &str = match xlogrec.xl_rmid {
1129 : pg_constants::RM_HEAP2_ID => {
1130 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1131 0 : match info {
1132 0 : pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
1133 0 : pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
1134 : _ => {
1135 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1136 0 : &unknown_str
1137 : }
1138 : }
1139 : }
1140 : pg_constants::RM_HEAP_ID => {
1141 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1142 0 : match info {
1143 0 : pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
1144 0 : pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
1145 0 : pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
1146 0 : pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
1147 : _ => {
1148 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1149 0 : &unknown_str
1150 : }
1151 : }
1152 : }
1153 : pg_constants::RM_XLOG_ID => {
1154 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1155 0 : match info {
1156 0 : pg_constants::XLOG_FPI => "XLOG FPI",
1157 0 : pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
1158 : _ => {
1159 0 : unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
1160 0 : &unknown_str
1161 : }
1162 : }
1163 : }
1164 0 : rmid => {
1165 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1166 0 :
1167 0 : unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
1168 0 : &unknown_str
1169 : }
1170 : };
1171 :
1172 0 : Ok(String::from(result))
1173 0 : }
|