LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - xlog_utils.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 66.1 % 333 220
Test Date: 2024-09-25 14:04:07 Functions: 72.1 % 129 93

            Line data    Source code
       1              : //
       2              : // This file contains common utilities for dealing with PostgreSQL WAL files and
       3              : // LSNs.
       4              : //
       5              : // Many of these functions have been copied from PostgreSQL, and rewritten in
       6              : // Rust. That's why they don't follow the usual Rust naming conventions, they
       7              : // have been named the same as the corresponding PostgreSQL functions instead.
       8              : //
       9              : 
      10              : use crc32c::crc32c_append;
      11              : 
      12              : use super::super::waldecoder::WalStreamDecoder;
      13              : use super::bindings::{
      14              :     CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
      15              :     XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
      16              : };
      17              : use super::PG_MAJORVERSION;
      18              : use crate::pg_constants;
      19              : use crate::PG_TLI;
      20              : use crate::{uint32, uint64, Oid};
      21              : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
      22              : 
      23              : use bytes::BytesMut;
      24              : use bytes::{Buf, Bytes};
      25              : 
      26              : use log::*;
      27              : 
      28              : use serde::Serialize;
      29              : use std::ffi::OsStr;
      30              : use std::fs::File;
      31              : use std::io::prelude::*;
      32              : use std::io::ErrorKind;
      33              : use std::io::SeekFrom;
      34              : use std::path::Path;
      35              : use std::time::SystemTime;
      36              : use utils::bin_ser::DeserializeError;
      37              : use utils::bin_ser::SerializeError;
      38              : 
      39              : use utils::lsn::Lsn;
      40              : 
      41              : pub const XLOG_FNAME_LEN: usize = 24;
      42              : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
      43              : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
      44              : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
      45              : 
      46              : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
      47              : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
      48              : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
      49              : #[allow(clippy::identity_op)]
      50              : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
      51              : 
      52              : /// Interval of checkpointing metadata file. We should store metadata file to enforce
      53              : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
      54              : /// But flushing checkpoint file for each transaction seems to be too expensive,
      55              : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
      56              : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
      57              : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
      58              : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
      59              : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
      60              : 
      61          148 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
      62          148 :     (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
      63          148 : }
      64              : 
      65        34244 : pub fn XLogSegNoOffsetToRecPtr(
      66        34244 :     segno: XLogSegNo,
      67        34244 :     offset: u32,
      68        34244 :     wal_segsz_bytes: usize,
      69        34244 : ) -> XLogRecPtr {
      70        34244 :     segno * (wal_segsz_bytes as u64) + (offset as u64)
      71        34244 : }
      72              : 
      73           46 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
      74           46 :     format!(
      75           46 :         "{:>08X}{:>08X}{:>08X}",
      76           46 :         tli,
      77           46 :         logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
      78           46 :         logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
      79           46 :     )
      80           46 : }
      81              : 
      82           56 : pub fn XLogFromFileName(
      83           56 :     fname: &OsStr,
      84           56 :     wal_seg_size: usize,
      85           56 : ) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
      86           56 :     if let Some(fname_str) = fname.to_str() {
      87           56 :         let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
      88           56 :         let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
      89           56 :         let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
      90           56 :         Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
      91              :     } else {
      92            0 :         anyhow::bail!("non-ut8 filename: {:?}", fname);
      93              :     }
      94           56 : }
      95              : 
      96          131 : pub fn IsXLogFileName(fname: &OsStr) -> bool {
      97          131 :     if let Some(fname) = fname.to_str() {
      98         1824 :         fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
      99              :     } else {
     100            0 :         false
     101              :     }
     102          131 : }
     103              : 
     104            0 : pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
     105            0 :     if let Some(fname) = fname.to_str() {
     106            0 :         fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
     107              :     } else {
     108            0 :         false
     109              :     }
     110            0 : }
     111              : 
     112              : /// If LSN points to the beginning of the page, then shift it to first record,
     113              : /// otherwise align on 8-bytes boundary (required for WAL records)
     114            0 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
     115            0 :     if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
     116            0 :         let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
     117            0 :             XLOG_SIZE_OF_XLOG_LONG_PHD
     118              :         } else {
     119            0 :             XLOG_SIZE_OF_XLOG_SHORT_PHD
     120              :         };
     121            0 :         lsn + hdr_size as u64
     122              :     } else {
     123            0 :         lsn.align()
     124              :     }
     125            0 : }
     126              : 
     127            0 : pub fn generate_pg_control(
     128            0 :     pg_control_bytes: &[u8],
     129            0 :     checkpoint_bytes: &[u8],
     130            0 :     lsn: Lsn,
     131            0 : ) -> anyhow::Result<(Bytes, u64)> {
     132            0 :     let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
     133            0 :     let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
     134              : 
     135              :     // Generate new pg_control needed for bootstrap
     136            0 :     checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
     137            0 : 
     138            0 :     //save new values in pg_control
     139            0 :     pg_control.checkPoint = 0;
     140            0 :     pg_control.checkPointCopy = checkpoint;
     141            0 :     pg_control.state = DBState_DB_SHUTDOWNED;
     142            0 : 
     143            0 :     Ok((pg_control.encode(), pg_control.system_identifier))
     144            0 : }
     145              : 
     146            4 : pub fn get_current_timestamp() -> TimestampTz {
     147            4 :     to_pg_timestamp(SystemTime::now())
     148            4 : }
     149              : 
     150              : // Module to reduce the scope of the constants
     151              : mod timestamp_conversions {
     152              :     use std::time::Duration;
     153              : 
     154              :     use anyhow::Context;
     155              : 
     156              :     use super::*;
     157              : 
     158              :     const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
     159              :     const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
     160              :     const SECS_PER_DAY: u64 = 86400;
     161              :     const USECS_PER_SEC: u64 = 1000000;
     162              :     const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
     163              :         (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
     164              : 
     165           12 :     pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
     166           12 :         match time.duration_since(SystemTime::UNIX_EPOCH) {
     167           12 :             Ok(n) => {
     168           12 :                 ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
     169           12 :                     + n.subsec_micros() as u64) as i64
     170              :             }
     171            0 :             Err(_) => panic!("SystemTime before UNIX EPOCH!"),
     172              :         }
     173           12 :     }
     174              : 
     175           32 :     pub fn try_from_pg_timestamp(time: TimestampTz) -> anyhow::Result<SystemTime> {
     176           32 :         let time: u64 = time
     177           32 :             .try_into()
     178           32 :             .context("timestamp before millenium (postgres epoch)")?;
     179           32 :         let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
     180           32 :         SystemTime::UNIX_EPOCH
     181           32 :             .checked_add(Duration::from_micros(since_unix_epoch))
     182           32 :             .context("SystemTime overflow")
     183           32 :     }
     184              : }
     185              : 
     186              : pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp};
     187              : 
     188              : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
     189              : // start_lsn must point to some previously known record boundary (beginning of
     190              : // the next record). If no valid record after is found, start_lsn is returned
     191              : // back.
     192           32 : pub fn find_end_of_wal(
     193           32 :     data_dir: &Path,
     194           32 :     wal_seg_size: usize,
     195           32 :     start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
     196           32 : ) -> anyhow::Result<Lsn> {
     197           32 :     let mut result = start_lsn;
     198           32 :     let mut curr_lsn = start_lsn;
     199           32 :     let mut buf = [0u8; XLOG_BLCKSZ];
     200           32 :     let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
     201           32 :     debug!("find_end_of_wal PG_VERSION: {}", pg_version);
     202              : 
     203           32 :     let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
     204              : 
     205              :     // loop over segments
     206              :     loop {
     207           40 :         let segno = curr_lsn.segment_number(wal_seg_size);
     208           40 :         let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     209           40 :         let seg_file_path = data_dir.join(seg_file_name);
     210           40 :         match open_wal_segment(&seg_file_path)? {
     211              :             None => {
     212              :                 // no more segments
     213            0 :                 debug!(
     214            0 :                     "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
     215              :                     result, seg_file_path
     216              :                 );
     217            0 :                 return Ok(result);
     218              :             }
     219           40 :             Some(mut segment) => {
     220           40 :                 let seg_offs = curr_lsn.segment_offset(wal_seg_size);
     221           40 :                 segment.seek(SeekFrom::Start(seg_offs as u64))?;
     222              :                 // loop inside segment
     223        16504 :                 while curr_lsn.segment_number(wal_seg_size) == segno {
     224        16496 :                     let bytes_read = segment.read(&mut buf)?;
     225        16496 :                     if bytes_read == 0 {
     226            0 :                         debug!(
     227            0 :                             "find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}",
     228            0 :                             result,
     229            0 :                             seg_file_path,
     230            0 :                             curr_lsn.segment_offset(wal_seg_size)
     231              :                         );
     232            0 :                         return Ok(result);
     233        16496 :                     }
     234        16496 :                     curr_lsn += bytes_read as u64;
     235        16496 :                     decoder.feed_bytes(&buf[0..bytes_read]);
     236              : 
     237              :                     // advance result past all completely read records
     238              :                     loop {
     239        16664 :                         match decoder.poll_decode() {
     240          168 :                             Ok(Some(record)) => result = record.0,
     241           32 :                             Err(e) => {
     242           32 :                                 debug!(
     243           32 :                                     "find_end_of_wal reached end at {:?}, decode error: {:?}",
     244              :                                     result, e
     245              :                                 );
     246           32 :                                 return Ok(result);
     247              :                             }
     248        16464 :                             Ok(None) => break, // need more data
     249              :                         }
     250              :                     }
     251              :                 }
     252              :             }
     253              :         }
     254              :     }
     255           32 : }
     256              : 
     257              : // Open .partial or full WAL segment file, if present.
     258           40 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
     259           40 :     let mut partial_path = seg_file_path.to_owned();
     260           40 :     partial_path.set_extension("partial");
     261           40 :     match File::open(partial_path) {
     262           32 :         Ok(file) => Ok(Some(file)),
     263            8 :         Err(e) => match e.kind() {
     264              :             ErrorKind::NotFound => {
     265              :                 // .partial not found, try full
     266            8 :                 match File::open(seg_file_path) {
     267            8 :                     Ok(file) => Ok(Some(file)),
     268            0 :                     Err(e) => match e.kind() {
     269            0 :                         ErrorKind::NotFound => Ok(None),
     270            0 :                         _ => Err(e.into()),
     271              :                     },
     272              :                 }
     273              :             }
     274            0 :             _ => Err(e.into()),
     275              :         },
     276              :     }
     277           40 : }
     278              : 
     279              : impl XLogRecord {
     280       477680 :     pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
     281              :         use utils::bin_ser::LeSer;
     282       477680 :         XLogRecord::des(buf)
     283       477680 :     }
     284              : 
     285       437556 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
     286              :         use utils::bin_ser::LeSer;
     287       437556 :         XLogRecord::des_from(&mut buf.reader())
     288       437556 :     }
     289              : 
     290        34196 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     291              :         use utils::bin_ser::LeSer;
     292        34196 :         Ok(self.ser()?.into())
     293        34196 :     }
     294              : 
     295              :     // Is this record an XLOG_SWITCH record? They need some special processing,
     296       477680 :     pub fn is_xlog_switch_record(&self) -> bool {
     297       477680 :         self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
     298       477680 :     }
     299              : }
     300              : 
     301              : impl XLogPageHeaderData {
     302        25366 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
     303              :         use utils::bin_ser::LeSer;
     304        25366 :         XLogPageHeaderData::des_from(&mut buf.reader())
     305        25366 :     }
     306              : 
     307           96 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     308              :         use utils::bin_ser::LeSer;
     309           96 :         self.ser().map(|b| b.into())
     310           96 :     }
     311              : }
     312              : 
     313              : impl XLogLongPageHeaderData {
     314            8 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
     315              :         use utils::bin_ser::LeSer;
     316            8 :         XLogLongPageHeaderData::des_from(&mut buf.reader())
     317            8 :     }
     318              : 
     319            0 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     320              :         use utils::bin_ser::LeSer;
     321            0 :         self.ser().map(|b| b.into())
     322            0 :     }
     323              : }
     324              : 
     325              : pub const SIZEOF_CHECKPOINT: usize = size_of::<CheckPoint>();
     326              : 
     327              : impl CheckPoint {
     328           24 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     329              :         use utils::bin_ser::LeSer;
     330           24 :         Ok(self.ser()?.into())
     331           24 :     }
     332              : 
     333           68 :     pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
     334              :         use utils::bin_ser::LeSer;
     335           68 :         CheckPoint::des(buf)
     336           68 :     }
     337              : 
     338              :     /// Update next XID based on provided new_xid and stored epoch.
     339              :     /// Next XID should be greater than new_xid. This handles 32-bit
     340              :     /// XID wraparound correctly.
     341              :     ///
     342              :     /// Returns 'true' if the XID was updated.
     343       437518 :     pub fn update_next_xid(&mut self, xid: u32) -> bool {
     344       437518 :         // nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
     345       437518 :         let mut new_xid = std::cmp::max(
     346       437518 :             xid.wrapping_add(1),
     347       437518 :             pg_constants::FIRST_NORMAL_TRANSACTION_ID,
     348       437518 :         );
     349       437518 :         // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
     350       437518 :         // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
     351       437518 :         new_xid =
     352       437518 :             new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
     353       437518 :         let full_xid = self.nextXid.value;
     354       437518 :         let old_xid = full_xid as u32;
     355       437518 :         if new_xid.wrapping_sub(old_xid) as i32 > 0 {
     356           14 :             let mut epoch = full_xid >> 32;
     357           14 :             if new_xid < old_xid {
     358            0 :                 // wrap-around
     359            0 :                 epoch += 1;
     360           14 :             }
     361           14 :             let nextXid = (epoch << 32) | new_xid as u64;
     362           14 : 
     363           14 :             if nextXid != self.nextXid.value {
     364           14 :                 self.nextXid = FullTransactionId { value: nextXid };
     365           14 :                 return true;
     366            0 :             }
     367       437504 :         }
     368       437504 :         false
     369       437518 :     }
     370              : 
     371              :     /// Advance next multi-XID/offset to those given in arguments.
     372              :     ///
     373              :     /// It's important that this handles wraparound correctly. This should match the
     374              :     /// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
     375              :     ///
     376              :     /// Returns 'true' if the Checkpoint was updated.
     377           24 :     pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
     378           24 :         let mut modified = false;
     379           24 : 
     380           24 :         if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
     381           16 :             self.nextMulti = multi_xid;
     382           16 :             modified = true;
     383           16 :         }
     384              : 
     385           24 :         if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
     386           20 :             self.nextMultiOffset = multi_offset;
     387           20 :             modified = true;
     388           20 :         }
     389              : 
     390           24 :         modified
     391           24 :     }
     392              : }
     393              : 
     394              : /// Generate new, empty WAL segment, with correct block headers at the first
     395              : /// page of the segment and the page that contains the given LSN.
     396              : /// We need this segment to start compute node.
     397            0 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
     398            0 :     let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
     399            0 : 
     400            0 :     let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
     401            0 : 
     402            0 :     let page_off = lsn.block_offset();
     403            0 :     let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
     404            0 : 
     405            0 :     let first_page_only = seg_off < XLOG_BLCKSZ;
     406              :     // If first records starts in the middle of the page, pretend in page header
     407              :     // there is a fake record which ends where first real record starts. This
     408              :     // makes pg_waldump etc happy.
     409            0 :     let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
     410            0 :         assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
     411              :         // xlp_rem_len doesn't include page header, hence the subtraction.
     412            0 :         (
     413            0 :             seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
     414            0 :             pg_constants::XLP_FIRST_IS_CONTRECORD,
     415            0 :         )
     416              :     } else {
     417            0 :         (0, 0)
     418              :     };
     419              : 
     420            0 :     let hdr = XLogLongPageHeaderData {
     421            0 :         std: {
     422            0 :             XLogPageHeaderData {
     423            0 :                 xlp_magic: XLOG_PAGE_MAGIC as u16,
     424            0 :                 xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
     425            0 :                 xlp_tli: PG_TLI,
     426            0 :                 xlp_pageaddr: pageaddr,
     427            0 :                 xlp_rem_len: shdr_rem_len as u32,
     428            0 :                 ..Default::default() // Put 0 in padding fields.
     429            0 :             }
     430            0 :         },
     431            0 :         xlp_sysid: system_id,
     432            0 :         xlp_seg_size: WAL_SEGMENT_SIZE as u32,
     433            0 :         xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
     434            0 :     };
     435              : 
     436            0 :     let hdr_bytes = hdr.encode()?;
     437            0 :     seg_buf.extend_from_slice(&hdr_bytes);
     438            0 : 
     439            0 :     //zero out the rest of the file
     440            0 :     seg_buf.resize(WAL_SEGMENT_SIZE, 0);
     441            0 : 
     442            0 :     if !first_page_only {
     443            0 :         let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
     444              :         // see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
     445            0 :         let (xlp_rem_len, xlp_info) = if page_off > 0 {
     446            0 :             assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
     447            0 :             (
     448            0 :                 (page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
     449            0 :                 pg_constants::XLP_FIRST_IS_CONTRECORD,
     450            0 :             )
     451              :         } else {
     452            0 :             (0, 0)
     453              :         };
     454            0 :         let header = XLogPageHeaderData {
     455            0 :             xlp_magic: XLOG_PAGE_MAGIC as u16,
     456            0 :             xlp_info,
     457            0 :             xlp_tli: PG_TLI,
     458            0 :             xlp_pageaddr: lsn.page_lsn().0,
     459            0 :             xlp_rem_len,
     460            0 :             ..Default::default() // Put 0 in padding fields.
     461            0 :         };
     462            0 :         let hdr_bytes = header.encode()?;
     463              : 
     464            0 :         debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
     465            0 :         debug_assert_ne!(block_offset, 0);
     466              : 
     467            0 :         seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
     468            0 :     }
     469              : 
     470            0 :     Ok(seg_buf.freeze())
     471            0 : }
     472              : 
     473              : #[repr(C)]
     474              : #[derive(Serialize)]
     475              : pub struct XlLogicalMessage {
     476              :     pub db_id: Oid,
     477              :     pub transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
     478              :     pub prefix_size: uint64,
     479              :     pub message_size: uint64,
     480              : }
     481              : 
     482              : impl XlLogicalMessage {
     483        17098 :     pub fn encode(&self) -> Bytes {
     484              :         use utils::bin_ser::LeSer;
     485        17098 :         self.ser().unwrap().into()
     486        17098 :     }
     487              : }
     488              : 
     489              : /// Create new WAL record for non-transactional logical message.
     490              : /// Used for creating artificial WAL for tests, as LogicalMessage
     491              : /// record is basically no-op.
     492              : ///
     493              : /// NOTE: This leaves the xl_prev field zero. The safekeeper and
     494              : /// pageserver tolerate that, but PostgreSQL does not.
     495            4 : pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
     496            4 :     let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
     497            4 :     prefix_bytes.write_all(prefix.as_bytes()).unwrap();
     498            4 :     prefix_bytes.push(0);
     499            4 : 
     500            4 :     let message_bytes = message.as_bytes();
     501            4 : 
     502            4 :     let logical_message = XlLogicalMessage {
     503            4 :         db_id: 0,
     504            4 :         transactional: 0,
     505            4 :         prefix_size: prefix_bytes.len() as u64,
     506            4 :         message_size: message_bytes.len() as u64,
     507            4 :     };
     508            4 : 
     509            4 :     let mainrdata = logical_message.encode();
     510            4 :     let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
     511            4 :     // only short mainrdata is supported for now
     512            4 :     assert!(mainrdata_len <= 255);
     513            4 :     let mainrdata_len = mainrdata_len as u8;
     514            4 : 
     515            4 :     let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
     516            4 :     data.extend_from_slice(&mainrdata);
     517            4 :     data.extend_from_slice(&prefix_bytes);
     518            4 :     data.extend_from_slice(message_bytes);
     519            4 : 
     520            4 :     let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
     521            4 : 
     522            4 :     let mut header = XLogRecord {
     523            4 :         xl_tot_len: total_len as u32,
     524            4 :         xl_xid: 0,
     525            4 :         xl_prev: 0,
     526            4 :         xl_info: 0,
     527            4 :         xl_rmid: 21,
     528            4 :         __bindgen_padding_0: [0u8; 2usize],
     529            4 :         xl_crc: 0, // crc will be calculated later
     530            4 :     };
     531            4 : 
     532            4 :     let header_bytes = header.encode().expect("failed to encode header");
     533            4 :     let crc = crc32c_append(0, &data);
     534            4 :     let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
     535            4 :     header.xl_crc = crc;
     536            4 : 
     537            4 :     let mut wal: Vec<u8> = Vec::new();
     538            4 :     wal.extend_from_slice(&header.encode().expect("failed to encode header"));
     539            4 :     wal.extend_from_slice(&data);
     540              : 
     541              :     // WAL start position must be aligned at 8 bytes,
     542              :     // this will add padding for the next WAL record.
     543              :     const PADDING: usize = 8;
     544            4 :     let padding_rem = wal.len() % PADDING;
     545            4 :     if padding_rem != 0 {
     546            0 :         wal.resize(wal.len() + PADDING - padding_rem, 0);
     547            4 :     }
     548              : 
     549            4 :     wal
     550            4 : }
     551              : 
     552              : #[cfg(test)]
     553              : mod tests {
     554              :     use super::*;
     555              : 
     556              :     #[test]
     557            4 :     fn test_ts_conversion() {
     558            4 :         let now = SystemTime::now();
     559            4 :         let round_trip = try_from_pg_timestamp(to_pg_timestamp(now)).unwrap();
     560            4 : 
     561            4 :         let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     562            4 :         let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     563            4 :         assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
     564              : 
     565            4 :         let now_pg = get_current_timestamp();
     566            4 :         let round_trip_pg = to_pg_timestamp(try_from_pg_timestamp(now_pg).unwrap());
     567            4 : 
     568            4 :         assert_eq!(now_pg, round_trip_pg);
     569            4 :     }
     570              : 
     571              :     // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
     572              : }
        

Generated by: LCOV version 2.1-beta