LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - waldecoder_handler.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 77.8 % 158 123
Test Date: 2024-02-07 07:37:29 Functions: 54.2 % 24 13

            Line data    Source code
       1              : //!
       2              : //! Basic WAL stream decoding.
       3              : //!
       4              : //! This understands the WAL page and record format, enough to figure out where the WAL record
       5              : //! boundaries are, and to reassemble WAL records that cross page boundaries.
       6              : //!
       7              : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
       8              : //! to look deeper into the WAL records to also understand which blocks they modify, the code
       9              : //! for that is in pageserver/src/walrecord.rs
      10              : //!
      11              : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
      12              : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
      13              : use super::xlog_utils::*;
      14              : use crate::WAL_SEGMENT_SIZE;
      15              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      16              : use crc32c::*;
      17              : use log::*;
      18              : use std::cmp::min;
      19              : use std::num::NonZeroU32;
      20              : use utils::lsn::Lsn;
      21              : 
      22              : pub trait WalStreamDecoderHandler {
      23              :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
      24              :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
      25              :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
      26              : }
      27              : 
      28              : //
      29              : // This is a trick to support several postgres versions simultaneously.
      30              : //
      31              : // Page decoding code depends on postgres bindings, so it is compiled for each version.
      32              : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
      33              : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
      34              : // Other methods are internal and are not dispatched.
      35              : //
      36              : // It is similar to having several impl blocks for the same struct,
      37              : // but the impls here are in different modules, so need to use a trait.
      38              : //
      39              : impl WalStreamDecoderHandler for WalStreamDecoder {
      40      3306609 :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
      41      3306609 :         let validate_impl = || {
      42      3306609 :             if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
      43            1 :                 return Err(format!(
      44            1 :                     "invalid xlog page header: xlp_magic={}, expected {}",
      45            1 :                     hdr.xlp_magic, XLOG_PAGE_MAGIC
      46            1 :                 ));
      47      3306608 :             }
      48      3306608 :             if hdr.xlp_pageaddr != self.lsn.0 {
      49            0 :                 return Err(format!(
      50            0 :                     "invalid xlog page header: xlp_pageaddr={}, expected {}",
      51            0 :                     hdr.xlp_pageaddr, self.lsn
      52            0 :                 ));
      53      3306608 :             }
      54      3306608 :             match self.state {
      55              :                 State::WaitingForRecord => {
      56       154745 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
      57            0 :                         return Err(
      58            0 :                             "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
      59            0 :                         );
      60       154745 :                     }
      61       154745 :                     if hdr.xlp_rem_len != 0 {
      62            0 :                         return Err(format!(
      63            0 :                             "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
      64            0 :                             hdr.xlp_rem_len
      65            0 :                         ));
      66       154745 :                     }
      67              :                 }
      68      3151863 :                 State::ReassemblingRecord { contlen, .. } => {
      69      3151863 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
      70            0 :                         return Err(
      71            0 :                             "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
      72            0 :                                 .into(),
      73            0 :                         );
      74      3151863 :                     }
      75      3151863 :                     if hdr.xlp_rem_len != contlen.get() {
      76            0 :                         return Err(format!(
      77            0 :                             "invalid xlog page header: xlp_rem_len={}, expected {}",
      78            0 :                             hdr.xlp_rem_len,
      79            0 :                             contlen.get()
      80            0 :                         ));
      81      3151863 :                     }
      82              :                 }
      83              :                 State::SkippingEverything { .. } => {
      84            0 :                     panic!("Should not be validating page header in the SkippingEverything state");
      85              :                 }
      86              :             };
      87      3306608 :             Ok(())
      88      3306609 :         };
      89      3306609 :         validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
      90      3306609 :     }
      91              : 
      92              :     /// Attempt to decode another WAL record from the input that has been fed to the
      93              :     /// decoder so far.
      94              :     ///
      95              :     /// Returns one of the following:
      96              :     ///     Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
      97              :     ///     Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
      98              :     ///     Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
      99              :     ///
     100    160992493 :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
     101              :         // Run state machine that validates page headers, and reassembles records
     102              :         // that cross page boundaries.
     103    325437123 :         loop {
     104    325437123 :             // parse and verify page boundaries as we go
     105    325437123 :             // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
     106    325437123 :             match self.state {
     107              :                 State::WaitingForRecord | State::ReassemblingRecord { .. } => {
     108    167418105 :                     if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     109              :                         // parse long header
     110              : 
     111         2445 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
     112          860 :                             return Ok(None);
     113         1585 :                         }
     114              : 
     115         1585 :                         let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
     116         1585 :                             |e| WalDecodeError {
     117            0 :                                 msg: format!("long header deserialization failed {}", e),
     118            0 :                                 lsn: self.lsn,
     119         1585 :                             },
     120         1585 :                         )?;
     121              : 
     122         1585 :                         self.validate_page_header(&hdr.std)?;
     123              : 
     124         1585 :                         self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     125    167415660 :                     } else if self.lsn.block_offset() == 0 {
     126      3415035 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
     127       110011 :                             return Ok(None);
     128      3305024 :                         }
     129              : 
     130      3305024 :                         let hdr =
     131      3305024 :                             XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
     132            0 :                                 WalDecodeError {
     133            0 :                                     msg: format!("header deserialization failed {}", e),
     134            0 :                                     lsn: self.lsn,
     135            0 :                                 }
     136      3305024 :                             })?;
     137              : 
     138      3305024 :                         self.validate_page_header(&hdr)?;
     139              : 
     140      3305023 :                         self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     141    164000625 :                     }
     142              :                 }
     143    158019018 :                 State::SkippingEverything { .. } => {}
     144              :             }
     145              :             // now read page contents
     146    325326251 :             match &mut self.state {
     147              :                 State::WaitingForRecord => {
     148              :                     // need to have at least the xl_tot_len field
     149    160338833 :                     if self.inputbuf.remaining() < 4 {
     150      2335816 :                         return Ok(None);
     151    158003017 :                     }
     152    158003017 : 
     153    158003017 :                     // peek xl_tot_len at the beginning of the record.
     154    158003017 :                     // FIXME: assumes little-endian
     155    158003017 :                     let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
     156    158003017 :                     if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
     157          223 :                         return Err(WalDecodeError {
     158          223 :                             msg: format!("invalid xl_tot_len {}", xl_tot_len),
     159          223 :                             lsn: self.lsn,
     160          223 :                         });
     161    158002794 :                     }
     162    158002794 :                     // Fast path for the common case that the whole record fits on the page.
     163    158002794 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     164    158002794 :                     if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
     165    154712220 :                         self.lsn += xl_tot_len as u64;
     166    154712220 :                         let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
     167    154712220 :                         return Ok(Some(self.complete_record(recordbuf)?));
     168              :                     } else {
     169              :                         // Need to assemble the record from pieces. Remember the size of the
     170              :                         // record, and loop back. On next iterations, we will reach the branch
     171              :                         // below, and copy the part of the record that was on this or next page(s)
     172              :                         // to 'recordbuf'.  Subsequent iterations will skip page headers, and
     173              :                         // append the continuations from the next pages to 'recordbuf'.
     174      3290574 :                         self.state = State::ReassemblingRecord {
     175      3290574 :                             recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
     176      3290574 :                             contlen: NonZeroU32::new(xl_tot_len).unwrap(),
     177      3290574 :                         }
     178              :                     }
     179              :                 }
     180      6968400 :                 State::ReassemblingRecord { recordbuf, contlen } => {
     181      6968400 :                     // we're continuing a record, possibly from previous page.
     182      6968400 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     183      6968400 : 
     184      6968400 :                     // read the rest of the record, or as much as fits on this page.
     185      6968400 :                     let n = min(contlen.get(), pageleft) as usize;
     186      6968400 : 
     187      6968400 :                     if self.inputbuf.remaining() < n {
     188       525963 :                         return Ok(None);
     189      6442437 :                     }
     190      6442437 : 
     191      6442437 :                     recordbuf.put(self.inputbuf.split_to(n));
     192      6442437 :                     self.lsn += n as u64;
     193      6442437 :                     *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
     194      3151916 :                         Some(x) => x,
     195              :                         None => {
     196              :                             // The record is now complete.
     197      3290521 :                             let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
     198      3290521 :                             return Ok(Some(self.complete_record(recordbuf)?));
     199              :                         }
     200              :                     }
     201              :                 }
     202    158019018 :                 State::SkippingEverything { skip_until_lsn } => {
     203    158019018 :                     assert!(*skip_until_lsn >= self.lsn);
     204    158019018 :                     let n = skip_until_lsn.0 - self.lsn.0;
     205    158019018 :                     if self.inputbuf.remaining() < n as usize {
     206        16878 :                         return Ok(None);
     207    158002140 :                     }
     208    158002140 :                     self.inputbuf.advance(n as usize);
     209    158002140 :                     self.lsn += n;
     210    158002140 :                     self.state = State::WaitingForRecord;
     211              :                 }
     212              :             }
     213              :         }
     214    160992492 :     }
     215              : 
     216    158002741 :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
     217              :         // We now have a record in the 'recordbuf' local variable.
     218    158002741 :         let xlogrec =
     219    158002741 :             XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
     220            0 :                 WalDecodeError {
     221            0 :                     msg: format!("xlog record deserialization failed {}", e),
     222            0 :                     lsn: self.lsn,
     223            0 :                 }
     224    158002741 :             })?;
     225              : 
     226    158002741 :         let mut crc = 0;
     227    158002741 :         crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
     228    158002741 :         crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
     229    158002741 :         if crc != xlogrec.xl_crc {
     230            0 :             return Err(WalDecodeError {
     231            0 :                 msg: "WAL record crc mismatch".into(),
     232            0 :                 lsn: self.lsn,
     233            0 :             });
     234    158002741 :         }
     235              : 
     236              :         // XLOG_SWITCH records are special. If we see one, we need to skip
     237              :         // to the next WAL segment.
     238    158002741 :         let next_lsn = if xlogrec.is_xlog_switch_record() {
     239           16 :             trace!("saw xlog switch record at {}", self.lsn);
     240           15 :             self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
     241              :         } else {
     242              :             // Pad to an 8-byte boundary
     243    158002725 :             self.lsn.align()
     244              :         };
     245    158002740 :         self.state = State::SkippingEverything {
     246    158002740 :             skip_until_lsn: next_lsn,
     247    158002740 :         };
     248    158002740 : 
     249    158002740 :         // We should return LSN of the next record, not the last byte of this record or
     250    158002740 :         // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
     251    158002740 :         // records, the former "spans" until the next WAL segment (see test_xlog_switch).
     252    158002740 :         Ok((next_lsn, recordbuf))
     253    158002740 :     }
     254              : }
        

Generated by: LCOV version 2.1-beta