LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - xlog_utils.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 84.0 % 269 226
Test Date: 2025-07-16 12:29:03 Functions: 78.1 % 128 100

            Line data    Source code
       1              : //
       2              : // This file contains common utilities for dealing with PostgreSQL WAL files and
       3              : // LSNs.
       4              : //
       5              : // Many of these functions have been copied from PostgreSQL, and rewritten in
       6              : // Rust. That's why they don't follow the usual Rust naming conventions, they
       7              : // have been named the same as the corresponding PostgreSQL functions instead.
       8              : //
       9              : 
      10              : use super::super::waldecoder::WalStreamDecoder;
      11              : use super::bindings::{
      12              :     CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID,
      13              :     XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
      14              :     MY_PGVERSION
      15              : };
      16              : use postgres_ffi_types::TimestampTz;
      17              : use super::wal_generator::LogicalMessageGenerator;
      18              : use crate::pg_constants;
      19              : use crate::PG_TLI;
      20              : use crate::{uint32, uint64, Oid};
      21              : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
      22              : 
      23              : use bytes::BytesMut;
      24              : use bytes::{Buf, Bytes};
      25              : 
      26              : use log::*;
      27              : 
      28              : use serde::Serialize;
      29              : use std::ffi::{CString, OsStr};
      30              : use std::fs::File;
      31              : use std::io::prelude::*;
      32              : use std::io::ErrorKind;
      33              : use std::io::SeekFrom;
      34              : use std::path::Path;
      35              : use std::time::SystemTime;
      36              : use utils::bin_ser::DeserializeError;
      37              : use utils::bin_ser::SerializeError;
      38              : 
      39              : use utils::lsn::Lsn;
      40              : 
      41              : pub const XLOG_FNAME_LEN: usize = 24;
      42              : pub const XLP_BKP_REMOVABLE: u16 = 0x0004;
      43              : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
      44              : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
      45              : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
      46              : 
      47              : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
      48              : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
      49              : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
      50              : #[allow(clippy::identity_op)]
      51              : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
      52              : 
      53              : /// Interval of checkpointing metadata file. We should store metadata file to enforce
      54              : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
      55              : /// But flushing checkpoint file for each transaction seems to be too expensive,
      56              : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
      57              : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
      58              : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
      59              : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
      60              : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
      61              : 
      62          209 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
      63          209 :     (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
      64          209 : }
      65              : 
      66           61 : pub fn XLogSegNoOffsetToRecPtr(
      67           61 :     segno: XLogSegNo,
      68           61 :     offset: u32,
      69           61 :     wal_segsz_bytes: usize,
      70           61 : ) -> XLogRecPtr {
      71           61 :     segno * (wal_segsz_bytes as u64) + (offset as u64)
      72           61 : }
      73              : 
      74           74 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
      75           74 :     format!(
      76           74 :         "{:>08X}{:>08X}{:>08X}",
      77              :         tli,
      78           74 :         logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
      79           74 :         logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
      80              :     )
      81           74 : }
      82              : 
      83           61 : pub fn XLogFromFileName(
      84           61 :     fname: &OsStr,
      85           61 :     wal_seg_size: usize,
      86           61 : ) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
      87           61 :     if let Some(fname_str) = fname.to_str() {
      88           61 :         let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
      89           61 :         let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
      90           61 :         let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
      91           61 :         Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
      92              :     } else {
      93            0 :         anyhow::bail!("non-ut8 filename: {:?}", fname);
      94              :     }
      95           61 : }
      96              : 
      97          146 : pub fn IsXLogFileName(fname: &OsStr) -> bool {
      98          146 :     if let Some(fname) = fname.to_str() {
      99         1944 :         fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
     100              :     } else {
     101            0 :         false
     102              :     }
     103          146 : }
     104              : 
     105           10 : pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
     106           10 :     if let Some(fname) = fname.to_str() {
     107           10 :         fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
     108              :     } else {
     109            0 :         false
     110              :     }
     111           10 : }
     112              : 
     113              : /// If LSN points to the beginning of the page, then shift it to first record,
     114              : /// otherwise align on 8-bytes boundary (required for WAL records)
     115            7 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
     116            7 :     if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
     117            0 :         let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
     118            0 :             XLOG_SIZE_OF_XLOG_LONG_PHD
     119              :         } else {
     120            0 :             XLOG_SIZE_OF_XLOG_SHORT_PHD
     121              :         };
     122            0 :         lsn + hdr_size as u64
     123              :     } else {
     124            7 :         lsn.align()
     125              :     }
     126            7 : }
     127              : 
     128              : /// Generate a pg_control file, for a basebackup for starting up Postgres at the given LSN
     129              : ///
     130              : /// 'pg_control_bytes' and 'checkpoint_bytes' are the contents of those keys persisted in
     131              : /// the pageserver. They use the same format as the PostgreSQL control file and the
     132              : /// checkpoint record, but see walingest.rs for how exactly they are kept up to date.
     133              : /// 'lsn' is the LSN at which we're starting up.
     134              : ///
     135              : /// Returns:
     136              : /// - pg_control file contents
     137              : /// - system_identifier, extracted from the persisted information
     138              : /// - true, if we're starting up from a "clean shutdown", i.e. if there was a shutdown
     139              : ///   checkpoint at the given LSN
     140            0 : pub fn generate_pg_control(
     141            0 :     pg_control_bytes: &[u8],
     142            0 :     checkpoint_bytes: &[u8],
     143            0 :     lsn: Lsn,
     144            0 : ) -> anyhow::Result<(Bytes, u64, bool)> {
     145            0 :     let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
     146            0 :     let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
     147              : 
     148              :     // Generate new pg_control needed for bootstrap
     149              :     //
     150              :     // NB: In the checkpoint struct that we persist in the pageserver, we have a different
     151              :     // convention for the 'redo' field than in PostgreSQL: On a shutdown checkpoint,
     152              :     // 'redo' points the *end* of the checkpoint WAL record. On PostgreSQL, it points to
     153              :     // the beginning. Furthermore, on an online checkpoint, 'redo' is set to 0.
     154              :     //
     155              :     // We didn't always have this convention however, and old persisted records will have
     156              :     // old REDO values that point to some old LSN.
     157              :     //
     158              :     // The upshot is that if 'redo' is equal to the "current" LSN, there was a shutdown
     159              :     // checkpoint record at that point in WAL, with no new WAL records after it. That case
     160              :     // can be treated as starting from a clean shutdown. All other cases are treated as
     161              :     // non-clean shutdown. In Neon, we don't do WAL replay at startup in either case, so
     162              :     // that distinction doesn't matter very much. As of this writing, it only affects
     163              :     // whether the persisted pg_stats information can be used or not.
     164              :     //
     165              :     // In the Checkpoint struct in the returned pg_control file, the redo pointer is
     166              :     // always set to the LSN we're starting at, to hint that no WAL replay is required.
     167              :     // (There's some neon-specific code in Postgres startup to make that work, though.
     168              :     // Just setting the redo pointer is not sufficient.)
     169            0 :     let was_shutdown = Lsn(checkpoint.redo) == lsn;
     170            0 :     checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
     171              : 
     172              :     // We use DBState_DB_SHUTDOWNED even if it was not a clean shutdown.  The
     173              :     // neon-specific code at postgres startup ignores the state stored in the control
     174              :     // file, similar to archive recovery in standalone PostgreSQL. Similarly, the
     175              :     // checkPoint pointer is ignored, so just set it to 0.
     176            0 :     pg_control.checkPoint = 0;
     177            0 :     pg_control.checkPointCopy = checkpoint;
     178            0 :     pg_control.state = DBState_DB_SHUTDOWNED;
     179              : 
     180            0 :     Ok((pg_control.encode(), pg_control.system_identifier, was_shutdown))
     181            0 : }
     182              : 
     183            4 : pub fn get_current_timestamp() -> TimestampTz {
     184            4 :     to_pg_timestamp(SystemTime::now())
     185            4 : }
     186              : 
     187              : // Module to reduce the scope of the constants
     188              : mod timestamp_conversions {
     189              :     use std::time::Duration;
     190              : 
     191              :     use anyhow::Context;
     192              : 
     193              :     use super::*;
     194              : 
     195              :     const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
     196              :     const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
     197              :     const SECS_PER_DAY: u64 = 86400;
     198              :     const USECS_PER_SEC: u64 = 1000000;
     199              :     const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
     200              :         (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
     201              : 
     202           12 :     pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
     203           12 :         match time.duration_since(SystemTime::UNIX_EPOCH) {
     204           12 :             Ok(n) => {
     205           12 :                 ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
     206           12 :                     + n.subsec_micros() as u64) as i64
     207              :             }
     208            0 :             Err(_) => panic!("SystemTime before UNIX EPOCH!"),
     209              :         }
     210           12 :     }
     211              : 
     212           12 :     pub fn try_from_pg_timestamp(time: TimestampTz) -> anyhow::Result<SystemTime> {
     213           12 :         let time: u64 = time
     214           12 :             .try_into()
     215           12 :             .context("timestamp before millenium (postgres epoch)")?;
     216           12 :         let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
     217           12 :         SystemTime::UNIX_EPOCH
     218           12 :             .checked_add(Duration::from_micros(since_unix_epoch))
     219           12 :             .context("SystemTime overflow")
     220           12 :     }
     221              : }
     222              : 
     223              : pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp};
     224              : 
     225              : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
     226              : // start_lsn must point to some previously known record boundary (beginning of
     227              : // the next record). If no valid record after is found, start_lsn is returned
     228              : // back.
     229           32 : pub fn find_end_of_wal(
     230           32 :     data_dir: &Path,
     231           32 :     wal_seg_size: usize,
     232           32 :     start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
     233           32 : ) -> anyhow::Result<Lsn> {
     234           32 :     let mut result = start_lsn;
     235           32 :     let mut curr_lsn = start_lsn;
     236           32 :     let mut buf = [0u8; XLOG_BLCKSZ];
     237           32 :     let pg_version = MY_PGVERSION;
     238           32 :     debug!("find_end_of_wal PG_VERSION: {}", pg_version);
     239              : 
     240           32 :     let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
     241              : 
     242              :     // loop over segments
     243              :     loop {
     244           40 :         let segno = curr_lsn.segment_number(wal_seg_size);
     245           40 :         let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     246           40 :         let seg_file_path = data_dir.join(seg_file_name);
     247           40 :         match open_wal_segment(&seg_file_path)? {
     248              :             None => {
     249              :                 // no more segments
     250            0 :                 debug!(
     251            0 :                     "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
     252              :                     result, seg_file_path
     253              :                 );
     254            0 :                 return Ok(result);
     255              :             }
     256           40 :             Some(mut segment) => {
     257           40 :                 let seg_offs = curr_lsn.segment_offset(wal_seg_size);
     258           40 :                 segment.seek(SeekFrom::Start(seg_offs as u64))?;
     259              :                 // loop inside segment
     260        16504 :                 while curr_lsn.segment_number(wal_seg_size) == segno {
     261        16496 :                     let bytes_read = segment.read(&mut buf)?;
     262        16496 :                     if bytes_read == 0 {
     263            0 :                         debug!(
     264            0 :                             "find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}",
     265              :                             result,
     266              :                             seg_file_path,
     267            0 :                             curr_lsn.segment_offset(wal_seg_size)
     268              :                         );
     269            0 :                         return Ok(result);
     270        16496 :                     }
     271        16496 :                     curr_lsn += bytes_read as u64;
     272        16496 :                     decoder.feed_bytes(&buf[0..bytes_read]);
     273              : 
     274              :                     // advance result past all completely read records
     275              :                     loop {
     276        16664 :                         match decoder.poll_decode() {
     277          168 :                             Ok(Some(record)) => result = record.0,
     278           32 :                             Err(e) => {
     279           32 :                                 debug!(
     280           32 :                                     "find_end_of_wal reached end at {:?}, decode error: {:?}",
     281              :                                     result, e
     282              :                                 );
     283           32 :                                 return Ok(result);
     284              :                             }
     285        16464 :                             Ok(None) => break, // need more data
     286              :                         }
     287              :                     }
     288              :                 }
     289              :             }
     290              :         }
     291              :     }
     292           32 : }
     293              : 
     294              : // Open .partial or full WAL segment file, if present.
     295           40 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
     296           40 :     let mut partial_path = seg_file_path.to_owned();
     297           40 :     partial_path.set_extension("partial");
     298           40 :     match File::open(partial_path) {
     299           32 :         Ok(file) => Ok(Some(file)),
     300            8 :         Err(e) => match e.kind() {
     301              :             ErrorKind::NotFound => {
     302              :                 // .partial not found, try full
     303            8 :                 match File::open(seg_file_path) {
     304            8 :                     Ok(file) => Ok(Some(file)),
     305            0 :                     Err(e) => match e.kind() {
     306            0 :                         ErrorKind::NotFound => Ok(None),
     307            0 :                         _ => Err(e.into()),
     308              :                     },
     309              :                 }
     310              :             }
     311            0 :             _ => Err(e.into()),
     312              :         },
     313              :     }
     314           40 : }
     315              : 
     316              : impl XLogRecord {
     317        97978 :     pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
     318              :         use utils::bin_ser::LeSer;
     319        97978 :         XLogRecord::des(buf)
     320        97978 :     }
     321              : 
     322        73532 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
     323              :         use utils::bin_ser::LeSer;
     324        73532 :         XLogRecord::des_from(&mut buf.reader())
     325        73532 :     }
     326              : 
     327        23916 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     328              :         use utils::bin_ser::LeSer;
     329        23916 :         Ok(self.ser()?.into())
     330        23916 :     }
     331              : 
     332              :     // Is this record an XLOG_SWITCH record? They need some special processing,
     333        97978 :     pub fn is_xlog_switch_record(&self) -> bool {
     334        97978 :         self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
     335        97978 :     }
     336              : }
     337              : 
     338              : impl XLogPageHeaderData {
     339        19382 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
     340              :         use utils::bin_ser::LeSer;
     341        19382 :         XLogPageHeaderData::des_from(&mut buf.reader())
     342        19382 :     }
     343              : 
     344          783 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     345              :         use utils::bin_ser::LeSer;
     346          783 :         self.ser().map(|b| b.into())
     347          783 :     }
     348              : }
     349              : 
     350              : impl XLogLongPageHeaderData {
     351            8 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
     352              :         use utils::bin_ser::LeSer;
     353            8 :         XLogLongPageHeaderData::des_from(&mut buf.reader())
     354            8 :     }
     355              : 
     356            5 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     357              :         use utils::bin_ser::LeSer;
     358            5 :         self.ser().map(|b| b.into())
     359            5 :     }
     360              : }
     361              : 
     362              : pub const SIZEOF_CHECKPOINT: usize = size_of::<CheckPoint>();
     363              : 
     364              : impl CheckPoint {
     365            4 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     366              :         use utils::bin_ser::LeSer;
     367            4 :         Ok(self.ser()?.into())
     368            4 :     }
     369              : 
     370           19 :     pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
     371              :         use utils::bin_ser::LeSer;
     372           19 :         CheckPoint::des(buf)
     373           19 :     }
     374              : 
     375              :     /// Update next XID based on provided new_xid and stored epoch.
     376              :     /// Next XID should be greater than new_xid. This handles 32-bit
     377              :     /// XID wraparound correctly.
     378              :     ///
     379              :     /// Returns 'true' if the XID was updated.
     380        72933 :     pub fn update_next_xid(&mut self, xid: u32) -> bool {
     381              :         // nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
     382        72933 :         let mut new_xid = std::cmp::max(
     383        72933 :             xid.wrapping_add(1),
     384              :             pg_constants::FIRST_NORMAL_TRANSACTION_ID,
     385              :         );
     386              :         // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
     387              :         // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
     388        72933 :         new_xid =
     389        72933 :             new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
     390        72933 :         let full_xid = self.nextXid.value;
     391        72933 :         let old_xid = full_xid as u32;
     392        72933 :         if new_xid.wrapping_sub(old_xid) as i32 > 0 {
     393            9 :             let mut epoch = full_xid >> 32;
     394            9 :             if new_xid < old_xid {
     395            0 :                 // wrap-around
     396            0 :                 epoch += 1;
     397            9 :             }
     398            9 :             let nextXid = (epoch << 32) | new_xid as u64;
     399              : 
     400            9 :             if nextXid != self.nextXid.value {
     401            9 :                 self.nextXid = FullTransactionId { value: nextXid };
     402            9 :                 return true;
     403            0 :             }
     404        72924 :         }
     405        72924 :         false
     406        72933 :     }
     407              : 
     408              :     /// Advance next multi-XID/offset to those given in arguments.
     409              :     ///
     410              :     /// It's important that this handles wraparound correctly. This should match the
     411              :     /// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
     412              :     ///
     413              :     /// Returns 'true' if the Checkpoint was updated.
     414           24 :     pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
     415           24 :         let mut modified = false;
     416              : 
     417           24 :         if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
     418           16 :             self.nextMulti = multi_xid;
     419           16 :             modified = true;
     420           16 :         }
     421              : 
     422           24 :         if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
     423           20 :             self.nextMultiOffset = multi_offset;
     424           20 :             modified = true;
     425           20 :         }
     426              : 
     427           24 :         modified
     428           24 :     }
     429              : }
     430              : 
     431              : /// Generate new, empty WAL segment, with correct block headers at the first
     432              : /// page of the segment and the page that contains the given LSN.
     433              : /// We need this segment to start compute node.
     434            5 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
     435            5 :     let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
     436              : 
     437            5 :     let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
     438              : 
     439            5 :     let page_off = lsn.block_offset();
     440            5 :     let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
     441              : 
     442            5 :     let first_page_only = seg_off < XLOG_BLCKSZ;
     443              :     // If first records starts in the middle of the page, pretend in page header
     444              :     // there is a fake record which ends where first real record starts. This
     445              :     // makes pg_waldump etc happy.
     446            5 :     let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
     447            0 :         assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
     448              :         // xlp_rem_len doesn't include page header, hence the subtraction.
     449            0 :         (
     450            0 :             seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
     451            0 :             pg_constants::XLP_FIRST_IS_CONTRECORD,
     452            0 :         )
     453              :     } else {
     454            5 :         (0, 0)
     455              :     };
     456              : 
     457            5 :     let hdr = XLogLongPageHeaderData {
     458            5 :         std: {
     459            5 :             XLogPageHeaderData {
     460            5 :                 xlp_magic: XLOG_PAGE_MAGIC as u16,
     461            5 :                 xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
     462            5 :                 xlp_tli: PG_TLI,
     463            5 :                 xlp_pageaddr: pageaddr,
     464            5 :                 xlp_rem_len: shdr_rem_len as u32,
     465            5 :                 ..Default::default() // Put 0 in padding fields.
     466            5 :             }
     467            5 :         },
     468            5 :         xlp_sysid: system_id,
     469            5 :         xlp_seg_size: WAL_SEGMENT_SIZE as u32,
     470            5 :         xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
     471            5 :     };
     472              : 
     473            5 :     let hdr_bytes = hdr.encode()?;
     474            5 :     seg_buf.extend_from_slice(&hdr_bytes);
     475              : 
     476              :     //zero out the rest of the file
     477            5 :     seg_buf.resize(WAL_SEGMENT_SIZE, 0);
     478              : 
     479            5 :     if !first_page_only {
     480            5 :         let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
     481              :         // see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
     482            5 :         let (xlp_rem_len, xlp_info) = if page_off > 0 {
     483            5 :             assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
     484            5 :             (
     485            5 :                 (page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
     486            5 :                 pg_constants::XLP_FIRST_IS_CONTRECORD,
     487            5 :             )
     488              :         } else {
     489            0 :             (0, 0)
     490              :         };
     491            5 :         let header = XLogPageHeaderData {
     492            5 :             xlp_magic: XLOG_PAGE_MAGIC as u16,
     493            5 :             xlp_info,
     494            5 :             xlp_tli: PG_TLI,
     495            5 :             xlp_pageaddr: lsn.page_lsn().0,
     496            5 :             xlp_rem_len,
     497            5 :             ..Default::default() // Put 0 in padding fields.
     498            5 :         };
     499            5 :         let hdr_bytes = header.encode()?;
     500              : 
     501            5 :         debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
     502            5 :         debug_assert_ne!(block_offset, 0);
     503              : 
     504            5 :         seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
     505            0 :     }
     506              : 
     507            5 :     Ok(seg_buf.freeze())
     508            5 : }
     509              : 
     510              : #[repr(C)]
     511              : #[derive(Serialize)]
     512              : pub struct XlLogicalMessage {
     513              :     pub db_id: Oid,
     514              :     pub transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
     515              :     pub prefix_size: uint64,
     516              :     pub message_size: uint64,
     517              : }
     518              : 
     519              : impl XlLogicalMessage {
     520        11958 :     pub fn encode(&self) -> Bytes {
     521              :         use utils::bin_ser::LeSer;
     522        11958 :         self.ser().unwrap().into()
     523        11958 :     }
     524              : }
     525              : 
     526              : /// Create new WAL record for non-transactional logical message.
     527              : /// Used for creating artificial WAL for tests, as LogicalMessage
     528              : /// record is basically no-op.
     529            4 : pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes {
     530              :     // This function can take untrusted input, so discard any NUL bytes in the prefix string.
     531            4 :     let prefix = CString::new(prefix.replace('\0', "")).expect("no NULs");
     532            4 :     let message = message.as_bytes();
     533            4 :     LogicalMessageGenerator::new(&prefix, message)
     534            4 :         .next()
     535            4 :         .unwrap()
     536            4 :         .encode(Lsn(0))
     537            4 : }
     538              : 
     539              : #[cfg(test)]
     540              : mod tests {
     541              :     use super::*;
     542              : 
     543              :     #[test]
     544            4 :     fn test_ts_conversion() {
     545            4 :         let now = SystemTime::now();
     546            4 :         let round_trip = try_from_pg_timestamp(to_pg_timestamp(now)).unwrap();
     547              : 
     548            4 :         let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     549            4 :         let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     550            4 :         assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
     551              : 
     552            4 :         let now_pg = get_current_timestamp();
     553            4 :         let round_trip_pg = to_pg_timestamp(try_from_pg_timestamp(now_pg).unwrap());
     554              : 
     555            4 :         assert_eq!(now_pg, round_trip_pg);
     556            4 :     }
     557              : 
     558              :     // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
     559              : }
        

Generated by: LCOV version 2.1-beta