Line data Source code
1 : //!
2 : //! Functions for parsing WAL records.
3 : //!
4 :
5 : use anyhow::Result;
6 : use bytes::{Buf, Bytes};
7 : use postgres_ffi::pg_constants;
8 : use postgres_ffi::BLCKSZ;
9 : use postgres_ffi::{BlockNumber, OffsetNumber, TimestampTz};
10 : use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
11 : use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
12 : use serde::{Deserialize, Serialize};
13 : use tracing::*;
14 : use utils::bin_ser::DeserializeError;
15 :
16 : /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
17 : /// around a PostgreSQL WAL record, or a custom neon-specific "record".
18 878685652 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19 : pub enum NeonWalRecord {
20 : /// Native PostgreSQL WAL record
21 : Postgres { will_init: bool, rec: Bytes },
22 :
23 : /// Clear bits in heap visibility map. ('flags' is bitmap of bits to clear)
24 : ClearVisibilityMapFlags {
25 : new_heap_blkno: Option<u32>,
26 : old_heap_blkno: Option<u32>,
27 : flags: u8,
28 : },
29 : /// Mark transaction IDs as committed on a CLOG page
30 : ClogSetCommitted {
31 : xids: Vec<TransactionId>,
32 : timestamp: TimestampTz,
33 : },
34 : /// Mark transaction IDs as aborted on a CLOG page
35 : ClogSetAborted { xids: Vec<TransactionId> },
36 : /// Extend multixact offsets SLRU
37 : MultixactOffsetCreate {
38 : mid: MultiXactId,
39 : moff: MultiXactOffset,
40 : },
41 : /// Extend multixact members SLRU.
42 : MultixactMembersCreate {
43 : moff: MultiXactOffset,
44 : members: Vec<MultiXactMember>,
45 : },
46 : }
47 :
48 : impl NeonWalRecord {
49 : /// Does replaying this WAL record initialize the page from scratch, or does
50 : /// it need to be applied over the previous image of the page?
51 294763596 : pub fn will_init(&self) -> bool {
52 294763596 : match self {
53 265556861 : NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
54 :
55 : // None of the special neon record types currently initialize the page
56 29206735 : _ => false,
57 : }
58 294763596 : }
59 : }
60 :
61 : /// DecodedBkpBlock represents per-page data contained in a WAL record.
62 75336586 : #[derive(Default)]
63 : pub struct DecodedBkpBlock {
64 : /* Is this block ref in use? */
65 : //in_use: bool,
66 :
67 : /* Identify the block this refers to */
68 : pub rnode_spcnode: u32,
69 : pub rnode_dbnode: u32,
70 : pub rnode_relnode: u32,
71 : // Note that we have a few special forknum values for non-rel files.
72 : pub forknum: u8,
73 : pub blkno: u32,
74 :
75 : /* copy of the fork_flags field from the XLogRecordBlockHeader */
76 : pub flags: u8,
77 :
78 : /* Information on full-page image, if any */
79 : pub has_image: bool, /* has image, even for consistency checking */
80 : pub apply_image: bool, /* has image that should be restored */
81 : pub will_init: bool, /* record doesn't need previous page version to apply */
82 : //char *bkp_image;
83 : pub hole_offset: u16,
84 : pub hole_length: u16,
85 : pub bimg_offset: u32,
86 : pub bimg_len: u16,
87 : pub bimg_info: u8,
88 :
89 : /* Buffer holding the rmgr-specific data associated with this block */
90 : has_data: bool,
91 : data_len: u16,
92 : }
93 :
94 : impl DecodedBkpBlock {
95 75336583 : pub fn new() -> DecodedBkpBlock {
96 75336583 : Default::default()
97 75336583 : }
98 : }
99 :
100 733013 : #[derive(Default)]
101 : pub struct DecodedWALRecord {
102 : pub xl_xid: TransactionId,
103 : pub xl_info: u8,
104 : pub xl_rmid: u8,
105 : pub record: Bytes, // raw XLogRecord
106 :
107 : pub blocks: Vec<DecodedBkpBlock>,
108 : pub main_data_offset: usize,
109 : }
110 :
111 : #[repr(C)]
112 0 : #[derive(Debug, Clone, Copy)]
113 : pub struct RelFileNode {
114 : pub spcnode: Oid, /* tablespace */
115 : pub dbnode: Oid, /* database */
116 : pub relnode: Oid, /* relation */
117 : }
118 :
119 : #[repr(C)]
120 0 : #[derive(Debug)]
121 : pub struct XlRelmapUpdate {
122 : pub dbid: Oid, /* database ID, or 0 for shared map */
123 : pub tsid: Oid, /* database's tablespace, or pg_global */
124 : pub nbytes: i32, /* size of relmap data */
125 : }
126 :
127 : impl XlRelmapUpdate {
128 45 : pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
129 45 : XlRelmapUpdate {
130 45 : dbid: buf.get_u32_le(),
131 45 : tsid: buf.get_u32_le(),
132 45 : nbytes: buf.get_i32_le(),
133 45 : }
134 45 : }
135 : }
136 :
137 : #[repr(C)]
138 0 : #[derive(Debug)]
139 : pub struct XlSmgrCreate {
140 : pub rnode: RelFileNode,
141 : // FIXME: This is ForkNumber in storage_xlog.h. That's an enum. Does it have
142 : // well-defined size?
143 : pub forknum: u8,
144 : }
145 :
146 : impl XlSmgrCreate {
147 21950 : pub fn decode(buf: &mut Bytes) -> XlSmgrCreate {
148 21950 : XlSmgrCreate {
149 21950 : rnode: RelFileNode {
150 21950 : spcnode: buf.get_u32_le(), /* tablespace */
151 21950 : dbnode: buf.get_u32_le(), /* database */
152 21950 : relnode: buf.get_u32_le(), /* relation */
153 21950 : },
154 21950 : forknum: buf.get_u32_le() as u8,
155 21950 : }
156 21950 : }
157 : }
158 :
159 : #[repr(C)]
160 0 : #[derive(Debug)]
161 : pub struct XlSmgrTruncate {
162 : pub blkno: BlockNumber,
163 : pub rnode: RelFileNode,
164 : pub flags: u32,
165 : }
166 :
167 : impl XlSmgrTruncate {
168 45 : pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
169 45 : XlSmgrTruncate {
170 45 : blkno: buf.get_u32_le(),
171 45 : rnode: RelFileNode {
172 45 : spcnode: buf.get_u32_le(), /* tablespace */
173 45 : dbnode: buf.get_u32_le(), /* database */
174 45 : relnode: buf.get_u32_le(), /* relation */
175 45 : },
176 45 : flags: buf.get_u32_le(),
177 45 : }
178 45 : }
179 : }
180 :
181 : #[repr(C)]
182 0 : #[derive(Debug)]
183 : pub struct XlCreateDatabase {
184 : pub db_id: Oid,
185 : pub tablespace_id: Oid,
186 : pub src_db_id: Oid,
187 : pub src_tablespace_id: Oid,
188 : }
189 :
190 : impl XlCreateDatabase {
191 12 : pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
192 12 : XlCreateDatabase {
193 12 : db_id: buf.get_u32_le(),
194 12 : tablespace_id: buf.get_u32_le(),
195 12 : src_db_id: buf.get_u32_le(),
196 12 : src_tablespace_id: buf.get_u32_le(),
197 12 : }
198 12 : }
199 : }
200 :
201 : #[repr(C)]
202 0 : #[derive(Debug)]
203 : pub struct XlDropDatabase {
204 : pub db_id: Oid,
205 : pub n_tablespaces: Oid, /* number of tablespace IDs */
206 : pub tablespace_ids: Vec<Oid>,
207 : }
208 :
209 : impl XlDropDatabase {
210 2 : pub fn decode(buf: &mut Bytes) -> XlDropDatabase {
211 2 : let mut rec = XlDropDatabase {
212 2 : db_id: buf.get_u32_le(),
213 2 : n_tablespaces: buf.get_u32_le(),
214 2 : tablespace_ids: Vec::<Oid>::new(),
215 2 : };
216 :
217 2 : for _i in 0..rec.n_tablespaces {
218 2 : let id = buf.get_u32_le();
219 2 : rec.tablespace_ids.push(id);
220 2 : }
221 :
222 2 : rec
223 2 : }
224 : }
225 :
226 : #[repr(C)]
227 0 : #[derive(Debug)]
228 : pub struct XlHeapInsert {
229 : pub offnum: OffsetNumber,
230 : pub flags: u8,
231 : }
232 :
233 : impl XlHeapInsert {
234 47303205 : pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
235 47303205 : XlHeapInsert {
236 47303205 : offnum: buf.get_u16_le(),
237 47303205 : flags: buf.get_u8(),
238 47303205 : }
239 47303205 : }
240 : }
241 :
242 : #[repr(C)]
243 0 : #[derive(Debug)]
244 : pub struct XlHeapMultiInsert {
245 : pub flags: u8,
246 : pub _padding: u8,
247 : pub ntuples: u16,
248 : }
249 :
250 : impl XlHeapMultiInsert {
251 692488 : pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
252 692488 : XlHeapMultiInsert {
253 692488 : flags: buf.get_u8(),
254 692488 : _padding: buf.get_u8(),
255 692488 : ntuples: buf.get_u16_le(),
256 692488 : }
257 692488 : }
258 : }
259 :
260 : #[repr(C)]
261 0 : #[derive(Debug)]
262 : pub struct XlHeapDelete {
263 : pub xmax: TransactionId,
264 : pub offnum: OffsetNumber,
265 : pub _padding: u16,
266 : pub t_cid: u32,
267 : pub infobits_set: u8,
268 : pub flags: u8,
269 : }
270 :
271 : impl XlHeapDelete {
272 535296 : pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
273 535296 : XlHeapDelete {
274 535296 : xmax: buf.get_u32_le(),
275 535296 : offnum: buf.get_u16_le(),
276 535296 : _padding: buf.get_u16_le(),
277 535296 : t_cid: buf.get_u32_le(),
278 535296 : infobits_set: buf.get_u8(),
279 535296 : flags: buf.get_u8(),
280 535296 : }
281 535296 : }
282 : }
283 :
284 : #[repr(C)]
285 0 : #[derive(Debug)]
286 : pub struct XlHeapUpdate {
287 : pub old_xmax: TransactionId,
288 : pub old_offnum: OffsetNumber,
289 : pub old_infobits_set: u8,
290 : pub flags: u8,
291 : pub t_cid: u32,
292 : pub new_xmax: TransactionId,
293 : pub new_offnum: OffsetNumber,
294 : }
295 :
296 : impl XlHeapUpdate {
297 5355792 : pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
298 5355792 : XlHeapUpdate {
299 5355792 : old_xmax: buf.get_u32_le(),
300 5355792 : old_offnum: buf.get_u16_le(),
301 5355792 : old_infobits_set: buf.get_u8(),
302 5355792 : flags: buf.get_u8(),
303 5355792 : t_cid: buf.get_u32(),
304 5355792 : new_xmax: buf.get_u32_le(),
305 5355792 : new_offnum: buf.get_u16_le(),
306 5355792 : }
307 5355792 : }
308 : }
309 :
310 : ///
311 : /// Note: Parsing some fields is missing, because they're not needed.
312 : ///
313 : /// This is similar to the xl_xact_parsed_commit and
314 : /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
315 : /// struct for commits and aborts.
316 : ///
317 0 : #[derive(Debug)]
318 : pub struct XlXactParsedRecord {
319 : pub xid: TransactionId,
320 : pub info: u8,
321 : pub xact_time: TimestampTz,
322 : pub xinfo: u32,
323 :
324 : pub db_id: Oid, /* MyDatabaseId */
325 : pub ts_id: Oid, /* MyDatabaseTableSpace */
326 :
327 : pub subxacts: Vec<TransactionId>,
328 :
329 : pub xnodes: Vec<RelFileNode>,
330 : }
331 :
332 : impl XlXactParsedRecord {
333 : /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
334 : /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
335 : /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
336 2265099 : pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
337 2265099 : let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
338 2265099 : // The record starts with time of commit/abort
339 2265099 : let xact_time = buf.get_i64_le();
340 2265099 : let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
341 23705 : buf.get_u32_le()
342 : } else {
343 2241394 : 0
344 : };
345 : let db_id;
346 : let ts_id;
347 2265099 : if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
348 22656 : db_id = buf.get_u32_le();
349 22656 : ts_id = buf.get_u32_le();
350 2242443 : } else {
351 2242443 : db_id = 0;
352 2242443 : ts_id = 0;
353 2242443 : }
354 2265099 : let mut subxacts = Vec::<TransactionId>::new();
355 2265099 : if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
356 144 : let nsubxacts = buf.get_i32_le();
357 100049 : for _i in 0..nsubxacts {
358 100049 : let subxact = buf.get_u32_le();
359 100049 : subxacts.push(subxact);
360 100049 : }
361 2264955 : }
362 2265099 : let mut xnodes = Vec::<RelFileNode>::new();
363 2265099 : if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
364 6656 : let nrels = buf.get_i32_le();
365 16872 : for _i in 0..nrels {
366 16872 : let spcnode = buf.get_u32_le();
367 16872 : let dbnode = buf.get_u32_le();
368 16872 : let relnode = buf.get_u32_le();
369 16872 : trace!(
370 0 : "XLOG_XACT_COMMIT relfilenode {}/{}/{}",
371 0 : spcnode,
372 0 : dbnode,
373 0 : relnode
374 0 : );
375 16872 : xnodes.push(RelFileNode {
376 16872 : spcnode,
377 16872 : dbnode,
378 16872 : relnode,
379 16872 : });
380 : }
381 2258443 : }
382 :
383 2265099 : if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
384 0 : let nitems = buf.get_i32_le();
385 0 : debug!(
386 0 : "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
387 0 : nitems
388 0 : );
389 0 : let sizeof_xl_xact_stats_item = 12;
390 0 : buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
391 2265099 : }
392 :
393 2265099 : if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
394 22656 : let nmsgs = buf.get_i32_le();
395 22656 : let sizeof_shared_invalidation_message = 16;
396 22656 : buf.advance(
397 22656 : (nmsgs * sizeof_shared_invalidation_message)
398 22656 : .try_into()
399 22656 : .unwrap(),
400 22656 : );
401 2242443 : }
402 :
403 2265099 : if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
404 2 : xid = buf.get_u32_le();
405 2 : debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
406 2265097 : }
407 :
408 2265099 : XlXactParsedRecord {
409 2265099 : xid,
410 2265099 : info,
411 2265099 : xact_time,
412 2265099 : xinfo,
413 2265099 : db_id,
414 2265099 : ts_id,
415 2265099 : subxacts,
416 2265099 : xnodes,
417 2265099 : }
418 2265099 : }
419 : }
420 :
421 : #[repr(C)]
422 0 : #[derive(Debug)]
423 : pub struct XlClogTruncate {
424 : pub pageno: u32,
425 : pub oldest_xid: TransactionId,
426 : pub oldest_xid_db: Oid,
427 : }
428 :
429 : impl XlClogTruncate {
430 1 : pub fn decode(buf: &mut Bytes) -> XlClogTruncate {
431 1 : XlClogTruncate {
432 1 : pageno: buf.get_u32_le(),
433 1 : oldest_xid: buf.get_u32_le(),
434 1 : oldest_xid_db: buf.get_u32_le(),
435 1 : }
436 1 : }
437 : }
438 :
439 : #[repr(C)]
440 945465 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
441 : pub struct MultiXactMember {
442 : pub xid: TransactionId,
443 : pub status: MultiXactStatus,
444 : }
445 :
446 : impl MultiXactMember {
447 472948 : pub fn decode(buf: &mut Bytes) -> MultiXactMember {
448 472948 : MultiXactMember {
449 472948 : xid: buf.get_u32_le(),
450 472948 : status: buf.get_u32_le(),
451 472948 : }
452 472948 : }
453 : }
454 :
455 : #[repr(C)]
456 0 : #[derive(Debug)]
457 : pub struct XlMultiXactCreate {
458 : pub mid: MultiXactId, /* new MultiXact's ID */
459 : pub moff: MultiXactOffset, /* its starting offset in members file */
460 : pub nmembers: u32, /* number of member XIDs */
461 : pub members: Vec<MultiXactMember>,
462 : }
463 :
464 : impl XlMultiXactCreate {
465 24027 : pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
466 24027 : let mid = buf.get_u32_le();
467 24027 : let moff = buf.get_u32_le();
468 24027 : let nmembers = buf.get_u32_le();
469 24027 : let mut members = Vec::new();
470 472948 : for _ in 0..nmembers {
471 472948 : members.push(MultiXactMember::decode(buf));
472 472948 : }
473 24027 : XlMultiXactCreate {
474 24027 : mid,
475 24027 : moff,
476 24027 : nmembers,
477 24027 : members,
478 24027 : }
479 24027 : }
480 : }
481 :
482 : #[repr(C)]
483 0 : #[derive(Debug)]
484 : pub struct XlMultiXactTruncate {
485 : pub oldest_multi_db: Oid,
486 : /* to-be-truncated range of multixact offsets */
487 : pub start_trunc_off: MultiXactId, /* just for completeness' sake */
488 : pub end_trunc_off: MultiXactId,
489 :
490 : /* to-be-truncated range of multixact members */
491 : pub start_trunc_memb: MultiXactOffset,
492 : pub end_trunc_memb: MultiXactOffset,
493 : }
494 :
495 : impl XlMultiXactTruncate {
496 0 : pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
497 0 : XlMultiXactTruncate {
498 0 : oldest_multi_db: buf.get_u32_le(),
499 0 : start_trunc_off: buf.get_u32_le(),
500 0 : end_trunc_off: buf.get_u32_le(),
501 0 : start_trunc_memb: buf.get_u32_le(),
502 0 : end_trunc_memb: buf.get_u32_le(),
503 0 : }
504 0 : }
505 : }
506 :
507 : /// Main routine to decode a WAL record and figure out which blocks are modified
508 : //
509 : // See xlogrecord.h for details
510 : // The overall layout of an XLOG record is:
511 : // Fixed-size header (XLogRecord struct)
512 : // XLogRecordBlockHeader struct
513 : // If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
514 : // If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
515 : // XLogRecordBlockCompressHeader struct follows.
516 : // If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
517 : // BlockNumber follows
518 : // XLogRecordBlockHeader struct
519 : // ...
520 : // XLogRecordDataHeader[Short|Long] struct
521 : // block data
522 : // block data
523 : // ...
524 : // main data
525 : //
526 : //
527 : // For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
528 : // It would be more natural for this function to return a DecodedWALRecord as return value,
529 : // but reusing the caller-supplied struct avoids an allocation.
530 : // This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
531 : //
532 73526713 : pub fn decode_wal_record(
533 73526713 : record: Bytes,
534 73526713 : decoded: &mut DecodedWALRecord,
535 73526713 : pg_version: u32,
536 73526713 : ) -> Result<()> {
537 73526713 : let mut rnode_spcnode: u32 = 0;
538 73526713 : let mut rnode_dbnode: u32 = 0;
539 73526713 : let mut rnode_relnode: u32 = 0;
540 73526713 : let mut got_rnode = false;
541 73526713 :
542 73526713 : let mut buf = record.clone();
543 :
544 : // 1. Parse XLogRecord struct
545 :
546 : // FIXME: assume little-endian here
547 73526713 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
548 :
549 73526713 : trace!(
550 0 : "decode_wal_record xl_rmid = {} xl_info = {}",
551 0 : xlogrec.xl_rmid,
552 0 : xlogrec.xl_info
553 0 : );
554 :
555 73526712 : let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
556 73526712 :
557 73526712 : if buf.remaining() != remaining {
558 0 : //TODO error
559 73526712 : }
560 :
561 73526712 : let mut max_block_id = 0;
562 73526712 : let mut blocks_total_len: u32 = 0;
563 73526712 : let mut main_data_len = 0;
564 73526712 : let mut datatotal: u32 = 0;
565 73526712 : decoded.blocks.clear();
566 :
567 : // 2. Decode the headers.
568 : // XLogRecordBlockHeaders if any,
569 : // XLogRecordDataHeader[Short|Long]
570 222248244 : while buf.remaining() > datatotal as usize {
571 148721530 : let block_id = buf.get_u8();
572 148721530 :
573 148721530 : match block_id {
574 73367364 : pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
575 73367364 : /* XLogRecordDataHeaderShort */
576 73367364 : main_data_len = buf.get_u8() as u32;
577 73367364 : datatotal += main_data_len;
578 73367364 : }
579 :
580 17585 : pg_constants::XLR_BLOCK_ID_DATA_LONG => {
581 17585 : /* XLogRecordDataHeaderLong */
582 17585 : main_data_len = buf.get_u32_le();
583 17585 : datatotal += main_data_len;
584 17585 : }
585 :
586 0 : pg_constants::XLR_BLOCK_ID_ORIGIN => {
587 0 : // RepOriginId is uint16
588 0 : buf.advance(2);
589 0 : }
590 :
591 0 : pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
592 0 : // TransactionId is uint32
593 0 : buf.advance(4);
594 0 : }
595 :
596 75336586 : 0..=pg_constants::XLR_MAX_BLOCK_ID => {
597 : /* XLogRecordBlockHeader */
598 75336586 : let mut blk = DecodedBkpBlock::new();
599 75336586 :
600 75336586 : if block_id <= max_block_id {
601 71163567 : // TODO
602 71163567 : //report_invalid_record(state,
603 71163567 : // "out-of-order block_id %u at %X/%X",
604 71163567 : // block_id,
605 71163567 : // (uint32) (state->ReadRecPtr >> 32),
606 71163567 : // (uint32) state->ReadRecPtr);
607 71163567 : // goto err;
608 71163567 : }
609 75336586 : max_block_id = block_id;
610 75336586 :
611 75336586 : let fork_flags: u8 = buf.get_u8();
612 75336586 : blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
613 75336586 : blk.flags = fork_flags;
614 75336586 : blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
615 75336586 : blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
616 75336586 : blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
617 75336586 : blk.data_len = buf.get_u16_le();
618 75336586 :
619 75336586 : /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
620 75336586 :
621 75336586 : datatotal += blk.data_len as u32;
622 75336586 : blocks_total_len += blk.data_len as u32;
623 75336586 :
624 75336586 : if blk.has_image {
625 170593 : blk.bimg_len = buf.get_u16_le();
626 170593 : blk.hole_offset = buf.get_u16_le();
627 170593 : blk.bimg_info = buf.get_u8();
628 170593 :
629 170593 : blk.apply_image = if pg_version == 14 {
630 170593 : (blk.bimg_info & postgres_ffi::v14::bindings::BKPIMAGE_APPLY) != 0
631 : } else {
632 0 : assert_eq!(pg_version, 15);
633 0 : (blk.bimg_info & postgres_ffi::v15::bindings::BKPIMAGE_APPLY) != 0
634 : };
635 :
636 170593 : let blk_img_is_compressed =
637 170593 : postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
638 :
639 170593 : if blk_img_is_compressed {
640 0 : debug!("compressed block image , pg_version = {}", pg_version);
641 170593 : }
642 :
643 170593 : if blk_img_is_compressed {
644 0 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
645 0 : blk.hole_length = buf.get_u16_le();
646 0 : } else {
647 0 : blk.hole_length = 0;
648 0 : }
649 170593 : } else {
650 170593 : blk.hole_length = BLCKSZ - blk.bimg_len;
651 170593 : }
652 170593 : datatotal += blk.bimg_len as u32;
653 170593 : blocks_total_len += blk.bimg_len as u32;
654 170593 :
655 170593 : /*
656 170593 : * cross-check that hole_offset > 0, hole_length > 0 and
657 170593 : * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
658 170593 : */
659 170593 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
660 143247 : && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
661 0 : {
662 0 : // TODO
663 0 : /*
664 0 : report_invalid_record(state,
665 0 : "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
666 0 : (unsigned int) blk->hole_offset,
667 0 : (unsigned int) blk->hole_length,
668 0 : (unsigned int) blk->bimg_len,
669 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
670 0 : goto err;
671 0 : */
672 170593 : }
673 :
674 : /*
675 : * cross-check that hole_offset == 0 and hole_length == 0 if
676 : * the HAS_HOLE flag is not set.
677 : */
678 170593 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
679 27346 : && (blk.hole_offset != 0 || blk.hole_length != 0)
680 0 : {
681 0 : // TODO
682 0 : /*
683 0 : report_invalid_record(state,
684 0 : "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
685 0 : (unsigned int) blk->hole_offset,
686 0 : (unsigned int) blk->hole_length,
687 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
688 0 : goto err;
689 0 : */
690 170593 : }
691 :
692 : /*
693 : * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
694 : * flag is set.
695 : */
696 170593 : if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
697 27346 : // TODO
698 27346 : /*
699 27346 : report_invalid_record(state,
700 27346 : "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
701 27346 : (unsigned int) blk->bimg_len,
702 27346 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
703 27346 : goto err;
704 27346 : */
705 143247 : }
706 :
707 : /*
708 : * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
709 : * IS_COMPRESSED flag is set.
710 : */
711 170593 : if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
712 27346 : && !blk_img_is_compressed
713 27346 : && blk.bimg_len != BLCKSZ
714 0 : {
715 0 : // TODO
716 0 : /*
717 0 : report_invalid_record(state,
718 0 : "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
719 0 : (unsigned int) blk->data_len,
720 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
721 0 : goto err;
722 0 : */
723 170593 : }
724 75165993 : }
725 75336586 : if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
726 71164517 : rnode_spcnode = buf.get_u32_le();
727 71164517 : rnode_dbnode = buf.get_u32_le();
728 71164517 : rnode_relnode = buf.get_u32_le();
729 71164517 : got_rnode = true;
730 71164517 : } else if !got_rnode {
731 0 : // TODO
732 0 : /*
733 0 : report_invalid_record(state,
734 0 : "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
735 0 : (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
736 0 : goto err; */
737 4172069 : }
738 :
739 75336586 : blk.rnode_spcnode = rnode_spcnode;
740 75336586 : blk.rnode_dbnode = rnode_dbnode;
741 75336586 : blk.rnode_relnode = rnode_relnode;
742 75336586 :
743 75336586 : blk.blkno = buf.get_u32_le();
744 75336586 : trace!(
745 0 : "this record affects {}/{}/{} blk {}",
746 0 : rnode_spcnode,
747 0 : rnode_dbnode,
748 0 : rnode_relnode,
749 0 : blk.blkno
750 0 : );
751 :
752 75336583 : decoded.blocks.push(blk);
753 : }
754 :
755 0 : _ => {
756 0 : // TODO: invalid block_id
757 0 : }
758 : }
759 : }
760 :
761 : // 3. Decode blocks.
762 73526714 : let mut ptr = record.len() - buf.remaining();
763 75336585 : for blk in decoded.blocks.iter_mut() {
764 75336585 : if blk.has_image {
765 170593 : blk.bimg_offset = ptr as u32;
766 170593 : ptr += blk.bimg_len as usize;
767 75165992 : }
768 75336585 : if blk.has_data {
769 64564645 : ptr += blk.data_len as usize;
770 64564645 : }
771 : }
772 : // We don't need them, so just skip blocks_total_len bytes
773 73526714 : buf.advance(blocks_total_len as usize);
774 73526714 : assert_eq!(ptr, record.len() - buf.remaining());
775 :
776 73526714 : let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
777 73526714 :
778 73526714 : // 4. Decode main_data
779 73526714 : if main_data_len > 0 {
780 73384951 : assert_eq!(buf.remaining(), main_data_len as usize);
781 141763 : }
782 :
783 73526712 : decoded.xl_xid = xlogrec.xl_xid;
784 73526712 : decoded.xl_info = xlogrec.xl_info;
785 73526712 : decoded.xl_rmid = xlogrec.xl_rmid;
786 73526712 : decoded.record = record;
787 73526712 : decoded.main_data_offset = main_data_offset;
788 73526712 :
789 73526712 : Ok(())
790 73526712 : }
791 :
792 : ///
793 : /// Build a human-readable string to describe a WAL record
794 : ///
795 : /// For debugging purposes
796 0 : pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeError> {
797 0 : match rec {
798 0 : NeonWalRecord::Postgres { will_init, rec } => Ok(format!(
799 0 : "will_init: {}, {}",
800 0 : will_init,
801 0 : describe_postgres_wal_record(rec)?
802 : )),
803 0 : _ => Ok(format!("{:?}", rec)),
804 : }
805 0 : }
806 :
807 0 : fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
808 0 : // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
809 0 : // Maybe use the postgres wal redo process, the same used for replaying WAL records?
810 0 : // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
811 0 : // without worrying about security?
812 0 : //
813 0 : // But for now, we have a hand-written code for a few common WAL record types here.
814 0 :
815 0 : let mut buf = record.clone();
816 :
817 : // 1. Parse XLogRecord struct
818 :
819 : // FIXME: assume little-endian here
820 0 : let xlogrec = XLogRecord::from_bytes(&mut buf)?;
821 :
822 : let unknown_str: String;
823 :
824 0 : let result: &str = match xlogrec.xl_rmid {
825 : pg_constants::RM_HEAP2_ID => {
826 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
827 0 : match info {
828 0 : pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
829 0 : pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
830 : _ => {
831 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
832 0 : &unknown_str
833 : }
834 : }
835 : }
836 : pg_constants::RM_HEAP_ID => {
837 0 : let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK;
838 0 : match info {
839 0 : pg_constants::XLOG_HEAP_INSERT => "HEAP INSERT",
840 0 : pg_constants::XLOG_HEAP_DELETE => "HEAP DELETE",
841 0 : pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
842 0 : pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
843 : _ => {
844 0 : unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
845 0 : &unknown_str
846 : }
847 : }
848 : }
849 : pg_constants::RM_XLOG_ID => {
850 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
851 0 : match info {
852 0 : pg_constants::XLOG_FPI => "XLOG FPI",
853 0 : pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
854 : _ => {
855 0 : unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
856 0 : &unknown_str
857 : }
858 : }
859 : }
860 0 : rmid => {
861 0 : let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
862 0 :
863 0 : unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
864 0 : &unknown_str
865 : }
866 : };
867 :
868 0 : Ok(String::from(result))
869 0 : }
|