LCOV - differential code coverage report
Current view: top level - libs/postgres_ffi/src - xlog_utils.rs (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC CBC EUB ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 95.6 % 321 307 10 4 10 97 200 4 80
Current Date: 2023-10-19 02:04:12 Functions: 75.2 % 109 82 21 6 2 58 22 23 47
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //
       2                 : // This file contains common utilities for dealing with PostgreSQL WAL files and
       3                 : // LSNs.
       4                 : //
       5                 : // Many of these functions have been copied from PostgreSQL, and rewritten in
       6                 : // Rust. That's why they don't follow the usual Rust naming conventions, they
       7                 : // have been named the same as the corresponding PostgreSQL functions instead.
       8                 : //
       9                 : 
      10                 : use crc32c::crc32c_append;
      11                 : 
      12                 : use super::super::waldecoder::WalStreamDecoder;
      13                 : use super::bindings::{
      14                 :     CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
      15                 :     XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
      16                 : };
      17                 : use super::PG_MAJORVERSION;
      18                 : use crate::pg_constants;
      19                 : use crate::PG_TLI;
      20                 : use crate::{uint32, uint64, Oid};
      21                 : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
      22                 : 
      23                 : use bytes::BytesMut;
      24                 : use bytes::{Buf, Bytes};
      25                 : 
      26                 : use log::*;
      27                 : 
      28                 : use serde::Serialize;
      29                 : use std::fs::File;
      30                 : use std::io::prelude::*;
      31                 : use std::io::ErrorKind;
      32                 : use std::io::SeekFrom;
      33                 : use std::path::{Path, PathBuf};
      34                 : use std::time::SystemTime;
      35                 : use utils::bin_ser::DeserializeError;
      36                 : use utils::bin_ser::SerializeError;
      37                 : 
      38                 : use utils::lsn::Lsn;
      39                 : 
      40                 : pub const XLOG_FNAME_LEN: usize = 24;
      41                 : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
      42                 : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
      43                 : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
      44                 : 
      45                 : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
      46                 : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
      47                 : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
      48                 : #[allow(clippy::identity_op)]
      49                 : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
      50                 : 
      51                 : /// Interval of checkpointing metadata file. We should store metadata file to enforce
      52                 : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
      53                 : /// But flushing checkpoint file for each transaction seems to be too expensive,
      54                 : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
      55                 : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
      56                 : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
      57                 : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
      58                 : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
      59                 : 
      60 CBC       11194 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
      61           11194 :     (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
      62           11194 : }
      63                 : 
      64             719 : pub fn XLogSegNoOffsetToRecPtr(
      65             719 :     segno: XLogSegNo,
      66             719 :     offset: u32,
      67             719 :     wal_segsz_bytes: usize,
      68             719 : ) -> XLogRecPtr {
      69             719 :     segno * (wal_segsz_bytes as u64) + (offset as u64)
      70             719 : }
      71                 : 
      72            5439 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
      73            5439 :     format!(
      74            5439 :         "{:>08X}{:>08X}{:>08X}",
      75            5439 :         tli,
      76            5439 :         logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
      77            5439 :         logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
      78            5439 :     )
      79            5439 : }
      80                 : 
      81             316 : pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
      82             316 :     let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
      83             316 :     let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
      84             316 :     let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
      85             316 :     (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
      86             316 : }
      87                 : 
      88            1097 : pub fn IsXLogFileName(fname: &str) -> bool {
      89            7944 :     return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
      90            1097 : }
      91                 : 
      92             736 : pub fn IsPartialXLogFileName(fname: &str) -> bool {
      93             736 :     fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
      94             736 : }
      95                 : 
      96                 : /// If LSN points to the beginning of the page, then shift it to first record,
      97                 : /// otherwise align on 8-bytes boundary (required for WAL records)
      98            1317 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
      99            1317 :     if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
     100              10 :         let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
     101               6 :             XLOG_SIZE_OF_XLOG_LONG_PHD
     102                 :         } else {
     103               4 :             XLOG_SIZE_OF_XLOG_SHORT_PHD
     104                 :         };
     105              10 :         lsn + hdr_size as u64
     106                 :     } else {
     107            1307 :         lsn.align()
     108                 :     }
     109            1317 : }
     110                 : 
     111             638 : pub fn generate_pg_control(
     112             638 :     pg_control_bytes: &[u8],
     113             638 :     checkpoint_bytes: &[u8],
     114             638 :     lsn: Lsn,
     115             638 : ) -> anyhow::Result<(Bytes, u64)> {
     116             638 :     let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
     117             638 :     let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
     118                 : 
     119                 :     // Generate new pg_control needed for bootstrap
     120             638 :     checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
     121             638 : 
     122             638 :     //reset some fields we don't want to preserve
     123             638 :     //TODO Check this.
     124             638 :     //We may need to determine the value from twophase data.
     125             638 :     checkpoint.oldestActiveXid = 0;
     126             638 : 
     127             638 :     //save new values in pg_control
     128             638 :     pg_control.checkPoint = 0;
     129             638 :     pg_control.checkPointCopy = checkpoint;
     130             638 :     pg_control.state = DBState_DB_SHUTDOWNED;
     131             638 : 
     132             638 :     Ok((pg_control.encode(), pg_control.system_identifier))
     133             638 : }
     134                 : 
     135          785583 : pub fn get_current_timestamp() -> TimestampTz {
     136          785583 :     to_pg_timestamp(SystemTime::now())
     137          785583 : }
     138                 : 
     139 ECB    (822562) : // Module to reduce the scope of the constants
     140        (822562) : mod timestamp_conversions {
     141        (822562) :     use std::time::Duration;
     142        (822562) : 
     143        (822562) :     use super::*;
     144        (822562) : 
     145        (822562) :     const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
     146        (822562) :     const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
     147        (822562) :     const SECS_PER_DAY: u64 = 86400;
     148        (822562) :     const USECS_PER_SEC: u64 = 1000000;
     149                 :     const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
     150 EUB             :         (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
     151                 : 
     152 CBC      785784 :     pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
     153 GIC      785784 :         match time.duration_since(SystemTime::UNIX_EPOCH) {
     154          785784 :             Ok(n) => {
     155          785784 :                 ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
     156          785784 :                     + n.subsec_micros() as u64) as i64
     157                 :             }
     158 LBC       (110) :             Err(_) => panic!("SystemTime before UNIX EPOCH!"),
     159 ECB       (110) :         }
     160 CBC      785784 :     }
     161 ECB       (110) : 
     162 CBC          16 :     pub fn from_pg_timestamp(time: TimestampTz) -> SystemTime {
     163              16 :         let time: u64 = time
     164              16 :             .try_into()
     165              16 :             .expect("timestamp before millenium (postgres epoch)");
     166              16 :         let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
     167              16 :         SystemTime::UNIX_EPOCH
     168 GIC          16 :             .checked_add(Duration::from_micros(since_unix_epoch))
     169 CBC          16 :             .expect("SystemTime overflow")
     170 GIC          16 :     }
     171                 : }
     172 ECB       (144) : 
     173           (144) : pub use timestamp_conversions::{from_pg_timestamp, to_pg_timestamp};
     174           (144) : 
     175           (144) : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
     176           (144) : // start_lsn must point to some previously known record boundary (beginning of
     177                 : // the next record). If no valid record after is found, start_lsn is returned
     178                 : // back.
     179 CBC         115 : pub fn find_end_of_wal(
     180 GBC         115 :     data_dir: &Path,
     181 GIC         115 :     wal_seg_size: usize,
     182             115 :     start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
     183 CBC         115 : ) -> anyhow::Result<Lsn> {
     184 GIC         115 :     let mut result = start_lsn;
     185 CBC         115 :     let mut curr_lsn = start_lsn;
     186             115 :     let mut buf = [0u8; XLOG_BLCKSZ];
     187             115 :     let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
     188 GIC         115 :     debug!("find_end_of_wal PG_VERSION: {}", pg_version);
     189                 : 
     190 CBC         115 :     let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
     191 ECB     (61204) : 
     192            (34) :     // loop over segments
     193 CBC         146 :     loop {
     194             146 :         let segno = curr_lsn.segment_number(wal_seg_size);
     195             146 :         let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     196 GIC         146 :         let seg_file_path = data_dir.join(seg_file_name);
     197             146 :         match open_wal_segment(&seg_file_path)? {
     198                 :             None => {
     199 ECB   (4735729) :                 // no more segments
     200 CBC          11 :                 debug!(
     201 LBC        (99) :                     "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
     202 ECB        (99) :                     result, seg_file_path
     203            (21) :                 );
     204 GIC          11 :                 return Ok(result);
     205                 :             }
     206 CBC         135 :             Some(mut segment) => {
     207 GIC         135 :                 let seg_offs = curr_lsn.segment_offset(wal_seg_size);
     208 CBC         135 :                 segment.seek(SeekFrom::Start(seg_offs as u64))?;
     209                 :                 // loop inside segment
     210                 :                 loop {
     211 GIC       57049 :                     let bytes_read = segment.read(&mut buf)?;
     212           57049 :                     if bytes_read == 0 {
     213              31 :                         break; // EOF
     214           57018 :                     }
     215 CBC       57018 :                     curr_lsn += bytes_read as u64;
     216 GIC       57018 :                     decoder.feed_bytes(&buf[0..bytes_read]);
     217                 : 
     218 ECB       (144) :                     // advance result past all completely read records
     219           (144) :                     loop {
     220 CBC     4350098 :                         match decoder.poll_decode() {
     221         4293080 :                             Ok(Some(record)) => result = record.0,
     222             104 :                             Err(e) => {
     223             104 :                                 debug!(
     224 GIC          21 :                                     "find_end_of_wal reached end at {:?}, decode error: {:?}",
     225                 :                                     result, e
     226 ECB        (45) :                                 );
     227 CBC         104 :                                 return Ok(result);
     228 ECB        (11) :                             }
     229 CBC       56914 :                             Ok(None) => break, // need more data
     230 EUB             :                         }
     231                 :                     }
     232                 :                 }
     233                 :             }
     234                 :         }
     235                 :     }
     236 GIC         115 : }
     237 ECB       (144) : 
     238                 : // Open .partial or full WAL segment file, if present.
     239 GBC         146 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
     240             146 :     let mut partial_path = seg_file_path.to_owned();
     241             146 :     partial_path.set_extension("partial");
     242             146 :     match File::open(partial_path) {
     243             104 :         Ok(file) => Ok(Some(file)),
     244              42 :         Err(e) => match e.kind() {
     245                 :             ErrorKind::NotFound => {
     246                 :                 // .partial not found, try full
     247 CBC          42 :                 match File::open(seg_file_path) {
     248              31 :                     Ok(file) => Ok(Some(file)),
     249              11 :                     Err(e) => match e.kind() {
     250              11 :                         ErrorKind::NotFound => Ok(None),
     251 UIC           0 :                         _ => Err(e.into()),
     252 ECB  (68237203) :                     },
     253      (68237203) :                 }
     254      (68237203) :             }
     255 LBC  (68237203) :             _ => Err(e.into()),
     256                 :         },
     257 ECB        (12) :     }
     258 CBC         146 : }
     259 ECB        (12) : 
     260 LBC        (12) : pub fn main() {
     261 UIC           0 :     let mut data_dir = PathBuf::new();
     262               0 :     data_dir.push(".");
     263 LBC (168742557) :     let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap();
     264     (168742557) :     println!("wal_end={:?}", wal_end);
     265     (168742557) : }
     266                 : 
     267                 : impl XLogRecord {
     268 GIC   168851349 :     pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
     269 CBC   168851349 :         use utils::bin_ser::LeSer;
     270       168851349 :         XLogRecord::des(buf)
     271       168851349 :     }
     272 ECB   (3098110) : 
     273 GIC    68588722 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
     274 CBC    68588722 :         use utils::bin_ser::LeSer;
     275        68588722 :         XLogRecord::des_from(&mut buf.reader())
     276        68588722 :     }
     277 ECB       (635) : 
     278 GIC          12 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     279              12 :         use utils::bin_ser::LeSer;
     280              12 :         Ok(self.ser()?.into())
     281 CBC          12 :     }
     282 ECB      (1481) : 
     283          (1481) :     // Is this record an XLOG_SWITCH record? They need some special processing,
     284 CBC   168851349 :     pub fn is_xlog_switch_record(&self) -> bool {
     285 GIC   168851349 :         self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
     286 CBC   168851349 :     }
     287 ECB       (638) : }
     288           (638) : 
     289           (638) : impl XLogPageHeaderData {
     290 GIC     3096127 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
     291         3096127 :         use utils::bin_ser::LeSer;
     292         3096127 :         XLogPageHeaderData::des_from(&mut buf.reader())
     293         3096127 :     }
     294                 : 
     295 CBC         638 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     296             638 :         use utils::bin_ser::LeSer;
     297             638 :         self.ser().map(|b| b.into())
     298             638 :     }
     299                 : }
     300 ECB      (2506) : 
     301          (2506) : impl XLogLongPageHeaderData {
     302 CBC        1473 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
     303            1473 :         use utils::bin_ser::LeSer;
     304 GIC        1473 :         XLogLongPageHeaderData::des_from(&mut buf.reader())
     305            1473 :     }
     306                 : 
     307             641 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     308             641 :         use utils::bin_ser::LeSer;
     309             641 :         self.ser().map(|b| b.into())
     310 CBC         641 :     }
     311 ECB  (68261690) : }
     312      (68261690) : 
     313      (68261690) : pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
     314      (68261690) : 
     315      (68261690) : impl CheckPoint {
     316 CBC       28619 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     317           28619 :         use utils::bin_ser::LeSer;
     318           28619 :         Ok(self.ser()?.into())
     319           28619 :     }
     320 ECB      (3621) : 
     321 CBC        2523 :     pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
     322 GBC        2523 :         use utils::bin_ser::LeSer;
     323            2523 :         CheckPoint::des(buf)
     324 CBC        2523 :     }
     325 ECB      (3621) : 
     326          (3621) :     /// Update next XID based on provided new_xid and stored epoch.
     327          (3621) :     /// Next XID should be greater than new_xid. This handles 32-bit
     328          (3621) :     /// XID wraparound correctly.
     329          (3621) :     ///
     330 EUB             :     /// Returns 'true' if the XID was updated.
     331 CBC    68613496 :     pub fn update_next_xid(&mut self, xid: u32) -> bool {
     332        68613496 :         // nextXid should nw greater than any XID in WAL, so increment provided XID and check for wraparround.
     333        68613496 :         let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
     334 GIC    68613496 :         // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
     335        68613496 :         // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
     336        68613496 :         new_xid =
     337        68613496 :             new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
     338        68613496 :         let full_xid = self.nextXid.value;
     339 CBC    68613496 :         let old_xid = full_xid as u32;
     340        68613496 :         if new_xid.wrapping_sub(old_xid) as i32 > 0 {
     341            3628 :             let mut epoch = full_xid >> 32;
     342            3628 :             if new_xid < old_xid {
     343 LBC       (638) :                 // wrap-around
     344           (638) :                 epoch += 1;
     345 CBC        3628 :             }
     346            3628 :             let nextXid = (epoch << 32) | new_xid as u64;
     347            3628 : 
     348            3628 :             if nextXid != self.nextXid.value {
     349            3628 :                 self.nextXid = FullTransactionId { value: nextXid };
     350 GIC        3628 :                 return true;
     351 LBC       (635) :             }
     352 GIC    68609868 :         }
     353        68609868 :         false
     354 CBC    68613496 :     }
     355 ECB       (638) : }
     356           (638) : 
     357           (638) : /// Generate new, empty WAL segment, with correct block headers at the first
     358           (638) : /// page of the segment and the page that contains the given LSN.
     359           (638) : /// We need this segment to start compute node.
     360 CBC         641 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
     361             641 :     let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
     362             641 : 
     363             641 :     let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
     364             641 : 
     365             641 :     let page_off = lsn.block_offset();
     366             641 :     let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
     367             641 : 
     368             641 :     let first_page_only = seg_off < XLOG_BLCKSZ;
     369 GIC         641 :     let (shdr_rem_len, infoflags) = if first_page_only {
     370 CBC           3 :         (seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
     371 ECB       (638) :     } else {
     372 CBC         638 :         (0, 0)
     373 ECB       (638) :     };
     374           (638) : 
     375 CBC         641 :     let hdr = XLogLongPageHeaderData {
     376             641 :         std: {
     377             641 :             XLogPageHeaderData {
     378             641 :                 xlp_magic: XLOG_PAGE_MAGIC as u16,
     379             641 :                 xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
     380             641 :                 xlp_tli: PG_TLI,
     381             641 :                 xlp_pageaddr: pageaddr,
     382 GIC         641 :                 xlp_rem_len: shdr_rem_len as u32,
     383 CBC         641 :                 ..Default::default() // Put 0 in padding fields.
     384 GIC         641 :             }
     385             641 :         },
     386 CBC         641 :         xlp_sysid: system_id,
     387             641 :         xlp_seg_size: WAL_SEGMENT_SIZE as u32,
     388             641 :         xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
     389 GIC         641 :     };
     390 ECB         (1) : 
     391 GIC         641 :     let hdr_bytes = hdr.encode()?;
     392 CBC         641 :     seg_buf.extend_from_slice(&hdr_bytes);
     393 GIC         641 : 
     394 CBC         641 :     //zero out the rest of the file
     395 GIC         641 :     seg_buf.resize(WAL_SEGMENT_SIZE, 0);
     396 CBC         641 : 
     397             641 :     if !first_page_only {
     398 GIC         638 :         let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
     399 CBC         638 :         let header = XLogPageHeaderData {
     400             638 :             xlp_magic: XLOG_PAGE_MAGIC as u16,
     401 GIC         638 :             xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
     402 CBC         637 :                 pg_constants::XLP_FIRST_IS_CONTRECORD
     403 ECB       (638) :             } else {
     404 GIC           1 :                 0
     405                 :             },
     406 ECB        (12) :             xlp_tli: PG_TLI,
     407 GIC         638 :             xlp_pageaddr: lsn.page_lsn().0,
     408             638 :             xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
     409             637 :                 page_off as u32
     410                 :             } else {
     411               1 :                 0u32
     412                 :             },
     413             638 :             ..Default::default() // Put 0 in padding fields.
     414                 :         };
     415 CBC         638 :         let hdr_bytes = header.encode()?;
     416 ECB         (6) : 
     417 CBC         638 :         debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
     418             638 :         debug_assert_ne!(block_offset, 0);
     419                 : 
     420 GIC         638 :         seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
     421               3 :     }
     422                 : 
     423             641 :     Ok(seg_buf.freeze())
     424             641 : }
     425                 : 
     426                 : #[repr(C)]
     427 CBC          12 : #[derive(Serialize)]
     428 ECB         (6) : struct XlLogicalMessage {
     429             (6) :     db_id: Oid,
     430             (6) :     transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
     431             (6) :     prefix_size: uint64,
     432             (6) :     message_size: uint64,
     433             (6) : }
     434             (6) : 
     435             (6) : impl XlLogicalMessage {
     436 CBC           6 :     pub fn encode(&self) -> Bytes {
     437               6 :         use utils::bin_ser::LeSer;
     438               6 :         self.ser().unwrap().into()
     439               6 :     }
     440 ECB         (6) : }
     441             (6) : 
     442             (6) : /// Create new WAL record for non-transactional logical message.
     443             (6) : /// Used for creating artificial WAL for tests, as LogicalMessage
     444             (6) : /// record is basically no-op.
     445             (6) : ///
     446             (6) : /// NOTE: This leaves the xl_prev field zero. The safekeeper and
     447             (6) : /// pageserver tolerate that, but PostgreSQL does not.
     448 CBC           6 : pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
     449               6 :     let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
     450               6 :     prefix_bytes.write_all(prefix.as_bytes()).unwrap();
     451               6 :     prefix_bytes.push(0);
     452               6 : 
     453               6 :     let message_bytes = message.as_bytes();
     454               6 : 
     455               6 :     let logical_message = XlLogicalMessage {
     456               6 :         db_id: 0,
     457               6 :         transactional: 0,
     458               6 :         prefix_size: prefix_bytes.len() as u64,
     459               6 :         message_size: message_bytes.len() as u64,
     460               6 :     };
     461               6 : 
     462               6 :     let mainrdata = logical_message.encode();
     463               6 :     let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
     464               6 :     // only short mainrdata is supported for now
     465               6 :     assert!(mainrdata_len <= 255);
     466               6 :     let mainrdata_len = mainrdata_len as u8;
     467               6 : 
     468               6 :     let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
     469               6 :     data.extend_from_slice(&mainrdata);
     470               6 :     data.extend_from_slice(&prefix_bytes);
     471               6 :     data.extend_from_slice(message_bytes);
     472               6 : 
     473               6 :     let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
     474               6 : 
     475               6 :     let mut header = XLogRecord {
     476               6 :         xl_tot_len: total_len as u32,
     477               6 :         xl_xid: 0,
     478 GBC           6 :         xl_prev: 0,
     479 CBC           6 :         xl_info: 0,
     480 GIC           6 :         xl_rmid: 21,
     481 CBC           6 :         __bindgen_padding_0: [0u8; 2usize],
     482               6 :         xl_crc: 0, // crc will be calculated later
     483 GIC           6 :     };
     484               6 : 
     485               6 :     let header_bytes = header.encode().expect("failed to encode header");
     486               6 :     let crc = crc32c_append(0, &data);
     487               6 :     let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
     488               6 :     header.xl_crc = crc;
     489               6 : 
     490               6 :     let mut wal: Vec<u8> = Vec::new();
     491               6 :     wal.extend_from_slice(&header.encode().expect("failed to encode header"));
     492               6 :     wal.extend_from_slice(&data);
     493               6 : 
     494               6 :     // WAL start position must be aligned at 8 bytes,
     495               6 :     // this will add padding for the next WAL record.
     496               6 :     const PADDING: usize = 8;
     497               6 :     let padding_rem = wal.len() % PADDING;
     498               6 :     if padding_rem != 0 {
     499 UIC           0 :         wal.resize(wal.len() + PADDING - padding_rem, 0);
     500 GIC           6 :     }
     501                 : 
     502               6 :     wal
     503               6 : }
     504                 : 
     505                 : #[cfg(test)]
     506                 : mod tests {
     507                 :     use super::*;
     508                 : 
     509               3 :     #[test]
     510               3 :     fn test_ts_conversion() {
     511               3 :         let now = SystemTime::now();
     512               3 :         let round_trip = from_pg_timestamp(to_pg_timestamp(now));
     513               3 : 
     514               3 :         let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     515               3 :         let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     516               3 :         assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
     517                 : 
     518               3 :         let now_pg = get_current_timestamp();
     519               3 :         let round_trip_pg = to_pg_timestamp(from_pg_timestamp(now_pg));
     520               3 : 
     521               3 :         assert_eq!(now_pg, round_trip_pg);
     522               3 :     }
     523                 : 
     524                 :     // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
     525                 : }
        

Generated by: LCOV version 2.1-beta