Line data Source code
1 : //!
2 : //! Functions for parsing WAL records.
3 : //!
4 :
5 : use anyhow::Result;
6 : use bytes::{Buf, Bytes};
7 : use postgres_ffi::dispatch_pgversion;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::BLCKSZ;
10 : use postgres_ffi::{BlockNumber, TimestampTz};
11 : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
12 : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
13 : use serde::{Deserialize, Serialize};
14 : use tracing::*;
15 : use utils::bin_ser::DeserializeError;
16 :
17 : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
18 : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
19 381262797 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20 : pub enum NeonWalRecord {
21 : /// Native PostgreSQL WAL record
22 : Postgres { will_init: bool, rec: Bytes },
23 :
24 : /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
25 : ClearVisibilityMapFlags {
26 : new_heap_blkno: Option<u32>,
27 : old_heap_blkno: Option<u32>,
28 : flags: u8,
29 : },
30 : /// Mark transaction IDs as committed on a CLOG page
31 : ClogSetCommitted {
32 : xids: Vec<TransactionId>,
33 : timestamp: TimestampTz,
34 : },
35 : /// Mark transaction IDs as aborted on a CLOG page
36 : ClogSetAborted { xids: Vec<TransactionId> },
37 : /// Extend multixact offsets SLRU
38 : MultixactOffsetCreate {
39 : mid: MultiXactId,
40 : moff: MultiXactOffset,
41 : },
42 : /// Extend multixact members SLRU.
43 : MultixactMembersCreate {
44 : moff: MultiXactOffset,
45 : members: Vec<MultiXactMember>,
46 : },
47 : }
48 :
49 : impl NeonWalRecord {
50 : /// Does replaying this WAL record initialize the page from scratch, or does
51 : /// it need to be applied over the previous image of the page?
52 129335829 : pub fn will_init(&self) -> bool {
53 129335829 : match self {
54 110000944 : NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
55 :
56 : // None of the special neon record types currently initialize the page
57 19334885 : _ => false,
58 : }
59 129335829 : }
60 : }
61 :
62 : /// DecodedBkpBlock represents per-page data contained in a WAL record.
63 67216973 : #[derive(Default)]
64 : pub struct DecodedBkpBlock {
65 : /* Is this block ref in use? */
66 : //in_use: bool,
67 :
68 : /* Identify the block this refers to */
69 : pub rnode_spcnode: u32,
70 : pub rnode_dbnode: u32,
71 : pub rnode_relnode: u32,
72 : // Note that we have a few special forknum values for non-rel files.
73 : pub forknum: u8,
74 : pub blkno: u32,
75 :
76 : /* copy of the fork_flags field from the XLogRecordBlockHeader */
77 : pub flags: u8,
78 :
79 : /* Information on full-page image, if any */
80 : pub has_image: bool,
81 : /* has image, even for consistency checking */
82 : pub apply_image: bool,
83 : /* has image that should be restored */
84 : pub will_init: bool,
85 : /* record doesn't need previous page version to apply */
86 : //char *bkp_image;
87 : pub hole_offset: u16,
88 : pub hole_length: u16,
89 : pub bimg_offset: u32,
90 : pub bimg_len: u16,
91 : pub bimg_info: u8,
92 :
93 : /* Buffer holding the rmgr-specific data associated with this block */
94 : has_data: bool,
95 : data_len: u16,
96 : }
97 :
98 : impl DecodedBkpBlock {
99 67216973 : pub fn new() -> DecodedBkpBlock {
100 67216973 : Default::default()
101 67216973 : }
102 : }
103 :
104 792928 : #[derive(Default)]
105 : pub struct DecodedWALRecord {
106 : pub xl_xid: TransactionId,
107 : pub xl_info: u8,
108 : pub xl_rmid: u8,
109 : pub record: Bytes, // raw XLogRecord
110 :
111 : pub blocks: Vec<DecodedBkpBlock>,
112 : pub main_data_offset: usize,
113 : }
114 :
115 : #[repr(C)]
116 0 : #[derive(Debug, Clone, Copy)]
117 : pub struct RelFileNode {
118 : pub spcnode: Oid, /* tablespace */
119 : pub dbnode: Oid, /* database */
120 : pub relnode: Oid, /* relation */
121 : }
122 :
123 : #[repr(C)]
124 0 : #[derive(Debug)]
125 : pub struct XlRelmapUpdate {
126 : pub dbid: Oid, /* database ID, or 0 for shared map */
127 : pub tsid: Oid, /* database's tablespace, or pg_global */
128 : pub nbytes: i32, /* size of relmap data */
129 : }
130 :
131 : impl XlRelmapUpdate {
132 62 : pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
133 62 : XlRelmapUpdate {
134 62 : dbid: buf.get_u32_le(),
135 62 : tsid: buf.get_u32_le(),
136 62 : nbytes: buf.get_i32_le(),
137 62 : }
138 62 : }
139 : }
140 :
141 : pub mod v14 {
142 : use bytes::{Buf, Bytes};
143 : use postgres_ffi::{OffsetNumber, TransactionId};
144 :
145 : #[repr(C)]
146 0 : #[derive(Debug)]
147 : pub struct XlHeapInsert {
148 : pub offnum: OffsetNumber,
149 : pub flags: u8,
150 : }
151 :
152 : impl XlHeapInsert {
153 41933712 : pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
154 41933712 : XlHeapInsert {
155 41933712 : offnum: buf.get_u16_le(),
156 41933712 : flags: buf.get_u8(),
157 41933712 : }
158 41933712 : }
159 : }
160 :
161 : #[repr(C)]
162 0 : #[derive(Debug)]
163 : pub struct XlHeapMultiInsert {
164 : pub flags: u8,
165 : pub _padding: u8,
166 : pub ntuples: u16,
167 : }
168 :
169 : impl XlHeapMultiInsert {
170 1160097 : pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
171 1160097 : XlHeapMultiInsert {
172 1160097 : flags: buf.get_u8(),
173 1160097 : _padding: buf.get_u8(),
174 1160097 : ntuples: buf.get_u16_le(),
175 1160097 : }
176 1160097 : }
177 : }
178 :
179 : #[repr(C)]
180 0 : #[derive(Debug)]
181 : pub struct XlHeapDelete {
182 : pub xmax: TransactionId,
183 : pub offnum: OffsetNumber,
184 : pub _padding: u16,
185 : pub t_cid: u32,
186 : pub infobits_set: u8,
187 : pub flags: u8,
188 : }
189 :
190 : impl XlHeapDelete {
191 2302613 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
192 2302613 : XlHeapDelete {
193 2302613 : xmax: buf.get_u32_le(),
194 2302613 : offnum: buf.get_u16_le(),
195 2302613 : _padding: buf.get_u16_le(),
196 2302613 : t_cid: buf.get_u32_le(),
197 2302613 : infobits_set: buf.get_u8(),
198 2302613 : flags: buf.get_u8(),
199 2302613 : }
200 2302613 : }
201 : }
202 :
203 : #[repr(C)]
204 0 : #[derive(Debug)]
205 : pub struct XlHeapUpdate {
206 : pub old_xmax: TransactionId,
207 : pub old_offnum: OffsetNumber,
208 : pub old_infobits_set: u8,
209 : pub flags: u8,
210 : pub t_cid: u32,
211 : pub new_xmax: TransactionId,
212 : pub new_offnum: OffsetNumber,
213 : }
214 :
215 : impl XlHeapUpdate {
216 3413367 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
217 3413367 : XlHeapUpdate {
218 3413367 : old_xmax: buf.get_u32_le(),
219 3413367 : old_offnum: buf.get_u16_le(),
220 3413367 : old_infobits_set: buf.get_u8(),
221 3413367 : flags: buf.get_u8(),
222 3413367 : t_cid: buf.get_u32_le(),
223 3413367 : new_xmax: buf.get_u32_le(),
224 3413367 : new_offnum: buf.get_u16_le(),
225 3413367 : }
226 3413367 : }
227 : }
228 :
229 : #[repr(C)]
230 0 : #[derive(Debug)]
231 : pub struct XlHeapLock {
232 : pub locking_xid: TransactionId,
233 : pub offnum: OffsetNumber,
234 : pub _padding: u16,
235 : pub t_cid: u32,
236 : pub infobits_set: u8,
237 : pub flags: u8,
238 : }
239 :
240 : impl XlHeapLock {
241 2765675 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
242 2765675 : XlHeapLock {
243 2765675 : locking_xid: buf.get_u32_le(),
244 2765675 : offnum: buf.get_u16_le(),
245 2765675 : _padding: buf.get_u16_le(),
246 2765675 : t_cid: buf.get_u32_le(),
247 2765675 : infobits_set: buf.get_u8(),
248 2765675 : flags: buf.get_u8(),
249 2765675 : }
250 2765675 : }
251 : }
252 :
253 : #[repr(C)]
254 0 : #[derive(Debug)]
255 : pub struct XlHeapLockUpdated {
256 : pub xmax: TransactionId,
257 : pub offnum: OffsetNumber,
258 : pub infobits_set: u8,
259 : pub flags: u8,
260 : }
261 :
262 : impl XlHeapLockUpdated {
263 4146 : pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
264 4146 : XlHeapLockUpdated {
265 4146 : xmax: buf.get_u32_le(),
266 4146 : offnum: buf.get_u16_le(),
267 4146 : infobits_set: buf.get_u8(),
268 4146 : flags: buf.get_u8(),
269 4146 : }
270 4146 : }
271 : }
272 : }
273 :
274 : pub mod v15 {
275 : pub use super::v14::{
276 : XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
277 : };
278 : }
279 :
280 : pub mod v16 {
281 : pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
282 : use bytes::{Buf, Bytes};
283 : use postgres_ffi::{OffsetNumber, TransactionId};
284 :
285 : pub struct XlHeapDelete {
286 : pub xmax: TransactionId,
287 : pub offnum: OffsetNumber,
288 : pub infobits_set: u8,
289 : pub flags: u8,
290 : }
291 :
292 : impl XlHeapDelete {
293 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
294 0 : XlHeapDelete {
295 0 : xmax: buf.get_u32_le(),
296 0 : offnum: buf.get_u16_le(),
297 0 : infobits_set: buf.get_u8(),
298 0 : flags: buf.get_u8(),
299 0 : }
300 0 : }
301 : }
302 :
303 : #[repr(C)]
304 0 : #[derive(Debug)]
305 : pub struct XlHeapUpdate {
306 : pub old_xmax: TransactionId,
307 : pub old_offnum: OffsetNumber,
308 : pub old_infobits_set: u8,
309 : pub flags: u8,
310 : pub new_xmax: TransactionId,
311 : pub new_offnum: OffsetNumber,
312 : }
313 :
314 : impl XlHeapUpdate {
315 0 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
316 0 : XlHeapUpdate {
317 0 : old_xmax: buf.get_u32_le(),
318 0 : old_offnum: buf.get_u16_le(),
319 0 : old_infobits_set: buf.get_u8(),
320 0 : flags: buf.get_u8(),
321 0 : new_xmax: buf.get_u32_le(),
322 0 : new_offnum: buf.get_u16_le(),
323 0 : }
324 0 : }
325 : }
326 :
327 : #[repr(C)]
328 0 : #[derive(Debug)]
329 : pub struct XlHeapLock {
330 : pub locking_xid: TransactionId,
331 : pub offnum: OffsetNumber,
332 : pub infobits_set: u8,
333 : pub flags: u8,
334 : }
335 :
336 : impl XlHeapLock {
337 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
338 0 : XlHeapLock {
339 0 : locking_xid: buf.get_u32_le(),
340 0 : offnum: buf.get_u16_le(),
341 0 : infobits_set: buf.get_u8(),
342 0 : flags: buf.get_u8(),
343 0 : }
344 0 : }
345 : }
346 :
347 : /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
348 : pub mod rm_neon {
349 : use bytes::{Buf, Bytes};
350 : use postgres_ffi::{OffsetNumber, TransactionId};
351 :
352 : #[repr(C)]
353 0 : #[derive(Debug)]
354 : pub struct XlNeonHeapInsert {
355 : pub offnum: OffsetNumber,
356 : pub flags: u8,
357 : }
358 :
359 : impl XlNeonHeapInsert {
360 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
361 0 : XlNeonHeapInsert {
362 0 : offnum: buf.get_u16_le(),
363 0 : flags: buf.get_u8(),
364 0 : }
365 0 : }
366 : }
367 :
368 : #[repr(C)]
369 0 : #[derive(Debug)]
370 : pub struct XlNeonHeapMultiInsert {
371 : pub flags: u8,
372 : pub _padding: u8,
373 : pub ntuples: u16,
374 : pub t_cid: u32,
375 : }
376 :
377 : impl XlNeonHeapMultiInsert {
378 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
379 0 : XlNeonHeapMultiInsert {
380 0 : flags: buf.get_u8(),
381 0 : _padding: buf.get_u8(),
382 0 : ntuples: buf.get_u16_le(),
383 0 : t_cid: buf.get_u32_le(),
384 0 : }
385 0 : }
386 : }
387 :
388 : #[repr(C)]
389 0 : #[derive(Debug)]
390 : pub struct XlNeonHeapDelete {
391 : pub xmax: TransactionId,
392 : pub offnum: OffsetNumber,
393 : pub infobits_set: u8,
394 : pub flags: u8,
395 : pub t_cid: u32,
396 : }
397 :
398 : impl XlNeonHeapDelete {
399 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
400 0 : XlNeonHeapDelete {
401 0 : xmax: buf.get_u32_le(),
402 0 : offnum: buf.get_u16_le(),
403 0 : infobits_set: buf.get_u8(),
404 0 : flags: buf.get_u8(),
405 0 : t_cid: buf.get_u32_le(),
406 0 : }
407 0 : }
408 : }
409 :
410 : #[repr(C)]
411 0 : #[derive(Debug)]
412 : pub struct XlNeonHeapUpdate {
413 : pub old_xmax: TransactionId,
414 : pub old_offnum: OffsetNumber,
415 : pub old_infobits_set: u8,
416 : pub flags: u8,
417 : pub t_cid: u32,
418 : pub new_xmax: TransactionId,
419 : pub new_offnum: OffsetNumber,
420 : }
421 :
422 : impl XlNeonHeapUpdate {
423 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
424 0 : XlNeonHeapUpdate {
425 0 : old_xmax: buf.get_u32_le(),
426 0 : old_offnum: buf.get_u16_le(),
427 0 : old_infobits_set: buf.get_u8(),
428 0 : flags: buf.get_u8(),
429 0 : t_cid: buf.get_u32(),
430 0 : new_xmax: buf.get_u32_le(),
431 0 : new_offnum: buf.get_u16_le(),
432 0 : }
433 0 : }
434 : }
435 :
436 : #[repr(C)]
437 0 : #[derive(Debug)]
438 : pub struct XlNeonHeapLock {
439 : pub locking_xid: TransactionId,
440 : pub t_cid: u32,
441 : pub offnum: OffsetNumber,
442 : pub infobits_set: u8,
443 : pub flags: u8,
444 : }
445 :
446 : impl XlNeonHeapLock {
447 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
448 0 : XlNeonHeapLock {
449 0 : locking_xid: buf.get_u32_le(),
450 0 : t_cid: buf.get_u32_le(),
451 0 : offnum: buf.get_u16_le(),
452 0 : infobits_set: buf.get_u8(),
453 0 : flags: buf.get_u8(),
454 0 : }
455 0 : }
456 : }
457 : }
458 : }
459 :
460 : #[repr(C)]
461 0 : #[derive(Debug)]
462 : pub struct XlSmgrCreate {
463 : pub rnode: RelFileNode,
464 : // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
465 : // well-defined size?
466 : pub forknum: u8,
467 : }
468 :
469 : impl XlSmgrCreate {
470 75290 : pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
471 75290 : XlSmgrCreate {
472 75290 : rnode: RelFileNode {
473 75290 : spcnode: buf.get_u32_le(), /* tablespace */
474 75290 : dbnode: buf.get_u32_le(), /* database */
475 75290 : relnode: buf.get_u32_le(), /* relation */
476 75290 : },
477 75290 : forknum: buf.get_u32_le() as u8,
478 75290 : }
479 75290 : }
480 : }
481 :
482 : #[repr(C)]
483 0 : #[derive(Debug)]
484 : pub struct XlSmgrTruncate {
485 : pub blkno: BlockNumber,
486 : pub rnode: RelFileNode,
487 : pub flags: u32,
488 : }
489 :
490 : impl XlSmgrTruncate {
491 202 : pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
492 202 : XlSmgrTruncate {
493 202 : blkno: buf.get_u32_le(),
494 202 : rnode: RelFileNode {
495 202 : spcnode: buf.get_u32_le(), /* tablespace */
496 202 : dbnode: buf.get_u32_le(), /* database */
497 202 : relnode: buf.get_u32_le(), /* relation */
498 202 : },
499 202 : flags: buf.get_u32_le(),
500 202 : }
501 202 : }
502 : }
503 :
504 : #[repr(C)]
505 0 : #[derive(Debug)]
506 : pub struct XlCreateDatabase {
507 : pub db_id: Oid,
508 : pub tablespace_id: Oid,
509 : pub src_db_id: Oid,
510 : pub src_tablespace_id: Oid,
511 : }
512 :
513 : impl XlCreateDatabase {
514 24 : pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
515 24 : XlCreateDatabase {
516 24 : db_id: buf.get_u32_le(),
517 24 : tablespace_id: buf.get_u32_le(),
518 24 : src_db_id: buf.get_u32_le(),
519 24 : src_tablespace_id: buf.get_u32_le(),
520 24 : }
521 24 : }
522 : }
523 :
524 : #[repr(C)]
525 0 : #[derive(Debug)]
526 : pub struct XlDropDatabase {
527 : pub db_id: Oid,
528 : pub n_tablespaces: Oid, /* number of tablespace IDs */
529 : pub tablespace_ids: Vec<Oid>,
530 : }
531 :
532 : impl XlDropDatabase {
533 3 : pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
534 3 : let mut rec = XlDropDatabase {
535 3 : db_id: buf.get_u32_le(),
536 3 : n_tablespaces: buf.get_u32_le(),
537 3 : tablespace_ids: Vec::<Oid>::new(),
538 3 : };
539 :
540 3 : for _i in 0..rec.n_tablespaces {
541 3 : let id = buf.get_u32_le();
542 3 : rec.tablespace_ids.push(id);
543 3 : }
544 :
545 3 : rec
546 3 : }
547 : }
548 :
549 : ///
550 : /// Note: Parsing some fields is missing, because they're not needed.
551 : ///
552 : /// This is similar to the xl_xact_parsed_commit and
553 : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
554 : /// struct for commits and aborts.
555 : ///
556 0 : #[derive(Debug)]
557 : pub struct XlXactParsedRecord {
558 : pub xid: TransactionId,
559 : pub info: u8,
560 : pub xact_time: TimestampTz,
561 : pub xinfo: u32,
562 :
563 : pub db_id: Oid,
564 : /* MyDatabaseId */
565 : pub ts_id: Oid,
566 : /* MyDatabaseTableSpace */
567 : pub subxacts: Vec<TransactionId>,
568 :
569 : pub xnodes: Vec<RelFileNode>,
570 : }
571 :
572 : impl XlXactParsedRecord {
573 : /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
574 : /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
575 : /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
576 6113872 : pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
577 6113872 : let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
578 6113872 : // The record starts with time of commit/abort
579 6113872 : let xact_time = buf.get_i64_le();
580 6113872 : let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
581 6109246 : buf.get_u32_le()
582 : } else {
583 4626 : 0
584 : };
585 : let db_id;
586 : let ts_id;
587 6113872 : if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
588 6105005 : db_id = buf.get_u32_le();
589 6105005 : ts_id = buf.get_u32_le();
590 6105005 : } else {
591 8867 : db_id = 0;
592 8867 : ts_id = 0;
593 8867 : }
594 6113872 : let mut subxacts = Vec::<TransactionId>::new();
595 6113872 : if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
596 321 : let nsubxacts = buf.get_i32_le();
597 110245 : for _i in 0..nsubxacts {
598 110245 : let subxact = buf.get_u32_le();
599 110245 : subxacts.push(subxact);
600 110245 : }
601 6113551 : }
602 6113872 : let mut xnodes = Vec::<RelFileNode>::new();
603 6113872 : if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
604 19461 : let nrels = buf.get_i32_le();
605 65359 : for _i in 0..nrels {
606 65359 : let spcnode = buf.get_u32_le();
607 65359 : let dbnode = buf.get_u32_le();
608 65359 : let relnode = buf.get_u32_le();
609 65359 : trace!(
610 0 : "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
611 0 : spcnode,
612 0 : dbnode,
613 0 : relnode
614 0 : );
615 65359 : xnodes.push(RelFileNode {
616 65359 : spcnode,
617 65359 : dbnode,
618 65359 : relnode,
619 65359 : });
620 : }
621 6094411 : }
622 :
623 6113872 : if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
624 0 : let nitems = buf.get_i32_le();
625 0 : debug!(
626 0 : "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
627 0 : nitems
628 0 : );
629 0 : let sizeof_xl_xact_stats_item = 12;
630 0 : buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
631 6113872 : }
632 :
633 6113872 : if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
634 82511 : let nmsgs = buf.get_i32_le();
635 82511 : let sizeof_shared_invalidation_message = 16;
636 82511 : buf.advance(
637 82511 : (nmsgs * sizeof_shared_invalidation_message)
638 82511 : .try_into()
639 82511 : .unwrap(),
640 82511 : );
641 6031361 : }
642 :
643 6113872 : if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
644 2 : xid = buf.get_u32_le();
645 2 : debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
646 6113870 : }
647 :
648 6113872 : XlXactParsedRecord {
649 6113872 : xid,
650 6113872 : info,
651 6113872 : xact_time,
652 6113872 : xinfo,
653 6113872 : db_id,
654 6113872 : ts_id,
655 6113872 : subxacts,
656 6113872 : xnodes,
657 6113872 : }
658 6113872 : }
659 : }
660 :
661 : #[repr(C)]
662 0 : #[derive(Debug)]
663 : pub struct XlClogTruncate {
664 : pub pageno: u32,
665 : pub oldest_xid: TransactionId,
666 : pub oldest_xid_db: Oid,
667 : }
668 :
669 : impl XlClogTruncate {
670 2 : pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
671 2 : XlClogTruncate {
672 2 : pageno: buf.get_u32_le(),
673 2 : oldest_xid: buf.get_u32_le(),
674 2 : oldest_xid_db: buf.get_u32_le(),
675 2 : }
676 2 : }
677 : }
678 :
679 : #[repr(C)]
680 945060 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
681 : pub struct MultiXactMember {
682 : pub xid: TransactionId,
683 : pub status: MultiXactStatus,
684 : }
685 :
686 : impl MultiXactMember {
687 474674 : pub fn decode(buf: &mut Bytes) -> MultiXactMember {
688 474674 : MultiXactMember {
689 474674 : xid: buf.get_u32_le(),
690 474674 : status: buf.get_u32_le(),
691 474674 : }
692 474674 : }
693 : }
694 :
695 : #[repr(C)]
696 0 : #[derive(Debug)]
697 : pub struct XlMultiXactCreate {
698 : pub mid: MultiXactId,
699 : /* new MultiXact's ID */
700 : pub moff: MultiXactOffset,
701 : /* its starting offset in members file */
702 : pub nmembers: u32,
703 : /* number of member XIDs */
704 : pub members: Vec<MultiXactMember>,
705 : }
706 :
707 : impl XlMultiXactCreate {
708 24816 : pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
709 24816 : let mid = buf.get_u32_le();
710 24816 : let moff = buf.get_u32_le();
711 24816 : let nmembers = buf.get_u32_le();
712 24816 : let mut members = Vec::new();
713 474674 : for _ in 0..nmembers {
714 474674 : members.push(MultiXactMember::decode(buf));
715 474674 : }
716 24816 : XlMultiXactCreate {
717 24816 : mid,
718 24816 : moff,
719 24816 : nmembers,
720 24816 : members,
721 24816 : }
722 24816 : }
723 : }
724 :
725 : #[repr(C)]
726 0 : #[derive(Debug)]
727 : pub struct XlMultiXactTruncate {
728 : pub oldest_multi_db: Oid,
729 : /* to-be-truncated range of multixact offsets */
730 : pub start_trunc_off: MultiXactId,
731 : /* just for completeness' sake */
732 : pub end_trunc_off: MultiXactId,
733 :
734 : /* to-be-truncated range of multixact members */
735 : pub start_trunc_memb: MultiXactOffset,
736 : pub end_trunc_memb: MultiXactOffset,
737 : }
738 :
739 : impl XlMultiXactTruncate {
740 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
741 0 : XlMultiXactTruncate {
742 0 : oldest_multi_db: buf.get_u32_le(),
743 0 : start_trunc_off: buf.get_u32_le(),
744 0 : end_trunc_off: buf.get_u32_le(),
745 0 : start_trunc_memb: buf.get_u32_le(),
746 0 : end_trunc_memb: buf.get_u32_le(),
747 0 : }
748 0 : }
749 : }
750 :
751 : #[repr(C)]
752 0 : #[derive(Debug)]
753 : pub struct XlLogicalMessage {
754 : pub db_id: Oid,
755 : pub transactional: bool,
756 : pub prefix_size: usize,
757 : pub message_size: usize,
758 : }
759 :
760 : impl XlLogicalMessage {
761 128 : pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
762 128 : XlLogicalMessage {
763 128 : db_id: buf.get_u32_le(),
764 128 : transactional: buf.get_u32_le() != 0, // 4-bytes alignment
765 128 : prefix_size: buf.get_u64_le() as usize,
766 128 : message_size: buf.get_u64_le() as usize,
767 128 : }
768 128 : }
769 : }
770 :
771 : /// Main routine to decode a WAL record and figure out which blocks are modified
772 : //
773 : // See xlogrecord.h for details
774 : // The overall layout of an XLOG record is:
775 : // Fixed-size header (XLogRecord struct)
776 : // XLogRecordBlockHeader struct
777 : // If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
778 : // If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
779 : // XLogRecordBlockCompressHeader struct follows.
780 : // If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
781 : // BlockNumber follows
782 : // XLogRecordBlockHeader struct
783 : // ...
784 : // XLogRecordDataHeader[Short|Long] struct
785 : // block data
786 : // block data
787 : // ...
788 : // main data
789 : //
790 : //
791 : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
792 : // It would be more natural for this function to return a DecodedWALRecord as return value,
793 : // but reusing the caller-supplied struct avoids an allocation.
794 : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
795 : //
796 73215430 : pub fn decode_wal_record(
797 73215430 : record: Bytes,
798 73215430 : decoded: &mut DecodedWALRecord,
799 73215430 : pg_version: u32,
800 73215430 : ) -> Result<()> {
801 73215430 : let mut rnode_spcnode: u32 = 0;
802 73215430 : let mut rnode_dbnode: u32 = 0;
803 73215430 : let mut rnode_relnode: u32 = 0;
804 73215430 : let mut got_rnode = false;
805 73215430 :
806 73215430 : let mut buf = record.clone();
807 :
808 : // 1. Parse XLogRecord struct
809 :
810 : // FIXME: assume little-endian here
811 73215430 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
812 :
813 73215430 : trace!(
814 0 : "decode_wal_record xl_rmid = {} xl_info = {}",
815 0 : xlogrec.xl_rmid,
816 0 : xlogrec.xl_info
817 0 : );
818 :
819 73215430 : let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
820 73215430 :
821 73215430 : if buf.remaining() != remaining {
822 0 : //TODO error
823 73215430 : }
824 :
825 73215430 : let mut max_block_id = 0;
826 73215430 : let mut blocks_total_len: u32 = 0;
827 73215430 : let mut main_data_len = 0;
828 73215430 : let mut datatotal: u32 = 0;
829 73215430 : decoded.blocks.clear();
830 :
831 : // 2. Decode the headers.
832 : // XLogRecordBlockHeaders if any,
833 : // XLogRecordDataHeader[Short|Long]
834 213515724 : while buf.remaining() > datatotal as usize {
835 140300294 : let block_id = buf.get_u8();
836 140300294 :
837 140300294 : match block_id {
838 72822961 : pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
839 72822961 : /* XLogRecordDataHeaderShort */
840 72822961 : main_data_len = buf.get_u8() as u32;
841 72822961 : datatotal += main_data_len;
842 72822961 : }
843 :
844 149202 : pg_constants::XLR_BLOCK_ID_DATA_LONG => {
845 149202 : /* XLogRecordDataHeaderLong */
846 149202 : main_data_len = buf.get_u32_le();
847 149202 : datatotal += main_data_len;
848 149202 : }
849 :
850 8 : pg_constants::XLR_BLOCK_ID_ORIGIN => {
851 8 : // RepOriginId is uint16
852 8 : buf.advance(2);
853 8 : }
854 :
855 111150 : pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
856 111150 : // TransactionId is uint32
857 111150 : buf.advance(4);
858 111150 : }
859 :
860 67216973 : 0..=pg_constants::XLR_MAX_BLOCK_ID => {
861 : /* XLogRecordBlockHeader */
862 67216973 : let mut blk = DecodedBkpBlock::new();
863 67216973 :
864 67216973 : if block_id <= max_block_id {
865 64317008 : // TODO
866 64317008 : //report_invalid_record(state,
867 64317008 : // "out-of-order block_id %u at %X/%X",
868 64317008 : // block_id,
869 64317008 : // (uint32) (state->ReadRecPtr >> 32),
870 64317008 : // (uint32) state->ReadRecPtr);
871 64317008 : // goto err;
872 64317008 : }
873 67216973 : max_block_id = block_id;
874 67216973 :
875 67216973 : let fork_flags: u8 = buf.get_u8();
876 67216973 : blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
877 67216973 : blk.flags = fork_flags;
878 67216973 : blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
879 67216973 : blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
880 67216973 : blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
881 67216973 : blk.data_len = buf.get_u16_le();
882 67216973 :
883 67216973 : /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
884 67216973 :
885 67216973 : datatotal += blk.data_len as u32;
886 67216973 : blocks_total_len += blk.data_len as u32;
887 67216973 :
888 67216973 : if blk.has_image {
889 279552 : blk.bimg_len = buf.get_u16_le();
890 279552 : blk.hole_offset = buf.get_u16_le();
891 279552 : blk.bimg_info = buf.get_u8();
892 :
893 0 : blk.apply_image = dispatch_pgversion!(
894 279552 : pg_version,
895 279552 : (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
896 : );
897 :
898 279552 : let blk_img_is_compressed =
899 279552 : postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
900 :
901 279552 : if blk_img_is_compressed {
902 0 : debug!("compressed block image , pg_version = {}", pg_version);
903 279552 : }
904 :
905 279552 : if blk_img_is_compressed {
906 0 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
907 0 : blk.hole_length = buf.get_u16_le();
908 0 : } else {
909 0 : blk.hole_length = 0;
910 0 : }
911 279552 : } else {
912 279552 : blk.hole_length = BLCKSZ - blk.bimg_len;
913 279552 : }
914 279552 : datatotal += blk.bimg_len as u32;
915 279552 : blocks_total_len += blk.bimg_len as u32;
916 279552 :
917 279552 : /*
918 279552 : * cross-check that hole_offset > 0, hole_length > 0 and
919 279552 : * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
920 279552 : */
921 279552 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
922 217902 : && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
923 0 : {
924 0 : // TODO
925 0 : /*
926 0 : report_invalid_record(state,
927 0 : "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
928 0 : (unsigned int) blk->hole_offset,
929 0 : (unsigned int) blk->hole_length,
930 0 : (unsigned int) blk->bimg_len,
931 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
932 0 : goto err;
933 0 : */
934 279552 : }
935 :
936 : /*
937 : * cross-check that hole_offset == 0 and hole_length == 0 if
938 : * the HAS_HOLE flag is not set.
939 : */
940 279552 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
941 61650 : && (blk.hole_offset != 0 || blk.hole_length != 0)
942 0 : {
943 0 : // TODO
944 0 : /*
945 0 : report_invalid_record(state,
946 0 : "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
947 0 : (unsigned int) blk->hole_offset,
948 0 : (unsigned int) blk->hole_length,
949 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
950 0 : goto err;
951 0 : */
952 279552 : }
953 :
954 : /*
955 : * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
956 : * flag is set.
957 : */
958 279552 : if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
959 61650 : // TODO
960 61650 : /*
961 61650 : report_invalid_record(state,
962 61650 : "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
963 61650 : (unsigned int) blk->bimg_len,
964 61650 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
965 61650 : goto err;
966 61650 : */
967 217902 : }
968 :
969 : /*
970 : * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
971 : * IS_COMPRESSED flag is set.
972 : */
973 279552 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
974 61650 : && !blk_img_is_compressed
975 61650 : && blk.bimg_len != BLCKSZ
976 0 : {
977 0 : // TODO
978 0 : /*
979 0 : report_invalid_record(state,
980 0 : "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
981 0 : (unsigned int) blk->data_len,
982 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
983 0 : goto err;
984 0 : */
985 279552 : }
986 66937421 : }
987 67216973 : if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
988 64321742 : rnode_spcnode = buf.get_u32_le();
989 64321742 : rnode_dbnode = buf.get_u32_le();
990 64321742 : rnode_relnode = buf.get_u32_le();
991 64321742 : got_rnode = true;
992 64321742 : } else if !got_rnode {
993 0 : // TODO
994 0 : /*
995 0 : report_invalid_record(state,
996 0 : "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
997 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
998 0 : goto err; */
999 2895231 : }
1000 :
1001 67216973 : blk.rnode_spcnode = rnode_spcnode;
1002 67216973 : blk.rnode_dbnode = rnode_dbnode;
1003 67216973 : blk.rnode_relnode = rnode_relnode;
1004 67216973 :
1005 67216973 : blk.blkno = buf.get_u32_le();
1006 67216973 : trace!(
1007 0 : "this record affects {}/{}/{} blk {}",
1008 0 : rnode_spcnode,
1009 0 : rnode_dbnode,
1010 0 : rnode_relnode,
1011 0 : blk.blkno
1012 0 : );
1013 :
1014 67216973 : decoded.blocks.push(blk);
1015 : }
1016 :
1017 0 : _ => {
1018 0 : // TODO: invalid block_id
1019 0 : }
1020 : }
1021 : }
1022 :
1023 : // 3. Decode blocks.
1024 73215430 : let mut ptr = record.len() - buf.remaining();
1025 73215430 : for blk in decoded.blocks.iter_mut() {
1026 67216973 : if blk.has_image {
1027 279552 : blk.bimg_offset = ptr as u32;
1028 279552 : ptr += blk.bimg_len as usize;
1029 66937421 : }
1030 67216973 : if blk.has_data {
1031 57772376 : ptr += blk.data_len as usize;
1032 57772376 : }
1033 : }
1034 : // We don't need them, so just skip blocks_total_len bytes
1035 73215430 : buf.advance(blocks_total_len as usize);
1036 73215430 : assert_eq!(ptr, record.len() - buf.remaining());
1037 :
1038 73215430 : let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
1039 73215430 :
1040 73215430 : // 4. Decode main_data
1041 73215430 : if main_data_len > 0 {
1042 72972163 : assert_eq!(buf.remaining(), main_data_len as usize);
1043 243267 : }
1044 :
1045 73215430 : decoded.xl_xid = xlogrec.xl_xid;
1046 73215430 : decoded.xl_info = xlogrec.xl_info;
1047 73215430 : decoded.xl_rmid = xlogrec.xl_rmid;
1048 73215430 : decoded.record = record;
1049 73215430 : decoded.main_data_offset = main_data_offset;
1050 73215430 :
1051 73215430 : Ok(())
1052 73215430 : }
1053 :
1054 : ///
1055 : /// Build a human-readable string to describe a WAL record
1056 : ///
1057 : /// For debugging purposes
1058 0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
1059 0 : match rec {
1060 0 : NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
1061 0 : "will_init: {}, {}",
1062 0 : will_init,
1063 0 : describe_postgres_wal_record(rec)?
1064 : )),
1065 0 : _ => Ok(format!("{:?}", rec)),
1066 : }
1067 0 : }
1068 :
1069 0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
1070 0 : // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
1071 0 : // Maybe use the postgres wal redo process, the same used for replaying WAL records?
1072 0 : // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
1073 0 : // without worrying about security?
1074 0 : //
1075 0 : // But for now, we have a hand-written code for a few common WAL record types here.
1076 0 :
1077 0 : let mut buf = record.clone();
1078 :
1079 : // 1. Parse XLogRecord struct
1080 :
1081 : // FIXME: assume little-endian here
1082 0 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
1083 :
1084 : let unknown_str: String;
1085 :
1086 0 : let result: &str = match xlogrec.xl_rmid {
1087 : pg_constants::RM_HEAP2_ID => {
1088 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1089 0 : match info {
1090 0 : pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
1091 0 : pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
1092 : _ => {
1093 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1094 0 : &unknown_str
1095 : }
1096 : }
1097 : }
1098 : pg_constants::RM_HEAP_ID => {
1099 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1100 0 : match info {
1101 0 : pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
1102 0 : pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
1103 0 : pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
1104 0 : pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
1105 : _ => {
1106 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1107 0 : &unknown_str
1108 : }
1109 : }
1110 : }
1111 : pg_constants::RM_XLOG_ID => {
1112 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1113 0 : match info {
1114 0 : pg_constants::XLOG_FPI => "XLOG FPI",
1115 0 : pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
1116 : _ => {
1117 0 : unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
1118 0 : &unknown_str
1119 : }
1120 : }
1121 : }
1122 0 : rmid => {
1123 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1124 0 :
1125 0 : unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
1126 0 : &unknown_str
1127 : }
1128 : };
1129 :
1130 0 : Ok(String::from(result))
1131 0 : }
|