LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - waldecoder_handler.rs (source / functions) Coverage Total Hit
Test: 52d9d4a58355424a48c56cb9ba9670a073f618b9.info Lines: 73.5 % 155 114
Test Date: 2024-11-21 08:31:22 Functions: 50.0 % 32 16

            Line data    Source code
       1              : //!
       2              : //! Basic WAL stream decoding.
       3              : //!
       4              : //! This understands the WAL page and record format, enough to figure out where the WAL record
       5              : //! boundaries are, and to reassemble WAL records that cross page boundaries.
       6              : //!
       7              : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
       8              : //! to look deeper into the WAL records to also understand which blocks they modify, the code
       9              : //! for that is in pageserver/src/walrecord.rs
      10              : //!
      11              : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
      12              : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
      13              : use super::xlog_utils::*;
      14              : use crate::WAL_SEGMENT_SIZE;
      15              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      16              : use crc32c::*;
      17              : use log::*;
      18              : use std::cmp::min;
      19              : use std::num::NonZeroU32;
      20              : use utils::lsn::Lsn;
      21              : 
      22              : pub trait WalStreamDecoderHandler {
      23              :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
      24              :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
      25              :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
      26              : }
      27              : 
      28              : //
      29              : // This is a trick to support several postgres versions simultaneously.
      30              : //
      31              : // Page decoding code depends on postgres bindings, so it is compiled for each version.
      32              : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
      33              : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
      34              : // Other methods are internal and are not dispatched.
      35              : //
      36              : // It is similar to having several impl blocks for the same struct,
      37              : // but the impls here are in different modules, so need to use a trait.
      38              : //
      39              : impl WalStreamDecoderHandler for WalStreamDecoder {
      40        19520 :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
      41        19520 :         let validate_impl = || {
      42        19520 :             if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
      43            0 :                 return Err(format!(
      44            0 :                     "invalid xlog page header: xlp_magic={}, expected {}",
      45            0 :                     hdr.xlp_magic, XLOG_PAGE_MAGIC
      46            0 :                 ));
      47        19520 :             }
      48        19520 :             if hdr.xlp_pageaddr != self.lsn.0 {
      49            0 :                 return Err(format!(
      50            0 :                     "invalid xlog page header: xlp_pageaddr={}, expected {}",
      51            0 :                     hdr.xlp_pageaddr, self.lsn
      52            0 :                 ));
      53        19520 :             }
      54        19520 :             match self.state {
      55              :                 State::WaitingForRecord => {
      56          160 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
      57            0 :                         return Err(
      58            0 :                             "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
      59            0 :                         );
      60          160 :                     }
      61          160 :                     if hdr.xlp_rem_len != 0 {
      62            0 :                         return Err(format!(
      63            0 :                             "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
      64            0 :                             hdr.xlp_rem_len
      65            0 :                         ));
      66          160 :                     }
      67              :                 }
      68        19360 :                 State::ReassemblingRecord { contlen, .. } => {
      69        19360 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
      70            0 :                         return Err(
      71            0 :                             "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
      72            0 :                                 .into(),
      73            0 :                         );
      74        19360 :                     }
      75        19360 :                     if hdr.xlp_rem_len != contlen.get() {
      76            0 :                         return Err(format!(
      77            0 :                             "invalid xlog page header: xlp_rem_len={}, expected {}",
      78            0 :                             hdr.xlp_rem_len,
      79            0 :                             contlen.get()
      80            0 :                         ));
      81        19360 :                     }
      82              :                 }
      83              :                 State::SkippingEverything { .. } => {
      84            0 :                     panic!("Should not be validating page header in the SkippingEverything state");
      85              :                 }
      86              :             };
      87        19520 :             Ok(())
      88        19520 :         };
      89        19520 :         validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
      90        19520 :     }
      91              : 
      92              :     /// Attempt to decode another WAL record from the input that has been fed to the
      93              :     /// decoder so far.
      94              :     ///
      95              :     /// Returns one of the following:
      96              :     ///     Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
      97              :     ///     Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
      98              :     ///     Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
      99              :     ///
     100       663436 :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
     101              :         // Run state machine that validates page headers, and reassembles records
     102              :         // that cross page boundaries.
     103              :         loop {
     104              :             // parse and verify page boundaries as we go
     105              :             // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
     106       999645 :             match self.state {
     107              :                 State::WaitingForRecord | State::ReassemblingRecord { .. } => {
     108       817183 :                     if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     109              :                         // parse long header
     110              : 
     111           18 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
     112           10 :                             return Ok(None);
     113            8 :                         }
     114              : 
     115            8 :                         let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
     116            8 :                             |e| WalDecodeError {
     117            0 :                                 msg: format!("long header deserialization failed {}", e),
     118            0 :                                 lsn: self.lsn,
     119            8 :                             },
     120            8 :                         )?;
     121              : 
     122            8 :                         self.validate_page_header(&hdr.std)?;
     123              : 
     124            8 :                         self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     125       817165 :                     } else if self.lsn.block_offset() == 0 {
     126        26714 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
     127         7202 :                             return Ok(None);
     128        19512 :                         }
     129              : 
     130        19512 :                         let hdr =
     131        19512 :                             XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
     132            0 :                                 WalDecodeError {
     133            0 :                                     msg: format!("header deserialization failed {}", e),
     134            0 :                                     lsn: self.lsn,
     135            0 :                                 }
     136        19512 :                             })?;
     137              : 
     138        19512 :                         self.validate_page_header(&hdr)?;
     139              : 
     140        19512 :                         self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     141       790451 :                     }
     142              :                 }
     143       182462 :                 State::SkippingEverything { .. } => {}
     144              :             }
     145              :             // now read page contents
     146       992433 :             match &mut self.state {
     147              :                 State::WaitingForRecord => {
     148              :                     // need to have at least the xl_tot_len field
     149       183916 :                     if self.inputbuf.remaining() < 4 {
     150        12466 :                         return Ok(None);
     151       171450 :                     }
     152       171450 : 
     153       171450 :                     // peek xl_tot_len at the beginning of the record.
     154       171450 :                     // FIXME: assumes little-endian
     155       171450 :                     let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
     156       171450 :                     if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
     157          640 :                         return Err(WalDecodeError {
     158          640 :                             msg: format!("invalid xl_tot_len {}", xl_tot_len),
     159          640 :                             lsn: self.lsn,
     160          640 :                         });
     161       170810 :                     }
     162       170810 :                     // Fast path for the common case that the whole record fits on the page.
     163       170810 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     164       170810 :                     if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
     165        24769 :                         self.lsn += xl_tot_len as u64;
     166        24769 :                         let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
     167        24769 :                         return Ok(Some(self.complete_record(recordbuf)?));
     168              :                     } else {
     169              :                         // Need to assemble the record from pieces. Remember the size of the
     170              :                         // record, and loop back. On next iterations, we will reach the branch
     171              :                         // below, and copy the part of the record that was on this or next page(s)
     172              :                         // to 'recordbuf'.  Subsequent iterations will skip page headers, and
     173              :                         // append the continuations from the next pages to 'recordbuf'.
     174       146041 :                         self.state = State::ReassemblingRecord {
     175       146041 :                             recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
     176       146041 :                             contlen: NonZeroU32::new(xl_tot_len).unwrap(),
     177       146041 :                         }
     178              :                     }
     179              :                 }
     180       626055 :                 State::ReassemblingRecord { recordbuf, contlen } => {
     181       626055 :                     // we're continuing a record, possibly from previous page.
     182       626055 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     183       626055 : 
     184       626055 :                     // read the rest of the record, or as much as fits on this page.
     185       626055 :                     let n = min(contlen.get(), pageleft) as usize;
     186       626055 : 
     187       626055 :                     if self.inputbuf.remaining() < n {
     188       460654 :                         return Ok(None);
     189       165401 :                     }
     190       165401 : 
     191       165401 :                     recordbuf.put(self.inputbuf.split_to(n));
     192       165401 :                     self.lsn += n as u64;
     193       165401 :                     *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
     194        19362 :                         Some(x) => x,
     195              :                         None => {
     196              :                             // The record is now complete.
     197       146039 :                             let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
     198       146039 :                             return Ok(Some(self.complete_record(recordbuf)?));
     199              :                         }
     200              :                     }
     201              :                 }
     202       182462 :                 State::SkippingEverything { skip_until_lsn } => {
     203       182462 :                     assert!(*skip_until_lsn >= self.lsn);
     204       182462 :                     let n = skip_until_lsn.0 - self.lsn.0;
     205       182462 :                     if self.inputbuf.remaining() < n as usize {
     206        11656 :                         return Ok(None);
     207       170806 :                     }
     208       170806 :                     self.inputbuf.advance(n as usize);
     209       170806 :                     self.lsn += n;
     210       170806 :                     self.state = State::WaitingForRecord;
     211              :                 }
     212              :             }
     213              :         }
     214       663436 :     }
     215              : 
     216       170808 :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
     217              :         // We now have a record in the 'recordbuf' local variable.
     218       170808 :         let xlogrec =
     219       170808 :             XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
     220            0 :                 WalDecodeError {
     221            0 :                     msg: format!("xlog record deserialization failed {}", e),
     222            0 :                     lsn: self.lsn,
     223            0 :                 }
     224       170808 :             })?;
     225              : 
     226       170808 :         let mut crc = 0;
     227       170808 :         crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
     228       170808 :         crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
     229       170808 :         if crc != xlogrec.xl_crc {
     230            0 :             return Err(WalDecodeError {
     231            0 :                 msg: "WAL record crc mismatch".into(),
     232            0 :                 lsn: self.lsn,
     233            0 :             });
     234       170808 :         }
     235              : 
     236              :         // XLOG_SWITCH records are special. If we see one, we need to skip
     237              :         // to the next WAL segment.
     238       170808 :         let next_lsn = if xlogrec.is_xlog_switch_record() {
     239            0 :             trace!("saw xlog switch record at {}", self.lsn);
     240            0 :             self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
     241              :         } else {
     242              :             // Pad to an 8-byte boundary
     243       170808 :             self.lsn.align()
     244              :         };
     245       170808 :         self.state = State::SkippingEverything {
     246       170808 :             skip_until_lsn: next_lsn,
     247       170808 :         };
     248       170808 : 
     249       170808 :         // We should return LSN of the next record, not the last byte of this record or
     250       170808 :         // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
     251       170808 :         // records, the former "spans" until the next WAL segment (see test_xlog_switch).
     252       170808 :         Ok((next_lsn, recordbuf))
     253       170808 :     }
     254              : }
        

Generated by: LCOV version 2.1-beta