Line data Source code
1 : //!
2 : //! Basic WAL stream decoding.
3 : //!
4 : //! This understands the WAL page and record format, enough to figure out where the WAL record
5 : //! boundaries are, and to reassemble WAL records that cross page boundaries.
6 : //!
7 : //! This functionality is needed by both the pageserver and the safekeepers. The pageserver needs
8 : //! to look deeper into the WAL records to also understand which blocks they modify, the code
9 : //! for that is in pageserver/src/walrecord.rs
10 : //!
11 : use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
12 : use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
13 : use super::xlog_utils::*;
14 : use crate::WAL_SEGMENT_SIZE;
15 : use bytes::{Buf, BufMut, Bytes, BytesMut};
16 : use crc32c::*;
17 : use log::*;
18 : use std::cmp::min;
19 : use std::num::NonZeroU32;
20 : use utils::lsn::Lsn;
21 :
22 : pub trait WalStreamDecoderHandler {
23 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
24 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
25 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
26 : }
27 :
28 : //
29 : // This is a trick to support several postgres versions simultaneously.
30 : //
31 : // Page decoding code depends on postgres bindings, so it is compiled for each version.
32 : // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
33 : // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
34 : // Other methods are internal and are not dispatched.
35 : //
36 : // It is similar to having several impl blocks for the same struct,
37 : // but the impls here are in different modules, so need to use a trait.
38 : //
39 : impl WalStreamDecoderHandler for WalStreamDecoder {
40 3306609 : fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
41 3306609 : let validate_impl = || {
42 3306609 : if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
43 1 : return Err(format!(
44 1 : "invalid xlog page header: xlp_magic={}, expected {}",
45 1 : hdr.xlp_magic, XLOG_PAGE_MAGIC
46 1 : ));
47 3306608 : }
48 3306608 : if hdr.xlp_pageaddr != self.lsn.0 {
49 0 : return Err(format!(
50 0 : "invalid xlog page header: xlp_pageaddr={}, expected {}",
51 0 : hdr.xlp_pageaddr, self.lsn
52 0 : ));
53 3306608 : }
54 3306608 : match self.state {
55 : State::WaitingForRecord => {
56 154745 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
57 0 : return Err(
58 0 : "invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
59 0 : );
60 154745 : }
61 154745 : if hdr.xlp_rem_len != 0 {
62 0 : return Err(format!(
63 0 : "invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
64 0 : hdr.xlp_rem_len
65 0 : ));
66 154745 : }
67 : }
68 3151863 : State::ReassemblingRecord { contlen, .. } => {
69 3151863 : if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
70 0 : return Err(
71 0 : "invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
72 0 : .into(),
73 0 : );
74 3151863 : }
75 3151863 : if hdr.xlp_rem_len != contlen.get() {
76 0 : return Err(format!(
77 0 : "invalid xlog page header: xlp_rem_len={}, expected {}",
78 0 : hdr.xlp_rem_len,
79 0 : contlen.get()
80 0 : ));
81 3151863 : }
82 : }
83 : State::SkippingEverything { .. } => {
84 0 : panic!("Should not be validating page header in the SkippingEverything state");
85 : }
86 : };
87 3306608 : Ok(())
88 3306609 : };
89 3306609 : validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
90 3306609 : }
91 :
92 : /// Attempt to decode another WAL record from the input that has been fed to the
93 : /// decoder so far.
94 : ///
95 : /// Returns one of the following:
96 : /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
97 : /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
98 : /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
99 : ///
100 160992493 : fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
101 : // Run state machine that validates page headers, and reassembles records
102 : // that cross page boundaries.
103 325437123 : loop {
104 325437123 : // parse and verify page boundaries as we go
105 325437123 : // However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
106 325437123 : match self.state {
107 : State::WaitingForRecord | State::ReassemblingRecord { .. } => {
108 167418105 : if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
109 : // parse long header
110 :
111 2445 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
112 860 : return Ok(None);
113 1585 : }
114 :
115 1585 : let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
116 1585 : |e| WalDecodeError {
117 0 : msg: format!("long header deserialization failed {}", e),
118 0 : lsn: self.lsn,
119 1585 : },
120 1585 : )?;
121 :
122 1585 : self.validate_page_header(&hdr.std)?;
123 :
124 1585 : self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
125 167415660 : } else if self.lsn.block_offset() == 0 {
126 3415035 : if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
127 110011 : return Ok(None);
128 3305024 : }
129 :
130 3305024 : let hdr =
131 3305024 : XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
132 0 : WalDecodeError {
133 0 : msg: format!("header deserialization failed {}", e),
134 0 : lsn: self.lsn,
135 0 : }
136 3305024 : })?;
137 :
138 3305024 : self.validate_page_header(&hdr)?;
139 :
140 3305023 : self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
141 164000625 : }
142 : }
143 158019018 : State::SkippingEverything { .. } => {}
144 : }
145 : // now read page contents
146 325326251 : match &mut self.state {
147 : State::WaitingForRecord => {
148 : // need to have at least the xl_tot_len field
149 160338833 : if self.inputbuf.remaining() < 4 {
150 2335816 : return Ok(None);
151 158003017 : }
152 158003017 :
153 158003017 : // peek xl_tot_len at the beginning of the record.
154 158003017 : // FIXME: assumes little-endian
155 158003017 : let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
156 158003017 : if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
157 223 : return Err(WalDecodeError {
158 223 : msg: format!("invalid xl_tot_len {}", xl_tot_len),
159 223 : lsn: self.lsn,
160 223 : });
161 158002794 : }
162 158002794 : // Fast path for the common case that the whole record fits on the page.
163 158002794 : let pageleft = self.lsn.remaining_in_block() as u32;
164 158002794 : if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
165 154712220 : self.lsn += xl_tot_len as u64;
166 154712220 : let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
167 154712220 : return Ok(Some(self.complete_record(recordbuf)?));
168 : } else {
169 : // Need to assemble the record from pieces. Remember the size of the
170 : // record, and loop back. On next iterations, we will reach the branch
171 : // below, and copy the part of the record that was on this or next page(s)
172 : // to 'recordbuf'. Subsequent iterations will skip page headers, and
173 : // append the continuations from the next pages to 'recordbuf'.
174 3290574 : self.state = State::ReassemblingRecord {
175 3290574 : recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
176 3290574 : contlen: NonZeroU32::new(xl_tot_len).unwrap(),
177 3290574 : }
178 : }
179 : }
180 6968400 : State::ReassemblingRecord { recordbuf, contlen } => {
181 6968400 : // we're continuing a record, possibly from previous page.
182 6968400 : let pageleft = self.lsn.remaining_in_block() as u32;
183 6968400 :
184 6968400 : // read the rest of the record, or as much as fits on this page.
185 6968400 : let n = min(contlen.get(), pageleft) as usize;
186 6968400 :
187 6968400 : if self.inputbuf.remaining() < n {
188 525963 : return Ok(None);
189 6442437 : }
190 6442437 :
191 6442437 : recordbuf.put(self.inputbuf.split_to(n));
192 6442437 : self.lsn += n as u64;
193 6442437 : *contlen = match NonZeroU32::new(contlen.get() - n as u32) {
194 3151916 : Some(x) => x,
195 : None => {
196 : // The record is now complete.
197 3290521 : let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
198 3290521 : return Ok(Some(self.complete_record(recordbuf)?));
199 : }
200 : }
201 : }
202 158019018 : State::SkippingEverything { skip_until_lsn } => {
203 158019018 : assert!(*skip_until_lsn >= self.lsn);
204 158019018 : let n = skip_until_lsn.0 - self.lsn.0;
205 158019018 : if self.inputbuf.remaining() < n as usize {
206 16878 : return Ok(None);
207 158002140 : }
208 158002140 : self.inputbuf.advance(n as usize);
209 158002140 : self.lsn += n;
210 158002140 : self.state = State::WaitingForRecord;
211 : }
212 : }
213 : }
214 160992492 : }
215 :
216 158002741 : fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
217 : // We now have a record in the 'recordbuf' local variable.
218 158002741 : let xlogrec =
219 158002741 : XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
220 0 : WalDecodeError {
221 0 : msg: format!("xlog record deserialization failed {}", e),
222 0 : lsn: self.lsn,
223 0 : }
224 158002741 : })?;
225 :
226 158002741 : let mut crc = 0;
227 158002741 : crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
228 158002741 : crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
229 158002741 : if crc != xlogrec.xl_crc {
230 0 : return Err(WalDecodeError {
231 0 : msg: "WAL record crc mismatch".into(),
232 0 : lsn: self.lsn,
233 0 : });
234 158002741 : }
235 :
236 : // XLOG_SWITCH records are special. If we see one, we need to skip
237 : // to the next WAL segment.
238 158002741 : let next_lsn = if xlogrec.is_xlog_switch_record() {
239 16 : trace!("saw xlog switch record at {}", self.lsn);
240 15 : self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
241 : } else {
242 : // Pad to an 8-byte boundary
243 158002725 : self.lsn.align()
244 : };
245 158002740 : self.state = State::SkippingEverything {
246 158002740 : skip_until_lsn: next_lsn,
247 158002740 : };
248 158002740 :
249 158002740 : // We should return LSN of the next record, not the last byte of this record or
250 158002740 : // the byte immediately after. Note that this handles both XLOG_SWITCH and usual
251 158002740 : // records, the former "spans" until the next WAL segment (see test_xlog_switch).
252 158002740 : Ok((next_lsn, recordbuf))
253 158002740 : }
254 : }
|