LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - xlog_utils.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 93.9 % 327 307
Test Date: 2024-02-14 18:05:35 Functions: 77.1 % 109 84

            Line data    Source code
       1              : //
       2              : // This file contains common utilities for dealing with PostgreSQL WAL files and
       3              : // LSNs.
       4              : //
       5              : // Many of these functions have been copied from PostgreSQL, and rewritten in
       6              : // Rust. That's why they don't follow the usual Rust naming conventions, they
       7              : // have been named the same as the corresponding PostgreSQL functions instead.
       8              : //
       9              : 
      10              : use crc32c::crc32c_append;
      11              : 
      12              : use super::super::waldecoder::WalStreamDecoder;
      13              : use super::bindings::{
      14              :     CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
      15              :     XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
      16              : };
      17              : use super::PG_MAJORVERSION;
      18              : use crate::pg_constants;
      19              : use crate::PG_TLI;
      20              : use crate::{uint32, uint64, Oid};
      21              : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
      22              : 
      23              : use bytes::BytesMut;
      24              : use bytes::{Buf, Bytes};
      25              : 
      26              : use log::*;
      27              : 
      28              : use serde::Serialize;
      29              : use std::fs::File;
      30              : use std::io::prelude::*;
      31              : use std::io::ErrorKind;
      32              : use std::io::SeekFrom;
      33              : use std::path::{Path, PathBuf};
      34              : use std::time::SystemTime;
      35              : use utils::bin_ser::DeserializeError;
      36              : use utils::bin_ser::SerializeError;
      37              : 
      38              : use utils::lsn::Lsn;
      39              : 
      40              : pub const XLOG_FNAME_LEN: usize = 24;
      41              : pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
      42              : pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
      43              : pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
      44              : 
      45              : pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
      46              : pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
      47              : pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
      48              : #[allow(clippy::identity_op)]
      49              : pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
      50              : 
      51              : /// Interval of checkpointing metadata file. We should store metadata file to enforce
      52              : /// predicate that checkpoint.nextXid is larger than any XID in WAL.
      53              : /// But flushing checkpoint file for each transaction seems to be too expensive,
      54              : /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
      55              : /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
      56              : /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
      57              : /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
      58              : const XID_CHECKPOINT_INTERVAL: u32 = 1024;
      59              : 
      60        12011 : pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
      61        12011 :     (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
      62        12011 : }
      63              : 
      64        66307 : pub fn XLogSegNoOffsetToRecPtr(
      65        66307 :     segno: XLogSegNo,
      66        66307 :     offset: u32,
      67        66307 :     wal_segsz_bytes: usize,
      68        66307 : ) -> XLogRecPtr {
      69        66307 :     segno * (wal_segsz_bytes as u64) + (offset as u64)
      70        66307 : }
      71              : 
      72         5851 : pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
      73         5851 :     format!(
      74         5851 :         "{:>08X}{:>08X}{:>08X}",
      75         5851 :         tli,
      76         5851 :         logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
      77         5851 :         logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
      78         5851 :     )
      79         5851 : }
      80              : 
      81          309 : pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
      82          309 :     let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
      83          309 :     let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
      84          309 :     let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
      85          309 :     (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
      86          309 : }
      87              : 
      88         1108 : pub fn IsXLogFileName(fname: &str) -> bool {
      89         8136 :     return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
      90         1108 : }
      91              : 
      92          709 : pub fn IsPartialXLogFileName(fname: &str) -> bool {
      93          709 :     fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
      94          709 : }
      95              : 
      96              : /// If LSN points to the beginning of the page, then shift it to first record,
      97              : /// otherwise align on 8-bytes boundary (required for WAL records)
      98         1356 : pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
      99         1356 :     if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
     100           14 :         let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
     101            7 :             XLOG_SIZE_OF_XLOG_LONG_PHD
     102              :         } else {
     103            7 :             XLOG_SIZE_OF_XLOG_SHORT_PHD
     104              :         };
     105           14 :         lsn + hdr_size as u64
     106              :     } else {
     107         1342 :         lsn.align()
     108              :     }
     109         1356 : }
     110              : 
     111          595 : pub fn generate_pg_control(
     112          595 :     pg_control_bytes: &[u8],
     113          595 :     checkpoint_bytes: &[u8],
     114          595 :     lsn: Lsn,
     115          595 : ) -> anyhow::Result<(Bytes, u64)> {
     116          595 :     let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
     117          595 :     let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
     118              : 
     119              :     // Generate new pg_control needed for bootstrap
     120          595 :     checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
     121          595 : 
     122          595 :     //reset some fields we don't want to preserve
     123          595 :     //TODO Check this.
     124          595 :     //We may need to determine the value from twophase data.
     125          595 :     checkpoint.oldestActiveXid = 0;
     126          595 : 
     127          595 :     //save new values in pg_control
     128          595 :     pg_control.checkPoint = 0;
     129          595 :     pg_control.checkPointCopy = checkpoint;
     130          595 :     pg_control.state = DBState_DB_SHUTDOWNED;
     131          595 : 
     132          595 :     Ok((pg_control.encode(), pg_control.system_identifier))
     133          595 : }
     134              : 
     135       758126 : pub fn get_current_timestamp() -> TimestampTz {
     136       758126 :     to_pg_timestamp(SystemTime::now())
     137       758126 : }
     138              : 
     139              : // Module to reduce the scope of the constants
     140              : mod timestamp_conversions {
     141              :     use std::time::Duration;
     142              : 
     143              :     use super::*;
     144              : 
     145              :     const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
     146              :     const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
     147              :     const SECS_PER_DAY: u64 = 86400;
     148              :     const USECS_PER_SEC: u64 = 1000000;
     149              :     const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
     150              :         (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;
     151              : 
     152       758319 :     pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
     153       758319 :         match time.duration_since(SystemTime::UNIX_EPOCH) {
     154       758319 :             Ok(n) => {
     155       758319 :                 ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
     156       758319 :                     + n.subsec_micros() as u64) as i64
     157              :             }
     158            0 :             Err(_) => panic!("SystemTime before UNIX EPOCH!"),
     159              :         }
     160       758319 :     }
     161              : 
     162           22 :     pub fn from_pg_timestamp(time: TimestampTz) -> SystemTime {
     163           22 :         let time: u64 = time
     164           22 :             .try_into()
     165           22 :             .expect("timestamp before millenium (postgres epoch)");
     166           22 :         let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
     167           22 :         SystemTime::UNIX_EPOCH
     168           22 :             .checked_add(Duration::from_micros(since_unix_epoch))
     169           22 :             .expect("SystemTime overflow")
     170           22 :     }
     171              : }
     172              : 
     173              : pub use timestamp_conversions::{from_pg_timestamp, to_pg_timestamp};
     174              : 
     175              : // Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
     176              : // start_lsn must point to some previously known record boundary (beginning of
     177              : // the next record). If no valid record after is found, start_lsn is returned
     178              : // back.
     179          228 : pub fn find_end_of_wal(
     180          228 :     data_dir: &Path,
     181          228 :     wal_seg_size: usize,
     182          228 :     start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
     183          228 : ) -> anyhow::Result<Lsn> {
     184          228 :     let mut result = start_lsn;
     185          228 :     let mut curr_lsn = start_lsn;
     186          228 :     let mut buf = [0u8; XLOG_BLCKSZ];
     187          228 :     let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
     188          228 :     debug!("find_end_of_wal PG_VERSION: {}", pg_version);
     189              : 
     190          228 :     let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
     191              : 
     192              :     // loop over segments
     193          255 :     loop {
     194          255 :         let segno = curr_lsn.segment_number(wal_seg_size);
     195          255 :         let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     196          255 :         let seg_file_path = data_dir.join(seg_file_name);
     197          255 :         match open_wal_segment(&seg_file_path)? {
     198              :             None => {
     199              :                 // no more segments
     200            5 :                 debug!(
     201            0 :                     "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
     202              :                     result, seg_file_path
     203              :                 );
     204            5 :                 return Ok(result);
     205              :             }
     206          250 :             Some(mut segment) => {
     207          250 :                 let seg_offs = curr_lsn.segment_offset(wal_seg_size);
     208          250 :                 segment.seek(SeekFrom::Start(seg_offs as u64))?;
     209              :                 // loop inside segment
     210        51553 :                 while curr_lsn.segment_number(wal_seg_size) == segno {
     211        51526 :                     let bytes_read = segment.read(&mut buf)?;
     212        51526 :                     if bytes_read == 0 {
     213            0 :                         debug!(
     214            0 :                             "find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}",
     215            0 :                             result,
     216            0 :                             seg_file_path,
     217            0 :                             curr_lsn.segment_offset(wal_seg_size)
     218              :                         );
     219            0 :                         return Ok(result);
     220        51526 :                     }
     221        51526 :                     curr_lsn += bytes_read as u64;
     222        51526 :                     decoder.feed_bytes(&buf[0..bytes_read]);
     223              : 
     224              :                     // advance result past all completely read records
     225              :                     loop {
     226      2368139 :                         match decoder.poll_decode() {
     227      2316613 :                             Ok(Some(record)) => result = record.0,
     228          223 :                             Err(e) => {
     229          223 :                                 debug!(
     230           42 :                                     "find_end_of_wal reached end at {:?}, decode error: {:?}",
     231              :                                     result, e
     232              :                                 );
     233          223 :                                 return Ok(result);
     234              :                             }
     235        51303 :                             Ok(None) => break, // need more data
     236              :                         }
     237              :                     }
     238              :                 }
     239              :             }
     240              :         }
     241              :     }
     242          228 : }
     243              : 
     244              : // Open .partial or full WAL segment file, if present.
     245          255 : fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
     246          255 :     let mut partial_path = seg_file_path.to_owned();
     247          255 :     partial_path.set_extension("partial");
     248          255 :     match File::open(partial_path) {
     249          223 :         Ok(file) => Ok(Some(file)),
     250           32 :         Err(e) => match e.kind() {
     251              :             ErrorKind::NotFound => {
     252              :                 // .partial not found, try full
     253           32 :                 match File::open(seg_file_path) {
     254           27 :                     Ok(file) => Ok(Some(file)),
     255            5 :                     Err(e) => match e.kind() {
     256            5 :                         ErrorKind::NotFound => Ok(None),
     257            0 :                         _ => Err(e.into()),
     258              :                     },
     259              :                 }
     260              :             }
     261            0 :             _ => Err(e.into()),
     262              :         },
     263              :     }
     264          255 : }
     265              : 
     266            0 : pub fn main() {
     267            0 :     let mut data_dir = PathBuf::new();
     268            0 :     data_dir.push(".");
     269            0 :     let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap();
     270            0 :     println!("wal_end={:?}", wal_end);
     271            0 : }
     272              : 
     273              : impl XLogRecord {
     274    156326246 :     pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
     275    156326246 :         use utils::bin_ser::LeSer;
     276    156326246 :         XLogRecord::des(buf)
     277    156326246 :     }
     278              : 
     279     72693843 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
     280     72693843 :         use utils::bin_ser::LeSer;
     281     72693843 :         XLogRecord::des_from(&mut buf.reader())
     282     72693843 :     }
     283              : 
     284        65630 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     285        65630 :         use utils::bin_ser::LeSer;
     286        65630 :         Ok(self.ser()?.into())
     287        65630 :     }
     288              : 
     289              :     // Is this record an XLOG_SWITCH record? They need some special processing,
     290    156326246 :     pub fn is_xlog_switch_record(&self) -> bool {
     291    156326246 :         self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
     292    156326246 :     }
     293              : }
     294              : 
     295              : impl XLogPageHeaderData {
     296      3223639 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
     297      3223639 :         use utils::bin_ser::LeSer;
     298      3223639 :         XLogPageHeaderData::des_from(&mut buf.reader())
     299      3223639 :     }
     300              : 
     301          780 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     302          780 :         use utils::bin_ser::LeSer;
     303          780 :         self.ser().map(|b| b.into())
     304          780 :     }
     305              : }
     306              : 
     307              : impl XLogLongPageHeaderData {
     308         1547 :     pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
     309         1547 :         use utils::bin_ser::LeSer;
     310         1547 :         XLogLongPageHeaderData::des_from(&mut buf.reader())
     311         1547 :     }
     312              : 
     313          601 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     314          601 :         use utils::bin_ser::LeSer;
     315          601 :         self.ser().map(|b| b.into())
     316          601 :     }
     317              : }
     318              : 
     319              : pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
     320              : 
     321              : impl CheckPoint {
     322        34914 :     pub fn encode(&self) -> Result<Bytes, SerializeError> {
     323        34914 :         use utils::bin_ser::LeSer;
     324        34914 :         Ok(self.ser()?.into())
     325        34914 :     }
     326              : 
     327         2650 :     pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
     328         2650 :         use utils::bin_ser::LeSer;
     329         2650 :         CheckPoint::des(buf)
     330         2650 :     }
     331              : 
     332              :     /// Update next XID based on provided new_xid and stored epoch.
     333              :     /// Next XID should be greater than new_xid. This handles 32-bit
     334              :     /// XID wraparound correctly.
     335              :     ///
     336              :     /// Returns 'true' if the XID was updated.
     337     71483956 :     pub fn update_next_xid(&mut self, xid: u32) -> bool {
     338     71483956 :         // nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
     339     71483956 :         let mut new_xid = std::cmp::max(xid.wrapping_add(1), pg_constants::FIRST_NORMAL_TRANSACTION_ID);
     340     71483956 :         // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
     341     71483956 :         // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
     342     71483956 :         new_xid =
     343     71483956 :             new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
     344     71483956 :         let full_xid = self.nextXid.value;
     345     71483956 :         let old_xid = full_xid as u32;
     346     71483956 :         if new_xid.wrapping_sub(old_xid) as i32 > 0 {
     347         8380 :             let mut epoch = full_xid >> 32;
     348         8380 :             if new_xid < old_xid {
     349            0 :                 // wrap-around
     350            0 :                 epoch += 1;
     351         8380 :             }
     352         8380 :             let nextXid = (epoch << 32) | new_xid as u64;
     353         8380 : 
     354         8380 :             if nextXid != self.nextXid.value {
     355         8380 :                 self.nextXid = FullTransactionId { value: nextXid };
     356         8380 :                 return true;
     357            0 :             }
     358     71475576 :         }
     359     71475576 :         false
     360     71483956 :     }
     361              : }
     362              : 
     363              : /// Generate new, empty WAL segment, with correct block headers at the first
     364              : /// page of the segment and the page that contains the given LSN.
     365              : /// We need this segment to start compute node.
     366          601 : pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
     367          601 :     let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
     368          601 : 
     369          601 :     let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
     370          601 : 
     371          601 :     let page_off = lsn.block_offset();
     372          601 :     let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
     373          601 : 
     374          601 :     let first_page_only = seg_off < XLOG_BLCKSZ;
     375          601 :     let (shdr_rem_len, infoflags) = if first_page_only {
     376            4 :         (seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
     377              :     } else {
     378          597 :         (0, 0)
     379              :     };
     380              : 
     381          601 :     let hdr = XLogLongPageHeaderData {
     382          601 :         std: {
     383          601 :             XLogPageHeaderData {
     384          601 :                 xlp_magic: XLOG_PAGE_MAGIC as u16,
     385          601 :                 xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
     386          601 :                 xlp_tli: PG_TLI,
     387          601 :                 xlp_pageaddr: pageaddr,
     388          601 :                 xlp_rem_len: shdr_rem_len as u32,
     389          601 :                 ..Default::default() // Put 0 in padding fields.
     390          601 :             }
     391          601 :         },
     392          601 :         xlp_sysid: system_id,
     393          601 :         xlp_seg_size: WAL_SEGMENT_SIZE as u32,
     394          601 :         xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
     395          601 :     };
     396              : 
     397          601 :     let hdr_bytes = hdr.encode()?;
     398          601 :     seg_buf.extend_from_slice(&hdr_bytes);
     399          601 : 
     400          601 :     //zero out the rest of the file
     401          601 :     seg_buf.resize(WAL_SEGMENT_SIZE, 0);
     402          601 : 
     403          601 :     if !first_page_only {
     404          597 :         let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
     405          597 :         let header = XLogPageHeaderData {
     406          597 :             xlp_magic: XLOG_PAGE_MAGIC as u16,
     407          597 :             xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
     408          596 :                 pg_constants::XLP_FIRST_IS_CONTRECORD
     409              :             } else {
     410            1 :                 0
     411              :             },
     412              :             xlp_tli: PG_TLI,
     413          597 :             xlp_pageaddr: lsn.page_lsn().0,
     414          597 :             xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
     415          596 :                 page_off as u32
     416              :             } else {
     417            1 :                 0u32
     418              :             },
     419          597 :             ..Default::default() // Put 0 in padding fields.
     420              :         };
     421          597 :         let hdr_bytes = header.encode()?;
     422              : 
     423          597 :         debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
     424          597 :         debug_assert_ne!(block_offset, 0);
     425              : 
     426          597 :         seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
     427            4 :     }
     428              : 
     429          601 :     Ok(seg_buf.freeze())
     430          601 : }
     431              : 
     432              : #[repr(C)]
     433        65630 : #[derive(Serialize)]
     434              : pub struct XlLogicalMessage {
     435              :     pub db_id: Oid,
     436              :     pub transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
     437              :     pub prefix_size: uint64,
     438              :     pub message_size: uint64,
     439              : }
     440              : 
     441              : impl XlLogicalMessage {
     442        32815 :     pub fn encode(&self) -> Bytes {
     443        32815 :         use utils::bin_ser::LeSer;
     444        32815 :         self.ser().unwrap().into()
     445        32815 :     }
     446              : }
     447              : 
     448              : /// Create new WAL record for non-transactional logical message.
     449              : /// Used for creating artificial WAL for tests, as LogicalMessage
     450              : /// record is basically no-op.
     451              : ///
     452              : /// NOTE: This leaves the xl_prev field zero. The safekeeper and
     453              : /// pageserver tolerate that, but PostgreSQL does not.
     454            9 : pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
     455            9 :     let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
     456            9 :     prefix_bytes.write_all(prefix.as_bytes()).unwrap();
     457            9 :     prefix_bytes.push(0);
     458            9 : 
     459            9 :     let message_bytes = message.as_bytes();
     460            9 : 
     461            9 :     let logical_message = XlLogicalMessage {
     462            9 :         db_id: 0,
     463            9 :         transactional: 0,
     464            9 :         prefix_size: prefix_bytes.len() as u64,
     465            9 :         message_size: message_bytes.len() as u64,
     466            9 :     };
     467            9 : 
     468            9 :     let mainrdata = logical_message.encode();
     469            9 :     let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
     470            9 :     // only short mainrdata is supported for now
     471            9 :     assert!(mainrdata_len <= 255);
     472            9 :     let mainrdata_len = mainrdata_len as u8;
     473            9 : 
     474            9 :     let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
     475            9 :     data.extend_from_slice(&mainrdata);
     476            9 :     data.extend_from_slice(&prefix_bytes);
     477            9 :     data.extend_from_slice(message_bytes);
     478            9 : 
     479            9 :     let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
     480            9 : 
     481            9 :     let mut header = XLogRecord {
     482            9 :         xl_tot_len: total_len as u32,
     483            9 :         xl_xid: 0,
     484            9 :         xl_prev: 0,
     485            9 :         xl_info: 0,
     486            9 :         xl_rmid: 21,
     487            9 :         __bindgen_padding_0: [0u8; 2usize],
     488            9 :         xl_crc: 0, // crc will be calculated later
     489            9 :     };
     490            9 : 
     491            9 :     let header_bytes = header.encode().expect("failed to encode header");
     492            9 :     let crc = crc32c_append(0, &data);
     493            9 :     let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
     494            9 :     header.xl_crc = crc;
     495            9 : 
     496            9 :     let mut wal: Vec<u8> = Vec::new();
     497            9 :     wal.extend_from_slice(&header.encode().expect("failed to encode header"));
     498            9 :     wal.extend_from_slice(&data);
     499            9 : 
     500            9 :     // WAL start position must be aligned at 8 bytes,
     501            9 :     // this will add padding for the next WAL record.
     502            9 :     const PADDING: usize = 8;
     503            9 :     let padding_rem = wal.len() % PADDING;
     504            9 :     if padding_rem != 0 {
     505            0 :         wal.resize(wal.len() + PADDING - padding_rem, 0);
     506            9 :     }
     507              : 
     508            9 :     wal
     509            9 : }
     510              : 
     511              : #[cfg(test)]
     512              : mod tests {
     513              :     use super::*;
     514              : 
     515            6 :     #[test]
     516            6 :     fn test_ts_conversion() {
     517            6 :         let now = SystemTime::now();
     518            6 :         let round_trip = from_pg_timestamp(to_pg_timestamp(now));
     519            6 : 
     520            6 :         let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     521            6 :         let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
     522            6 :         assert_eq!(now_since.as_micros(), round_trip_since.as_micros());
     523              : 
     524            6 :         let now_pg = get_current_timestamp();
     525            6 :         let round_trip_pg = to_pg_timestamp(from_pg_timestamp(now_pg));
     526            6 : 
     527            6 :         assert_eq!(now_pg, round_trip_pg);
     528            6 :     }
     529              : 
     530              :     // If you need to craft WAL and write tests for this module, put it at wal_craft crate.
     531              : }
        

Generated by: LCOV version 2.1-beta