LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - waldecoder_handler.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 68.1 % 141 96
Test Date: 2025-07-31 15:59:03 Functions: 50.0 % 32 16

            Line data    Source code
       1              : //!
       2              : //! Basic WAL stream decoding.
       3              : //!
       4              : //! This understands the WAL page and record format, enough to figure out where the WAL record
       5              : //! boundaries are, and to reassemble WAL records that cross page boundaries.
       6              : //!
       7              : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
       8              : //! to look deeper into the WAL records to also understand which blocks they modify, the code
       9              : //! for that is in pageserver/src/walrecord.rs
      10              : //!
      11              : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
      12              : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
      13              : use super::xlog_utils::*;
      14              : use crate::WAL_SEGMENT_SIZE;
      15              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      16              : use crc32c::*;
      17              : use std::cmp::min;
      18              : use std::num::NonZeroU32;
      19              : use utils::lsn::Lsn;
      20              : 
      21              : pub trait WalStreamDecoderHandler {
      22              :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
      23              :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
      24              :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
      25              : }
      26              : 
      27              : //
      28              : // This is a trick to support several postgres versions simultaneously.
      29              : //
      30              : // Page decoding code depends on postgres bindings, so it is compiled for each version.
      31              : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
      32              : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
      33              : // Other methods are internal and are not dispatched.
      34              : //
      35              : // It is similar to having several impl blocks for the same struct,
      36              : // but the impls here are in different modules, so need to use a trait.
      37              : //
      38              : impl WalStreamDecoderHandler for WalStreamDecoder {
      39        19470 :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
      40        19470 :         let validate_impl = || {
      41        19470 :             if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
      42            0 :                 return Err(format!(
      43            0 :                     "invalid xlog page header: xlp_magic={}, expected {}",
      44            0 :                     hdr.xlp_magic, XLOG_PAGE_MAGIC
      45            0 :                 ));
      46        19470 :             }
      47        19470 :             if hdr.xlp_pageaddr != self.lsn.0 {
      48            0 :                 return Err(format!(
      49            0 :                     "invalid xlog page header: xlp_pageaddr={}, expected {}",
      50            0 :                     hdr.xlp_pageaddr, self.lsn
      51            0 :                 ));
      52        19470 :             }
      53        19470 :             match self.state {
      54              :                 State::WaitingForRecord => {
      55          100 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
      56            0 :                         return Err(
      57            0 :                             "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
      58            0 :                         );
      59          100 :                     }
      60          100 :                     if hdr.xlp_rem_len != 0 {
      61            0 :                         return Err(format!(
      62            0 :                             "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
      63            0 :                             hdr.xlp_rem_len
      64            0 :                         ));
      65          100 :                     }
      66              :                 }
      67        19370 :                 State::ReassemblingRecord { contlen, .. } => {
      68        19370 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
      69            0 :                         return Err(
      70            0 :                             "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
      71            0 :                                 .into(),
      72            0 :                         );
      73        19370 :                     }
      74        19370 :                     if hdr.xlp_rem_len != contlen.get() {
      75            0 :                         return Err(format!(
      76            0 :                             "invalid xlog page header: xlp_rem_len={}, expected {}",
      77            0 :                             hdr.xlp_rem_len,
      78            0 :                             contlen.get()
      79            0 :                         ));
      80        19370 :                     }
      81              :                 }
      82              :                 State::SkippingEverything { .. } => {
      83            0 :                     panic!("Should not be validating page header in the SkippingEverything state");
      84              :                 }
      85              :             };
      86        19470 :             Ok(())
      87        19470 :         };
      88        19470 :         validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
      89        19470 :     }
      90              : 
      91              :     /// Attempt to decode another WAL record from the input that has been fed to the
      92              :     /// decoder so far.
      93              :     ///
      94              :     /// Returns one of the following:
      95              :     ///     Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
      96              :     ///     Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
      97              :     ///     Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
      98              :     ///
      99       364293 :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
     100              :         // Run state machine that validates page headers, and reassembles records
     101              :         // that cross page boundaries.
     102              :         loop {
     103              :             // parse and verify page boundaries as we go
     104              :             // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
     105       566280 :             match self.state {
     106              :                 State::WaitingForRecord | State::ReassemblingRecord { .. } => {
     107       452254 :                     if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     108              :                         // parse long header
     109              : 
     110           17 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
     111            9 :                             return Ok(None);
     112            8 :                         }
     113              : 
     114            8 :                         let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
     115              :                             |e| WalDecodeError {
     116            0 :                                 msg: format!("long header deserialization failed {e}"),
     117            0 :                                 lsn: self.lsn,
     118            0 :                             },
     119            0 :                         )?;
     120              : 
     121            8 :                         self.validate_page_header(&hdr.std)?;
     122              : 
     123            8 :                         self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     124       452237 :                     } else if self.lsn.block_offset() == 0 {
     125        26017 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
     126         6555 :                             return Ok(None);
     127        19462 :                         }
     128              : 
     129        19462 :                         let hdr =
     130        19462 :                             XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
     131            0 :                                 WalDecodeError {
     132            0 :                                     msg: format!("header deserialization failed {e}"),
     133            0 :                                     lsn: self.lsn,
     134            0 :                                 }
     135            0 :                             })?;
     136              : 
     137        19462 :                         self.validate_page_header(&hdr)?;
     138              : 
     139        19462 :                         self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     140       426220 :                     }
     141              :                 }
     142       114026 :                 State::SkippingEverything { .. } => {}
     143              :             }
     144              :             // now read page contents
     145       559716 :             match &mut self.state {
     146              :                 State::WaitingForRecord => {
     147              :                     // need to have at least the xl_tot_len field
     148       116255 :                     if self.inputbuf.remaining() < 4 {
     149         7396 :                         return Ok(None);
     150       108859 :                     }
     151              : 
     152              :                     // peek xl_tot_len at the beginning of the record.
     153              :                     // FIXME: assumes little-endian
     154       108859 :                     let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
     155       108859 :                     if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
     156          657 :                         return Err(WalDecodeError {
     157          657 :                             msg: format!("invalid xl_tot_len {xl_tot_len}"),
     158          657 :                             lsn: self.lsn,
     159          657 :                         });
     160       108202 :                     }
     161              :                     // Fast path for the common case that the whole record fits on the page.
     162       108202 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     163       108202 :                     if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
     164        33786 :                         self.lsn += xl_tot_len as u64;
     165        33786 :                         let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
     166        33786 :                         return Ok(Some(self.complete_record(recordbuf)?));
     167              :                     } else {
     168              :                         // Need to assemble the record from pieces. Remember the size of the
     169              :                         // record, and loop back. On next iterations, we will reach the branch
     170              :                         // below, and copy the part of the record that was on this or next page(s)
     171              :                         // to 'recordbuf'.  Subsequent iterations will skip page headers, and
     172              :                         // append the continuations from the next pages to 'recordbuf'.
     173        74416 :                         self.state = State::ReassemblingRecord {
     174        74416 :                             recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
     175        74416 :                             contlen: NonZeroU32::new(xl_tot_len).unwrap(),
     176        74416 :                         }
     177              :                     }
     178              :                 }
     179       329435 :                 State::ReassemblingRecord { recordbuf, contlen } => {
     180              :                     // we're continuing a record, possibly from previous page.
     181       329435 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     182              : 
     183              :                     // read the rest of the record, or as much as fits on this page.
     184       329435 :                     let n = min(contlen.get(), pageleft) as usize;
     185              : 
     186       329435 :                     if self.inputbuf.remaining() < n {
     187       235649 :                         return Ok(None);
     188        93786 :                     }
     189              : 
     190        93786 :                     recordbuf.put(self.inputbuf.split_to(n));
     191        93786 :                     self.lsn += n as u64;
     192        93786 :                     *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
     193        19373 :                         Some(x) => x,
     194              :                         None => {
     195              :                             // The record is now complete.
     196        74413 :                             let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
     197        74413 :                             return Ok(Some(self.complete_record(recordbuf)?));
     198              :                         }
     199              :                     }
     200              :                 }
     201       114026 :                 State::SkippingEverything { skip_until_lsn } => {
     202       114026 :                     assert!(*skip_until_lsn >= self.lsn);
     203       114026 :                     let n = skip_until_lsn.0 - self.lsn.0;
     204       114026 :                     if self.inputbuf.remaining() < n as usize {
     205         5828 :                         return Ok(None);
     206       108198 :                     }
     207       108198 :                     self.inputbuf.advance(n as usize);
     208       108198 :                     self.lsn += n;
     209       108198 :                     self.state = State::WaitingForRecord;
     210              :                 }
     211              :             }
     212              :         }
     213       364293 :     }
     214              : 
     215       108199 :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
     216              :         // We now have a record in the 'recordbuf' local variable.
     217       108199 :         let xlogrec =
     218       108199 :             XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
     219            0 :                 WalDecodeError {
     220            0 :                     msg: format!("xlog record deserialization failed {e}"),
     221            0 :                     lsn: self.lsn,
     222            0 :                 }
     223            0 :             })?;
     224              : 
     225       108199 :         let mut crc = 0;
     226       108199 :         crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
     227       108199 :         crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
     228       108199 :         if crc != xlogrec.xl_crc {
     229            0 :             return Err(WalDecodeError {
     230            0 :                 msg: "WAL record crc mismatch".into(),
     231            0 :                 lsn: self.lsn,
     232            0 :             });
     233       108199 :         }
     234              : 
     235              :         // XLOG_SWITCH records are special. If we see one, we need to skip
     236              :         // to the next WAL segment.
     237       108199 :         let next_lsn = if xlogrec.is_xlog_switch_record() {
     238            0 :             tracing::trace!("saw xlog switch record at {}", self.lsn);
     239            0 :             self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
     240              :         } else {
     241              :             // Pad to an 8-byte boundary
     242       108199 :             self.lsn.align()
     243              :         };
     244       108199 :         self.state = State::SkippingEverything {
     245       108199 :             skip_until_lsn: next_lsn,
     246       108199 :         };
     247              : 
     248              :         // We should return LSN of the next record, not the last byte of this record or
     249              :         // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
     250              :         // records, the former "spans" until the next WAL segment (see test_xlog_switch).
     251       108199 :         Ok((next_lsn, recordbuf))
     252       108199 :     }
     253              : }
        

Generated by: LCOV version 2.1-beta