Line data Source code
1 : use std::ffi::{CStr, CString};
2 :
3 : use bytes::{Bytes, BytesMut};
4 : use crc32c::crc32c_append;
5 : use utils::lsn::Lsn;
6 :
7 : use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
8 : use super::xlog_utils::{
9 : XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE,
10 : XLP_FIRST_IS_CONTRECORD,
11 : };
12 : use super::XLogRecord;
13 : use crate::pg_constants::{
14 : RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
15 : XLR_BLOCK_ID_DATA_SHORT,
16 : };
17 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
18 :
19 : /// A WAL record payload. Will be prefixed by an XLogRecord header when encoded.
20 : pub struct Record {
21 : pub rmid: RmgrId,
22 : pub info: u8,
23 : pub data: Bytes,
24 : }
25 :
26 : impl Record {
27 : /// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of
28 : /// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres.
29 12259 : pub fn encode(&self, prev_lsn: Lsn) -> Bytes {
30 : // Prefix data with block ID and length.
31 12259 : let data_header = Bytes::from(match self.data.len() {
32 0 : 0 => vec![],
33 12259 : 1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8],
34 0 : 256.. => {
35 0 : let len_bytes = (self.data.len() as u32).to_le_bytes();
36 0 : [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
37 : }
38 : });
39 :
40 : // Construct the WAL record header.
41 12259 : let mut header = XLogRecord {
42 12259 : xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
43 12259 : xl_xid: 0,
44 12259 : xl_prev: prev_lsn.into(),
45 12259 : xl_info: self.info,
46 12259 : xl_rmid: self.rmid,
47 12259 : __bindgen_padding_0: [0; 2],
48 12259 : xl_crc: 0, // see below
49 12259 : };
50 12259 :
51 12259 : // Compute the CRC checksum for the data, and the header up to the CRC field.
52 12259 : let mut crc = 0;
53 12259 : crc = crc32c_append(crc, &data_header);
54 12259 : crc = crc32c_append(crc, &self.data);
55 12259 : crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
56 12259 : header.xl_crc = crc;
57 12259 :
58 12259 : // Encode the final header and record.
59 12259 : let header = header.encode().unwrap();
60 12259 :
61 12259 : [header, data_header, self.data.clone()].concat().into()
62 12259 : }
63 : }
64 :
65 : /// Generates WAL record payloads.
66 : ///
67 : /// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator
68 : /// that creates a table and inserts rows.
69 : pub trait RecordGenerator: Iterator<Item = Record> {}
70 :
71 : impl<I: Iterator<Item = Record>> RecordGenerator for I {}
72 :
73 : /// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs
74 : /// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record,
75 : /// including internal page headers if it spans pages. Concatenating the bytes will yield a
76 : /// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized
77 : /// for performance.
78 : ///
79 : /// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this
80 : /// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`).
81 : ///
82 : /// A WAL is split into 16 MB segments. Each segment is split into 8 KB pages, with headers.
83 : /// Records are arbitrary length, 8-byte aligned, and may span pages. The layout is e.g.:
84 : ///
85 : /// | Segment 1 | Segment 2 | Segment 3 |
86 : /// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 |
87 : /// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 |
88 : #[derive(Default)]
89 : pub struct WalGenerator<R: RecordGenerator> {
90 : /// Generates record payloads for the WAL.
91 : pub record_generator: R,
92 : /// Current LSN to append the next record at.
93 : ///
94 : /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should
95 : /// ensure that the LSN is on a valid record boundary (i.e. we can't start appending in the
96 : /// middle on an existing record or header, or beyond the end of the existing WAL).
97 : pub lsn: Lsn,
98 : /// The starting LSN of the previous record. Used in WAL record headers. The Safekeeper doesn't
99 : /// care about this, unlike Postgres, but we include it for completeness.
100 : pub prev_lsn: Lsn,
101 : }
102 :
103 : impl<R: RecordGenerator> WalGenerator<R> {
104 : // Hardcode the sys and timeline ID. We can make them configurable if we care about them.
105 : const SYS_ID: u64 = 0;
106 : const TIMELINE_ID: u32 = 1;
107 :
108 : /// Creates a new WAL generator with the given record generator.
109 9808 : pub fn new(record_generator: R) -> WalGenerator<R> {
110 9808 : Self {
111 9808 : record_generator,
112 9808 : lsn: Lsn(0),
113 9808 : prev_lsn: Lsn(0),
114 9808 : }
115 9808 : }
116 :
117 : /// Appends a record with an arbitrary payload at the current LSN, then increments the LSN.
118 : /// Returns the WAL bytes for the record, including page headers and padding, and the start LSN.
119 12255 : fn append_record(&mut self, record: Record) -> (Lsn, Bytes) {
120 12255 : let record = record.encode(self.prev_lsn);
121 12255 : let record = Self::insert_pages(record, self.lsn);
122 12255 : let record = Self::pad_record(record, self.lsn);
123 12255 : let lsn = self.lsn;
124 12255 : self.prev_lsn = self.lsn;
125 12255 : self.lsn += record.len() as u64;
126 12255 : (lsn, record)
127 12255 : }
128 :
129 : /// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record
130 : /// is to be appended.
131 12255 : fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
132 12255 : // Fast path: record fits in current page, and the page already has a header.
133 12255 : if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 {
134 12170 : return record;
135 85 : }
136 85 :
137 85 : let mut pages = BytesMut::new();
138 85 : let mut remaining = record.clone(); // Bytes::clone() is cheap
139 246 : while !remaining.is_empty() {
140 : // At new page boundary, inject page header.
141 161 : if lsn.block_offset() == 0 {
142 85 : let mut page_header = XLogPageHeaderData {
143 85 : xlp_magic: XLOG_PAGE_MAGIC as u16,
144 85 : xlp_info: XLP_BKP_REMOVABLE,
145 85 : xlp_tli: Self::TIMELINE_ID,
146 85 : xlp_pageaddr: lsn.0,
147 85 : xlp_rem_len: 0,
148 85 : __bindgen_padding_0: [0; 4],
149 85 : };
150 85 : // If the record was split across page boundaries, mark as continuation.
151 85 : if remaining.len() < record.len() {
152 76 : page_header.xlp_rem_len = remaining.len() as u32;
153 76 : page_header.xlp_info |= XLP_FIRST_IS_CONTRECORD;
154 76 : }
155 : // At start of segment, use a long page header.
156 85 : let page_header = if lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
157 0 : page_header.xlp_info |= XLP_LONG_HEADER;
158 0 : XLogLongPageHeaderData {
159 0 : std: page_header,
160 0 : xlp_sysid: Self::SYS_ID,
161 0 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
162 0 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
163 0 : }
164 0 : .encode()
165 0 : .unwrap()
166 : } else {
167 85 : page_header.encode().unwrap()
168 : };
169 85 : pages.extend_from_slice(&page_header);
170 85 : lsn += page_header.len() as u64;
171 76 : }
172 :
173 : // Append the record up to the next page boundary, if any.
174 161 : let page_free = lsn.remaining_in_block() as usize;
175 161 : let chunk = remaining.split_to(std::cmp::min(page_free, remaining.len()));
176 161 : pages.extend_from_slice(&chunk);
177 161 : lsn += chunk.len() as u64;
178 : }
179 85 : pages.freeze()
180 12255 : }
181 :
182 : /// Records must be 8-byte aligned. Take an encoded record (including any injected page
183 : /// boundaries), starting at the given LSN, and add any necessary padding at the end.
184 12255 : fn pad_record(record: Bytes, mut lsn: Lsn) -> Bytes {
185 12255 : lsn += record.len() as u64;
186 12255 : let padding = lsn.calc_padding(8u64) as usize;
187 12255 : if padding == 0 {
188 12255 : return record;
189 0 : }
190 0 : [record, Bytes::from(vec![0; padding])].concat().into()
191 12255 : }
192 : }
193 :
194 : /// Generates WAL records as an iterator.
195 : impl<R: RecordGenerator> Iterator for WalGenerator<R> {
196 : type Item = (Lsn, Bytes);
197 :
198 0 : fn next(&mut self) -> Option<Self::Item> {
199 0 : let record = self.record_generator.next()?;
200 0 : Some(self.append_record(record))
201 0 : }
202 : }
203 :
204 : /// Generates logical message records (effectively noops) with a fixed message.
205 : pub struct LogicalMessageGenerator {
206 : prefix: CString,
207 : message: Vec<u8>,
208 : }
209 :
210 : impl LogicalMessageGenerator {
211 : const DB_ID: u32 = 0; // hardcoded for now
212 : const RM_ID: RmgrId = RM_LOGICALMSG_ID;
213 : const INFO: u8 = XLOG_LOGICAL_MESSAGE;
214 :
215 : /// Creates a new LogicalMessageGenerator.
216 9812 : pub fn new(prefix: &CStr, message: &[u8]) -> Self {
217 9812 : Self {
218 9812 : prefix: prefix.to_owned(),
219 9812 : message: message.to_owned(),
220 9812 : }
221 9812 : }
222 :
223 : /// Encodes a logical message.
224 12259 : fn encode(prefix: &CStr, message: &[u8]) -> Bytes {
225 12259 : let prefix = prefix.to_bytes_with_nul();
226 12259 : let header = XlLogicalMessage {
227 12259 : db_id: Self::DB_ID,
228 12259 : transactional: 0,
229 12259 : prefix_size: prefix.len() as u64,
230 12259 : message_size: message.len() as u64,
231 12259 : };
232 12259 : [&header.encode(), prefix, message].concat().into()
233 12259 : }
234 : }
235 :
236 : impl Iterator for LogicalMessageGenerator {
237 : type Item = Record;
238 :
239 4 : fn next(&mut self) -> Option<Self::Item> {
240 4 : Some(Record {
241 4 : rmid: Self::RM_ID,
242 4 : info: Self::INFO,
243 4 : data: Self::encode(&self.prefix, &self.message),
244 4 : })
245 4 : }
246 : }
247 :
248 : impl WalGenerator<LogicalMessageGenerator> {
249 : /// Convenience method for appending a WAL record with an arbitrary logical message at the
250 : /// current WAL LSN position. Returns the start LSN and resulting WAL bytes.
251 12255 : pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) {
252 12255 : let record = Record {
253 12255 : rmid: LogicalMessageGenerator::RM_ID,
254 12255 : info: LogicalMessageGenerator::INFO,
255 12255 : data: LogicalMessageGenerator::encode(prefix, message),
256 12255 : };
257 12255 : self.append_record(record)
258 12255 : }
259 : }
|