Line data Source code
1 : //
2 : // This file contains common utilities for dealing with PostgreSQL WAL files and
3 : // LSNs.
4 : //
5 : // Many of these functions have been copied from PostgreSQL, and rewritten in
6 : // Rust. That's why they don't follow the usual Rust naming conventions, they
7 : // have been named the same as the corresponding PostgreSQL functions instead.
8 : //
9 :
10 : use crc32c::crc32c_append;
11 :
12 : use super::super::waldecoder::WalStreamDecoder;
13 : use super::bindings::{
14 : CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
15 : XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
16 : };
17 : use super::PG_MAJORVERSION;
18 : use crate::pg_constants;
19 : use crate::PG_TLI;
20 : use crate::{uint32, uint64, Oid};
21 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
22 :
23 : use bytes::BytesMut;
24 : use bytes::{Buf, Bytes};
25 :
26 : use log::*;
27 :
28 : use serde::Serialize;
29 : use std::fs::File;
30 : use std::io::prelude::*;
31 : use std::io::ErrorKind;
32 : use std::io::SeekFrom;
33 : use std::path::{Path, PathBuf};
34 : use std::time::SystemTime;
35 : use utils::bin_ser::DeserializeError;
36 : use utils::bin_ser::SerializeError;
37 :
38 : use utils::lsn::Lsn;
39 :
40 : pub const XLOG_FNAME_LEN: usize = 24;
41 : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
42 : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
43 : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
44 :
45 : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
46 : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
47 : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
48 : #[allow(clippy::identity_op)]
49 : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
50 :
51 : /// Interval of checkpointing metadata file. We should store metadata file to enforce
52 : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
53 : /// But flushing checkpoint file for each transaction seems to be too expensive,
54 : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
55 : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
56 : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
57 : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
58 : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
59 :
60 624 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
61 624 : (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
62 624 : }
63 :
64 200030 : pub fn XLogSegNoOffsetToRecPtr(
65 200030 : segno: XLogSegNo,
66 200030 : offset: u32,
67 200030 : wal_segsz_bytes: usize,
68 200030 : ) -> XLogRecPtr {
69 200030 : segno * (wal_segsz_bytes as u64) + (offset as u64)
70 200030 : }
71 :
72 186 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
73 186 : format!(
74 186 : "{:>08X}{:>08X}{:>08X}",
75 186 : tli,
76 186 : logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
77 186 : logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
78 186 : )
79 186 : }
80 :
81 252 : pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
82 252 : let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
83 252 : let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
84 252 : let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
85 252 : (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
86 252 : }
87 :
88 540 : pub fn IsXLogFileName(fname: &str) -> bool {
89 8208 : return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
90 540 : }
91 :
92 0 : pub fn IsPartialXLogFileName(fname: &str) -> bool {
93 0 : fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
94 0 : }
95 :
96 : /// If LSN points to the beginning of the page, then shift it to first record,
97 : /// otherwise align on 8-bytes boundary (required for WAL records)
98 0 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
99 0 : if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
100 0 : let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
101 0 : XLOG_SIZE_OF_XLOG_LONG_PHD
102 : } else {
103 0 : XLOG_SIZE_OF_XLOG_SHORT_PHD
104 : };
105 0 : lsn + hdr_size as u64
106 : } else {
107 0 : lsn.align()
108 : }
109 0 : }
110 :
111 0 : pub fn generate_pg_control(
112 0 : pg_control_bytes: &[u8],
113 0 : checkpoint_bytes: &[u8],
114 0 : lsn: Lsn,
115 0 : ) -> anyhow::Result<(Bytes, u64)> {
116 0 : let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
117 0 : let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
118 :
119 : // Generate new pg_control needed for bootstrap
120 0 : checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
121 0 :
122 0 : //save new values in pg_control
123 0 : pg_control.checkPoint = 0;
124 0 : pg_control.checkPointCopy = checkpoint;
125 0 : pg_control.state = DBState_DB_SHUTDOWNED;
126 0 :
127 0 : Ok((pg_control.encode(), pg_control.system_identifier))
128 0 : }
129 :
130 18 : pub fn get_current_timestamp() -> TimestampTz {
131 18 : to_pg_timestamp(SystemTime::now())
132 18 : }
133 :
134 : // Module to reduce the scope of the constants
135 : mod timestamp_conversions {
136 : use std::time::Duration;
137 :
138 : use super::*;
139 :
140 : const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
141 : const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
142 : const SECS_PER_DAY: u64 = 86400;
143 : const USECS_PER_SEC: u64 = 1000000;
144 : const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
145 : (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
146 :
147 54 : pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
148 54 : match time.duration_since(SystemTime::UNIX_EPOCH) {
149 54 : Ok(n) => {
150 54 : ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
151 54 : + n.subsec_micros() as u64) as i64
152 : }
153 0 : Err(_) => panic!("SystemTime before UNIX EPOCH!"),
154 : }
155 54 : }
156 :
157 36 : pub fn from_pg_timestamp(time: TimestampTz) -> SystemTime {
158 36 : let time: u64 = time
159 36 : .try_into()
160 36 : .expect("timestamp before millenium (postgres epoch)");
161 36 : let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
162 36 : SystemTime::UNIX_EPOCH
163 36 : .checked_add(Duration::from_micros(since_unix_epoch))
164 36 : .expect("SystemTime overflow")
165 36 : }
166 : }
167 :
168 : pub use timestamp_conversions::{from_pg_timestamp, to_pg_timestamp};
169 :
170 : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
171 : // start_lsn must point to some previously known record boundary (beginning of
172 : // the next record). If no valid record after is found, start_lsn is returned
173 : // back.
174 144 : pub fn find_end_of_wal(
175 144 : data_dir: &Path,
176 144 : wal_seg_size: usize,
177 144 : start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
178 144 : ) -> anyhow::Result<Lsn> {
179 144 : let mut result = start_lsn;
180 144 : let mut curr_lsn = start_lsn;
181 144 : let mut buf = [0u8; XLOG_BLCKSZ];
182 144 : let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
183 144 : debug!("find_end_of_wal PG_VERSION: {}", pg_version);
184 :
185 144 : let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
186 :
187 : // loop over segments
188 180 : loop {
189 180 : let segno = curr_lsn.segment_number(wal_seg_size);
190 180 : let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
191 180 : let seg_file_path = data_dir.join(seg_file_name);
192 180 : match open_wal_segment(&seg_file_path)? {
193 : None => {
194 : // no more segments
195 0 : debug!(
196 0 : "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
197 : result, seg_file_path
198 : );
199 0 : return Ok(result);
200 : }
201 180 : Some(mut segment) => {
202 180 : let seg_offs = curr_lsn.segment_offset(wal_seg_size);
203 180 : segment.seek(SeekFrom::Start(seg_offs as u64))?;
204 : // loop inside segment
205 74268 : while curr_lsn.segment_number(wal_seg_size) == segno {
206 74232 : let bytes_read = segment.read(&mut buf)?;
207 74232 : if bytes_read == 0 {
208 0 : debug!(
209 0 : "find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}",
210 0 : result,
211 0 : seg_file_path,
212 0 : curr_lsn.segment_offset(wal_seg_size)
213 : );
214 0 : return Ok(result);
215 74232 : }
216 74232 : curr_lsn += bytes_read as u64;
217 74232 : decoder.feed_bytes(&buf[0..bytes_read]);
218 :
219 : // advance result past all completely read records
220 : loop {
221 74988 : match decoder.poll_decode() {
222 756 : Ok(Some(record)) => result = record.0,
223 144 : Err(e) => {
224 144 : debug!(
225 144 : "find_end_of_wal reached end at {:?}, decode error: {:?}",
226 : result, e
227 : );
228 144 : return Ok(result);
229 : }
230 74088 : Ok(None) => break, // need more data
231 : }
232 : }
233 : }
234 : }
235 : }
236 : }
237 144 : }
238 :
239 : // Open .partial or full WAL segment file, if present.
240 180 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
241 180 : let mut partial_path = seg_file_path.to_owned();
242 180 : partial_path.set_extension("partial");
243 180 : match File::open(partial_path) {
244 144 : Ok(file) => Ok(Some(file)),
245 36 : Err(e) => match e.kind() {
246 : ErrorKind::NotFound => {
247 : // .partial not found, try full
248 36 : match File::open(seg_file_path) {
249 36 : Ok(file) => Ok(Some(file)),
250 0 : Err(e) => match e.kind() {
251 0 : ErrorKind::NotFound => Ok(None),
252 0 : _ => Err(e.into()),
253 : },
254 : }
255 : }
256 0 : _ => Err(e.into()),
257 : },
258 : }
259 180 : }
260 :
261 0 : pub fn main() {
262 0 : let mut data_dir = PathBuf::new();
263 0 : data_dir.push(".");
264 0 : let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap();
265 0 : println!("wal_end={:?}", wal_end);
266 0 : }
267 :
268 : impl XLogRecord {
269 675659 : pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
270 675659 : use utils::bin_ser::LeSer;
271 675659 : XLogRecord::des(buf)
272 675659 : }
273 :
274 437556 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
275 437556 : use utils::bin_ser::LeSer;
276 437556 : XLogRecord::des_from(&mut buf.reader())
277 437556 : }
278 :
279 199814 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
280 199814 : use utils::bin_ser::LeSer;
281 199814 : Ok(self.ser()?.into())
282 199814 : }
283 :
284 : // Is this record an XLOG_SWITCH record? They need some special processing,
285 675659 : pub fn is_xlog_switch_record(&self) -> bool {
286 675659 : self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
287 675659 : }
288 : }
289 :
290 : impl XLogPageHeaderData {
291 83926 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
292 83926 : use utils::bin_ser::LeSer;
293 83926 : XLogPageHeaderData::des_from(&mut buf.reader())
294 83926 : }
295 :
296 549 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
297 549 : use utils::bin_ser::LeSer;
298 549 : self.ser().map(|b| b.into())
299 549 : }
300 : }
301 :
302 : impl XLogLongPageHeaderData {
303 36 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
304 36 : use utils::bin_ser::LeSer;
305 36 : XLogLongPageHeaderData::des_from(&mut buf.reader())
306 36 : }
307 :
308 0 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
309 0 : use utils::bin_ser::LeSer;
310 0 : self.ser().map(|b| b.into())
311 0 : }
312 : }
313 :
314 : pub const SIZEOF_CHECKPOINT: usize = size_of::<CheckPoint>();
315 :
316 : impl CheckPoint {
317 24 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
318 24 : use utils::bin_ser::LeSer;
319 24 : Ok(self.ser()?.into())
320 24 : }
321 :
322 78 : pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
323 78 : use utils::bin_ser::LeSer;
324 78 : CheckPoint::des(buf)
325 78 : }
326 :
327 : /// Update next XID based on provided new_xid and stored epoch.
328 : /// Next XID should be greater than new_xid. This handles 32-bit
329 : /// XID wraparound correctly.
330 : ///
331 : /// Returns 'true' if the XID was updated.
332 437574 : pub fn update_next_xid(&mut self, xid: u32) -> bool {
333 437574 : // nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
334 437574 : let mut new_xid = std::cmp::max(
335 437574 : xid.wrapping_add(1),
336 437574 : pg_constants::FIRST_NORMAL_TRANSACTION_ID,
337 437574 : );
338 437574 : // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
339 437574 : // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
340 437574 : new_xid =
341 437574 : new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
342 437574 : let full_xid = self.nextXid.value;
343 437574 : let old_xid = full_xid as u32;
344 437574 : if new_xid.wrapping_sub(old_xid) as i32 > 0 {
345 42 : let mut epoch = full_xid >> 32;
346 42 : if new_xid < old_xid {
347 0 : // wrap-around
348 0 : epoch += 1;
349 42 : }
350 42 : let nextXid = (epoch << 32) | new_xid as u64;
351 42 :
352 42 : if nextXid != self.nextXid.value {
353 42 : self.nextXid = FullTransactionId { value: nextXid };
354 42 : return true;
355 0 : }
356 437532 : }
357 437532 : false
358 437574 : }
359 :
360 : /// Advance next multi-XID/offset to those given in arguments.
361 : ///
362 : /// It's important that this handles wraparound correctly. This should match the
363 : /// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
364 : ///
365 : /// Returns 'true' if the Checkpoint was updated.
366 108 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
367 108 : let mut modified = false;
368 108 :
369 108 : if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
370 72 : self.nextMulti = multi_xid;
371 72 : modified = true;
372 72 : }
373 :
374 108 : if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
375 90 : self.nextMultiOffset = multi_offset;
376 90 : modified = true;
377 90 : }
378 :
379 108 : modified
380 108 : }
381 : }
382 :
383 : /// Generate new, empty WAL segment, with correct block headers at the first
384 : /// page of the segment and the page that contains the given LSN.
385 : /// We need this segment to start compute node.
386 0 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
387 0 : let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
388 0 :
389 0 : let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
390 0 :
391 0 : let page_off = lsn.block_offset();
392 0 : let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
393 0 :
394 0 : let first_page_only = seg_off < XLOG_BLCKSZ;
395 : // If first records starts in the middle of the page, pretend in page header
396 : // there is a fake record which ends where first real record starts. This
397 : // makes pg_waldump etc happy.
398 0 : let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
399 0 : assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
400 : // xlp_rem_len doesn't include page header, hence the subtraction.
401 0 : (
402 0 : seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
403 0 : pg_constants::XLP_FIRST_IS_CONTRECORD,
404 0 : )
405 : } else {
406 0 : (0, 0)
407 : };
408 :
409 0 : let hdr = XLogLongPageHeaderData {
410 0 : std: {
411 0 : XLogPageHeaderData {
412 0 : xlp_magic: XLOG_PAGE_MAGIC as u16,
413 0 : xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
414 0 : xlp_tli: PG_TLI,
415 0 : xlp_pageaddr: pageaddr,
416 0 : xlp_rem_len: shdr_rem_len as u32,
417 0 : ..Default::default() // Put 0 in padding fields.
418 0 : }
419 0 : },
420 0 : xlp_sysid: system_id,
421 0 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
422 0 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
423 0 : };
424 :
425 0 : let hdr_bytes = hdr.encode()?;
426 0 : seg_buf.extend_from_slice(&hdr_bytes);
427 0 :
428 0 : //zero out the rest of the file
429 0 : seg_buf.resize(WAL_SEGMENT_SIZE, 0);
430 0 :
431 0 : if !first_page_only {
432 0 : let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
433 : // see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
434 0 : let (xlp_rem_len, xlp_info) = if page_off > 0 {
435 0 : assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
436 0 : (
437 0 : (page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
438 0 : pg_constants::XLP_FIRST_IS_CONTRECORD,
439 0 : )
440 : } else {
441 0 : (0, 0)
442 : };
443 0 : let header = XLogPageHeaderData {
444 0 : xlp_magic: XLOG_PAGE_MAGIC as u16,
445 0 : xlp_info,
446 0 : xlp_tli: PG_TLI,
447 0 : xlp_pageaddr: lsn.page_lsn().0,
448 0 : xlp_rem_len,
449 0 : ..Default::default() // Put 0 in padding fields.
450 0 : };
451 0 : let hdr_bytes = header.encode()?;
452 :
453 0 : debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
454 0 : debug_assert_ne!(block_offset, 0);
455 :
456 0 : seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
457 0 : }
458 :
459 0 : Ok(seg_buf.freeze())
460 0 : }
461 :
462 : #[repr(C)]
463 : #[derive(Serialize)]
464 : pub struct XlLogicalMessage {
465 : pub db_id: Oid,
466 : pub transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
467 : pub prefix_size: uint64,
468 : pub message_size: uint64,
469 : }
470 :
471 : impl XlLogicalMessage {
472 99907 : pub fn encode(&self) -> Bytes {
473 99907 : use utils::bin_ser::LeSer;
474 99907 : self.ser().unwrap().into()
475 99907 : }
476 : }
477 :
478 : /// Create new WAL record for non-transactional logical message.
479 : /// Used for creating artificial WAL for tests, as LogicalMessage
480 : /// record is basically no-op.
481 : ///
482 : /// NOTE: This leaves the xl_prev field zero. The safekeeper and
483 : /// pageserver tolerate that, but PostgreSQL does not.
484 18 : pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
485 18 : let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
486 18 : prefix_bytes.write_all(prefix.as_bytes()).unwrap();
487 18 : prefix_bytes.push(0);
488 18 :
489 18 : let message_bytes = message.as_bytes();
490 18 :
491 18 : let logical_message = XlLogicalMessage {
492 18 : db_id: 0,
493 18 : transactional: 0,
494 18 : prefix_size: prefix_bytes.len() as u64,
495 18 : message_size: message_bytes.len() as u64,
496 18 : };
497 18 :
498 18 : let mainrdata = logical_message.encode();
499 18 : let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
500 18 : // only short mainrdata is supported for now
501 18 : assert!(mainrdata_len <= 255);
502 18 : let mainrdata_len = mainrdata_len as u8;
503 18 :
504 18 : let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
505 18 : data.extend_from_slice(&mainrdata);
506 18 : data.extend_from_slice(&prefix_bytes);
507 18 : data.extend_from_slice(message_bytes);
508 18 :
509 18 : let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
510 18 :
511 18 : let mut header = XLogRecord {
512 18 : xl_tot_len: total_len as u32,
513 18 : xl_xid: 0,
514 18 : xl_prev: 0,
515 18 : xl_info: 0,
516 18 : xl_rmid: 21,
517 18 : __bindgen_padding_0: [0u8; 2usize],
518 18 : xl_crc: 0, // crc will be calculated later
519 18 : };
520 18 :
521 18 : let header_bytes = header.encode().expect("failed to encode header");
522 18 : let crc = crc32c_append(0, &data);
523 18 : let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
524 18 : header.xl_crc = crc;
525 18 :
526 18 : let mut wal: Vec<u8> = Vec::new();
527 18 : wal.extend_from_slice(&header.encode().expect("failed to encode header"));
528 18 : wal.extend_from_slice(&data);
529 18 :
530 18 : // WAL start position must be aligned at 8 bytes,
531 18 : // this will add padding for the next WAL record.
532 18 : const PADDING: usize = 8;
533 18 : let padding_rem = wal.len() % PADDING;
534 18 : if padding_rem != 0 {
535 0 : wal.resize(wal.len() + PADDING - padding_rem, 0);
536 18 : }
537 :
538 18 : wal
539 18 : }
540 :
541 : #[cfg(test)]
542 : mod tests {
543 : use super::*;
544 :
545 : #[test]
546 18 : fn test_ts_conversion() {
547 18 : let now = SystemTime::now();
548 18 : let round_trip = from_pg_timestamp(to_pg_timestamp(now));
549 18 :
550 18 : let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
551 18 : let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
552 18 : assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
553 :
554 18 : let now_pg = get_current_timestamp();
555 18 : let round_trip_pg = to_pg_timestamp(from_pg_timestamp(now_pg));
556 18 :
557 18 : assert_eq!(now_pg, round_trip_pg);
558 18 : }
559 :
560 : // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
561 : }
|