Line data Source code
1 : use std::ffi::{CStr, CString};
2 :
3 : use bytes::{Bytes, BytesMut};
4 : use crc32c::crc32c_append;
5 : use utils::lsn::Lsn;
6 :
7 : use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
8 : use super::xlog_utils::{
9 : XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE,
10 : XLP_FIRST_IS_CONTRECORD,
11 : };
12 : use super::XLogRecord;
13 : use crate::pg_constants::{
14 : RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
15 : XLR_BLOCK_ID_DATA_SHORT,
16 : };
17 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
18 :
19 : /// A WAL record payload. Will be prefixed by an XLogRecord header when encoded.
20 : pub struct Record {
21 : pub rmid: RmgrId,
22 : pub info: u8,
23 : pub data: Bytes,
24 : }
25 :
26 : impl Record {
27 : /// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of
28 : /// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres.
29 12016 : pub fn encode(&self, prev_lsn: Lsn) -> Bytes {
30 : // Prefix data with block ID and length.
31 12016 : let data_header = Bytes::from(match self.data.len() {
32 0 : 0 => vec![],
33 12016 : 1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8],
34 0 : 256.. => {
35 0 : let len_bytes = (self.data.len() as u32).to_le_bytes();
36 0 : [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
37 : }
38 : });
39 :
40 : // Construct the WAL record header.
41 12016 : let mut header = XLogRecord {
42 12016 : xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
43 12016 : xl_xid: 0,
44 12016 : xl_prev: prev_lsn.into(),
45 12016 : xl_info: self.info,
46 12016 : xl_rmid: self.rmid,
47 12016 : __bindgen_padding_0: [0; 2],
48 12016 : xl_crc: 0, // see below
49 12016 : };
50 12016 :
51 12016 : // Compute the CRC checksum for the data, and the header up to the CRC field.
52 12016 : let mut crc = 0;
53 12016 : crc = crc32c_append(crc, &data_header);
54 12016 : crc = crc32c_append(crc, &self.data);
55 12016 : crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
56 12016 : header.xl_crc = crc;
57 12016 :
58 12016 : // Encode the final header and record.
59 12016 : let header = header.encode().unwrap();
60 12016 :
61 12016 : [header, data_header, self.data.clone()].concat().into()
62 12016 : }
63 : }
64 :
65 : /// Generates WAL record payloads.
66 : ///
67 : /// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator
68 : /// that creates a table and inserts rows.
69 : pub trait RecordGenerator: Iterator<Item = Record> {}
70 :
71 : impl<I: Iterator<Item = Record>> RecordGenerator for I {}
72 :
73 : /// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs
74 : /// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record,
75 : /// including internal page headers if it spans pages. Concatenating the bytes will yield a
76 : /// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized
77 : /// for performance.
78 : ///
79 : /// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this
80 : /// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`).
81 : ///
82 : /// A WAL is split into 16 MB segments. Each segment is split into 8 KB pages, with headers.
83 : /// Records are arbitrary length, 8-byte aligned, and may span pages. The layout is e.g.:
84 : ///
85 : /// | Segment 1 | Segment 2 | Segment 3 |
86 : /// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 |
87 : /// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 |
88 : #[derive(Default)]
89 : pub struct WalGenerator<R: RecordGenerator> {
90 : /// Generates record payloads for the WAL.
91 : pub record_generator: R,
92 : /// Current LSN to append the next record at.
93 : ///
94 : /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should
95 : /// ensure that the LSN is on a valid record boundary (i.e. we can't start appending in the
96 : /// middle on an existing record or header, or beyond the end of the existing WAL).
97 : pub lsn: Lsn,
98 : /// The starting LSN of the previous record. Used in WAL record headers. The Safekeeper doesn't
99 : /// care about this, unlike Postgres, but we include it for completeness.
100 : pub prev_lsn: Lsn,
101 : }
102 :
103 : impl<R: RecordGenerator> WalGenerator<R> {
104 : // Hardcode the sys and timeline ID. We can make them configurable if we care about them.
105 : const SYS_ID: u64 = 0;
106 : const TIMELINE_ID: u32 = 1;
107 :
108 : /// Creates a new WAL generator with the given record generator.
109 9341 : pub fn new(record_generator: R) -> WalGenerator<R> {
110 9341 : Self {
111 9341 : record_generator,
112 9341 : lsn: Lsn(0),
113 9341 : prev_lsn: Lsn(0),
114 9341 : }
115 9341 : }
116 :
117 : /// Appends a record with an arbitrary payload at the current LSN, then increments the LSN.
118 : /// Returns the WAL bytes for the record, including page headers and padding, and the start LSN.
119 12012 : fn append_record(&mut self, record: Record) -> (Lsn, Bytes) {
120 12012 : let record = record.encode(self.prev_lsn);
121 12012 : let record = Self::insert_pages(record, self.lsn);
122 12012 : let record = Self::pad_record(record, self.lsn);
123 12012 : let lsn = self.lsn;
124 12012 : self.prev_lsn = self.lsn;
125 12012 : self.lsn += record.len() as u64;
126 12012 : (lsn, record)
127 12012 : }
128 :
129 : /// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record
130 : /// is to be appended.
131 12012 : fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
132 12012 : // Fast path: record fits in current page, and the page already has a header.
133 12012 : if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 {
134 11932 : return record;
135 80 : }
136 80 :
137 80 : let mut pages = BytesMut::new();
138 80 : let mut remaining = record.clone(); // Bytes::clone() is cheap
139 231 : while !remaining.is_empty() {
140 : // At new page boundary, inject page header.
141 151 : if lsn.block_offset() == 0 {
142 80 : let mut page_header = XLogPageHeaderData {
143 80 : xlp_magic: XLOG_PAGE_MAGIC as u16,
144 80 : xlp_info: XLP_BKP_REMOVABLE,
145 80 : xlp_tli: Self::TIMELINE_ID,
146 80 : xlp_pageaddr: lsn.0,
147 80 : xlp_rem_len: 0,
148 80 : __bindgen_padding_0: [0; 4],
149 80 : };
150 80 : // If the record was split across page boundaries, mark as continuation.
151 80 : if remaining.len() < record.len() {
152 71 : page_header.xlp_rem_len = remaining.len() as u32;
153 71 : page_header.xlp_info |= XLP_FIRST_IS_CONTRECORD;
154 71 : }
155 : // At start of segment, use a long page header.
156 80 : let page_header = if lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
157 0 : page_header.xlp_info |= XLP_LONG_HEADER;
158 0 : XLogLongPageHeaderData {
159 0 : std: page_header,
160 0 : xlp_sysid: Self::SYS_ID,
161 0 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
162 0 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
163 0 : }
164 0 : .encode()
165 0 : .unwrap()
166 : } else {
167 80 : page_header.encode().unwrap()
168 : };
169 80 : pages.extend_from_slice(&page_header);
170 80 : lsn += page_header.len() as u64;
171 71 : }
172 :
173 : // Append the record up to the next page boundary, if any.
174 151 : let page_free = lsn.remaining_in_block() as usize;
175 151 : let chunk = remaining.split_to(std::cmp::min(page_free, remaining.len()));
176 151 : pages.extend_from_slice(&chunk);
177 151 : lsn += chunk.len() as u64;
178 : }
179 80 : pages.freeze()
180 12012 : }
181 :
182 : /// Records must be 8-byte aligned. Take an encoded record (including any injected page
183 : /// boundaries), starting at the given LSN, and add any necessary padding at the end.
184 12012 : fn pad_record(record: Bytes, mut lsn: Lsn) -> Bytes {
185 12012 : lsn += record.len() as u64;
186 12012 : let padding = lsn.calc_padding(8u64) as usize;
187 12012 : if padding == 0 {
188 12012 : return record;
189 0 : }
190 0 : [record, Bytes::from(vec![0; padding])].concat().into()
191 12012 : }
192 : }
193 :
194 : /// Generates WAL records as an iterator.
195 : impl<R: RecordGenerator> Iterator for WalGenerator<R> {
196 : type Item = (Lsn, Bytes);
197 :
198 0 : fn next(&mut self) -> Option<Self::Item> {
199 0 : let record = self.record_generator.next()?;
200 0 : Some(self.append_record(record))
201 0 : }
202 : }
203 :
204 : /// Generates logical message records (effectively noops) with a fixed message.
205 : pub struct LogicalMessageGenerator {
206 : prefix: CString,
207 : message: Vec<u8>,
208 : }
209 :
210 : impl LogicalMessageGenerator {
211 : const DB_ID: u32 = 0; // hardcoded for now
212 : const RM_ID: RmgrId = RM_LOGICALMSG_ID;
213 : const INFO: u8 = XLOG_LOGICAL_MESSAGE;
214 :
215 : /// Creates a new LogicalMessageGenerator.
216 9345 : pub fn new(prefix: &CStr, message: &[u8]) -> Self {
217 9345 : Self {
218 9345 : prefix: prefix.to_owned(),
219 9345 : message: message.to_owned(),
220 9345 : }
221 9345 : }
222 :
223 : /// Encodes a logical message.
224 12016 : fn encode(prefix: &CStr, message: &[u8]) -> Bytes {
225 12016 : let prefix = prefix.to_bytes_with_nul();
226 12016 : let header = XlLogicalMessage {
227 12016 : db_id: Self::DB_ID,
228 12016 : transactional: 0,
229 12016 : prefix_size: prefix.len() as u64,
230 12016 : message_size: message.len() as u64,
231 12016 : };
232 12016 : [&header.encode(), prefix, message].concat().into()
233 12016 : }
234 : }
235 :
236 : impl Iterator for LogicalMessageGenerator {
237 : type Item = Record;
238 :
239 4 : fn next(&mut self) -> Option<Self::Item> {
240 4 : Some(Record {
241 4 : rmid: Self::RM_ID,
242 4 : info: Self::INFO,
243 4 : data: Self::encode(&self.prefix, &self.message),
244 4 : })
245 4 : }
246 : }
247 :
248 : impl WalGenerator<LogicalMessageGenerator> {
249 : /// Convenience method for appending a WAL record with an arbitrary logical message at the
250 : /// current WAL LSN position. Returns the start LSN and resulting WAL bytes.
251 12012 : pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) {
252 12012 : let record = Record {
253 12012 : rmid: LogicalMessageGenerator::RM_ID,
254 12012 : info: LogicalMessageGenerator::INFO,
255 12012 : data: LogicalMessageGenerator::encode(prefix, message),
256 12012 : };
257 12012 : self.append_record(record)
258 12012 : }
259 : }
|