Line data Source code
1 : //
2 : // This file contains common utilities for dealing with PostgreSQL WAL files and
3 : // LSNs.
4 : //
5 : // Many of these functions have been copied from PostgreSQL, and rewritten in
6 : // Rust. That's why they don't follow the usual Rust naming conventions, they
7 : // have been named the same as the corresponding PostgreSQL functions instead.
8 : //
9 :
10 : use crc32c::crc32c_append;
11 :
12 : use super::super::waldecoder::WalStreamDecoder;
13 : use super::bindings::{
14 : CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
15 : XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
16 : };
17 : use super::PG_MAJORVERSION;
18 : use crate::pg_constants;
19 : use crate::PG_TLI;
20 : use crate::{uint32, uint64, Oid};
21 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
22 :
23 : use bytes::BytesMut;
24 : use bytes::{Buf, Bytes};
25 :
26 : use log::*;
27 :
28 : use serde::Serialize;
29 : use std::ffi::OsStr;
30 : use std::fs::File;
31 : use std::io::prelude::*;
32 : use std::io::ErrorKind;
33 : use std::io::SeekFrom;
34 : use std::path::Path;
35 : use std::time::SystemTime;
36 : use utils::bin_ser::DeserializeError;
37 : use utils::bin_ser::SerializeError;
38 :
39 : use utils::lsn::Lsn;
40 :
41 : pub const XLOG_FNAME_LEN: usize = 24;
42 : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
43 : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
44 : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
45 :
46 : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
47 : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
48 : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
49 : #[allow(clippy::identity_op)]
50 : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
51 :
52 : /// Interval of checkpointing metadata file. We should store metadata file to enforce
53 : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
54 : /// But flushing checkpoint file for each transaction seems to be too expensive,
55 : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
56 : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
57 : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
58 : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
59 : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
60 :
61 148 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
62 148 : (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
63 148 : }
64 :
65 33852 : pub fn XLogSegNoOffsetToRecPtr(
66 33852 : segno: XLogSegNo,
67 33852 : offset: u32,
68 33852 : wal_segsz_bytes: usize,
69 33852 : ) -> XLogRecPtr {
70 33852 : segno * (wal_segsz_bytes as u64) + (offset as u64)
71 33852 : }
72 :
73 46 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
74 46 : format!(
75 46 : "{:>08X}{:>08X}{:>08X}",
76 46 : tli,
77 46 : logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
78 46 : logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
79 46 : )
80 46 : }
81 :
82 56 : pub fn XLogFromFileName(
83 56 : fname: &OsStr,
84 56 : wal_seg_size: usize,
85 56 : ) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
86 56 : if let Some(fname_str) = fname.to_str() {
87 56 : let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
88 56 : let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
89 56 : let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
90 56 : Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
91 : } else {
92 0 : anyhow::bail!("non-ut8 filename: {:?}", fname);
93 : }
94 56 : }
95 :
96 131 : pub fn IsXLogFileName(fname: &OsStr) -> bool {
97 131 : if let Some(fname) = fname.to_str() {
98 1824 : fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
99 : } else {
100 0 : false
101 : }
102 131 : }
103 :
104 0 : pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
105 0 : if let Some(fname) = fname.to_str() {
106 0 : fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
107 : } else {
108 0 : false
109 : }
110 0 : }
111 :
112 : /// If LSN points to the beginning of the page, then shift it to first record,
113 : /// otherwise align on 8-bytes boundary (required for WAL records)
114 0 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
115 0 : if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
116 0 : let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
117 0 : XLOG_SIZE_OF_XLOG_LONG_PHD
118 : } else {
119 0 : XLOG_SIZE_OF_XLOG_SHORT_PHD
120 : };
121 0 : lsn + hdr_size as u64
122 : } else {
123 0 : lsn.align()
124 : }
125 0 : }
126 :
127 0 : pub fn generate_pg_control(
128 0 : pg_control_bytes: &[u8],
129 0 : checkpoint_bytes: &[u8],
130 0 : lsn: Lsn,
131 0 : ) -> anyhow::Result<(Bytes, u64)> {
132 0 : let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
133 0 : let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
134 :
135 : // Generate new pg_control needed for bootstrap
136 0 : checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
137 0 :
138 0 : //save new values in pg_control
139 0 : pg_control.checkPoint = 0;
140 0 : pg_control.checkPointCopy = checkpoint;
141 0 : pg_control.state = DBState_DB_SHUTDOWNED;
142 0 :
143 0 : Ok((pg_control.encode(), pg_control.system_identifier))
144 0 : }
145 :
146 4 : pub fn get_current_timestamp() -> TimestampTz {
147 4 : to_pg_timestamp(SystemTime::now())
148 4 : }
149 :
150 : // Module to reduce the scope of the constants
151 : mod timestamp_conversions {
152 : use std::time::Duration;
153 :
154 : use anyhow::Context;
155 :
156 : use super::*;
157 :
158 : const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
159 : const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
160 : const SECS_PER_DAY: u64 = 86400;
161 : const USECS_PER_SEC: u64 = 1000000;
162 : const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
163 : (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
164 :
165 12 : pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
166 12 : match time.duration_since(SystemTime::UNIX_EPOCH) {
167 12 : Ok(n) => {
168 12 : ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
169 12 : + n.subsec_micros() as u64) as i64
170 : }
171 0 : Err(_) => panic!("SystemTime before UNIX EPOCH!"),
172 : }
173 12 : }
174 :
175 32 : pub fn try_from_pg_timestamp(time: TimestampTz) -> anyhow::Result<SystemTime> {
176 32 : let time: u64 = time
177 32 : .try_into()
178 32 : .context("timestamp before millenium (postgres epoch)")?;
179 32 : let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
180 32 : SystemTime::UNIX_EPOCH
181 32 : .checked_add(Duration::from_micros(since_unix_epoch))
182 32 : .context("SystemTime overflow")
183 32 : }
184 : }
185 :
186 : pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp};
187 :
188 : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
189 : // start_lsn must point to some previously known record boundary (beginning of
190 : // the next record). If no valid record after is found, start_lsn is returned
191 : // back.
192 32 : pub fn find_end_of_wal(
193 32 : data_dir: &Path,
194 32 : wal_seg_size: usize,
195 32 : start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
196 32 : ) -> anyhow::Result<Lsn> {
197 32 : let mut result = start_lsn;
198 32 : let mut curr_lsn = start_lsn;
199 32 : let mut buf = [0u8; XLOG_BLCKSZ];
200 32 : let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
201 32 : debug!("find_end_of_wal PG_VERSION: {}", pg_version);
202 :
203 32 : let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
204 :
205 : // loop over segments
206 : loop {
207 40 : let segno = curr_lsn.segment_number(wal_seg_size);
208 40 : let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
209 40 : let seg_file_path = data_dir.join(seg_file_name);
210 40 : match open_wal_segment(&seg_file_path)? {
211 : None => {
212 : // no more segments
213 0 : debug!(
214 0 : "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
215 : result, seg_file_path
216 : );
217 0 : return Ok(result);
218 : }
219 40 : Some(mut segment) => {
220 40 : let seg_offs = curr_lsn.segment_offset(wal_seg_size);
221 40 : segment.seek(SeekFrom::Start(seg_offs as u64))?;
222 : // loop inside segment
223 16504 : while curr_lsn.segment_number(wal_seg_size) == segno {
224 16496 : let bytes_read = segment.read(&mut buf)?;
225 16496 : if bytes_read == 0 {
226 0 : debug!(
227 0 : "find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}",
228 0 : result,
229 0 : seg_file_path,
230 0 : curr_lsn.segment_offset(wal_seg_size)
231 : );
232 0 : return Ok(result);
233 16496 : }
234 16496 : curr_lsn += bytes_read as u64;
235 16496 : decoder.feed_bytes(&buf[0..bytes_read]);
236 :
237 : // advance result past all completely read records
238 : loop {
239 16664 : match decoder.poll_decode() {
240 168 : Ok(Some(record)) => result = record.0,
241 32 : Err(e) => {
242 32 : debug!(
243 32 : "find_end_of_wal reached end at {:?}, decode error: {:?}",
244 : result, e
245 : );
246 32 : return Ok(result);
247 : }
248 16464 : Ok(None) => break, // need more data
249 : }
250 : }
251 : }
252 : }
253 : }
254 : }
255 32 : }
256 :
257 : // Open .partial or full WAL segment file, if present.
258 40 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
259 40 : let mut partial_path = seg_file_path.to_owned();
260 40 : partial_path.set_extension("partial");
261 40 : match File::open(partial_path) {
262 32 : Ok(file) => Ok(Some(file)),
263 8 : Err(e) => match e.kind() {
264 : ErrorKind::NotFound => {
265 : // .partial not found, try full
266 8 : match File::open(seg_file_path) {
267 8 : Ok(file) => Ok(Some(file)),
268 0 : Err(e) => match e.kind() {
269 0 : ErrorKind::NotFound => Ok(None),
270 0 : _ => Err(e.into()),
271 : },
272 : }
273 : }
274 0 : _ => Err(e.into()),
275 : },
276 : }
277 40 : }
278 :
279 : impl XLogRecord {
280 475950 : pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
281 : use utils::bin_ser::LeSer;
282 475950 : XLogRecord::des(buf)
283 475950 : }
284 :
285 437556 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
286 : use utils::bin_ser::LeSer;
287 437556 : XLogRecord::des_from(&mut buf.reader())
288 437556 : }
289 :
290 33804 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
291 : use utils::bin_ser::LeSer;
292 33804 : Ok(self.ser()?.into())
293 33804 : }
294 :
295 : // Is this record an XLOG_SWITCH record? They need some special processing,
296 475950 : pub fn is_xlog_switch_record(&self) -> bool {
297 475950 : self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
298 475950 : }
299 : }
300 :
301 : impl XLogPageHeaderData {
302 25326 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
303 : use utils::bin_ser::LeSer;
304 25326 : XLogPageHeaderData::des_from(&mut buf.reader())
305 25326 : }
306 :
307 86 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
308 : use utils::bin_ser::LeSer;
309 86 : self.ser().map(|b| b.into())
310 86 : }
311 : }
312 :
313 : impl XLogLongPageHeaderData {
314 8 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
315 : use utils::bin_ser::LeSer;
316 8 : XLogLongPageHeaderData::des_from(&mut buf.reader())
317 8 : }
318 :
319 0 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
320 : use utils::bin_ser::LeSer;
321 0 : self.ser().map(|b| b.into())
322 0 : }
323 : }
324 :
325 : pub const SIZEOF_CHECKPOINT: usize = size_of::<CheckPoint>();
326 :
327 : impl CheckPoint {
328 24 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
329 : use utils::bin_ser::LeSer;
330 24 : Ok(self.ser()?.into())
331 24 : }
332 :
333 68 : pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
334 : use utils::bin_ser::LeSer;
335 68 : CheckPoint::des(buf)
336 68 : }
337 :
338 : /// Update next XID based on provided new_xid and stored epoch.
339 : /// Next XID should be greater than new_xid. This handles 32-bit
340 : /// XID wraparound correctly.
341 : ///
342 : /// Returns 'true' if the XID was updated.
343 437518 : pub fn update_next_xid(&mut self, xid: u32) -> bool {
344 437518 : // nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
345 437518 : let mut new_xid = std::cmp::max(
346 437518 : xid.wrapping_add(1),
347 437518 : pg_constants::FIRST_NORMAL_TRANSACTION_ID,
348 437518 : );
349 437518 : // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
350 437518 : // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
351 437518 : new_xid =
352 437518 : new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
353 437518 : let full_xid = self.nextXid.value;
354 437518 : let old_xid = full_xid as u32;
355 437518 : if new_xid.wrapping_sub(old_xid) as i32 > 0 {
356 14 : let mut epoch = full_xid >> 32;
357 14 : if new_xid < old_xid {
358 0 : // wrap-around
359 0 : epoch += 1;
360 14 : }
361 14 : let nextXid = (epoch << 32) | new_xid as u64;
362 14 :
363 14 : if nextXid != self.nextXid.value {
364 14 : self.nextXid = FullTransactionId { value: nextXid };
365 14 : return true;
366 0 : }
367 437504 : }
368 437504 : false
369 437518 : }
370 :
371 : /// Advance next multi-XID/offset to those given in arguments.
372 : ///
373 : /// It's important that this handles wraparound correctly. This should match the
374 : /// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
375 : ///
376 : /// Returns 'true' if the Checkpoint was updated.
377 24 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
378 24 : let mut modified = false;
379 24 :
380 24 : if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
381 16 : self.nextMulti = multi_xid;
382 16 : modified = true;
383 16 : }
384 :
385 24 : if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
386 20 : self.nextMultiOffset = multi_offset;
387 20 : modified = true;
388 20 : }
389 :
390 24 : modified
391 24 : }
392 : }
393 :
394 : /// Generate new, empty WAL segment, with correct block headers at the first
395 : /// page of the segment and the page that contains the given LSN.
396 : /// We need this segment to start compute node.
397 0 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
398 0 : let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
399 0 :
400 0 : let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
401 0 :
402 0 : let page_off = lsn.block_offset();
403 0 : let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
404 0 :
405 0 : let first_page_only = seg_off < XLOG_BLCKSZ;
406 : // If first records starts in the middle of the page, pretend in page header
407 : // there is a fake record which ends where first real record starts. This
408 : // makes pg_waldump etc happy.
409 0 : let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
410 0 : assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
411 : // xlp_rem_len doesn't include page header, hence the subtraction.
412 0 : (
413 0 : seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
414 0 : pg_constants::XLP_FIRST_IS_CONTRECORD,
415 0 : )
416 : } else {
417 0 : (0, 0)
418 : };
419 :
420 0 : let hdr = XLogLongPageHeaderData {
421 0 : std: {
422 0 : XLogPageHeaderData {
423 0 : xlp_magic: XLOG_PAGE_MAGIC as u16,
424 0 : xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
425 0 : xlp_tli: PG_TLI,
426 0 : xlp_pageaddr: pageaddr,
427 0 : xlp_rem_len: shdr_rem_len as u32,
428 0 : ..Default::default() // Put 0 in padding fields.
429 0 : }
430 0 : },
431 0 : xlp_sysid: system_id,
432 0 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
433 0 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
434 0 : };
435 :
436 0 : let hdr_bytes = hdr.encode()?;
437 0 : seg_buf.extend_from_slice(&hdr_bytes);
438 0 :
439 0 : //zero out the rest of the file
440 0 : seg_buf.resize(WAL_SEGMENT_SIZE, 0);
441 0 :
442 0 : if !first_page_only {
443 0 : let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
444 : // see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
445 0 : let (xlp_rem_len, xlp_info) = if page_off > 0 {
446 0 : assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
447 0 : (
448 0 : (page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
449 0 : pg_constants::XLP_FIRST_IS_CONTRECORD,
450 0 : )
451 : } else {
452 0 : (0, 0)
453 : };
454 0 : let header = XLogPageHeaderData {
455 0 : xlp_magic: XLOG_PAGE_MAGIC as u16,
456 0 : xlp_info,
457 0 : xlp_tli: PG_TLI,
458 0 : xlp_pageaddr: lsn.page_lsn().0,
459 0 : xlp_rem_len,
460 0 : ..Default::default() // Put 0 in padding fields.
461 0 : };
462 0 : let hdr_bytes = header.encode()?;
463 :
464 0 : debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
465 0 : debug_assert_ne!(block_offset, 0);
466 :
467 0 : seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
468 0 : }
469 :
470 0 : Ok(seg_buf.freeze())
471 0 : }
472 :
473 : #[repr(C)]
474 : #[derive(Serialize)]
475 : pub struct XlLogicalMessage {
476 : pub db_id: Oid,
477 : pub transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
478 : pub prefix_size: uint64,
479 : pub message_size: uint64,
480 : }
481 :
482 : impl XlLogicalMessage {
483 16902 : pub fn encode(&self) -> Bytes {
484 : use utils::bin_ser::LeSer;
485 16902 : self.ser().unwrap().into()
486 16902 : }
487 : }
488 :
489 : /// Create new WAL record for non-transactional logical message.
490 : /// Used for creating artificial WAL for tests, as LogicalMessage
491 : /// record is basically no-op.
492 : ///
493 : /// NOTE: This leaves the xl_prev field zero. The safekeeper and
494 : /// pageserver tolerate that, but PostgreSQL does not.
495 4 : pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
496 4 : let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
497 4 : prefix_bytes.write_all(prefix.as_bytes()).unwrap();
498 4 : prefix_bytes.push(0);
499 4 :
500 4 : let message_bytes = message.as_bytes();
501 4 :
502 4 : let logical_message = XlLogicalMessage {
503 4 : db_id: 0,
504 4 : transactional: 0,
505 4 : prefix_size: prefix_bytes.len() as u64,
506 4 : message_size: message_bytes.len() as u64,
507 4 : };
508 4 :
509 4 : let mainrdata = logical_message.encode();
510 4 : let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
511 4 : // only short mainrdata is supported for now
512 4 : assert!(mainrdata_len <= 255);
513 4 : let mainrdata_len = mainrdata_len as u8;
514 4 :
515 4 : let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
516 4 : data.extend_from_slice(&mainrdata);
517 4 : data.extend_from_slice(&prefix_bytes);
518 4 : data.extend_from_slice(message_bytes);
519 4 :
520 4 : let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
521 4 :
522 4 : let mut header = XLogRecord {
523 4 : xl_tot_len: total_len as u32,
524 4 : xl_xid: 0,
525 4 : xl_prev: 0,
526 4 : xl_info: 0,
527 4 : xl_rmid: 21,
528 4 : __bindgen_padding_0: [0u8; 2usize],
529 4 : xl_crc: 0, // crc will be calculated later
530 4 : };
531 4 :
532 4 : let header_bytes = header.encode().expect("failed to encode header");
533 4 : let crc = crc32c_append(0, &data);
534 4 : let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
535 4 : header.xl_crc = crc;
536 4 :
537 4 : let mut wal: Vec<u8> = Vec::new();
538 4 : wal.extend_from_slice(&header.encode().expect("failed to encode header"));
539 4 : wal.extend_from_slice(&data);
540 :
541 : // WAL start position must be aligned at 8 bytes,
542 : // this will add padding for the next WAL record.
543 : const PADDING: usize = 8;
544 4 : let padding_rem = wal.len() % PADDING;
545 4 : if padding_rem != 0 {
546 0 : wal.resize(wal.len() + PADDING - padding_rem, 0);
547 4 : }
548 :
549 4 : wal
550 4 : }
551 :
552 : #[cfg(test)]
553 : mod tests {
554 : use super::*;
555 :
556 : #[test]
557 4 : fn test_ts_conversion() {
558 4 : let now = SystemTime::now();
559 4 : let round_trip = try_from_pg_timestamp(to_pg_timestamp(now)).unwrap();
560 4 :
561 4 : let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
562 4 : let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
563 4 : assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
564 :
565 4 : let now_pg = get_current_timestamp();
566 4 : let round_trip_pg = to_pg_timestamp(try_from_pg_timestamp(now_pg).unwrap());
567 4 :
568 4 : assert_eq!(now_pg, round_trip_pg);
569 4 : }
570 :
571 : // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
572 : }
|