Line data Source code
1 : //!
2 : //! Basic WAL stream decoding.
3 : //!
4 : //! This understands the WAL page and record format, enough to figure out where the WAL record
5 : //! boundaries are, and to reassemble WAL records that cross page boundaries.
6 : //!
7 : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
8 : //! to look deeper into the WAL records to also understand which blocks they modify, the code
9 : //! for that is in pageserver/src/walrecord.rs
10 : //!
11 : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
12 : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
13 : use super::xlog_utils::*;
14 : use crate::WAL_SEGMENT_SIZE;
15 : use bytes::{Buf, BufMut, Bytes, BytesMut};
16 : use crc32c::*;
17 : use log::*;
18 : use std::cmp::min;
19 : use std::num::NonZeroU32;
20 : use utils::lsn::Lsn;
21 :
22 : pub trait WalStreamDecoderHandler {
23 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
24 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
25 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
26 : }
27 :
28 : //
29 : // This is a trick to support several postgres versions simultaneously.
30 : //
31 : // Page decoding code depends on postgres bindings, so it is compiled for each version.
32 : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
33 : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
34 : // Other methods are internal and are not dispatched.
35 : //
36 : // It is similar to having several impl blocks for the same struct,
37 : // but the impls here are in different modules, so need to use a trait.
38 : //
39 : impl WalStreamDecoderHandler for WalStreamDecoder {
40 19522 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
41 19522 : let validate_impl = || {
42 19522 : if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
43 0 : return Err(format!(
44 0 : "invalid xlog page header: xlp_magic={}, expected {}",
45 0 : hdr.xlp_magic, XLOG_PAGE_MAGIC
46 0 : ));
47 19522 : }
48 19522 : if hdr.xlp_pageaddr != self.lsn.0 {
49 0 : return Err(format!(
50 0 : "invalid xlog page header: xlp_pageaddr={}, expected {}",
51 0 : hdr.xlp_pageaddr, self.lsn
52 0 : ));
53 19522 : }
54 19522 : match self.state {
55 : State::WaitingForRecord => {
56 160 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
57 0 : return Err(
58 0 : "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
59 0 : );
60 160 : }
61 160 : if hdr.xlp_rem_len != 0 {
62 0 : return Err(format!(
63 0 : "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
64 0 : hdr.xlp_rem_len
65 0 : ));
66 160 : }
67 : }
68 19362 : State::ReassemblingRecord { contlen, .. } => {
69 19362 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
70 0 : return Err(
71 0 : "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
72 0 : .into(),
73 0 : );
74 19362 : }
75 19362 : if hdr.xlp_rem_len != contlen.get() {
76 0 : return Err(format!(
77 0 : "invalid xlog page header: xlp_rem_len={}, expected {}",
78 0 : hdr.xlp_rem_len,
79 0 : contlen.get()
80 0 : ));
81 19362 : }
82 : }
83 : State::SkippingEverything { .. } => {
84 0 : panic!("Should not be validating page header in the SkippingEverything state");
85 : }
86 : };
87 19522 : Ok(())
88 19522 : };
89 19522 : validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
90 19522 : }
91 :
92 : /// Attempt to decode another WAL record from the input that has been fed to the
93 : /// decoder so far.
94 : ///
95 : /// Returns one of the following:
96 : /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
97 : /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
98 : /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
99 : ///
100 662781 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
101 : // Run state machine that validates page headers, and reassembles records
102 : // that cross page boundaries.
103 : loop {
104 : // parse and verify page boundaries as we go
105 : // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
106 998431 : match self.state {
107 : State::WaitingForRecord | State::ReassemblingRecord { .. } => {
108 816532 : if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
109 : // parse long header
110 :
111 18 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
112 10 : return Ok(None);
113 8 : }
114 :
115 8 : let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
116 8 : |e| WalDecodeError {
117 0 : msg: format!("long header deserialization failed {}", e),
118 0 : lsn: self.lsn,
119 8 : },
120 8 : )?;
121 :
122 8 : self.validate_page_header(&hdr.std)?;
123 :
124 8 : self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
125 816514 : } else if self.lsn.block_offset() == 0 {
126 26716 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
127 7202 : return Ok(None);
128 19514 : }
129 :
130 19514 : let hdr =
131 19514 : XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
132 0 : WalDecodeError {
133 0 : msg: format!("header deserialization failed {}", e),
134 0 : lsn: self.lsn,
135 0 : }
136 19514 : })?;
137 :
138 19514 : self.validate_page_header(&hdr)?;
139 :
140 19514 : self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
141 789798 : }
142 : }
143 181899 : State::SkippingEverything { .. } => {}
144 : }
145 : // now read page contents
146 991219 : match &mut self.state {
147 : State::WaitingForRecord => {
148 : // need to have at least the xl_tot_len field
149 183261 : if self.inputbuf.remaining() < 4 {
150 12358 : return Ok(None);
151 170903 : }
152 170903 :
153 170903 : // peek xl_tot_len at the beginning of the record.
154 170903 : // FIXME: assumes little-endian
155 170903 : let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
156 170903 : if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
157 656 : return Err(WalDecodeError {
158 656 : msg: format!("invalid xl_tot_len {}", xl_tot_len),
159 656 : lsn: self.lsn,
160 656 : });
161 170247 : }
162 170247 : // Fast path for the common case that the whole record fits on the page.
163 170247 : let pageleft = self.lsn.remaining_in_block() as u32;
164 170247 : if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
165 24204 : self.lsn += xl_tot_len as u64;
166 24204 : let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
167 24204 : return Ok(Some(self.complete_record(recordbuf)?));
168 : } else {
169 : // Need to assemble the record from pieces. Remember the size of the
170 : // record, and loop back. On next iterations, we will reach the branch
171 : // below, and copy the part of the record that was on this or next page(s)
172 : // to 'recordbuf'. Subsequent iterations will skip page headers, and
173 : // append the continuations from the next pages to 'recordbuf'.
174 146043 : self.state = State::ReassemblingRecord {
175 146043 : recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
176 146043 : contlen: NonZeroU32::new(xl_tot_len).unwrap(),
177 146043 : }
178 : }
179 : }
180 626059 : State::ReassemblingRecord { recordbuf, contlen } => {
181 626059 : // we're continuing a record, possibly from previous page.
182 626059 : let pageleft = self.lsn.remaining_in_block() as u32;
183 626059 :
184 626059 : // read the rest of the record, or as much as fits on this page.
185 626059 : let n = min(contlen.get(), pageleft) as usize;
186 626059 :
187 626059 : if self.inputbuf.remaining() < n {
188 460654 : return Ok(None);
189 165405 : }
190 165405 :
191 165405 : recordbuf.put(self.inputbuf.split_to(n));
192 165405 : self.lsn += n as u64;
193 165405 : *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
194 19364 : Some(x) => x,
195 : None => {
196 : // The record is now complete.
197 146041 : let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
198 146041 : return Ok(Some(self.complete_record(recordbuf)?));
199 : }
200 : }
201 : }
202 181899 : State::SkippingEverything { skip_until_lsn } => {
203 181899 : assert!(*skip_until_lsn >= self.lsn);
204 181899 : let n = skip_until_lsn.0 - self.lsn.0;
205 181899 : if self.inputbuf.remaining() < n as usize {
206 11656 : return Ok(None);
207 170243 : }
208 170243 : self.inputbuf.advance(n as usize);
209 170243 : self.lsn += n;
210 170243 : self.state = State::WaitingForRecord;
211 : }
212 : }
213 : }
214 662781 : }
215 :
216 170245 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
217 : // We now have a record in the 'recordbuf' local variable.
218 170245 : let xlogrec =
219 170245 : XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
220 0 : WalDecodeError {
221 0 : msg: format!("xlog record deserialization failed {}", e),
222 0 : lsn: self.lsn,
223 0 : }
224 170245 : })?;
225 :
226 170245 : let mut crc = 0;
227 170245 : crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
228 170245 : crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
229 170245 : if crc != xlogrec.xl_crc {
230 0 : return Err(WalDecodeError {
231 0 : msg: "WAL record crc mismatch".into(),
232 0 : lsn: self.lsn,
233 0 : });
234 170245 : }
235 :
236 : // XLOG_SWITCH records are special. If we see one, we need to skip
237 : // to the next WAL segment.
238 170245 : let next_lsn = if xlogrec.is_xlog_switch_record() {
239 0 : trace!("saw xlog switch record at {}", self.lsn);
240 0 : self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
241 : } else {
242 : // Pad to an 8-byte boundary
243 170245 : self.lsn.align()
244 : };
245 170245 : self.state = State::SkippingEverything {
246 170245 : skip_until_lsn: next_lsn,
247 170245 : };
248 170245 :
249 170245 : // We should return LSN of the next record, not the last byte of this record or
250 170245 : // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
251 170245 : // records, the former "spans" until the next WAL segment (see test_xlog_switch).
252 170245 : Ok((next_lsn, recordbuf))
253 170245 : }
254 : }
|