TLA Line data Source code
1 : //
2 : // This file contains common utilities for dealing with PostgreSQL WAL files and
3 : // LSNs.
4 : //
5 : // Many of these functions have been copied from PostgreSQL, and rewritten in
6 : // Rust. That's why they don't follow the usual Rust naming conventions, they
7 : // have been named the same as the corresponding PostgreSQL functions instead.
8 : //
9 :
10 : use crc32c::crc32c_append;
11 :
12 : use super::super::waldecoder::WalStreamDecoder;
13 : use super::bindings::{
14 : CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
15 : XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
16 : };
17 : use super::PG_MAJORVERSION;
18 : use crate::pg_constants;
19 : use crate::PG_TLI;
20 : use crate::{uint32, uint64, Oid};
21 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
22 :
23 : use bytes::BytesMut;
24 : use bytes::{Buf, Bytes};
25 :
26 : use log::*;
27 :
28 : use serde::Serialize;
29 : use std::fs::File;
30 : use std::io::prelude::*;
31 : use std::io::ErrorKind;
32 : use std::io::SeekFrom;
33 : use std::path::{Path, PathBuf};
34 : use std::time::SystemTime;
35 : use utils::bin_ser::DeserializeError;
36 : use utils::bin_ser::SerializeError;
37 :
38 : use utils::lsn::Lsn;
39 :
40 : pub const XLOG_FNAME_LEN: usize = 24;
41 : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
42 : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
43 : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
44 :
45 : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
46 : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
47 : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
48 : #[allow(clippy::identity_op)]
49 : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
50 :
51 : /// Interval of checkpointing metadata file. We should store metadata file to enforce
52 : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
53 : /// But flushing checkpoint file for each transaction seems to be too expensive,
54 : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
55 : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
56 : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
57 : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
58 : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
59 :
60 CBC 11194 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
61 11194 : (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
62 11194 : }
63 :
64 719 : pub fn XLogSegNoOffsetToRecPtr(
65 719 : segno: XLogSegNo,
66 719 : offset: u32,
67 719 : wal_segsz_bytes: usize,
68 719 : ) -> XLogRecPtr {
69 719 : segno * (wal_segsz_bytes as u64) + (offset as u64)
70 719 : }
71 :
72 5439 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
73 5439 : format!(
74 5439 : "{:>08X}{:>08X}{:>08X}",
75 5439 : tli,
76 5439 : logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
77 5439 : logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
78 5439 : )
79 5439 : }
80 :
81 316 : pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
82 316 : let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
83 316 : let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
84 316 : let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
85 316 : (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
86 316 : }
87 :
88 1097 : pub fn IsXLogFileName(fname: &str) -> bool {
89 7944 : return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
90 1097 : }
91 :
92 736 : pub fn IsPartialXLogFileName(fname: &str) -> bool {
93 736 : fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
94 736 : }
95 :
96 : /// If LSN points to the beginning of the page, then shift it to first record,
97 : /// otherwise align on 8-bytes boundary (required for WAL records)
98 1317 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
99 1317 : if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
100 10 : let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
101 6 : XLOG_SIZE_OF_XLOG_LONG_PHD
102 : } else {
103 4 : XLOG_SIZE_OF_XLOG_SHORT_PHD
104 : };
105 10 : lsn + hdr_size as u64
106 : } else {
107 1307 : lsn.align()
108 : }
109 1317 : }
110 :
111 638 : pub fn generate_pg_control(
112 638 : pg_control_bytes: &[u8],
113 638 : checkpoint_bytes: &[u8],
114 638 : lsn: Lsn,
115 638 : ) -> anyhow::Result<(Bytes, u64)> {
116 638 : let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
117 638 : let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
118 :
119 : // Generate new pg_control needed for bootstrap
120 638 : checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
121 638 :
122 638 : //reset some fields we don't want to preserve
123 638 : //TODO Check this.
124 638 : //We may need to determine the value from twophase data.
125 638 : checkpoint.oldestActiveXid = 0;
126 638 :
127 638 : //save new values in pg_control
128 638 : pg_control.checkPoint = 0;
129 638 : pg_control.checkPointCopy = checkpoint;
130 638 : pg_control.state = DBState_DB_SHUTDOWNED;
131 638 :
132 638 : Ok((pg_control.encode(), pg_control.system_identifier))
133 638 : }
134 :
135 785583 : pub fn get_current_timestamp() -> TimestampTz {
136 785583 : to_pg_timestamp(SystemTime::now())
137 785583 : }
138 :
139 ECB (822562) : // Module to reduce the scope of the constants
140 (822562) : mod timestamp_conversions {
141 (822562) : use std::time::Duration;
142 (822562) :
143 (822562) : use super::*;
144 (822562) :
145 (822562) : const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
146 (822562) : const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
147 (822562) : const SECS_PER_DAY: u64 = 86400;
148 (822562) : const USECS_PER_SEC: u64 = 1000000;
149 : const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
150 EUB : (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
151 :
152 CBC 785784 : pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
153 GIC 785784 : match time.duration_since(SystemTime::UNIX_EPOCH) {
154 785784 : Ok(n) => {
155 785784 : ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
156 785784 : + n.subsec_micros() as u64) as i64
157 : }
158 LBC (110) : Err(_) => panic!("SystemTime before UNIX EPOCH!"),
159 ECB (110) : }
160 CBC 785784 : }
161 ECB (110) :
162 CBC 16 : pub fn from_pg_timestamp(time: TimestampTz) -> SystemTime {
163 16 : let time: u64 = time
164 16 : .try_into()
165 16 : .expect("timestamp before millenium (postgres epoch)");
166 16 : let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
167 16 : SystemTime::UNIX_EPOCH
168 GIC 16 : .checked_add(Duration::from_micros(since_unix_epoch))
169 CBC 16 : .expect("SystemTime overflow")
170 GIC 16 : }
171 : }
172 ECB (144) :
173 (144) : pub use timestamp_conversions::{from_pg_timestamp, to_pg_timestamp};
174 (144) :
175 (144) : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
176 (144) : // start_lsn must point to some previously known record boundary (beginning of
177 : // the next record). If no valid record after is found, start_lsn is returned
178 : // back.
179 CBC 115 : pub fn find_end_of_wal(
180 GBC 115 : data_dir: &Path,
181 GIC 115 : wal_seg_size: usize,
182 115 : start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
183 CBC 115 : ) -> anyhow::Result<Lsn> {
184 GIC 115 : let mut result = start_lsn;
185 CBC 115 : let mut curr_lsn = start_lsn;
186 115 : let mut buf = [0u8; XLOG_BLCKSZ];
187 115 : let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
188 GIC 115 : debug!("find_end_of_wal PG_VERSION: {}", pg_version);
189 :
190 CBC 115 : let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
191 ECB (61204) :
192 (34) : // loop over segments
193 CBC 146 : loop {
194 146 : let segno = curr_lsn.segment_number(wal_seg_size);
195 146 : let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
196 GIC 146 : let seg_file_path = data_dir.join(seg_file_name);
197 146 : match open_wal_segment(&seg_file_path)? {
198 : None => {
199 ECB (4735729) : // no more segments
200 CBC 11 : debug!(
201 LBC (99) : "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
202 ECB (99) : result, seg_file_path
203 (21) : );
204 GIC 11 : return Ok(result);
205 : }
206 CBC 135 : Some(mut segment) => {
207 GIC 135 : let seg_offs = curr_lsn.segment_offset(wal_seg_size);
208 CBC 135 : segment.seek(SeekFrom::Start(seg_offs as u64))?;
209 : // loop inside segment
210 : loop {
211 GIC 57049 : let bytes_read = segment.read(&mut buf)?;
212 57049 : if bytes_read == 0 {
213 31 : break; // EOF
214 57018 : }
215 CBC 57018 : curr_lsn += bytes_read as u64;
216 GIC 57018 : decoder.feed_bytes(&buf[0..bytes_read]);
217 :
218 ECB (144) : // advance result past all completely read records
219 (144) : loop {
220 CBC 4350098 : match decoder.poll_decode() {
221 4293080 : Ok(Some(record)) => result = record.0,
222 104 : Err(e) => {
223 104 : debug!(
224 GIC 21 : "find_end_of_wal reached end at {:?}, decode error: {:?}",
225 : result, e
226 ECB (45) : );
227 CBC 104 : return Ok(result);
228 ECB (11) : }
229 CBC 56914 : Ok(None) => break, // need more data
230 EUB : }
231 : }
232 : }
233 : }
234 : }
235 : }
236 GIC 115 : }
237 ECB (144) :
238 : // Open .partial or full WAL segment file, if present.
239 GBC 146 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
240 146 : let mut partial_path = seg_file_path.to_owned();
241 146 : partial_path.set_extension("partial");
242 146 : match File::open(partial_path) {
243 104 : Ok(file) => Ok(Some(file)),
244 42 : Err(e) => match e.kind() {
245 : ErrorKind::NotFound => {
246 : // .partial not found, try full
247 CBC 42 : match File::open(seg_file_path) {
248 31 : Ok(file) => Ok(Some(file)),
249 11 : Err(e) => match e.kind() {
250 11 : ErrorKind::NotFound => Ok(None),
251 UIC 0 : _ => Err(e.into()),
252 ECB (68237203) : },
253 (68237203) : }
254 (68237203) : }
255 LBC (68237203) : _ => Err(e.into()),
256 : },
257 ECB (12) : }
258 CBC 146 : }
259 ECB (12) :
260 LBC (12) : pub fn main() {
261 UIC 0 : let mut data_dir = PathBuf::new();
262 0 : data_dir.push(".");
263 LBC (168742557) : let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap();
264 (168742557) : println!("wal_end={:?}", wal_end);
265 (168742557) : }
266 :
267 : impl XLogRecord {
268 GIC 168851349 : pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
269 CBC 168851349 : use utils::bin_ser::LeSer;
270 168851349 : XLogRecord::des(buf)
271 168851349 : }
272 ECB (3098110) :
273 GIC 68588722 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
274 CBC 68588722 : use utils::bin_ser::LeSer;
275 68588722 : XLogRecord::des_from(&mut buf.reader())
276 68588722 : }
277 ECB (635) :
278 GIC 12 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
279 12 : use utils::bin_ser::LeSer;
280 12 : Ok(self.ser()?.into())
281 CBC 12 : }
282 ECB (1481) :
283 (1481) : // Is this record an XLOG_SWITCH record? They need some special processing,
284 CBC 168851349 : pub fn is_xlog_switch_record(&self) -> bool {
285 GIC 168851349 : self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
286 CBC 168851349 : }
287 ECB (638) : }
288 (638) :
289 (638) : impl XLogPageHeaderData {
290 GIC 3096127 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
291 3096127 : use utils::bin_ser::LeSer;
292 3096127 : XLogPageHeaderData::des_from(&mut buf.reader())
293 3096127 : }
294 :
295 CBC 638 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
296 638 : use utils::bin_ser::LeSer;
297 638 : self.ser().map(|b| b.into())
298 638 : }
299 : }
300 ECB (2506) :
301 (2506) : impl XLogLongPageHeaderData {
302 CBC 1473 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
303 1473 : use utils::bin_ser::LeSer;
304 GIC 1473 : XLogLongPageHeaderData::des_from(&mut buf.reader())
305 1473 : }
306 :
307 641 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
308 641 : use utils::bin_ser::LeSer;
309 641 : self.ser().map(|b| b.into())
310 CBC 641 : }
311 ECB (68261690) : }
312 (68261690) :
313 (68261690) : pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
314 (68261690) :
315 (68261690) : impl CheckPoint {
316 CBC 28619 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
317 28619 : use utils::bin_ser::LeSer;
318 28619 : Ok(self.ser()?.into())
319 28619 : }
320 ECB (3621) :
321 CBC 2523 : pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
322 GBC 2523 : use utils::bin_ser::LeSer;
323 2523 : CheckPoint::des(buf)
324 CBC 2523 : }
325 ECB (3621) :
326 (3621) : /// Update next XID based on provided new_xid and stored epoch.
327 (3621) : /// Next XID should be greater than new_xid. This handles 32-bit
328 (3621) : /// XID wraparound correctly.
329 (3621) : ///
330 EUB : /// Returns 'true' if the XID was updated.
331 CBC 68613496 : pub fn update_next_xid(&mut self, xid: u32) -> bool {
332 68613496 : // nextXid should nw greater than any XID in WAL, so increment provided XID and check for wraparround.
333 68613496 : let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
334 GIC 68613496 : // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
335 68613496 : // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
336 68613496 : new_xid =
337 68613496 : new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
338 68613496 : let full_xid = self.nextXid.value;
339 CBC 68613496 : let old_xid = full_xid as u32;
340 68613496 : if new_xid.wrapping_sub(old_xid) as i32 > 0 {
341 3628 : let mut epoch = full_xid >> 32;
342 3628 : if new_xid < old_xid {
343 LBC (638) : // wrap-around
344 (638) : epoch += 1;
345 CBC 3628 : }
346 3628 : let nextXid = (epoch << 32) | new_xid as u64;
347 3628 :
348 3628 : if nextXid != self.nextXid.value {
349 3628 : self.nextXid = FullTransactionId { value: nextXid };
350 GIC 3628 : return true;
351 LBC (635) : }
352 GIC 68609868 : }
353 68609868 : false
354 CBC 68613496 : }
355 ECB (638) : }
356 (638) :
357 (638) : /// Generate new, empty WAL segment, with correct block headers at the first
358 (638) : /// page of the segment and the page that contains the given LSN.
359 (638) : /// We need this segment to start compute node.
360 CBC 641 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
361 641 : let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
362 641 :
363 641 : let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
364 641 :
365 641 : let page_off = lsn.block_offset();
366 641 : let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
367 641 :
368 641 : let first_page_only = seg_off < XLOG_BLCKSZ;
369 GIC 641 : let (shdr_rem_len, infoflags) = if first_page_only {
370 CBC 3 : (seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
371 ECB (638) : } else {
372 CBC 638 : (0, 0)
373 ECB (638) : };
374 (638) :
375 CBC 641 : let hdr = XLogLongPageHeaderData {
376 641 : std: {
377 641 : XLogPageHeaderData {
378 641 : xlp_magic: XLOG_PAGE_MAGIC as u16,
379 641 : xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
380 641 : xlp_tli: PG_TLI,
381 641 : xlp_pageaddr: pageaddr,
382 GIC 641 : xlp_rem_len: shdr_rem_len as u32,
383 CBC 641 : ..Default::default() // Put 0 in padding fields.
384 GIC 641 : }
385 641 : },
386 CBC 641 : xlp_sysid: system_id,
387 641 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
388 641 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
389 GIC 641 : };
390 ECB (1) :
391 GIC 641 : let hdr_bytes = hdr.encode()?;
392 CBC 641 : seg_buf.extend_from_slice(&hdr_bytes);
393 GIC 641 :
394 CBC 641 : //zero out the rest of the file
395 GIC 641 : seg_buf.resize(WAL_SEGMENT_SIZE, 0);
396 CBC 641 :
397 641 : if !first_page_only {
398 GIC 638 : let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
399 CBC 638 : let header = XLogPageHeaderData {
400 638 : xlp_magic: XLOG_PAGE_MAGIC as u16,
401 GIC 638 : xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
402 CBC 637 : pg_constants::XLP_FIRST_IS_CONTRECORD
403 ECB (638) : } else {
404 GIC 1 : 0
405 : },
406 ECB (12) : xlp_tli: PG_TLI,
407 GIC 638 : xlp_pageaddr: lsn.page_lsn().0,
408 638 : xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
409 637 : page_off as u32
410 : } else {
411 1 : 0u32
412 : },
413 638 : ..Default::default() // Put 0 in padding fields.
414 : };
415 CBC 638 : let hdr_bytes = header.encode()?;
416 ECB (6) :
417 CBC 638 : debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
418 638 : debug_assert_ne!(block_offset, 0);
419 :
420 GIC 638 : seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
421 3 : }
422 :
423 641 : Ok(seg_buf.freeze())
424 641 : }
425 :
426 : #[repr(C)]
427 CBC 12 : #[derive(Serialize)]
428 ECB (6) : struct XlLogicalMessage {
429 (6) : db_id: Oid,
430 (6) : transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
431 (6) : prefix_size: uint64,
432 (6) : message_size: uint64,
433 (6) : }
434 (6) :
435 (6) : impl XlLogicalMessage {
436 CBC 6 : pub fn encode(&self) -> Bytes {
437 6 : use utils::bin_ser::LeSer;
438 6 : self.ser().unwrap().into()
439 6 : }
440 ECB (6) : }
441 (6) :
442 (6) : /// Create new WAL record for non-transactional logical message.
443 (6) : /// Used for creating artificial WAL for tests, as LogicalMessage
444 (6) : /// record is basically no-op.
445 (6) : ///
446 (6) : /// NOTE: This leaves the xl_prev field zero. The safekeeper and
447 (6) : /// pageserver tolerate that, but PostgreSQL does not.
448 CBC 6 : pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
449 6 : let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
450 6 : prefix_bytes.write_all(prefix.as_bytes()).unwrap();
451 6 : prefix_bytes.push(0);
452 6 :
453 6 : let message_bytes = message.as_bytes();
454 6 :
455 6 : let logical_message = XlLogicalMessage {
456 6 : db_id: 0,
457 6 : transactional: 0,
458 6 : prefix_size: prefix_bytes.len() as u64,
459 6 : message_size: message_bytes.len() as u64,
460 6 : };
461 6 :
462 6 : let mainrdata = logical_message.encode();
463 6 : let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
464 6 : // only short mainrdata is supported for now
465 6 : assert!(mainrdata_len <= 255);
466 6 : let mainrdata_len = mainrdata_len as u8;
467 6 :
468 6 : let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
469 6 : data.extend_from_slice(&mainrdata);
470 6 : data.extend_from_slice(&prefix_bytes);
471 6 : data.extend_from_slice(message_bytes);
472 6 :
473 6 : let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
474 6 :
475 6 : let mut header = XLogRecord {
476 6 : xl_tot_len: total_len as u32,
477 6 : xl_xid: 0,
478 GBC 6 : xl_prev: 0,
479 CBC 6 : xl_info: 0,
480 GIC 6 : xl_rmid: 21,
481 CBC 6 : __bindgen_padding_0: [0u8; 2usize],
482 6 : xl_crc: 0, // crc will be calculated later
483 GIC 6 : };
484 6 :
485 6 : let header_bytes = header.encode().expect("failed to encode header");
486 6 : let crc = crc32c_append(0, &data);
487 6 : let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
488 6 : header.xl_crc = crc;
489 6 :
490 6 : let mut wal: Vec<u8> = Vec::new();
491 6 : wal.extend_from_slice(&header.encode().expect("failed to encode header"));
492 6 : wal.extend_from_slice(&data);
493 6 :
494 6 : // WAL start position must be aligned at 8 bytes,
495 6 : // this will add padding for the next WAL record.
496 6 : const PADDING: usize = 8;
497 6 : let padding_rem = wal.len() % PADDING;
498 6 : if padding_rem != 0 {
499 UIC 0 : wal.resize(wal.len() + PADDING - padding_rem, 0);
500 GIC 6 : }
501 :
502 6 : wal
503 6 : }
504 :
505 : #[cfg(test)]
506 : mod tests {
507 : use super::*;
508 :
509 3 : #[test]
510 3 : fn test_ts_conversion() {
511 3 : let now = SystemTime::now();
512 3 : let round_trip = from_pg_timestamp(to_pg_timestamp(now));
513 3 :
514 3 : let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
515 3 : let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
516 3 : assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
517 :
518 3 : let now_pg = get_current_timestamp();
519 3 : let round_trip_pg = to_pg_timestamp(from_pg_timestamp(now_pg));
520 3 :
521 3 : assert_eq!(now_pg, round_trip_pg);
522 3 : }
523 :
524 : // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
525 : }
|