Line data Source code
1 : //
2 : // This file contains common utilities for dealing with PostgreSQL WAL files and
3 : // LSNs.
4 : //
5 : // Many of these functions have been copied from PostgreSQL, and rewritten in
6 : // Rust. That's why they don't follow the usual Rust naming conventions, they
7 : // have been named the same as the corresponding PostgreSQL functions instead.
8 : //
9 :
10 : use super::super::waldecoder::WalStreamDecoder;
11 : use super::bindings::{
12 : CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID,
13 : XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
14 : MY_PGVERSION
15 : };
16 : use postgres_ffi_types::TimestampTz;
17 : use super::wal_generator::LogicalMessageGenerator;
18 : use crate::pg_constants;
19 : use crate::PG_TLI;
20 : use crate::{uint32, uint64, Oid};
21 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
22 :
23 : use bytes::BytesMut;
24 : use bytes::{Buf, Bytes};
25 :
26 : use log::*;
27 :
28 : use serde::Serialize;
29 : use std::ffi::{CString, OsStr};
30 : use std::fs::File;
31 : use std::io::prelude::*;
32 : use std::io::ErrorKind;
33 : use std::io::SeekFrom;
34 : use std::path::Path;
35 : use std::time::SystemTime;
36 : use utils::bin_ser::DeserializeError;
37 : use utils::bin_ser::SerializeError;
38 :
39 : use utils::lsn::Lsn;
40 :
41 : pub const XLOG_FNAME_LEN: usize = 24;
42 : pub const XLP_BKP_REMOVABLE: u16 = 0x0004;
43 : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
44 : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
45 : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
46 :
47 : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
48 : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
49 : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
50 : #[allow(clippy::identity_op)]
51 : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
52 :
53 : /// Interval of checkpointing metadata file. We should store metadata file to enforce
54 : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
55 : /// But flushing checkpoint file for each transaction seems to be too expensive,
56 : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
57 : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
58 : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
59 : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
60 : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
61 :
62 209 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
63 209 : (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
64 209 : }
65 :
66 61 : pub fn XLogSegNoOffsetToRecPtr(
67 61 : segno: XLogSegNo,
68 61 : offset: u32,
69 61 : wal_segsz_bytes: usize,
70 61 : ) -> XLogRecPtr {
71 61 : segno * (wal_segsz_bytes as u64) + (offset as u64)
72 61 : }
73 :
74 74 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
75 74 : format!(
76 74 : "{:>08X}{:>08X}{:>08X}",
77 : tli,
78 74 : logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
79 74 : logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
80 : )
81 74 : }
82 :
83 61 : pub fn XLogFromFileName(
84 61 : fname: &OsStr,
85 61 : wal_seg_size: usize,
86 61 : ) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
87 61 : if let Some(fname_str) = fname.to_str() {
88 61 : let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
89 61 : let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
90 61 : let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
91 61 : Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
92 : } else {
93 0 : anyhow::bail!("non-ut8 filename: {:?}", fname);
94 : }
95 61 : }
96 :
97 146 : pub fn IsXLogFileName(fname: &OsStr) -> bool {
98 146 : if let Some(fname) = fname.to_str() {
99 1944 : fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
100 : } else {
101 0 : false
102 : }
103 146 : }
104 :
105 10 : pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
106 10 : if let Some(fname) = fname.to_str() {
107 10 : fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
108 : } else {
109 0 : false
110 : }
111 10 : }
112 :
113 : /// If LSN points to the beginning of the page, then shift it to first record,
114 : /// otherwise align on 8-bytes boundary (required for WAL records)
115 7 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
116 7 : if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
117 0 : let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
118 0 : XLOG_SIZE_OF_XLOG_LONG_PHD
119 : } else {
120 0 : XLOG_SIZE_OF_XLOG_SHORT_PHD
121 : };
122 0 : lsn + hdr_size as u64
123 : } else {
124 7 : lsn.align()
125 : }
126 7 : }
127 :
128 : /// Generate a pg_control file, for a basebackup for starting up Postgres at the given LSN
129 : ///
130 : /// 'pg_control_bytes' and 'checkpoint_bytes' are the contents of those keys persisted in
131 : /// the pageserver. They use the same format as the PostgreSQL control file and the
132 : /// checkpoint record, but see walingest.rs for how exactly they are kept up to date.
133 : /// 'lsn' is the LSN at which we're starting up.
134 : ///
135 : /// Returns:
136 : /// - pg_control file contents
137 : /// - system_identifier, extracted from the persisted information
138 : /// - true, if we're starting up from a "clean shutdown", i.e. if there was a shutdown
139 : /// checkpoint at the given LSN
140 0 : pub fn generate_pg_control(
141 0 : pg_control_bytes: &[u8],
142 0 : checkpoint_bytes: &[u8],
143 0 : lsn: Lsn,
144 0 : ) -> anyhow::Result<(Bytes, u64, bool)> {
145 0 : let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
146 0 : let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
147 :
148 : // Generate new pg_control needed for bootstrap
149 : //
150 : // NB: In the checkpoint struct that we persist in the pageserver, we have a different
151 : // convention for the 'redo' field than in PostgreSQL: On a shutdown checkpoint,
152 : // 'redo' points the *end* of the checkpoint WAL record. On PostgreSQL, it points to
153 : // the beginning. Furthermore, on an online checkpoint, 'redo' is set to 0.
154 : //
155 : // We didn't always have this convention however, and old persisted records will have
156 : // old REDO values that point to some old LSN.
157 : //
158 : // The upshot is that if 'redo' is equal to the "current" LSN, there was a shutdown
159 : // checkpoint record at that point in WAL, with no new WAL records after it. That case
160 : // can be treated as starting from a clean shutdown. All other cases are treated as
161 : // non-clean shutdown. In Neon, we don't do WAL replay at startup in either case, so
162 : // that distinction doesn't matter very much. As of this writing, it only affects
163 : // whether the persisted pg_stats information can be used or not.
164 : //
165 : // In the Checkpoint struct in the returned pg_control file, the redo pointer is
166 : // always set to the LSN we're starting at, to hint that no WAL replay is required.
167 : // (There's some neon-specific code in Postgres startup to make that work, though.
168 : // Just setting the redo pointer is not sufficient.)
169 0 : let was_shutdown = Lsn(checkpoint.redo) == lsn;
170 0 : checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
171 :
172 : // We use DBState_DB_SHUTDOWNED even if it was not a clean shutdown. The
173 : // neon-specific code at postgres startup ignores the state stored in the control
174 : // file, similar to archive recovery in standalone PostgreSQL. Similarly, the
175 : // checkPoint pointer is ignored, so just set it to 0.
176 0 : pg_control.checkPoint = 0;
177 0 : pg_control.checkPointCopy = checkpoint;
178 0 : pg_control.state = DBState_DB_SHUTDOWNED;
179 :
180 0 : Ok((pg_control.encode(), pg_control.system_identifier, was_shutdown))
181 0 : }
182 :
183 4 : pub fn get_current_timestamp() -> TimestampTz {
184 4 : to_pg_timestamp(SystemTime::now())
185 4 : }
186 :
187 : // Module to reduce the scope of the constants
188 : mod timestamp_conversions {
189 : use std::time::Duration;
190 :
191 : use anyhow::Context;
192 :
193 : use super::*;
194 :
195 : const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
196 : const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
197 : const SECS_PER_DAY: u64 = 86400;
198 : const USECS_PER_SEC: u64 = 1000000;
199 : const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
200 : (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
201 :
202 12 : pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
203 12 : match time.duration_since(SystemTime::UNIX_EPOCH) {
204 12 : Ok(n) => {
205 12 : ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
206 12 : + n.subsec_micros() as u64) as i64
207 : }
208 0 : Err(_) => panic!("SystemTime before UNIX EPOCH!"),
209 : }
210 12 : }
211 :
212 12 : pub fn try_from_pg_timestamp(time: TimestampTz) -> anyhow::Result<SystemTime> {
213 12 : let time: u64 = time
214 12 : .try_into()
215 12 : .context("timestamp before millenium (postgres epoch)")?;
216 12 : let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
217 12 : SystemTime::UNIX_EPOCH
218 12 : .checked_add(Duration::from_micros(since_unix_epoch))
219 12 : .context("SystemTime overflow")
220 12 : }
221 : }
222 :
223 : pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp};
224 :
225 : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
226 : // start_lsn must point to some previously known record boundary (beginning of
227 : // the next record). If no valid record after is found, start_lsn is returned
228 : // back.
229 32 : pub fn find_end_of_wal(
230 32 : data_dir: &Path,
231 32 : wal_seg_size: usize,
232 32 : start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
233 32 : ) -> anyhow::Result<Lsn> {
234 32 : let mut result = start_lsn;
235 32 : let mut curr_lsn = start_lsn;
236 32 : let mut buf = [0u8; XLOG_BLCKSZ];
237 32 : let pg_version = MY_PGVERSION;
238 32 : debug!("find_end_of_wal PG_VERSION: {}", pg_version);
239 :
240 32 : let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
241 :
242 : // loop over segments
243 : loop {
244 40 : let segno = curr_lsn.segment_number(wal_seg_size);
245 40 : let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
246 40 : let seg_file_path = data_dir.join(seg_file_name);
247 40 : match open_wal_segment(&seg_file_path)? {
248 : None => {
249 : // no more segments
250 0 : debug!(
251 0 : "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
252 : result, seg_file_path
253 : );
254 0 : return Ok(result);
255 : }
256 40 : Some(mut segment) => {
257 40 : let seg_offs = curr_lsn.segment_offset(wal_seg_size);
258 40 : segment.seek(SeekFrom::Start(seg_offs as u64))?;
259 : // loop inside segment
260 16504 : while curr_lsn.segment_number(wal_seg_size) == segno {
261 16496 : let bytes_read = segment.read(&mut buf)?;
262 16496 : if bytes_read == 0 {
263 0 : debug!(
264 0 : "find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}",
265 : result,
266 : seg_file_path,
267 0 : curr_lsn.segment_offset(wal_seg_size)
268 : );
269 0 : return Ok(result);
270 16496 : }
271 16496 : curr_lsn += bytes_read as u64;
272 16496 : decoder.feed_bytes(&buf[0..bytes_read]);
273 :
274 : // advance result past all completely read records
275 : loop {
276 16664 : match decoder.poll_decode() {
277 168 : Ok(Some(record)) => result = record.0,
278 32 : Err(e) => {
279 32 : debug!(
280 32 : "find_end_of_wal reached end at {:?}, decode error: {:?}",
281 : result, e
282 : );
283 32 : return Ok(result);
284 : }
285 16464 : Ok(None) => break, // need more data
286 : }
287 : }
288 : }
289 : }
290 : }
291 : }
292 32 : }
293 :
294 : // Open .partial or full WAL segment file, if present.
295 40 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
296 40 : let mut partial_path = seg_file_path.to_owned();
297 40 : partial_path.set_extension("partial");
298 40 : match File::open(partial_path) {
299 32 : Ok(file) => Ok(Some(file)),
300 8 : Err(e) => match e.kind() {
301 : ErrorKind::NotFound => {
302 : // .partial not found, try full
303 8 : match File::open(seg_file_path) {
304 8 : Ok(file) => Ok(Some(file)),
305 0 : Err(e) => match e.kind() {
306 0 : ErrorKind::NotFound => Ok(None),
307 0 : _ => Err(e.into()),
308 : },
309 : }
310 : }
311 0 : _ => Err(e.into()),
312 : },
313 : }
314 40 : }
315 :
316 : impl XLogRecord {
317 97978 : pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
318 : use utils::bin_ser::LeSer;
319 97978 : XLogRecord::des(buf)
320 97978 : }
321 :
322 73532 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
323 : use utils::bin_ser::LeSer;
324 73532 : XLogRecord::des_from(&mut buf.reader())
325 73532 : }
326 :
327 23916 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
328 : use utils::bin_ser::LeSer;
329 23916 : Ok(self.ser()?.into())
330 23916 : }
331 :
332 : // Is this record an XLOG_SWITCH record? They need some special processing,
333 97978 : pub fn is_xlog_switch_record(&self) -> bool {
334 97978 : self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
335 97978 : }
336 : }
337 :
338 : impl XLogPageHeaderData {
339 19382 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
340 : use utils::bin_ser::LeSer;
341 19382 : XLogPageHeaderData::des_from(&mut buf.reader())
342 19382 : }
343 :
344 783 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
345 : use utils::bin_ser::LeSer;
346 783 : self.ser().map(|b| b.into())
347 783 : }
348 : }
349 :
350 : impl XLogLongPageHeaderData {
351 8 : pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
352 : use utils::bin_ser::LeSer;
353 8 : XLogLongPageHeaderData::des_from(&mut buf.reader())
354 8 : }
355 :
356 5 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
357 : use utils::bin_ser::LeSer;
358 5 : self.ser().map(|b| b.into())
359 5 : }
360 : }
361 :
362 : pub const SIZEOF_CHECKPOINT: usize = size_of::<CheckPoint>();
363 :
364 : impl CheckPoint {
365 4 : pub fn encode(&self) -> Result<Bytes, SerializeError> {
366 : use utils::bin_ser::LeSer;
367 4 : Ok(self.ser()?.into())
368 4 : }
369 :
370 19 : pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
371 : use utils::bin_ser::LeSer;
372 19 : CheckPoint::des(buf)
373 19 : }
374 :
375 : /// Update next XID based on provided new_xid and stored epoch.
376 : /// Next XID should be greater than new_xid. This handles 32-bit
377 : /// XID wraparound correctly.
378 : ///
379 : /// Returns 'true' if the XID was updated.
380 72933 : pub fn update_next_xid(&mut self, xid: u32) -> bool {
381 : // nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
382 72933 : let mut new_xid = std::cmp::max(
383 72933 : xid.wrapping_add(1),
384 : pg_constants::FIRST_NORMAL_TRANSACTION_ID,
385 : );
386 : // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
387 : // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
388 72933 : new_xid =
389 72933 : new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
390 72933 : let full_xid = self.nextXid.value;
391 72933 : let old_xid = full_xid as u32;
392 72933 : if new_xid.wrapping_sub(old_xid) as i32 > 0 {
393 9 : let mut epoch = full_xid >> 32;
394 9 : if new_xid < old_xid {
395 0 : // wrap-around
396 0 : epoch += 1;
397 9 : }
398 9 : let nextXid = (epoch << 32) | new_xid as u64;
399 :
400 9 : if nextXid != self.nextXid.value {
401 9 : self.nextXid = FullTransactionId { value: nextXid };
402 9 : return true;
403 0 : }
404 72924 : }
405 72924 : false
406 72933 : }
407 :
408 : /// Advance next multi-XID/offset to those given in arguments.
409 : ///
410 : /// It's important that this handles wraparound correctly. This should match the
411 : /// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
412 : ///
413 : /// Returns 'true' if the Checkpoint was updated.
414 24 : pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
415 24 : let mut modified = false;
416 :
417 24 : if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
418 16 : self.nextMulti = multi_xid;
419 16 : modified = true;
420 16 : }
421 :
422 24 : if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
423 20 : self.nextMultiOffset = multi_offset;
424 20 : modified = true;
425 20 : }
426 :
427 24 : modified
428 24 : }
429 : }
430 :
431 : /// Generate new, empty WAL segment, with correct block headers at the first
432 : /// page of the segment and the page that contains the given LSN.
433 : /// We need this segment to start compute node.
434 5 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
435 5 : let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
436 :
437 5 : let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
438 :
439 5 : let page_off = lsn.block_offset();
440 5 : let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
441 :
442 5 : let first_page_only = seg_off < XLOG_BLCKSZ;
443 : // If first records starts in the middle of the page, pretend in page header
444 : // there is a fake record which ends where first real record starts. This
445 : // makes pg_waldump etc happy.
446 5 : let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
447 0 : assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
448 : // xlp_rem_len doesn't include page header, hence the subtraction.
449 0 : (
450 0 : seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
451 0 : pg_constants::XLP_FIRST_IS_CONTRECORD,
452 0 : )
453 : } else {
454 5 : (0, 0)
455 : };
456 :
457 5 : let hdr = XLogLongPageHeaderData {
458 5 : std: {
459 5 : XLogPageHeaderData {
460 5 : xlp_magic: XLOG_PAGE_MAGIC as u16,
461 5 : xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
462 5 : xlp_tli: PG_TLI,
463 5 : xlp_pageaddr: pageaddr,
464 5 : xlp_rem_len: shdr_rem_len as u32,
465 5 : ..Default::default() // Put 0 in padding fields.
466 5 : }
467 5 : },
468 5 : xlp_sysid: system_id,
469 5 : xlp_seg_size: WAL_SEGMENT_SIZE as u32,
470 5 : xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
471 5 : };
472 :
473 5 : let hdr_bytes = hdr.encode()?;
474 5 : seg_buf.extend_from_slice(&hdr_bytes);
475 :
476 : //zero out the rest of the file
477 5 : seg_buf.resize(WAL_SEGMENT_SIZE, 0);
478 :
479 5 : if !first_page_only {
480 5 : let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
481 : // see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
482 5 : let (xlp_rem_len, xlp_info) = if page_off > 0 {
483 5 : assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
484 5 : (
485 5 : (page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
486 5 : pg_constants::XLP_FIRST_IS_CONTRECORD,
487 5 : )
488 : } else {
489 0 : (0, 0)
490 : };
491 5 : let header = XLogPageHeaderData {
492 5 : xlp_magic: XLOG_PAGE_MAGIC as u16,
493 5 : xlp_info,
494 5 : xlp_tli: PG_TLI,
495 5 : xlp_pageaddr: lsn.page_lsn().0,
496 5 : xlp_rem_len,
497 5 : ..Default::default() // Put 0 in padding fields.
498 5 : };
499 5 : let hdr_bytes = header.encode()?;
500 :
501 5 : debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
502 5 : debug_assert_ne!(block_offset, 0);
503 :
504 5 : seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
505 0 : }
506 :
507 5 : Ok(seg_buf.freeze())
508 5 : }
509 :
510 : #[repr(C)]
511 : #[derive(Serialize)]
512 : pub struct XlLogicalMessage {
513 : pub db_id: Oid,
514 : pub transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
515 : pub prefix_size: uint64,
516 : pub message_size: uint64,
517 : }
518 :
519 : impl XlLogicalMessage {
520 11958 : pub fn encode(&self) -> Bytes {
521 : use utils::bin_ser::LeSer;
522 11958 : self.ser().unwrap().into()
523 11958 : }
524 : }
525 :
526 : /// Create new WAL record for non-transactional logical message.
527 : /// Used for creating artificial WAL for tests, as LogicalMessage
528 : /// record is basically no-op.
529 4 : pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes {
530 : // This function can take untrusted input, so discard any NUL bytes in the prefix string.
531 4 : let prefix = CString::new(prefix.replace('\0', "")).expect("no NULs");
532 4 : let message = message.as_bytes();
533 4 : LogicalMessageGenerator::new(&prefix, message)
534 4 : .next()
535 4 : .unwrap()
536 4 : .encode(Lsn(0))
537 4 : }
538 :
539 : #[cfg(test)]
540 : mod tests {
541 : use super::*;
542 :
543 : #[test]
544 4 : fn test_ts_conversion() {
545 4 : let now = SystemTime::now();
546 4 : let round_trip = try_from_pg_timestamp(to_pg_timestamp(now)).unwrap();
547 :
548 4 : let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
549 4 : let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
550 4 : assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
551 :
552 4 : let now_pg = get_current_timestamp();
553 4 : let round_trip_pg = to_pg_timestamp(try_from_pg_timestamp(now_pg).unwrap());
554 :
555 4 : assert_eq!(now_pg, round_trip_pg);
556 4 : }
557 :
558 : // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
559 : }
|