Line data Source code
1 : //!
2 : //! Basic WAL stream decoding.
3 : //!
4 : //! This understands the WAL page and record format, enough to figure out where the WAL record
5 : //! boundaries are, and to reassemble WAL records that cross page boundaries.
6 : //!
7 : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
8 : //! to look deeper into the WAL records to also understand which blocks they modify, the code
9 : //! for that is in pageserver/src/walrecord.rs
10 : //!
11 : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
12 : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
13 : use super::xlog_utils::*;
14 : use crate::WAL_SEGMENT_SIZE;
15 : use bytes::{Buf, BufMut, Bytes, BytesMut};
16 : use crc32c::*;
17 : use std::cmp::min;
18 : use std::num::NonZeroU32;
19 : use utils::lsn::Lsn;
20 :
21 : pub trait WalStreamDecoderHandler {
22 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
23 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
24 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
25 : }
26 :
27 : //
28 : // This is a trick to support several postgres versions simultaneously.
29 : //
30 : // Page decoding code depends on postgres bindings, so it is compiled for each version.
31 : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
32 : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
33 : // Other methods are internal and are not dispatched.
34 : //
35 : // It is similar to having several impl blocks for the same struct,
36 : // but the impls here are in different modules, so need to use a trait.
37 : //
38 : impl WalStreamDecoderHandler for WalStreamDecoder {
39 19470 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
40 19470 : let validate_impl = || {
41 19470 : if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
42 0 : return Err(format!(
43 0 : "invalid xlog page header: xlp_magic={}, expected {}",
44 0 : hdr.xlp_magic, XLOG_PAGE_MAGIC
45 0 : ));
46 19470 : }
47 19470 : if hdr.xlp_pageaddr != self.lsn.0 {
48 0 : return Err(format!(
49 0 : "invalid xlog page header: xlp_pageaddr={}, expected {}",
50 0 : hdr.xlp_pageaddr, self.lsn
51 0 : ));
52 19470 : }
53 19470 : match self.state {
54 : State::WaitingForRecord => {
55 100 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
56 0 : return Err(
57 0 : "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
58 0 : );
59 100 : }
60 100 : if hdr.xlp_rem_len != 0 {
61 0 : return Err(format!(
62 0 : "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
63 0 : hdr.xlp_rem_len
64 0 : ));
65 100 : }
66 : }
67 19370 : State::ReassemblingRecord { contlen, .. } => {
68 19370 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
69 0 : return Err(
70 0 : "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
71 0 : .into(),
72 0 : );
73 19370 : }
74 19370 : if hdr.xlp_rem_len != contlen.get() {
75 0 : return Err(format!(
76 0 : "invalid xlog page header: xlp_rem_len={}, expected {}",
77 0 : hdr.xlp_rem_len,
78 0 : contlen.get()
79 0 : ));
80 19370 : }
81 : }
82 : State::SkippingEverything { .. } => {
83 0 : panic!("Should not be validating page header in the SkippingEverything state");
84 : }
85 : };
86 19470 : Ok(())
87 19470 : };
88 19470 : validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
89 19470 : }
90 :
91 : /// Attempt to decode another WAL record from the input that has been fed to the
92 : /// decoder so far.
93 : ///
94 : /// Returns one of the following:
95 : /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
96 : /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
97 : /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
98 : ///
99 364293 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
100 : // Run state machine that validates page headers, and reassembles records
101 : // that cross page boundaries.
102 : loop {
103 : // parse and verify page boundaries as we go
104 : // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
105 566280 : match self.state {
106 : State::WaitingForRecord | State::ReassemblingRecord { .. } => {
107 452254 : if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
108 : // parse long header
109 :
110 17 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
111 9 : return Ok(None);
112 8 : }
113 :
114 8 : let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
115 : |e| WalDecodeError {
116 0 : msg: format!("long header deserialization failed {e}"),
117 0 : lsn: self.lsn,
118 0 : },
119 0 : )?;
120 :
121 8 : self.validate_page_header(&hdr.std)?;
122 :
123 8 : self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
124 452237 : } else if self.lsn.block_offset() == 0 {
125 26017 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
126 6555 : return Ok(None);
127 19462 : }
128 :
129 19462 : let hdr =
130 19462 : XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
131 0 : WalDecodeError {
132 0 : msg: format!("header deserialization failed {e}"),
133 0 : lsn: self.lsn,
134 0 : }
135 0 : })?;
136 :
137 19462 : self.validate_page_header(&hdr)?;
138 :
139 19462 : self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
140 426220 : }
141 : }
142 114026 : State::SkippingEverything { .. } => {}
143 : }
144 : // now read page contents
145 559716 : match &mut self.state {
146 : State::WaitingForRecord => {
147 : // need to have at least the xl_tot_len field
148 116255 : if self.inputbuf.remaining() < 4 {
149 7396 : return Ok(None);
150 108859 : }
151 :
152 : // peek xl_tot_len at the beginning of the record.
153 : // FIXME: assumes little-endian
154 108859 : let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
155 108859 : if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
156 657 : return Err(WalDecodeError {
157 657 : msg: format!("invalid xl_tot_len {xl_tot_len}"),
158 657 : lsn: self.lsn,
159 657 : });
160 108202 : }
161 : // Fast path for the common case that the whole record fits on the page.
162 108202 : let pageleft = self.lsn.remaining_in_block() as u32;
163 108202 : if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
164 33786 : self.lsn += xl_tot_len as u64;
165 33786 : let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
166 33786 : return Ok(Some(self.complete_record(recordbuf)?));
167 : } else {
168 : // Need to assemble the record from pieces. Remember the size of the
169 : // record, and loop back. On next iterations, we will reach the branch
170 : // below, and copy the part of the record that was on this or next page(s)
171 : // to 'recordbuf'. Subsequent iterations will skip page headers, and
172 : // append the continuations from the next pages to 'recordbuf'.
173 74416 : self.state = State::ReassemblingRecord {
174 74416 : recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
175 74416 : contlen: NonZeroU32::new(xl_tot_len).unwrap(),
176 74416 : }
177 : }
178 : }
179 329435 : State::ReassemblingRecord { recordbuf, contlen } => {
180 : // we're continuing a record, possibly from previous page.
181 329435 : let pageleft = self.lsn.remaining_in_block() as u32;
182 :
183 : // read the rest of the record, or as much as fits on this page.
184 329435 : let n = min(contlen.get(), pageleft) as usize;
185 :
186 329435 : if self.inputbuf.remaining() < n {
187 235649 : return Ok(None);
188 93786 : }
189 :
190 93786 : recordbuf.put(self.inputbuf.split_to(n));
191 93786 : self.lsn += n as u64;
192 93786 : *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
193 19373 : Some(x) => x,
194 : None => {
195 : // The record is now complete.
196 74413 : let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
197 74413 : return Ok(Some(self.complete_record(recordbuf)?));
198 : }
199 : }
200 : }
201 114026 : State::SkippingEverything { skip_until_lsn } => {
202 114026 : assert!(*skip_until_lsn >= self.lsn);
203 114026 : let n = skip_until_lsn.0 - self.lsn.0;
204 114026 : if self.inputbuf.remaining() < n as usize {
205 5828 : return Ok(None);
206 108198 : }
207 108198 : self.inputbuf.advance(n as usize);
208 108198 : self.lsn += n;
209 108198 : self.state = State::WaitingForRecord;
210 : }
211 : }
212 : }
213 364293 : }
214 :
215 108199 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
216 : // We now have a record in the 'recordbuf' local variable.
217 108199 : let xlogrec =
218 108199 : XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
219 0 : WalDecodeError {
220 0 : msg: format!("xlog record deserialization failed {e}"),
221 0 : lsn: self.lsn,
222 0 : }
223 0 : })?;
224 :
225 108199 : let mut crc = 0;
226 108199 : crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
227 108199 : crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
228 108199 : if crc != xlogrec.xl_crc {
229 0 : return Err(WalDecodeError {
230 0 : msg: "WAL record crc mismatch".into(),
231 0 : lsn: self.lsn,
232 0 : });
233 108199 : }
234 :
235 : // XLOG_SWITCH records are special. If we see one, we need to skip
236 : // to the next WAL segment.
237 108199 : let next_lsn = if xlogrec.is_xlog_switch_record() {
238 0 : tracing::trace!("saw xlog switch record at {}", self.lsn);
239 0 : self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
240 : } else {
241 : // Pad to an 8-byte boundary
242 108199 : self.lsn.align()
243 : };
244 108199 : self.state = State::SkippingEverything {
245 108199 : skip_until_lsn: next_lsn,
246 108199 : };
247 :
248 : // We should return LSN of the next record, not the last byte of this record or
249 : // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
250 : // records, the former "spans" until the next WAL segment (see test_xlog_switch).
251 108199 : Ok((next_lsn, recordbuf))
252 108199 : }
253 : }
|