Line data Source code
1 : //!
2 : //! Functions for parsing WAL records.
3 : //!
4 :
5 : use anyhow::Result;
6 : use bytes::{Buf, Bytes};
7 : use postgres_ffi::dispatch_pgversion;
8 : use postgres_ffi::pg_constants;
9 : use postgres_ffi::BLCKSZ;
10 : use postgres_ffi::{BlockNumber, TimestampTz};
11 : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
12 : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
13 : use serde::{Deserialize, Serialize};
14 : use tracing::*;
15 : use utils::bin_ser::DeserializeError;
16 :
17 : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
18 : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
19 379215474 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20 : pub enum NeonWalRecord {
21 : /// Native PostgreSQL WAL record
22 : Postgres { will_init: bool, rec: Bytes },
23 :
24 : /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
25 : ClearVisibilityMapFlags {
26 : new_heap_blkno: Option<u32>,
27 : old_heap_blkno: Option<u32>,
28 : flags: u8,
29 : },
30 : /// Mark transaction IDs as committed on a CLOG page
31 : ClogSetCommitted {
32 : xids: Vec<TransactionId>,
33 : timestamp: TimestampTz,
34 : },
35 : /// Mark transaction IDs as aborted on a CLOG page
36 : ClogSetAborted { xids: Vec<TransactionId> },
37 : /// Extend multixact offsets SLRU
38 : MultixactOffsetCreate {
39 : mid: MultiXactId,
40 : moff: MultiXactOffset,
41 : },
42 : /// Extend multixact members SLRU.
43 : MultixactMembersCreate {
44 : moff: MultiXactOffset,
45 : members: Vec<MultiXactMember>,
46 : },
47 : /// Update the map of AUX files, either writing or dropping an entry
48 : AuxFile {
49 : file_path: String,
50 : content: Option<Bytes>,
51 : },
52 : }
53 :
54 : impl NeonWalRecord {
55 : /// Does replaying this WAL record initialize the page from scratch, or does
56 : /// it need to be applied over the previous image of the page?
57 128218110 : pub fn will_init(&self) -> bool {
58 128218110 : match self {
59 108164222 : NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
60 :
61 : // None of the special neon record types currently initialize the page
62 20053888 : _ => false,
63 : }
64 128218110 : }
65 : }
66 :
67 : /// DecodedBkpBlock represents per-page data contained in a WAL record.
68 66719524 : #[derive(Default)]
69 : pub struct DecodedBkpBlock {
70 : /* Is this block ref in use? */
71 : //in_use: bool,
72 :
73 : /* Identify the block this refers to */
74 : pub rnode_spcnode: u32,
75 : pub rnode_dbnode: u32,
76 : pub rnode_relnode: u32,
77 : // Note that we have a few special forknum values for non-rel files.
78 : pub forknum: u8,
79 : pub blkno: u32,
80 :
81 : /* copy of the fork_flags field from the XLogRecordBlockHeader */
82 : pub flags: u8,
83 :
84 : /* Information on full-page image, if any */
85 : pub has_image: bool,
86 : /* has image, even for consistency checking */
87 : pub apply_image: bool,
88 : /* has image that should be restored */
89 : pub will_init: bool,
90 : /* record doesn't need previous page version to apply */
91 : //char *bkp_image;
92 : pub hole_offset: u16,
93 : pub hole_length: u16,
94 : pub bimg_offset: u32,
95 : pub bimg_len: u16,
96 : pub bimg_info: u8,
97 :
98 : /* Buffer holding the rmgr-specific data associated with this block */
99 : has_data: bool,
100 : data_len: u16,
101 : }
102 :
103 : impl DecodedBkpBlock {
104 66719524 : pub fn new() -> DecodedBkpBlock {
105 66719524 : Default::default()
106 66719524 : }
107 : }
108 :
109 749223 : #[derive(Default)]
110 : pub struct DecodedWALRecord {
111 : pub xl_xid: TransactionId,
112 : pub xl_info: u8,
113 : pub xl_rmid: u8,
114 : pub record: Bytes, // raw XLogRecord
115 :
116 : pub blocks: Vec<DecodedBkpBlock>,
117 : pub main_data_offset: usize,
118 : }
119 :
120 : #[repr(C)]
121 0 : #[derive(Debug, Clone, Copy)]
122 : pub struct RelFileNode {
123 : pub spcnode: Oid, /* tablespace */
124 : pub dbnode: Oid, /* database */
125 : pub relnode: Oid, /* relation */
126 : }
127 :
128 : #[repr(C)]
129 0 : #[derive(Debug)]
130 : pub struct XlRelmapUpdate {
131 : pub dbid: Oid, /* database ID, or 0 for shared map */
132 : pub tsid: Oid, /* database's tablespace, or pg_global */
133 : pub nbytes: i32, /* size of relmap data */
134 : }
135 :
136 : impl XlRelmapUpdate {
137 62 : pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
138 62 : XlRelmapUpdate {
139 62 : dbid: buf.get_u32_le(),
140 62 : tsid: buf.get_u32_le(),
141 62 : nbytes: buf.get_i32_le(),
142 62 : }
143 62 : }
144 : }
145 :
146 : pub mod v14 {
147 : use bytes::{Buf, Bytes};
148 : use postgres_ffi::{OffsetNumber, TransactionId};
149 :
150 : #[repr(C)]
151 0 : #[derive(Debug)]
152 : pub struct XlHeapInsert {
153 : pub offnum: OffsetNumber,
154 : pub flags: u8,
155 : }
156 :
157 : impl XlHeapInsert {
158 41797477 : pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
159 41797477 : XlHeapInsert {
160 41797477 : offnum: buf.get_u16_le(),
161 41797477 : flags: buf.get_u8(),
162 41797477 : }
163 41797477 : }
164 : }
165 :
166 : #[repr(C)]
167 0 : #[derive(Debug)]
168 : pub struct XlHeapMultiInsert {
169 : pub flags: u8,
170 : pub _padding: u8,
171 : pub ntuples: u16,
172 : }
173 :
174 : impl XlHeapMultiInsert {
175 1126297 : pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
176 1126297 : XlHeapMultiInsert {
177 1126297 : flags: buf.get_u8(),
178 1126297 : _padding: buf.get_u8(),
179 1126297 : ntuples: buf.get_u16_le(),
180 1126297 : }
181 1126297 : }
182 : }
183 :
184 : #[repr(C)]
185 0 : #[derive(Debug)]
186 : pub struct XlHeapDelete {
187 : pub xmax: TransactionId,
188 : pub offnum: OffsetNumber,
189 : pub _padding: u16,
190 : pub t_cid: u32,
191 : pub infobits_set: u8,
192 : pub flags: u8,
193 : }
194 :
195 : impl XlHeapDelete {
196 2303978 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
197 2303978 : XlHeapDelete {
198 2303978 : xmax: buf.get_u32_le(),
199 2303978 : offnum: buf.get_u16_le(),
200 2303978 : _padding: buf.get_u16_le(),
201 2303978 : t_cid: buf.get_u32_le(),
202 2303978 : infobits_set: buf.get_u8(),
203 2303978 : flags: buf.get_u8(),
204 2303978 : }
205 2303978 : }
206 : }
207 :
208 : #[repr(C)]
209 0 : #[derive(Debug)]
210 : pub struct XlHeapUpdate {
211 : pub old_xmax: TransactionId,
212 : pub old_offnum: OffsetNumber,
213 : pub old_infobits_set: u8,
214 : pub flags: u8,
215 : pub t_cid: u32,
216 : pub new_xmax: TransactionId,
217 : pub new_offnum: OffsetNumber,
218 : }
219 :
220 : impl XlHeapUpdate {
221 3248613 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
222 3248613 : XlHeapUpdate {
223 3248613 : old_xmax: buf.get_u32_le(),
224 3248613 : old_offnum: buf.get_u16_le(),
225 3248613 : old_infobits_set: buf.get_u8(),
226 3248613 : flags: buf.get_u8(),
227 3248613 : t_cid: buf.get_u32_le(),
228 3248613 : new_xmax: buf.get_u32_le(),
229 3248613 : new_offnum: buf.get_u16_le(),
230 3248613 : }
231 3248613 : }
232 : }
233 :
234 : #[repr(C)]
235 0 : #[derive(Debug)]
236 : pub struct XlHeapLock {
237 : pub locking_xid: TransactionId,
238 : pub offnum: OffsetNumber,
239 : pub _padding: u16,
240 : pub t_cid: u32,
241 : pub infobits_set: u8,
242 : pub flags: u8,
243 : }
244 :
245 : impl XlHeapLock {
246 2740111 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
247 2740111 : XlHeapLock {
248 2740111 : locking_xid: buf.get_u32_le(),
249 2740111 : offnum: buf.get_u16_le(),
250 2740111 : _padding: buf.get_u16_le(),
251 2740111 : t_cid: buf.get_u32_le(),
252 2740111 : infobits_set: buf.get_u8(),
253 2740111 : flags: buf.get_u8(),
254 2740111 : }
255 2740111 : }
256 : }
257 :
258 : #[repr(C)]
259 0 : #[derive(Debug)]
260 : pub struct XlHeapLockUpdated {
261 : pub xmax: TransactionId,
262 : pub offnum: OffsetNumber,
263 : pub infobits_set: u8,
264 : pub flags: u8,
265 : }
266 :
267 : impl XlHeapLockUpdated {
268 4146 : pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
269 4146 : XlHeapLockUpdated {
270 4146 : xmax: buf.get_u32_le(),
271 4146 : offnum: buf.get_u16_le(),
272 4146 : infobits_set: buf.get_u8(),
273 4146 : flags: buf.get_u8(),
274 4146 : }
275 4146 : }
276 : }
277 : }
278 :
279 : pub mod v15 {
280 : pub use super::v14::{
281 : XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
282 : };
283 : }
284 :
285 : pub mod v16 {
286 : pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
287 : use bytes::{Buf, Bytes};
288 : use postgres_ffi::{OffsetNumber, TransactionId};
289 :
290 : pub struct XlHeapDelete {
291 : pub xmax: TransactionId,
292 : pub offnum: OffsetNumber,
293 : pub infobits_set: u8,
294 : pub flags: u8,
295 : }
296 :
297 : impl XlHeapDelete {
298 0 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
299 0 : XlHeapDelete {
300 0 : xmax: buf.get_u32_le(),
301 0 : offnum: buf.get_u16_le(),
302 0 : infobits_set: buf.get_u8(),
303 0 : flags: buf.get_u8(),
304 0 : }
305 0 : }
306 : }
307 :
308 : #[repr(C)]
309 0 : #[derive(Debug)]
310 : pub struct XlHeapUpdate {
311 : pub old_xmax: TransactionId,
312 : pub old_offnum: OffsetNumber,
313 : pub old_infobits_set: u8,
314 : pub flags: u8,
315 : pub new_xmax: TransactionId,
316 : pub new_offnum: OffsetNumber,
317 : }
318 :
319 : impl XlHeapUpdate {
320 0 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
321 0 : XlHeapUpdate {
322 0 : old_xmax: buf.get_u32_le(),
323 0 : old_offnum: buf.get_u16_le(),
324 0 : old_infobits_set: buf.get_u8(),
325 0 : flags: buf.get_u8(),
326 0 : new_xmax: buf.get_u32_le(),
327 0 : new_offnum: buf.get_u16_le(),
328 0 : }
329 0 : }
330 : }
331 :
332 : #[repr(C)]
333 0 : #[derive(Debug)]
334 : pub struct XlHeapLock {
335 : pub locking_xid: TransactionId,
336 : pub offnum: OffsetNumber,
337 : pub infobits_set: u8,
338 : pub flags: u8,
339 : }
340 :
341 : impl XlHeapLock {
342 0 : pub fn decode(buf: &mut Bytes) -> XlHeapLock {
343 0 : XlHeapLock {
344 0 : locking_xid: buf.get_u32_le(),
345 0 : offnum: buf.get_u16_le(),
346 0 : infobits_set: buf.get_u8(),
347 0 : flags: buf.get_u8(),
348 0 : }
349 0 : }
350 : }
351 :
352 : /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
353 : pub mod rm_neon {
354 : use bytes::{Buf, Bytes};
355 : use postgres_ffi::{OffsetNumber, TransactionId};
356 :
357 : #[repr(C)]
358 0 : #[derive(Debug)]
359 : pub struct XlNeonHeapInsert {
360 : pub offnum: OffsetNumber,
361 : pub flags: u8,
362 : }
363 :
364 : impl XlNeonHeapInsert {
365 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapInsert {
366 0 : XlNeonHeapInsert {
367 0 : offnum: buf.get_u16_le(),
368 0 : flags: buf.get_u8(),
369 0 : }
370 0 : }
371 : }
372 :
373 : #[repr(C)]
374 0 : #[derive(Debug)]
375 : pub struct XlNeonHeapMultiInsert {
376 : pub flags: u8,
377 : pub _padding: u8,
378 : pub ntuples: u16,
379 : pub t_cid: u32,
380 : }
381 :
382 : impl XlNeonHeapMultiInsert {
383 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapMultiInsert {
384 0 : XlNeonHeapMultiInsert {
385 0 : flags: buf.get_u8(),
386 0 : _padding: buf.get_u8(),
387 0 : ntuples: buf.get_u16_le(),
388 0 : t_cid: buf.get_u32_le(),
389 0 : }
390 0 : }
391 : }
392 :
393 : #[repr(C)]
394 0 : #[derive(Debug)]
395 : pub struct XlNeonHeapDelete {
396 : pub xmax: TransactionId,
397 : pub offnum: OffsetNumber,
398 : pub infobits_set: u8,
399 : pub flags: u8,
400 : pub t_cid: u32,
401 : }
402 :
403 : impl XlNeonHeapDelete {
404 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapDelete {
405 0 : XlNeonHeapDelete {
406 0 : xmax: buf.get_u32_le(),
407 0 : offnum: buf.get_u16_le(),
408 0 : infobits_set: buf.get_u8(),
409 0 : flags: buf.get_u8(),
410 0 : t_cid: buf.get_u32_le(),
411 0 : }
412 0 : }
413 : }
414 :
415 : #[repr(C)]
416 0 : #[derive(Debug)]
417 : pub struct XlNeonHeapUpdate {
418 : pub old_xmax: TransactionId,
419 : pub old_offnum: OffsetNumber,
420 : pub old_infobits_set: u8,
421 : pub flags: u8,
422 : pub t_cid: u32,
423 : pub new_xmax: TransactionId,
424 : pub new_offnum: OffsetNumber,
425 : }
426 :
427 : impl XlNeonHeapUpdate {
428 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapUpdate {
429 0 : XlNeonHeapUpdate {
430 0 : old_xmax: buf.get_u32_le(),
431 0 : old_offnum: buf.get_u16_le(),
432 0 : old_infobits_set: buf.get_u8(),
433 0 : flags: buf.get_u8(),
434 0 : t_cid: buf.get_u32(),
435 0 : new_xmax: buf.get_u32_le(),
436 0 : new_offnum: buf.get_u16_le(),
437 0 : }
438 0 : }
439 : }
440 :
441 : #[repr(C)]
442 0 : #[derive(Debug)]
443 : pub struct XlNeonHeapLock {
444 : pub locking_xid: TransactionId,
445 : pub t_cid: u32,
446 : pub offnum: OffsetNumber,
447 : pub infobits_set: u8,
448 : pub flags: u8,
449 : }
450 :
451 : impl XlNeonHeapLock {
452 0 : pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
453 0 : XlNeonHeapLock {
454 0 : locking_xid: buf.get_u32_le(),
455 0 : t_cid: buf.get_u32_le(),
456 0 : offnum: buf.get_u16_le(),
457 0 : infobits_set: buf.get_u8(),
458 0 : flags: buf.get_u8(),
459 0 : }
460 0 : }
461 : }
462 : }
463 : }
464 :
465 : #[repr(C)]
466 0 : #[derive(Debug)]
467 : pub struct XlSmgrCreate {
468 : pub rnode: RelFileNode,
469 : // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
470 : // well-defined size?
471 : pub forknum: u8,
472 : }
473 :
474 : impl XlSmgrCreate {
475 75385 : pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
476 75385 : XlSmgrCreate {
477 75385 : rnode: RelFileNode {
478 75385 : spcnode: buf.get_u32_le(), /* tablespace */
479 75385 : dbnode: buf.get_u32_le(), /* database */
480 75385 : relnode: buf.get_u32_le(), /* relation */
481 75385 : },
482 75385 : forknum: buf.get_u32_le() as u8,
483 75385 : }
484 75385 : }
485 : }
486 :
487 : #[repr(C)]
488 0 : #[derive(Debug)]
489 : pub struct XlSmgrTruncate {
490 : pub blkno: BlockNumber,
491 : pub rnode: RelFileNode,
492 : pub flags: u32,
493 : }
494 :
495 : impl XlSmgrTruncate {
496 212 : pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
497 212 : XlSmgrTruncate {
498 212 : blkno: buf.get_u32_le(),
499 212 : rnode: RelFileNode {
500 212 : spcnode: buf.get_u32_le(), /* tablespace */
501 212 : dbnode: buf.get_u32_le(), /* database */
502 212 : relnode: buf.get_u32_le(), /* relation */
503 212 : },
504 212 : flags: buf.get_u32_le(),
505 212 : }
506 212 : }
507 : }
508 :
509 : #[repr(C)]
510 0 : #[derive(Debug)]
511 : pub struct XlCreateDatabase {
512 : pub db_id: Oid,
513 : pub tablespace_id: Oid,
514 : pub src_db_id: Oid,
515 : pub src_tablespace_id: Oid,
516 : }
517 :
518 : impl XlCreateDatabase {
519 24 : pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
520 24 : XlCreateDatabase {
521 24 : db_id: buf.get_u32_le(),
522 24 : tablespace_id: buf.get_u32_le(),
523 24 : src_db_id: buf.get_u32_le(),
524 24 : src_tablespace_id: buf.get_u32_le(),
525 24 : }
526 24 : }
527 : }
528 :
529 : #[repr(C)]
530 0 : #[derive(Debug)]
531 : pub struct XlDropDatabase {
532 : pub db_id: Oid,
533 : pub n_tablespaces: Oid, /* number of tablespace IDs */
534 : pub tablespace_ids: Vec<Oid>,
535 : }
536 :
537 : impl XlDropDatabase {
538 3 : pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
539 3 : let mut rec = XlDropDatabase {
540 3 : db_id: buf.get_u32_le(),
541 3 : n_tablespaces: buf.get_u32_le(),
542 3 : tablespace_ids: Vec::<Oid>::new(),
543 3 : };
544 :
545 3 : for _i in 0..rec.n_tablespaces {
546 3 : let id = buf.get_u32_le();
547 3 : rec.tablespace_ids.push(id);
548 3 : }
549 :
550 3 : rec
551 3 : }
552 : }
553 :
554 : ///
555 : /// Note: Parsing some fields is missing, because they're not needed.
556 : ///
557 : /// This is similar to the xl_xact_parsed_commit and
558 : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
559 : /// struct for commits and aborts.
560 : ///
561 0 : #[derive(Debug)]
562 : pub struct XlXactParsedRecord {
563 : pub xid: TransactionId,
564 : pub info: u8,
565 : pub xact_time: TimestampTz,
566 : pub xinfo: u32,
567 :
568 : pub db_id: Oid,
569 : /* MyDatabaseId */
570 : pub ts_id: Oid,
571 : /* MyDatabaseTableSpace */
572 : pub subxacts: Vec<TransactionId>,
573 :
574 : pub xnodes: Vec<RelFileNode>,
575 : }
576 :
577 : impl XlXactParsedRecord {
578 : /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
579 : /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
580 : /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
581 6041345 : pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
582 6041345 : let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
583 6041345 : // The record starts with time of commit/abort
584 6041345 : let xact_time = buf.get_i64_le();
585 6041345 : let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
586 6036723 : buf.get_u32_le()
587 : } else {
588 4622 : 0
589 : };
590 : let db_id;
591 : let ts_id;
592 6041345 : if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
593 6032476 : db_id = buf.get_u32_le();
594 6032476 : ts_id = buf.get_u32_le();
595 6032476 : } else {
596 8869 : db_id = 0;
597 8869 : ts_id = 0;
598 8869 : }
599 6041345 : let mut subxacts = Vec::<TransactionId>::new();
600 6041345 : if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
601 321 : let nsubxacts = buf.get_i32_le();
602 110245 : for _i in 0..nsubxacts {
603 110245 : let subxact = buf.get_u32_le();
604 110245 : subxacts.push(subxact);
605 110245 : }
606 6041024 : }
607 6041345 : let mut xnodes = Vec::<RelFileNode>::new();
608 6041345 : if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
609 19498 : let nrels = buf.get_i32_le();
610 65435 : for _i in 0..nrels {
611 65435 : let spcnode = buf.get_u32_le();
612 65435 : let dbnode = buf.get_u32_le();
613 65435 : let relnode = buf.get_u32_le();
614 65435 : trace!(
615 0 : "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
616 0 : spcnode,
617 0 : dbnode,
618 0 : relnode
619 0 : );
620 65435 : xnodes.push(RelFileNode {
621 65435 : spcnode,
622 65435 : dbnode,
623 65435 : relnode,
624 65435 : });
625 : }
626 6021847 : }
627 :
628 6041345 : if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
629 0 : let nitems = buf.get_i32_le();
630 0 : debug!(
631 0 : "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
632 0 : nitems
633 0 : );
634 0 : let sizeof_xl_xact_stats_item = 12;
635 0 : buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
636 6041345 : }
637 :
638 6041345 : if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
639 82770 : let nmsgs = buf.get_i32_le();
640 82770 : let sizeof_shared_invalidation_message = 16;
641 82770 : buf.advance(
642 82770 : (nmsgs * sizeof_shared_invalidation_message)
643 82770 : .try_into()
644 82770 : .unwrap(),
645 82770 : );
646 5958575 : }
647 :
648 6041345 : if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
649 2 : xid = buf.get_u32_le();
650 2 : debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
651 6041343 : }
652 :
653 6041345 : XlXactParsedRecord {
654 6041345 : xid,
655 6041345 : info,
656 6041345 : xact_time,
657 6041345 : xinfo,
658 6041345 : db_id,
659 6041345 : ts_id,
660 6041345 : subxacts,
661 6041345 : xnodes,
662 6041345 : }
663 6041345 : }
664 : }
665 :
666 : #[repr(C)]
667 0 : #[derive(Debug)]
668 : pub struct XlClogTruncate {
669 : pub pageno: u32,
670 : pub oldest_xid: TransactionId,
671 : pub oldest_xid_db: Oid,
672 : }
673 :
674 : impl XlClogTruncate {
675 3 : pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
676 3 : XlClogTruncate {
677 3 : pageno: buf.get_u32_le(),
678 3 : oldest_xid: buf.get_u32_le(),
679 3 : oldest_xid_db: buf.get_u32_le(),
680 3 : }
681 3 : }
682 : }
683 :
684 : #[repr(C)]
685 945060 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
686 : pub struct MultiXactMember {
687 : pub xid: TransactionId,
688 : pub status: MultiXactStatus,
689 : }
690 :
691 : impl MultiXactMember {
692 474674 : pub fn decode(buf: &mut Bytes) -> MultiXactMember {
693 474674 : MultiXactMember {
694 474674 : xid: buf.get_u32_le(),
695 474674 : status: buf.get_u32_le(),
696 474674 : }
697 474674 : }
698 : }
699 :
700 : #[repr(C)]
701 0 : #[derive(Debug)]
702 : pub struct XlMultiXactCreate {
703 : pub mid: MultiXactId,
704 : /* new MultiXact's ID */
705 : pub moff: MultiXactOffset,
706 : /* its starting offset in members file */
707 : pub nmembers: u32,
708 : /* number of member XIDs */
709 : pub members: Vec<MultiXactMember>,
710 : }
711 :
712 : impl XlMultiXactCreate {
713 24816 : pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
714 24816 : let mid = buf.get_u32_le();
715 24816 : let moff = buf.get_u32_le();
716 24816 : let nmembers = buf.get_u32_le();
717 24816 : let mut members = Vec::new();
718 474674 : for _ in 0..nmembers {
719 474674 : members.push(MultiXactMember::decode(buf));
720 474674 : }
721 24816 : XlMultiXactCreate {
722 24816 : mid,
723 24816 : moff,
724 24816 : nmembers,
725 24816 : members,
726 24816 : }
727 24816 : }
728 : }
729 :
730 : #[repr(C)]
731 0 : #[derive(Debug)]
732 : pub struct XlMultiXactTruncate {
733 : pub oldest_multi_db: Oid,
734 : /* to-be-truncated range of multixact offsets */
735 : pub start_trunc_off: MultiXactId,
736 : /* just for completeness' sake */
737 : pub end_trunc_off: MultiXactId,
738 :
739 : /* to-be-truncated range of multixact members */
740 : pub start_trunc_memb: MultiXactOffset,
741 : pub end_trunc_memb: MultiXactOffset,
742 : }
743 :
744 : impl XlMultiXactTruncate {
745 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
746 0 : XlMultiXactTruncate {
747 0 : oldest_multi_db: buf.get_u32_le(),
748 0 : start_trunc_off: buf.get_u32_le(),
749 0 : end_trunc_off: buf.get_u32_le(),
750 0 : start_trunc_memb: buf.get_u32_le(),
751 0 : end_trunc_memb: buf.get_u32_le(),
752 0 : }
753 0 : }
754 : }
755 :
756 : #[repr(C)]
757 0 : #[derive(Debug)]
758 : pub struct XlLogicalMessage {
759 : pub db_id: Oid,
760 : pub transactional: bool,
761 : pub prefix_size: usize,
762 : pub message_size: usize,
763 : }
764 :
765 : impl XlLogicalMessage {
766 127 : pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
767 127 : XlLogicalMessage {
768 127 : db_id: buf.get_u32_le(),
769 127 : transactional: buf.get_u32_le() != 0, // 4-bytes alignment
770 127 : prefix_size: buf.get_u64_le() as usize,
771 127 : message_size: buf.get_u64_le() as usize,
772 127 : }
773 127 : }
774 : }
775 :
776 : /// Main routine to decode a WAL record and figure out which blocks are modified
777 : //
778 : // See xlogrecord.h for details
779 : // The overall layout of an XLOG record is:
780 : // Fixed-size header (XLogRecord struct)
781 : // XLogRecordBlockHeader struct
782 : // If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
783 : // If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
784 : // XLogRecordBlockCompressHeader struct follows.
785 : // If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
786 : // BlockNumber follows
787 : // XLogRecordBlockHeader struct
788 : // ...
789 : // XLogRecordDataHeader[Short|Long] struct
790 : // block data
791 : // block data
792 : // ...
793 : // main data
794 : //
795 : //
796 : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
797 : // It would be more natural for this function to return a DecodedWALRecord as return value,
798 : // but reusing the caller-supplied struct avoids an allocation.
799 : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
800 : //
801 72693846 : pub fn decode_wal_record(
802 72693846 : record: Bytes,
803 72693846 : decoded: &mut DecodedWALRecord,
804 72693846 : pg_version: u32,
805 72693846 : ) -> Result<()> {
806 72693846 : let mut rnode_spcnode: u32 = 0;
807 72693846 : let mut rnode_dbnode: u32 = 0;
808 72693846 : let mut rnode_relnode: u32 = 0;
809 72693846 : let mut got_rnode = false;
810 72693846 :
811 72693846 : let mut buf = record.clone();
812 :
813 : // 1. Parse XLogRecord struct
814 :
815 : // FIXME: assume little-endian here
816 72693846 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
817 :
818 72693846 : trace!(
819 0 : "decode_wal_record xl_rmid = {} xl_info = {}",
820 0 : xlogrec.xl_rmid,
821 0 : xlogrec.xl_info
822 0 : );
823 :
824 72693846 : let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
825 72693846 :
826 72693846 : if buf.remaining() != remaining {
827 0 : //TODO error
828 72693846 : }
829 :
830 72693846 : let mut max_block_id = 0;
831 72693846 : let mut blocks_total_len: u32 = 0;
832 72693846 : let mut main_data_len = 0;
833 72693846 : let mut datatotal: u32 = 0;
834 72693846 : decoded.blocks.clear();
835 :
836 : // 2. Decode the headers.
837 : // XLogRecordBlockHeaders if any,
838 : // XLogRecordDataHeader[Short|Long]
839 211978415 : while buf.remaining() > datatotal as usize {
840 139284569 : let block_id = buf.get_u8();
841 139284569 :
842 139284569 : match block_id {
843 72304744 : pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
844 72304744 : /* XLogRecordDataHeaderShort */
845 72304744 : main_data_len = buf.get_u8() as u32;
846 72304744 : datatotal += main_data_len;
847 72304744 : }
848 :
849 149143 : pg_constants::XLR_BLOCK_ID_DATA_LONG => {
850 149143 : /* XLogRecordDataHeaderLong */
851 149143 : main_data_len = buf.get_u32_le();
852 149143 : datatotal += main_data_len;
853 149143 : }
854 :
855 8 : pg_constants::XLR_BLOCK_ID_ORIGIN => {
856 8 : // RepOriginId is uint16
857 8 : buf.advance(2);
858 8 : }
859 :
860 111150 : pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
861 111150 : // TransactionId is uint32
862 111150 : buf.advance(4);
863 111150 : }
864 :
865 66719524 : 0..=pg_constants::XLR_MAX_BLOCK_ID => {
866 : /* XLogRecordBlockHeader */
867 66719524 : let mut blk = DecodedBkpBlock::new();
868 66719524 :
869 66719524 : if block_id <= max_block_id {
870 63858708 : // TODO
871 63858708 : //report_invalid_record(state,
872 63858708 : // "out-of-order block_id %u at %X/%X",
873 63858708 : // block_id,
874 63858708 : // (uint32) (state->ReadRecPtr >> 32),
875 63858708 : // (uint32) state->ReadRecPtr);
876 63858708 : // goto err;
877 63858708 : }
878 66719524 : max_block_id = block_id;
879 66719524 :
880 66719524 : let fork_flags: u8 = buf.get_u8();
881 66719524 : blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
882 66719524 : blk.flags = fork_flags;
883 66719524 : blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
884 66719524 : blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
885 66719524 : blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
886 66719524 : blk.data_len = buf.get_u16_le();
887 66719524 :
888 66719524 : /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
889 66719524 :
890 66719524 : datatotal += blk.data_len as u32;
891 66719524 : blocks_total_len += blk.data_len as u32;
892 66719524 :
893 66719524 : if blk.has_image {
894 275690 : blk.bimg_len = buf.get_u16_le();
895 275690 : blk.hole_offset = buf.get_u16_le();
896 275690 : blk.bimg_info = buf.get_u8();
897 :
898 0 : blk.apply_image = dispatch_pgversion!(
899 275690 : pg_version,
900 275690 : (blk.bimg_info & pgv::bindings::BKPIMAGE_APPLY) != 0
901 : );
902 :
903 275690 : let blk_img_is_compressed =
904 275690 : postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
905 :
906 275690 : if blk_img_is_compressed {
907 0 : debug!("compressed block image , pg_version = {}", pg_version);
908 275690 : }
909 :
910 275690 : if blk_img_is_compressed {
911 0 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
912 0 : blk.hole_length = buf.get_u16_le();
913 0 : } else {
914 0 : blk.hole_length = 0;
915 0 : }
916 275690 : } else {
917 275690 : blk.hole_length = BLCKSZ - blk.bimg_len;
918 275690 : }
919 275690 : datatotal += blk.bimg_len as u32;
920 275690 : blocks_total_len += blk.bimg_len as u32;
921 275690 :
922 275690 : /*
923 275690 : * cross-check that hole_offset > 0, hole_length > 0 and
924 275690 : * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
925 275690 : */
926 275690 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
927 212337 : && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
928 0 : {
929 0 : // TODO
930 0 : /*
931 0 : report_invalid_record(state,
932 0 : "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
933 0 : (unsigned int) blk->hole_offset,
934 0 : (unsigned int) blk->hole_length,
935 0 : (unsigned int) blk->bimg_len,
936 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
937 0 : goto err;
938 0 : */
939 275690 : }
940 :
941 : /*
942 : * cross-check that hole_offset == 0 and hole_length == 0 if
943 : * the HAS_HOLE flag is not set.
944 : */
945 275690 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
946 63353 : && (blk.hole_offset != 0 || blk.hole_length != 0)
947 0 : {
948 0 : // TODO
949 0 : /*
950 0 : report_invalid_record(state,
951 0 : "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
952 0 : (unsigned int) blk->hole_offset,
953 0 : (unsigned int) blk->hole_length,
954 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
955 0 : goto err;
956 0 : */
957 275690 : }
958 :
959 : /*
960 : * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
961 : * flag is set.
962 : */
963 275690 : if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
964 63353 : // TODO
965 63353 : /*
966 63353 : report_invalid_record(state,
967 63353 : "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
968 63353 : (unsigned int) blk->bimg_len,
969 63353 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
970 63353 : goto err;
971 63353 : */
972 212337 : }
973 :
974 : /*
975 : * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
976 : * IS_COMPRESSED flag is set.
977 : */
978 275690 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
979 63353 : && !blk_img_is_compressed
980 63353 : && blk.bimg_len != BLCKSZ
981 0 : {
982 0 : // TODO
983 0 : /*
984 0 : report_invalid_record(state,
985 0 : "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
986 0 : (unsigned int) blk->data_len,
987 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
988 0 : goto err;
989 0 : */
990 275690 : }
991 66443834 : }
992 66719524 : if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
993 63863445 : rnode_spcnode = buf.get_u32_le();
994 63863445 : rnode_dbnode = buf.get_u32_le();
995 63863445 : rnode_relnode = buf.get_u32_le();
996 63863445 : got_rnode = true;
997 63863445 : } else if !got_rnode {
998 0 : // TODO
999 0 : /*
1000 0 : report_invalid_record(state,
1001 0 : "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1002 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1003 0 : goto err; */
1004 2856079 : }
1005 :
1006 66719524 : blk.rnode_spcnode = rnode_spcnode;
1007 66719524 : blk.rnode_dbnode = rnode_dbnode;
1008 66719524 : blk.rnode_relnode = rnode_relnode;
1009 66719524 :
1010 66719524 : blk.blkno = buf.get_u32_le();
1011 66719524 : trace!(
1012 0 : "this record affects {}/{}/{} blk {}",
1013 0 : rnode_spcnode,
1014 0 : rnode_dbnode,
1015 0 : rnode_relnode,
1016 0 : blk.blkno
1017 0 : );
1018 :
1019 66719524 : decoded.blocks.push(blk);
1020 : }
1021 :
1022 0 : _ => {
1023 0 : // TODO: invalid block_id
1024 0 : }
1025 : }
1026 : }
1027 :
1028 : // 3. Decode blocks.
1029 72693846 : let mut ptr = record.len() - buf.remaining();
1030 72693846 : for blk in decoded.blocks.iter_mut() {
1031 66719524 : if blk.has_image {
1032 275690 : blk.bimg_offset = ptr as u32;
1033 275690 : ptr += blk.bimg_len as usize;
1034 66443834 : }
1035 66719524 : if blk.has_data {
1036 57372387 : ptr += blk.data_len as usize;
1037 57372387 : }
1038 : }
1039 : // We don't need them, so just skip blocks_total_len bytes
1040 72693846 : buf.advance(blocks_total_len as usize);
1041 72693846 : assert_eq!(ptr, record.len() - buf.remaining());
1042 :
1043 72693846 : let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
1044 72693846 :
1045 72693846 : // 4. Decode main_data
1046 72693846 : if main_data_len > 0 {
1047 72453887 : assert_eq!(buf.remaining(), main_data_len as usize);
1048 239959 : }
1049 :
1050 72693846 : decoded.xl_xid = xlogrec.xl_xid;
1051 72693846 : decoded.xl_info = xlogrec.xl_info;
1052 72693846 : decoded.xl_rmid = xlogrec.xl_rmid;
1053 72693846 : decoded.record = record;
1054 72693846 : decoded.main_data_offset = main_data_offset;
1055 72693846 :
1056 72693846 : Ok(())
1057 72693846 : }
1058 :
1059 : ///
1060 : /// Build a human-readable string to describe a WAL record
1061 : ///
1062 : /// For debugging purposes
1063 0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
1064 0 : match rec {
1065 0 : NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
1066 0 : "will_init: {}, {}",
1067 0 : will_init,
1068 0 : describe_postgres_wal_record(rec)?
1069 : )),
1070 0 : _ => Ok(format!("{:?}", rec)),
1071 : }
1072 0 : }
1073 :
1074 0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
1075 0 : // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
1076 0 : // Maybe use the postgres wal redo process, the same used for replaying WAL records?
1077 0 : // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
1078 0 : // without worrying about security?
1079 0 : //
1080 0 : // But for now, we have a hand-written code for a few common WAL record types here.
1081 0 :
1082 0 : let mut buf = record.clone();
1083 :
1084 : // 1. Parse XLogRecord struct
1085 :
1086 : // FIXME: assume little-endian here
1087 0 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
1088 :
1089 : let unknown_str: String;
1090 :
1091 0 : let result: &str = match xlogrec.xl_rmid {
1092 : pg_constants::RM_HEAP2_ID => {
1093 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1094 0 : match info {
1095 0 : pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
1096 0 : pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
1097 : _ => {
1098 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1099 0 : &unknown_str
1100 : }
1101 : }
1102 : }
1103 : pg_constants::RM_HEAP_ID => {
1104 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
1105 0 : match info {
1106 0 : pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
1107 0 : pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
1108 0 : pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
1109 0 : pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
1110 : _ => {
1111 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
1112 0 : &unknown_str
1113 : }
1114 : }
1115 : }
1116 : pg_constants::RM_XLOG_ID => {
1117 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1118 0 : match info {
1119 0 : pg_constants::XLOG_FPI => "XLOG FPI",
1120 0 : pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
1121 : _ => {
1122 0 : unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
1123 0 : &unknown_str
1124 : }
1125 : }
1126 : }
1127 0 : rmid => {
1128 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
1129 0 :
1130 0 : unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
1131 0 : &unknown_str
1132 : }
1133 : };
1134 :
1135 0 : Ok(String::from(result))
1136 0 : }
|