LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - waldecoder_handler.rs (source / functions) Coverage Total Hit
Test: e402c46de0a007db6b48dddbde450ddbb92e6ceb.info Lines: 74.1 % 158 117
Test Date: 2024-06-25 10:31:23 Functions: 50.0 % 24 12

            Line data    Source code
       1              : //!
       2              : //! Basic WAL stream decoding.
       3              : //!
       4              : //! This understands the WAL page and record format, enough to figure out where the WAL record
       5              : //! boundaries are, and to reassemble WAL records that cross page boundaries.
       6              : //!
       7              : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
       8              : //! to look deeper into the WAL records to also understand which blocks they modify, the code
       9              : //! for that is in pageserver/src/walrecord.rs
      10              : //!
      11              : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
      12              : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
      13              : use super::xlog_utils::*;
      14              : use crate::WAL_SEGMENT_SIZE;
      15              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      16              : use crc32c::*;
      17              : use log::*;
      18              : use std::cmp::min;
      19              : use std::num::NonZeroU32;
      20              : use utils::lsn::Lsn;
      21              : 
      22              : pub trait WalStreamDecoderHandler {
      23              :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
      24              :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
      25              :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
      26              : }
      27              : 
      28              : //
      29              : // This is a trick to support several postgres versions simultaneously.
      30              : //
      31              : // Page decoding code depends on postgres bindings, so it is compiled for each version.
      32              : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
      33              : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
      34              : // Other methods are internal and are not dispatched.
      35              : //
      36              : // It is similar to having several impl blocks for the same struct,
      37              : // but the impls here are in different modules, so need to use a trait.
      38              : //
      39              : impl WalStreamDecoderHandler for WalStreamDecoder {
      40        27982 :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
      41        27982 :         let validate_impl = || {
      42        27982 :             if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
      43            0 :                 return Err(format!(
      44            0 :                     "invalid xlog page header: xlp_magic={}, expected {}",
      45            0 :                     hdr.xlp_magic, XLOG_PAGE_MAGIC
      46            0 :                 ));
      47        27982 :             }
      48        27982 :             if hdr.xlp_pageaddr != self.lsn.0 {
      49            0 :                 return Err(format!(
      50            0 :                     "invalid xlog page header: xlp_pageaddr={}, expected {}",
      51            0 :                     hdr.xlp_pageaddr, self.lsn
      52            0 :                 ));
      53        27982 :             }
      54        27982 :             match self.state {
      55              :                 State::WaitingForRecord => {
      56          180 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
      57            0 :                         return Err(
      58            0 :                             "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
      59            0 :                         );
      60          180 :                     }
      61          180 :                     if hdr.xlp_rem_len != 0 {
      62            0 :                         return Err(format!(
      63            0 :                             "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
      64            0 :                             hdr.xlp_rem_len
      65            0 :                         ));
      66          180 :                     }
      67              :                 }
      68        27802 :                 State::ReassemblingRecord { contlen, .. } => {
      69        27802 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
      70            0 :                         return Err(
      71            0 :                             "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
      72            0 :                                 .into(),
      73            0 :                         );
      74        27802 :                     }
      75        27802 :                     if hdr.xlp_rem_len != contlen.get() {
      76            0 :                         return Err(format!(
      77            0 :                             "invalid xlog page header: xlp_rem_len={}, expected {}",
      78            0 :                             hdr.xlp_rem_len,
      79            0 :                             contlen.get()
      80            0 :                         ));
      81        27802 :                     }
      82              :                 }
      83              :                 State::SkippingEverything { .. } => {
      84            0 :                     panic!("Should not be validating page header in the SkippingEverything state");
      85              :                 }
      86              :             };
      87        27982 :             Ok(())
      88        27982 :         };
      89        27982 :         validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
      90        27982 :     }
      91              : 
      92              :     /// Attempt to decode another WAL record from the input that has been fed to the
      93              :     /// decoder so far.
      94              :     ///
      95              :     /// Returns one of the following:
      96              :     ///     Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
      97              :     ///     Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
      98              :     ///     Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
      99              :     ///
     100       728328 :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
     101              :         // Run state machine that validates page headers, and reassembles records
     102              :         // that cross page boundaries.
     103      1123206 :         loop {
     104      1123206 :             // parse and verify page boundaries as we go
     105      1123206 :             // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
     106      1123206 :             match self.state {
     107              :                 State::WaitingForRecord | State::ReassemblingRecord { .. } => {
     108       890750 :                     if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     109              :                         // parse long header
     110              : 
     111           26 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
     112           14 :                             return Ok(None);
     113           12 :                         }
     114              : 
     115           12 :                         let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
     116           12 :                             |e| WalDecodeError {
     117            0 :                                 msg: format!("long header deserialization failed {}", e),
     118            0 :                                 lsn: self.lsn,
     119           12 :                             },
     120           12 :                         )?;
     121              : 
     122           12 :                         self.validate_page_header(&hdr.std)?;
     123              : 
     124           12 :                         self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     125       890724 :                     } else if self.lsn.block_offset() == 0 {
     126        38332 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
     127        10362 :                             return Ok(None);
     128        27970 :                         }
     129              : 
     130        27970 :                         let hdr =
     131        27970 :                             XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
     132            0 :                                 WalDecodeError {
     133            0 :                                     msg: format!("header deserialization failed {}", e),
     134            0 :                                     lsn: self.lsn,
     135            0 :                                 }
     136        27970 :                             })?;
     137              : 
     138        27970 :                         self.validate_page_header(&hdr)?;
     139              : 
     140        27970 :                         self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     141       852392 :                     }
     142              :                 }
     143       232456 :                 State::SkippingEverything { .. } => {}
     144              :             }
     145              :             // now read page contents
     146      1112830 :             match &mut self.state {
     147              :                 State::WaitingForRecord => {
     148              :                     // need to have at least the xl_tot_len field
     149       240574 :                     if self.inputbuf.remaining() < 4 {
     150        15461 :                         return Ok(None);
     151       225113 :                     }
     152       225113 : 
     153       225113 :                     // peek xl_tot_len at the beginning of the record.
     154       225113 :                     // FIXME: assumes little-endian
     155       225113 :                     let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
     156       225113 :                     if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
     157         4309 :                         return Err(WalDecodeError {
     158         4309 :                             msg: format!("invalid xl_tot_len {}", xl_tot_len),
     159         4309 :                             lsn: self.lsn,
     160         4309 :                         });
     161       220804 :                     }
     162       220804 :                     // Fast path for the common case that the whole record fits on the page.
     163       220804 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     164       220804 :                     if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
     165        74530 :                         self.lsn += xl_tot_len as u64;
     166        74530 :                         let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
     167        74530 :                         return Ok(Some(self.complete_record(recordbuf)?));
     168              :                     } else {
     169              :                         // Need to assemble the record from pieces. Remember the size of the
     170              :                         // record, and loop back. On next iterations, we will reach the branch
     171              :                         // below, and copy the part of the record that was on this or next page(s)
     172              :                         // to 'recordbuf'.  Subsequent iterations will skip page headers, and
     173              :                         // append the continuations from the next pages to 'recordbuf'.
     174       146274 :                         self.state = State::ReassemblingRecord {
     175       146274 :                             recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
     176       146274 :                             contlen: NonZeroU32::new(xl_tot_len).unwrap(),
     177       146274 :                         }
     178              :                     }
     179              :                 }
     180       639800 :                 State::ReassemblingRecord { recordbuf, contlen } => {
     181       639800 :                     // we're continuing a record, possibly from previous page.
     182       639800 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     183       639800 : 
     184       639800 :                     // read the rest of the record, or as much as fits on this page.
     185       639800 :                     let n = min(contlen.get(), pageleft) as usize;
     186       639800 : 
     187       639800 :                     if self.inputbuf.remaining() < n {
     188       465724 :                         return Ok(None);
     189       174076 :                     }
     190       174076 : 
     191       174076 :                     recordbuf.put(self.inputbuf.split_to(n));
     192       174076 :                     self.lsn += n as u64;
     193       174076 :                     *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
     194        27804 :                         Some(x) => x,
     195              :                         None => {
     196              :                             // The record is now complete.
     197       146272 :                             let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
     198       146272 :                             return Ok(Some(self.complete_record(recordbuf)?));
     199              :                         }
     200              :                     }
     201              :                 }
     202       232456 :                 State::SkippingEverything { skip_until_lsn } => {
     203       232456 :                     assert!(*skip_until_lsn >= self.lsn);
     204       232456 :                     let n = skip_until_lsn.0 - self.lsn.0;
     205       232456 :                     if self.inputbuf.remaining() < n as usize {
     206        11656 :                         return Ok(None);
     207       220800 :                     }
     208       220800 :                     self.inputbuf.advance(n as usize);
     209       220800 :                     self.lsn += n;
     210       220800 :                     self.state = State::WaitingForRecord;
     211              :                 }
     212              :             }
     213              :         }
     214       728328 :     }
     215              : 
     216       220802 :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
     217              :         // We now have a record in the 'recordbuf' local variable.
     218       220802 :         let xlogrec =
     219       220802 :             XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
     220            0 :                 WalDecodeError {
     221            0 :                     msg: format!("xlog record deserialization failed {}", e),
     222            0 :                     lsn: self.lsn,
     223            0 :                 }
     224       220802 :             })?;
     225              : 
     226       220802 :         let mut crc = 0;
     227       220802 :         crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
     228       220802 :         crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
     229       220802 :         if crc != xlogrec.xl_crc {
     230            0 :             return Err(WalDecodeError {
     231            0 :                 msg: "WAL record crc mismatch".into(),
     232            0 :                 lsn: self.lsn,
     233            0 :             });
     234       220802 :         }
     235              : 
     236              :         // XLOG_SWITCH records are special. If we see one, we need to skip
     237              :         // to the next WAL segment.
     238       220802 :         let next_lsn = if xlogrec.is_xlog_switch_record() {
     239            0 :             trace!("saw xlog switch record at {}", self.lsn);
     240            0 :             self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
     241              :         } else {
     242              :             // Pad to an 8-byte boundary
     243       220802 :             self.lsn.align()
     244              :         };
     245       220802 :         self.state = State::SkippingEverything {
     246       220802 :             skip_until_lsn: next_lsn,
     247       220802 :         };
     248       220802 : 
     249       220802 :         // We should return LSN of the next record, not the last byte of this record or
     250       220802 :         // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
     251       220802 :         // records, the former "spans" until the next WAL segment (see test_xlog_switch).
     252       220802 :         Ok((next_lsn, recordbuf))
     253       220802 :     }
     254              : }
        

Generated by: LCOV version 2.1-beta