Line data Source code
1 : //!
2 : //! Basic WAL stream decoding.
3 : //!
4 : //! This understands the WAL page and record format, enough to figure out where the WAL record
5 : //! boundaries are, and to reassemble WAL records that cross page boundaries.
6 : //!
7 : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
8 : //! to look deeper into the WAL records to also understand which blocks they modify, the code
9 : //! for that is in pageserver/src/walrecord.rs
10 : //!
11 : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
12 : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
13 : use super::xlog_utils::*;
14 : use crate::WAL_SEGMENT_SIZE;
15 : use bytes::{Buf, BufMut, Bytes, BytesMut};
16 : use crc32c::*;
17 : use log::*;
18 : use std::cmp::min;
19 : use std::num::NonZeroU32;
20 : use utils::lsn::Lsn;
21 :
22 : pub trait WalStreamDecoderHandler {
23 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
24 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
25 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
26 : }
27 :
28 : //
29 : // This is a trick to support several postgres versions simultaneously.
30 : //
31 : // Page decoding code depends on postgres bindings, so it is compiled for each version.
32 : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
33 : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
34 : // Other methods are internal and are not dispatched.
35 : //
36 : // It is similar to having several impl blocks for the same struct,
37 : // but the impls here are in different modules, so need to use a trait.
38 : //
39 : impl WalStreamDecoderHandler for WalStreamDecoder {
40 3275619 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
41 3275619 : let validate_impl = || {
42 3275619 : if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
43 1 : return Err(format!(
44 1 : "invalid xlog page header: xlp_magic={}, expected {}",
45 1 : hdr.xlp_magic, XLOG_PAGE_MAGIC
46 1 : ));
47 3275618 : }
48 3275618 : if hdr.xlp_pageaddr != self.lsn.0 {
49 0 : return Err(format!(
50 0 : "invalid xlog page header: xlp_pageaddr={}, expected {}",
51 0 : hdr.xlp_pageaddr, self.lsn
52 0 : ));
53 3275618 : }
54 3275618 : match self.state {
55 : State::WaitingForRecord => {
56 155493 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
57 0 : return Err(
58 0 : "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
59 0 : );
60 155493 : }
61 155493 : if hdr.xlp_rem_len != 0 {
62 0 : return Err(format!(
63 0 : "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
64 0 : hdr.xlp_rem_len
65 0 : ));
66 155493 : }
67 : }
68 3120125 : State::ReassemblingRecord { contlen, .. } => {
69 3120125 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
70 0 : return Err(
71 0 : "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
72 0 : .into(),
73 0 : );
74 3120125 : }
75 3120125 : if hdr.xlp_rem_len != contlen.get() {
76 0 : return Err(format!(
77 0 : "invalid xlog page header: xlp_rem_len={}, expected {}",
78 0 : hdr.xlp_rem_len,
79 0 : contlen.get()
80 0 : ));
81 3120125 : }
82 : }
83 : State::SkippingEverything { .. } => {
84 0 : panic!("Should not be validating page header in the SkippingEverything state");
85 : }
86 : };
87 3275618 : Ok(())
88 3275619 : };
89 3275619 : validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
90 3275619 : }
91 :
92 : /// Attempt to decode another WAL record from the input that has been fed to the
93 : /// decoder so far.
94 : ///
95 : /// Returns one of the following:
96 : /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
97 : /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
98 : /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
99 : ///
100 161428818 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
101 : // Run state machine that validates page headers, and reassembles records
102 : // that cross page boundaries.
103 326198012 : loop {
104 326198012 : // parse and verify page boundaries as we go
105 326198012 : // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
106 326198012 : match self.state {
107 : State::WaitingForRecord | State::ReassemblingRecord { .. } => {
108 167790614 : if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
109 : // parse long header
110 :
111 2438 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
112 865 : return Ok(None);
113 1573 : }
114 :
115 1573 : let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
116 1573 : |e| WalDecodeError {
117 0 : msg: format!("long header deserialization failed {}", e),
118 0 : lsn: self.lsn,
119 1573 : },
120 1573 : )?;
121 :
122 1573 : self.validate_page_header(&hdr.std)?;
123 :
124 1573 : self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
125 167788176 : } else if self.lsn.block_offset() == 0 {
126 3387683 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
127 113637 : return Ok(None);
128 3274046 : }
129 :
130 3274046 : let hdr =
131 3274046 : XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
132 0 : WalDecodeError {
133 0 : msg: format!("header deserialization failed {}", e),
134 0 : lsn: self.lsn,
135 0 : }
136 3274046 : })?;
137 :
138 3274046 : self.validate_page_header(&hdr)?;
139 :
140 3274045 : self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
141 164400493 : }
142 : }
143 158407398 : State::SkippingEverything { .. } => {}
144 : }
145 : // now read page contents
146 326083509 : match &mut self.state {
147 : State::WaitingForRecord => {
148 : // need to have at least the xl_tot_len field
149 160773995 : if self.inputbuf.remaining() < 4 {
150 2382612 : return Ok(None);
151 158391383 : }
152 158391383 :
153 158391383 : // peek xl_tot_len at the beginning of the record.
154 158391383 : // FIXME: assumes little-endian
155 158391383 : let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
156 158391383 : if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
157 220 : return Err(WalDecodeError {
158 220 : msg: format!("invalid xl_tot_len {}", xl_tot_len),
159 220 : lsn: self.lsn,
160 220 : });
161 158391163 : }
162 158391163 : // Fast path for the common case that the whole record fits on the page.
163 158391163 : let pageleft = self.lsn.remaining_in_block() as u32;
164 158391163 : if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
165 155132654 : self.lsn += xl_tot_len as u64;
166 155132654 : let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
167 155132654 : return Ok(Some(self.complete_record(recordbuf)?));
168 : } else {
169 : // Need to assemble the record from pieces. Remember the size of the
170 : // record, and loop back. On next iterations, we will reach the branch
171 : // below, and copy the part of the record that was on this or next page(s)
172 : // to 'recordbuf'. Subsequent iterations will skip page headers, and
173 : // append the continuations from the next pages to 'recordbuf'.
174 3258509 : self.state = State::ReassemblingRecord {
175 3258509 : recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
176 3258509 : contlen: NonZeroU32::new(xl_tot_len).unwrap(),
177 3258509 : }
178 : }
179 : }
180 6902116 : State::ReassemblingRecord { recordbuf, contlen } => {
181 6902116 : // we're continuing a record, possibly from previous page.
182 6902116 : let pageleft = self.lsn.remaining_in_block() as u32;
183 6902116 :
184 6902116 : // read the rest of the record, or as much as fits on this page.
185 6902116 : let n = min(contlen.get(), pageleft) as usize;
186 6902116 :
187 6902116 : if self.inputbuf.remaining() < n {
188 523482 : return Ok(None);
189 6378634 : }
190 6378634 :
191 6378634 : recordbuf.put(self.inputbuf.split_to(n));
192 6378634 : self.lsn += n as u64;
193 6378634 : *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
194 3120166 : Some(x) => x,
195 : None => {
196 : // The record is now complete.
197 3258468 : let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
198 3258468 : return Ok(Some(self.complete_record(recordbuf)?));
199 : }
200 : }
201 : }
202 158407398 : State::SkippingEverything { skip_until_lsn } => {
203 158407398 : assert!(*skip_until_lsn >= self.lsn);
204 158407398 : let n = skip_until_lsn.0 - self.lsn.0;
205 158407398 : if self.inputbuf.remaining() < n as usize {
206 16879 : return Ok(None);
207 158390519 : }
208 158390519 : self.inputbuf.advance(n as usize);
209 158390519 : self.lsn += n;
210 158390519 : self.state = State::WaitingForRecord;
211 : }
212 : }
213 : }
214 161428818 : }
215 :
216 158391122 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
217 : // We now have a record in the 'recordbuf' local variable.
218 158391122 : let xlogrec =
219 158391122 : XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
220 0 : WalDecodeError {
221 0 : msg: format!("xlog record deserialization failed {}", e),
222 0 : lsn: self.lsn,
223 0 : }
224 158391122 : })?;
225 :
226 158391122 : let mut crc = 0;
227 158391122 : crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
228 158391122 : crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
229 158391122 : if crc != xlogrec.xl_crc {
230 0 : return Err(WalDecodeError {
231 0 : msg: "WAL record crc mismatch".into(),
232 0 : lsn: self.lsn,
233 0 : });
234 158391122 : }
235 :
236 : // XLOG_SWITCH records are special. If we see one, we need to skip
237 : // to the next WAL segment.
238 158391122 : let next_lsn = if xlogrec.is_xlog_switch_record() {
239 14 : trace!("saw xlog switch record at {}", self.lsn);
240 14 : self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
241 : } else {
242 : // Pad to an 8-byte boundary
243 158391108 : self.lsn.align()
244 : };
245 158391122 : self.state = State::SkippingEverything {
246 158391122 : skip_until_lsn: next_lsn,
247 158391122 : };
248 158391122 :
249 158391122 : // We should return LSN of the next record, not the last byte of this record or
250 158391122 : // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
251 158391122 : // records, the former "spans" until the next WAL segment (see test_xlog_switch).
252 158391122 : Ok((next_lsn, recordbuf))
253 158391122 : }
254 : }
|