Line data Source code
1 : use std::ffi::{CStr, CString};
2 :
3 : use bytes::{Bytes, BytesMut};
4 : use crc32c::crc32c_append;
5 : use utils::lsn::Lsn;
6 :
7 : use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
8 : use super::xlog_utils::{
9 : XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE,
10 : XLP_FIRST_IS_CONTRECORD,
11 : };
12 : use super::XLogRecord;
13 : use crate::pg_constants::{
14 : RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
15 : XLR_BLOCK_ID_DATA_SHORT,
16 : };
17 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
18 :
19 : /// A WAL record payload. Will be prefixed by an XLogRecord header when encoded.
20 : pub struct Record {
21 : pub rmid: RmgrId,
22 : pub info: u8,
23 : pub data: Bytes,
24 : }
25 :
26 : impl Record {
27 : /// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of
28 : /// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres.
29 11927 : pub fn encode(&self, prev_lsn: Lsn) -> Bytes {
30 : // Prefix data with block ID and length.
31 11927 : let data_header = Bytes::from(match self.data.len() {
32 0 : 0 => vec![],
33 11927 : 1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8],
34 600 : 256.. => {
35 600 : let len_bytes = (self.data.len() as u32).to_le_bytes();
36 600 : [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
37 : }
38 : });
39 :
40 : // Construct the WAL record header.
41 11927 : let mut header = XLogRecord {
42 11927 : xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
43 11927 : xl_xid: 0,
44 11927 : xl_prev: prev_lsn.into(),
45 11927 : xl_info: self.info,
46 11927 : xl_rmid: self.rmid,
47 11927 : __bindgen_padding_0: [0; 2],
48 11927 : xl_crc: 0, // see below
49 11927 : };
50 11927 :
51 11927 : // Compute the CRC checksum for the data, and the header up to the CRC field.
52 11927 : let mut crc = 0;
53 11927 : crc = crc32c_append(crc, &data_header);
54 11927 : crc = crc32c_append(crc, &self.data);
55 11927 : crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
56 11927 : header.xl_crc = crc;
57 11927 :
58 11927 : // Encode the final header and record.
59 11927 : let header = header.encode().unwrap();
60 11927 :
61 11927 : [header, data_header, self.data.clone()].concat().into()
62 11927 : }
63 : }
64 :
65 : /// Generates WAL record payloads.
66 : ///
67 : /// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator
68 : /// that creates a table and inserts rows.
69 : pub trait RecordGenerator: Iterator<Item = Record> {}
70 :
71 : impl<I: Iterator<Item = Record>> RecordGenerator for I {}
72 :
73 : /// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs
74 : /// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record,
75 : /// including internal page headers if it spans pages. Concatenating the bytes will yield a
76 : /// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized
77 : /// for performance.
78 : ///
79 : /// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this
80 : /// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`).
81 : ///
82 : /// A WAL is split into 16 MB segments. Each segment is split into 8 KB pages, with headers.
83 : /// Records are arbitrary length, 8-byte aligned, and may span pages. The layout is e.g.:
84 : ///
85 : /// | Segment 1 | Segment 2 | Segment 3 |
86 : /// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 |
87 : /// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 |
88 : #[derive(Default)]
89 : pub struct WalGenerator<R: RecordGenerator> {
90 : /// Generates record payloads for the WAL.
91 : pub record_generator: R,
92 : /// Current LSN to append the next record at.
93 : ///
94 : /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should
95 : /// ensure that the LSN is on a valid record boundary (i.e. we can't start appending in the
96 : /// middle on an existing record or header, or beyond the end of the existing WAL).
97 : pub lsn: Lsn,
98 : /// The starting LSN of the previous record. Used in WAL record headers. The Safekeeper doesn't
99 : /// care about this, unlike Postgres, but we include it for completeness.
100 : pub prev_lsn: Lsn,
101 : }
102 :
103 : impl<R: RecordGenerator> WalGenerator<R> {
104 : // Hardcode the sys and timeline ID. We can make them configurable if we care about them.
105 : const SYS_ID: u64 = 0;
106 : const TIMELINE_ID: u32 = 1;
107 :
108 : /// Creates a new WAL generator with the given record generator.
109 8782 : pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
110 8782 : Self {
111 8782 : record_generator,
112 8782 : lsn: start_lsn,
113 8782 : prev_lsn: start_lsn,
114 8782 : }
115 8782 : }
116 :
117 : /// Appends a record with an arbitrary payload at the current LSN, then increments the LSN.
118 : /// Returns the WAL bytes for the record, including page headers and padding, and the start LSN.
119 11923 : fn append_record(&mut self, record: Record) -> (Lsn, Bytes) {
120 11923 : let record = record.encode(self.prev_lsn);
121 11923 : let record = Self::insert_pages(record, self.lsn);
122 11923 : let record = Self::pad_record(record, self.lsn);
123 11923 : let lsn = self.lsn;
124 11923 : self.prev_lsn = self.lsn;
125 11923 : self.lsn += record.len() as u64;
126 11923 : (lsn, record)
127 11923 : }
128 :
129 : /// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record
130 : /// is to be appended.
131 11923 : fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
132 11923 : // Fast path: record fits in current page, and the page already has a header.
133 11923 : if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 {
134 11243 : return record;
135 680 : }
136 680 :
137 680 : let mut pages = BytesMut::new();
138 680 : let mut remaining = record.clone(); // Bytes::clone() is cheap
139 2037 : while !remaining.is_empty() {
140 : // At new page boundary, inject page header.
141 1357 : if lsn.block_offset() == 0 {
142 686 : let mut page_header = XLogPageHeaderData {
143 686 : xlp_magic: XLOG_PAGE_MAGIC as u16,
144 686 : xlp_info: XLP_BKP_REMOVABLE,
145 686 : xlp_tli: Self::TIMELINE_ID,
146 686 : xlp_pageaddr: lsn.0,
147 686 : xlp_rem_len: 0,
148 686 : __bindgen_padding_0: [0; 4],
149 686 : };
150 686 : // If the record was split across page boundaries, mark as continuation.
151 686 : if remaining.len() < record.len() {
152 677 : page_header.xlp_rem_len = remaining.len() as u32;
153 677 : page_header.xlp_info |= XLP_FIRST_IS_CONTRECORD;
154 677 : }
155 : // At start of segment, use a long page header.
156 686 : let page_header = if lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
157 0 : page_header.xlp_info |= XLP_LONG_HEADER;
158 0 : XLogLongPageHeaderData {
159 0 : std: page_header,
160 0 : xlp_sysid: Self::SYS_ID,
161 0 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
162 0 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
163 0 : }
164 0 : .encode()
165 0 : .unwrap()
166 : } else {
167 686 : page_header.encode().unwrap()
168 : };
169 686 : pages.extend_from_slice(&page_header);
170 686 : lsn += page_header.len() as u64;
171 671 : }
172 :
173 : // Append the record up to the next page boundary, if any.
174 1357 : let page_free = lsn.remaining_in_block() as usize;
175 1357 : let chunk = remaining.split_to(std::cmp::min(page_free, remaining.len()));
176 1357 : pages.extend_from_slice(&chunk);
177 1357 : lsn += chunk.len() as u64;
178 : }
179 680 : pages.freeze()
180 11923 : }
181 :
182 : /// Records must be 8-byte aligned. Take an encoded record (including any injected page
183 : /// boundaries), starting at the given LSN, and add any necessary padding at the end.
184 11923 : fn pad_record(record: Bytes, mut lsn: Lsn) -> Bytes {
185 11923 : lsn += record.len() as u64;
186 11923 : let padding = lsn.calc_padding(8u64) as usize;
187 11923 : if padding == 0 {
188 11323 : return record;
189 600 : }
190 600 : [record, Bytes::from(vec![0; padding])].concat().into()
191 11923 : }
192 : }
193 :
194 : /// Generates WAL records as an iterator.
195 : impl<R: RecordGenerator> Iterator for WalGenerator<R> {
196 : type Item = (Lsn, Bytes);
197 :
198 600 : fn next(&mut self) -> Option<Self::Item> {
199 600 : let record = self.record_generator.next()?;
200 600 : Some(self.append_record(record))
201 600 : }
202 : }
203 :
204 : /// Generates logical message records (effectively noops) with a fixed message.
205 : pub struct LogicalMessageGenerator {
206 : prefix: CString,
207 : message: Vec<u8>,
208 : }
209 :
210 : impl LogicalMessageGenerator {
211 : const DB_ID: u32 = 0; // hardcoded for now
212 : const RM_ID: RmgrId = RM_LOGICALMSG_ID;
213 : const INFO: u8 = XLOG_LOGICAL_MESSAGE;
214 :
215 : /// Creates a new LogicalMessageGenerator.
216 8786 : pub fn new(prefix: &CStr, message: &[u8]) -> Self {
217 8786 : Self {
218 8786 : prefix: prefix.to_owned(),
219 8786 : message: message.to_owned(),
220 8786 : }
221 8786 : }
222 :
223 : /// Encodes a logical message.
224 11927 : fn encode(prefix: &CStr, message: &[u8]) -> Bytes {
225 11927 : let prefix = prefix.to_bytes_with_nul();
226 11927 : let header = XlLogicalMessage {
227 11927 : db_id: Self::DB_ID,
228 11927 : transactional: 0,
229 11927 : prefix_size: prefix.len() as u64,
230 11927 : message_size: message.len() as u64,
231 11927 : };
232 11927 : [&header.encode(), prefix, message].concat().into()
233 11927 : }
234 :
235 : /// Computes how large a value must be to get a record of the given size. Convenience method to
236 : /// construct records of pre-determined size. Panics if the record size is too small.
237 0 : pub fn make_value_size(record_size: usize, prefix: &CStr) -> usize {
238 0 : let xlog_header_size = XLOG_SIZE_OF_XLOG_RECORD;
239 0 : let lm_header_size = size_of::<XlLogicalMessage>();
240 0 : let prefix_size = prefix.to_bytes_with_nul().len();
241 0 : let data_header_size = match record_size - xlog_header_size - 2 {
242 0 : 0..=255 => 2,
243 0 : 256..=258 => panic!("impossible record_size {record_size}"),
244 0 : 259.. => 5,
245 : };
246 0 : record_size
247 0 : .checked_sub(xlog_header_size + lm_header_size + prefix_size + data_header_size)
248 0 : .expect("record_size too small")
249 0 : }
250 : }
251 :
252 : impl Iterator for LogicalMessageGenerator {
253 : type Item = Record;
254 :
255 604 : fn next(&mut self) -> Option<Self::Item> {
256 604 : Some(Record {
257 604 : rmid: Self::RM_ID,
258 604 : info: Self::INFO,
259 604 : data: Self::encode(&self.prefix, &self.message),
260 604 : })
261 604 : }
262 : }
263 :
264 : impl WalGenerator<LogicalMessageGenerator> {
265 : /// Convenience method for appending a WAL record with an arbitrary logical message at the
266 : /// current WAL LSN position. Returns the start LSN and resulting WAL bytes.
267 11323 : pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) {
268 11323 : let record = Record {
269 11323 : rmid: LogicalMessageGenerator::RM_ID,
270 11323 : info: LogicalMessageGenerator::INFO,
271 11323 : data: LogicalMessageGenerator::encode(prefix, message),
272 11323 : };
273 11323 : self.append_record(record)
274 11323 : }
275 : }
|