Line data Source code
1 : //!
2 : //! Basic WAL stream decoding.
3 : //!
4 : //! This understands the WAL page and record format, enough to figure out where the WAL record
5 : //! boundaries are, and to reassemble WAL records that cross page boundaries.
6 : //!
7 : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
8 : //! to look deeper into the WAL records to also understand which blocks they modify, the code
9 : //! for that is in pageserver/src/walrecord.rs
10 : //!
11 : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
12 : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
13 : use super::xlog_utils::*;
14 : use crate::WAL_SEGMENT_SIZE;
15 : use bytes::{Buf, BufMut, Bytes, BytesMut};
16 : use crc32c::*;
17 : use log::*;
18 : use std::cmp::min;
19 : use std::num::NonZeroU32;
20 : use utils::lsn::Lsn;
21 :
22 : pub trait WalStreamDecoderHandler {
23 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
24 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
25 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
26 : }
27 :
28 : //
29 : // This is a trick to support several postgres versions simultaneously.
30 : //
31 : // Page decoding code depends on postgres bindings, so it is compiled for each version.
32 : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
33 : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
34 : // Other methods are internal and are not dispatched.
35 : //
36 : // It is similar to having several impl blocks for the same struct,
37 : // but the impls here are in different modules, so need to use a trait.
38 : //
39 : impl WalStreamDecoderHandler for WalStreamDecoder {
40 25358 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
41 25358 : let validate_impl = || {
42 25358 : if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
43 0 : return Err(format!(
44 0 : "invalid xlog page header: xlp_magic={}, expected {}",
45 0 : hdr.xlp_magic, XLOG_PAGE_MAGIC
46 0 : ));
47 25358 : }
48 25358 : if hdr.xlp_pageaddr != self.lsn.0 {
49 0 : return Err(format!(
50 0 : "invalid xlog page header: xlp_pageaddr={}, expected {}",
51 0 : hdr.xlp_pageaddr, self.lsn
52 0 : ));
53 25358 : }
54 25358 : match self.state {
55 : State::WaitingForRecord => {
56 444 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
57 0 : return Err(
58 0 : "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
59 0 : );
60 444 : }
61 444 : if hdr.xlp_rem_len != 0 {
62 0 : return Err(format!(
63 0 : "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
64 0 : hdr.xlp_rem_len
65 0 : ));
66 444 : }
67 : }
68 24914 : State::ReassemblingRecord { contlen, .. } => {
69 24914 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
70 0 : return Err(
71 0 : "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
72 0 : .into(),
73 0 : );
74 24914 : }
75 24914 : if hdr.xlp_rem_len != contlen.get() {
76 0 : return Err(format!(
77 0 : "invalid xlog page header: xlp_rem_len={}, expected {}",
78 0 : hdr.xlp_rem_len,
79 0 : contlen.get()
80 0 : ));
81 24914 : }
82 : }
83 : State::SkippingEverything { .. } => {
84 0 : panic!("Should not be validating page header in the SkippingEverything state");
85 : }
86 : };
87 25358 : Ok(())
88 25358 : };
89 25358 : validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
90 25358 : }
91 :
92 : /// Attempt to decode another WAL record from the input that has been fed to the
93 : /// decoder so far.
94 : ///
95 : /// Returns one of the following:
96 : /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
97 : /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
98 : /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
99 : ///
100 1923757 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
101 : // Run state machine that validates page headers, and reassembles records
102 : // that cross page boundaries.
103 : loop {
104 : // parse and verify page boundaries as we go
105 : // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
106 2864950 : match self.state {
107 : State::WaitingForRecord | State::ReassemblingRecord { .. } => {
108 2351488 : if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
109 : // parse long header
110 :
111 22 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
112 14 : return Ok(None);
113 8 : }
114 :
115 8 : let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
116 8 : |e| WalDecodeError {
117 0 : msg: format!("long header deserialization failed {}", e),
118 0 : lsn: self.lsn,
119 8 : },
120 8 : )?;
121 :
122 8 : self.validate_page_header(&hdr.std)?;
123 :
124 8 : self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
125 2351466 : } else if self.lsn.block_offset() == 0 {
126 35332 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
127 9982 : return Ok(None);
128 25350 : }
129 :
130 25350 : let hdr =
131 25350 : XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
132 0 : WalDecodeError {
133 0 : msg: format!("header deserialization failed {}", e),
134 0 : lsn: self.lsn,
135 0 : }
136 25350 : })?;
137 :
138 25350 : self.validate_page_header(&hdr)?;
139 :
140 25350 : self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
141 2316134 : }
142 : }
143 513462 : State::SkippingEverything { .. } => {}
144 : }
145 : // now read page contents
146 2854954 : match &mut self.state {
147 : State::WaitingForRecord => {
148 : // need to have at least the xl_tot_len field
149 518125 : if self.inputbuf.remaining() < 4 {
150 37194 : return Ok(None);
151 480931 : }
152 480931 :
153 480931 : // peek xl_tot_len at the beginning of the record.
154 480931 : // FIXME: assumes little-endian
155 480931 : let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
156 480931 : if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
157 2425 : return Err(WalDecodeError {
158 2425 : msg: format!("invalid xl_tot_len {}", xl_tot_len),
159 2425 : lsn: self.lsn,
160 2425 : });
161 478506 : }
162 478506 : // Fast path for the common case that the whole record fits on the page.
163 478506 : let pageleft = self.lsn.remaining_in_block() as u32;
164 478506 : if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
165 40727 : self.lsn += xl_tot_len as u64;
166 40727 : let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
167 40727 : return Ok(Some(self.complete_record(recordbuf)?));
168 : } else {
169 : // Need to assemble the record from pieces. Remember the size of the
170 : // record, and loop back. On next iterations, we will reach the branch
171 : // below, and copy the part of the record that was on this or next page(s)
172 : // to 'recordbuf'. Subsequent iterations will skip page headers, and
173 : // append the continuations from the next pages to 'recordbuf'.
174 437779 : self.state = State::ReassemblingRecord {
175 437779 : recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
176 437779 : contlen: NonZeroU32::new(xl_tot_len).unwrap(),
177 437779 : }
178 : }
179 : }
180 1823367 : State::ReassemblingRecord { recordbuf, contlen } => {
181 1823367 : // we're continuing a record, possibly from previous page.
182 1823367 : let pageleft = self.lsn.remaining_in_block() as u32;
183 1823367 :
184 1823367 : // read the rest of the record, or as much as fits on this page.
185 1823367 : let n = min(contlen.get(), pageleft) as usize;
186 1823367 :
187 1823367 : if self.inputbuf.remaining() < n {
188 1360674 : return Ok(None);
189 462693 : }
190 462693 :
191 462693 : recordbuf.put(self.inputbuf.split_to(n));
192 462693 : self.lsn += n as u64;
193 462693 : *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
194 24920 : Some(x) => x,
195 : None => {
196 : // The record is now complete.
197 437773 : let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
198 437773 : return Ok(Some(self.complete_record(recordbuf)?));
199 : }
200 : }
201 : }
202 513462 : State::SkippingEverything { skip_until_lsn } => {
203 513462 : assert!(*skip_until_lsn >= self.lsn);
204 513462 : let n = skip_until_lsn.0 - self.lsn.0;
205 513462 : if self.inputbuf.remaining() < n as usize {
206 34968 : return Ok(None);
207 478494 : }
208 478494 : self.inputbuf.advance(n as usize);
209 478494 : self.lsn += n;
210 478494 : self.state = State::WaitingForRecord;
211 : }
212 : }
213 : }
214 1923757 : }
215 :
216 478500 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
217 : // We now have a record in the 'recordbuf' local variable.
218 478500 : let xlogrec =
219 478500 : XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
220 0 : WalDecodeError {
221 0 : msg: format!("xlog record deserialization failed {}", e),
222 0 : lsn: self.lsn,
223 0 : }
224 478500 : })?;
225 :
226 478500 : let mut crc = 0;
227 478500 : crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
228 478500 : crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
229 478500 : if crc != xlogrec.xl_crc {
230 0 : return Err(WalDecodeError {
231 0 : msg: "WAL record crc mismatch".into(),
232 0 : lsn: self.lsn,
233 0 : });
234 478500 : }
235 :
236 : // XLOG_SWITCH records are special. If we see one, we need to skip
237 : // to the next WAL segment.
238 478500 : let next_lsn = if xlogrec.is_xlog_switch_record() {
239 0 : trace!("saw xlog switch record at {}", self.lsn);
240 0 : self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
241 : } else {
242 : // Pad to an 8-byte boundary
243 478500 : self.lsn.align()
244 : };
245 478500 : self.state = State::SkippingEverything {
246 478500 : skip_until_lsn: next_lsn,
247 478500 : };
248 478500 :
249 478500 : // We should return LSN of the next record, not the last byte of this record or
250 478500 : // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
251 478500 : // records, the former "spans" until the next WAL segment (see test_xlog_switch).
252 478500 : Ok((next_lsn, recordbuf))
253 478500 : }
254 : }
|