LCOV - differential code coverage report
Current view: top level - libs/postgres_ffi/src - waldecoder_handler.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 75.3 % 158 119 39 119
Current Date: 2023-10-19 02:04:12 Functions: 50.0 % 24 12 12 12
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Basic WAL stream decoding.
       3                 : //!
       4                 : //! This understands the WAL page and record format, enough to figure out where the WAL record
       5                 : //! boundaries are, and to reassemble WAL records that cross page boundaries.
       6                 : //!
       7                 : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
       8                 : //! to look deeper into the WAL records to also understand which blocks they modify, the code
       9                 : //! for that is in pageserver/src/walrecord.rs
      10                 : //!
      11                 : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
      12                 : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
      13                 : use super::xlog_utils::*;
      14                 : use crate::WAL_SEGMENT_SIZE;
      15                 : use bytes::{Buf, BufMut, Bytes, BytesMut};
      16                 : use crc32c::*;
      17                 : use log::*;
      18                 : use std::cmp::min;
      19                 : use std::num::NonZeroU32;
      20                 : use utils::lsn::Lsn;
      21                 : 
      22                 : pub trait WalStreamDecoderHandler {
      23                 :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
      24                 :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
      25                 :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
      26                 : }
      27                 : 
      28                 : //
      29                 : // This is a trick to support several postgres versions simultaneously.
      30                 : //
      31                 : // Page decoding code depends on postgres bindings, so it is compiled for each version.
      32                 : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
      33                 : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
      34                 : // Other methods are internal and are not dispatched.
      35                 : //
      36                 : // It is similar to having several impl blocks for the same struct,
      37                 : // but the impls here are in different modules, so need to use a trait.
      38                 : //
      39                 : impl WalStreamDecoderHandler for WalStreamDecoder {
      40 CBC     3097600 :     fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
      41         3097600 :         let validate_impl = || {
      42         3097600 :             if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
      43 UBC           0 :                 return Err(format!(
      44               0 :                     "invalid xlog page header: xlp_magic={}, expected {}",
      45               0 :                     hdr.xlp_magic, XLOG_PAGE_MAGIC
      46               0 :                 ));
      47 CBC     3097600 :             }
      48         3097600 :             if hdr.xlp_pageaddr != self.lsn.0 {
      49 UBC           0 :                 return Err(format!(
      50               0 :                     "invalid xlog page header: xlp_pageaddr={}, expected {}",
      51               0 :                     hdr.xlp_pageaddr, self.lsn
      52               0 :                 ));
      53 CBC     3097600 :             }
      54         3097600 :             match self.state {
      55                 :                 State::WaitingForRecord => {
      56          165261 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
      57 UBC           0 :                         return Err(
      58               0 :                             "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
      59               0 :                         );
      60 CBC      165261 :                     }
      61          165261 :                     if hdr.xlp_rem_len != 0 {
      62 UBC           0 :                         return Err(format!(
      63               0 :                             "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
      64               0 :                             hdr.xlp_rem_len
      65               0 :                         ));
      66 CBC      165261 :                     }
      67                 :                 }
      68         2932339 :                 State::ReassemblingRecord { contlen, .. } => {
      69         2932339 :                     if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
      70 UBC           0 :                         return Err(
      71               0 :                             "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
      72               0 :                                 .into(),
      73               0 :                         );
      74 CBC     2932339 :                     }
      75         2932339 :                     if hdr.xlp_rem_len != contlen.get() {
      76 UBC           0 :                         return Err(format!(
      77               0 :                             "invalid xlog page header: xlp_rem_len={}, expected {}",
      78               0 :                             hdr.xlp_rem_len,
      79               0 :                             contlen.get()
      80               0 :                         ));
      81 CBC     2932339 :                     }
      82                 :                 }
      83                 :                 State::SkippingEverything { .. } => {
      84 UBC           0 :                     panic!("Should not be validating page header in the SkippingEverything state");
      85                 :                 }
      86                 :             };
      87 CBC     3097600 :             Ok(())
      88         3097600 :         };
      89         3097600 :         validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
      90         3097600 :     }
      91                 : 
      92                 :     /// Attempt to decode another WAL record from the input that has been fed to the
      93                 :     /// decoder so far.
      94                 :     ///
      95                 :     /// Returns one of the following:
      96                 :     ///     Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
      97                 :     ///     Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
      98                 :     ///     Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
      99                 :     ///
     100       171280692 :     fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
     101                 :         // Run state machine that validates page headers, and reassembles records
     102                 :         // that cross page boundaries.
     103       346051217 :         loop {
     104       346051217 :             // parse and verify page boundaries as we go
     105       346051217 :             // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
     106       346051217 :             match self.state {
     107                 :                 State::WaitingForRecord | State::ReassemblingRecord { .. } => {
     108       177195224 :                     if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     109                 :                         // parse long header
     110                 : 
     111            2221 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
     112             748 :                             return Ok(None);
     113            1473 :                         }
     114                 : 
     115            1473 :                         let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
     116            1473 :                             |e| WalDecodeError {
     117 UBC           0 :                                 msg: format!("long header deserialization failed {}", e),
     118               0 :                                 lsn: self.lsn,
     119 CBC        1473 :                             },
     120            1473 :                         )?;
     121                 : 
     122            1473 :                         self.validate_page_header(&hdr.std)?;
     123                 : 
     124            1473 :                         self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
     125       177193003 :                     } else if self.lsn.block_offset() == 0 {
     126         3166657 :                         if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
     127           70530 :                             return Ok(None);
     128         3096127 :                         }
     129                 : 
     130         3096127 :                         let hdr =
     131         3096127 :                             XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
     132 UBC           0 :                                 WalDecodeError {
     133               0 :                                     msg: format!("header deserialization failed {}", e),
     134               0 :                                     lsn: self.lsn,
     135               0 :                                 }
     136 CBC     3096127 :                             })?;
     137                 : 
     138         3096127 :                         self.validate_page_header(&hdr)?;
     139                 : 
     140         3096127 :                         self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
     141       174026346 :                     }
     142                 :                 }
     143       168855993 :                 State::SkippingEverything { .. } => {}
     144                 :             }
     145                 :             // now read page contents
     146       345979939 :             match &mut self.state {
     147                 :                 State::WaitingForRecord => {
     148                 :                     // need to have at least the xl_tot_len field
     149       171086986 :                     if self.inputbuf.remaining() < 4 {
     150         2235487 :                         return Ok(None);
     151       168851499 :                     }
     152       168851499 : 
     153       168851499 :                     // peek xl_tot_len at the beginning of the record.
     154       168851499 :                     // FIXME: assumes little-endian
     155       168851499 :                     let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
     156       168851499 :                     if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
     157             104 :                         return Err(WalDecodeError {
     158             104 :                             msg: format!("invalid xl_tot_len {}", xl_tot_len),
     159             104 :                             lsn: self.lsn,
     160             104 :                         });
     161       168851395 :                     }
     162       168851395 :                     // Fast path for the common case that the whole record fits on the page.
     163       168851395 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     164       168851395 :                     if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
     165       165864002 :                         self.lsn += xl_tot_len as u64;
     166       165864002 :                         let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
     167       165864002 :                         return Ok(Some(self.complete_record(recordbuf)?));
     168                 :                     } else {
     169                 :                         // Need to assemble the record from pieces. Remember the size of the
     170                 :                         // record, and loop back. On next iterations, we will reach the branch
     171                 :                         // below, and copy the part of the record that was on this or next page(s)
     172                 :                         // to 'recordbuf'.  Subsequent iterations will skip page headers, and
     173                 :                         // append the continuations from the next pages to 'recordbuf'.
     174         2987393 :                         self.state = State::ReassemblingRecord {
     175         2987393 :                             recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
     176         2987393 :                             contlen: NonZeroU32::new(xl_tot_len).unwrap(),
     177         2987393 :                         }
     178                 :                     }
     179                 :                 }
     180         6036960 :                 State::ReassemblingRecord { recordbuf, contlen } => {
     181         6036960 :                     // we're continuing a record, possibly from previous page.
     182         6036960 :                     let pageleft = self.lsn.remaining_in_block() as u32;
     183         6036960 : 
     184         6036960 :                     // read the rest of the record, or as much as fits on this page.
     185         6036960 :                     let n = min(contlen.get(), pageleft) as usize;
     186         6036960 : 
     187         6036960 :                     if self.inputbuf.remaining() < n {
     188          117244 :                         return Ok(None);
     189         5919716 :                     }
     190         5919716 : 
     191         5919716 :                     recordbuf.put(self.inputbuf.split_to(n));
     192         5919716 :                     self.lsn += n as u64;
     193         5919716 :                     *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
     194         2932365 :                         Some(x) => x,
     195                 :                         None => {
     196                 :                             // The record is now complete.
     197         2987351 :                             let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
     198         2987351 :                             return Ok(Some(self.complete_record(recordbuf)?));
     199                 :                         }
     200                 :                     }
     201                 :                 }
     202       168855993 :                 State::SkippingEverything { skip_until_lsn } => {
     203       168855993 :                     assert!(*skip_until_lsn >= self.lsn);
     204       168855993 :                     let n = skip_until_lsn.0 - self.lsn.0;
     205       168855993 :                     if self.inputbuf.remaining() < n as usize {
     206            5226 :                         return Ok(None);
     207       168850767 :                     }
     208       168850767 :                     self.inputbuf.advance(n as usize);
     209       168850767 :                     self.lsn += n;
     210       168850767 :                     self.state = State::WaitingForRecord;
     211                 :                 }
     212                 :             }
     213                 :         }
     214       171280691 :     }
     215                 : 
     216       168851353 :     fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
     217                 :         // We now have a record in the 'recordbuf' local variable.
     218       168851353 :         let xlogrec =
     219       168851353 :             XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
     220 UBC           0 :                 WalDecodeError {
     221               0 :                     msg: format!("xlog record deserialization failed {}", e),
     222               0 :                     lsn: self.lsn,
     223               0 :                 }
     224 CBC   168851353 :             })?;
     225                 : 
     226       168851353 :         let mut crc = 0;
     227       168851353 :         crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
     228       168851353 :         crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
     229       168851353 :         if crc != xlogrec.xl_crc {
     230 UBC           0 :             return Err(WalDecodeError {
     231               0 :                 msg: "WAL record crc mismatch".into(),
     232               0 :                 lsn: self.lsn,
     233               0 :             });
     234 CBC   168851353 :         }
     235                 : 
     236                 :         // XLOG_SWITCH records are special. If we see one, we need to skip
     237                 :         // to the next WAL segment.
     238       168851353 :         let next_lsn = if xlogrec.is_xlog_switch_record() {
     239              15 :             trace!("saw xlog switch record at {}", self.lsn);
     240              14 :             self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
     241                 :         } else {
     242                 :             // Pad to an 8-byte boundary
     243       168851338 :             self.lsn.align()
     244                 :         };
     245       168851352 :         self.state = State::SkippingEverything {
     246       168851352 :             skip_until_lsn: next_lsn,
     247       168851352 :         };
     248       168851352 : 
     249       168851352 :         // We should return LSN of the next record, not the last byte of this record or
     250       168851352 :         // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
     251       168851352 :         // records, the former "spans" until the next WAL segment (see test_xlog_switch).
     252       168851352 :         Ok((next_lsn, recordbuf))
     253       168851352 :     }
     254                 : }
        

Generated by: LCOV version 2.1-beta