Line data Source code
1 : //!
2 : //! Basic WAL stream decoding.
3 : //!
4 : //! This understands the WAL page and record format, enough to figure out where the WAL record
5 : //! boundaries are, and to reassemble WAL records that cross page boundaries.
6 : //!
7 : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
8 : //! to look deeper into the WAL records to also understand which blocks they modify, the code
9 : //! for that is in pageserver/src/walrecord.rs
10 : //!
11 : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
12 : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
13 : use super::xlog_utils::*;
14 : use crate::WAL_SEGMENT_SIZE;
15 : use bytes::{Buf, BufMut, Bytes, BytesMut};
16 : use crc32c::*;
17 : use log::*;
18 : use std::cmp::min;
19 : use std::num::NonZeroU32;
20 : use utils::lsn::Lsn;
21 :
22 : pub trait WalStreamDecoderHandler {
23 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
24 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
25 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
26 : }
27 :
28 : //
29 : // This is a trick to support several postgres versions simultaneously.
30 : //
31 : // Page decoding code depends on postgres bindings, so it is compiled for each version.
32 : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
33 : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
34 : // Other methods are internal and are not dispatched.
35 : //
36 : // It is similar to having several impl blocks for the same struct,
37 : // but the impls here are in different modules, so need to use a trait.
38 : //
39 : impl WalStreamDecoderHandler for WalStreamDecoder {
40 23752 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
41 23752 : let validate_impl = || {
42 23752 : if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
43 0 : return Err(format!(
44 0 : "invalid xlog page header: xlp_magic={}, expected {}",
45 0 : hdr.xlp_magic, XLOG_PAGE_MAGIC
46 0 : ));
47 23752 : }
48 23752 : if hdr.xlp_pageaddr != self.lsn.0 {
49 0 : return Err(format!(
50 0 : "invalid xlog page header: xlp_pageaddr={}, expected {}",
51 0 : hdr.xlp_pageaddr, self.lsn
52 0 : ));
53 23752 : }
54 23752 : match self.state {
55 : State::WaitingForRecord => {
56 304 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
57 0 : return Err(
58 0 : "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
59 0 : );
60 304 : }
61 304 : if hdr.xlp_rem_len != 0 {
62 0 : return Err(format!(
63 0 : "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
64 0 : hdr.xlp_rem_len
65 0 : ));
66 304 : }
67 : }
68 23448 : State::ReassemblingRecord { contlen, .. } => {
69 23448 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
70 0 : return Err(
71 0 : "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
72 0 : .into(),
73 0 : );
74 23448 : }
75 23448 : if hdr.xlp_rem_len != contlen.get() {
76 0 : return Err(format!(
77 0 : "invalid xlog page header: xlp_rem_len={}, expected {}",
78 0 : hdr.xlp_rem_len,
79 0 : contlen.get()
80 0 : ));
81 23448 : }
82 : }
83 : State::SkippingEverything { .. } => {
84 0 : panic!("Should not be validating page header in the SkippingEverything state");
85 : }
86 : };
87 23752 : Ok(())
88 23752 : };
89 23752 : validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
90 23752 : }
91 :
92 : /// Attempt to decode another WAL record from the input that has been fed to the
93 : /// decoder so far.
94 : ///
95 : /// Returns one of the following:
96 : /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
97 : /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
98 : /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
99 : ///
100 1285508 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
101 : // Run state machine that validates page headers, and reassembles records
102 : // that cross page boundaries.
103 : loop {
104 : // parse and verify page boundaries as we go
105 : // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
106 1919580 : match self.state {
107 : State::WaitingForRecord | State::ReassemblingRecord { .. } => {
108 1578783 : if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
109 : // parse long header
110 :
111 20 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
112 12 : return Ok(None);
113 8 : }
114 :
115 8 : let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
116 8 : |e| WalDecodeError {
117 0 : msg: format!("long header deserialization failed {}", e),
118 0 : lsn: self.lsn,
119 8 : },
120 8 : )?;
121 :
122 8 : self.validate_page_header(&hdr.std)?;
123 :
124 8 : self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
125 1578763 : } else if self.lsn.block_offset() == 0 {
126 32383 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
127 8639 : return Ok(None);
128 23744 : }
129 :
130 23744 : let hdr =
131 23744 : XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
132 0 : WalDecodeError {
133 0 : msg: format!("header deserialization failed {}", e),
134 0 : lsn: self.lsn,
135 0 : }
136 23744 : })?;
137 :
138 23744 : self.validate_page_header(&hdr)?;
139 :
140 23744 : self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
141 1546380 : }
142 : }
143 340797 : State::SkippingEverything { .. } => {}
144 : }
145 : // now read page contents
146 1910929 : match &mut self.state {
147 : State::WaitingForRecord => {
148 : // need to have at least the xl_tot_len field
149 342887 : if self.inputbuf.remaining() < 4 {
150 24748 : return Ok(None);
151 318139 : }
152 318139 :
153 318139 : // peek xl_tot_len at the beginning of the record.
154 318139 : // FIXME: assumes little-endian
155 318139 : let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
156 318139 : if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
157 644 : return Err(WalDecodeError {
158 644 : msg: format!("invalid xl_tot_len {}", xl_tot_len),
159 644 : lsn: self.lsn,
160 644 : });
161 317495 : }
162 317495 : // Fast path for the common case that the whole record fits on the page.
163 317495 : let pageleft = self.lsn.remaining_in_block() as u32;
164 317495 : if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
165 24362 : self.lsn += xl_tot_len as u64;
166 24362 : let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
167 24362 : return Ok(Some(self.complete_record(recordbuf)?));
168 : } else {
169 : // Need to assemble the record from pieces. Remember the size of the
170 : // record, and loop back. On next iterations, we will reach the branch
171 : // below, and copy the part of the record that was on this or next page(s)
172 : // to 'recordbuf'. Subsequent iterations will skip page headers, and
173 : // append the continuations from the next pages to 'recordbuf'.
174 293133 : self.state = State::ReassemblingRecord {
175 293133 : recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
176 293133 : contlen: NonZeroU32::new(xl_tot_len).unwrap(),
177 293133 : }
178 : }
179 : }
180 1227245 : State::ReassemblingRecord { recordbuf, contlen } => {
181 1227245 : // we're continuing a record, possibly from previous page.
182 1227245 : let pageleft = self.lsn.remaining_in_block() as u32;
183 1227245 :
184 1227245 : // read the rest of the record, or as much as fits on this page.
185 1227245 : let n = min(contlen.get(), pageleft) as usize;
186 1227245 :
187 1227245 : if self.inputbuf.remaining() < n {
188 910664 : return Ok(None);
189 316581 : }
190 316581 :
191 316581 : recordbuf.put(self.inputbuf.split_to(n));
192 316581 : self.lsn += n as u64;
193 316581 : *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
194 23454 : Some(x) => x,
195 : None => {
196 : // The record is now complete.
197 293127 : let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
198 293127 : return Ok(Some(self.complete_record(recordbuf)?));
199 : }
200 : }
201 : }
202 340797 : State::SkippingEverything { skip_until_lsn } => {
203 340797 : assert!(*skip_until_lsn >= self.lsn);
204 340797 : let n = skip_until_lsn.0 - self.lsn.0;
205 340797 : if self.inputbuf.remaining() < n as usize {
206 23312 : return Ok(None);
207 317485 : }
208 317485 : self.inputbuf.advance(n as usize);
209 317485 : self.lsn += n;
210 317485 : self.state = State::WaitingForRecord;
211 : }
212 : }
213 : }
214 1285508 : }
215 :
216 317489 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
217 : // We now have a record in the 'recordbuf' local variable.
218 317489 : let xlogrec =
219 317489 : XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
220 0 : WalDecodeError {
221 0 : msg: format!("xlog record deserialization failed {}", e),
222 0 : lsn: self.lsn,
223 0 : }
224 317489 : })?;
225 :
226 317489 : let mut crc = 0;
227 317489 : crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
228 317489 : crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
229 317489 : if crc != xlogrec.xl_crc {
230 0 : return Err(WalDecodeError {
231 0 : msg: "WAL record crc mismatch".into(),
232 0 : lsn: self.lsn,
233 0 : });
234 317489 : }
235 :
236 : // XLOG_SWITCH records are special. If we see one, we need to skip
237 : // to the next WAL segment.
238 317489 : let next_lsn = if xlogrec.is_xlog_switch_record() {
239 0 : trace!("saw xlog switch record at {}", self.lsn);
240 0 : self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
241 : } else {
242 : // Pad to an 8-byte boundary
243 317489 : self.lsn.align()
244 : };
245 317489 : self.state = State::SkippingEverything {
246 317489 : skip_until_lsn: next_lsn,
247 317489 : };
248 317489 :
249 317489 : // We should return LSN of the next record, not the last byte of this record or
250 317489 : // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
251 317489 : // records, the former "spans" until the next WAL segment (see test_xlog_switch).
252 317489 : Ok((next_lsn, recordbuf))
253 317489 : }
254 : }
|