LCOV - code coverage report
Current view: top level - libs/postgres_ffi/src - wal_generator.rs (source / functions) Coverage Total Hit
Test: f5f94ec0366b63fd2cbbe02edc2087dbd893d04d.info Lines: 85.5 % 131 112
Test Date: 2024-11-20 05:34:23 Functions: 51.2 % 41 21

            Line data    Source code
       1              : use std::ffi::{CStr, CString};
       2              : 
       3              : use bytes::{Bytes, BytesMut};
       4              : use crc32c::crc32c_append;
       5              : use utils::lsn::Lsn;
       6              : 
       7              : use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
       8              : use super::xlog_utils::{
       9              :     XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE,
      10              :     XLP_FIRST_IS_CONTRECORD,
      11              : };
      12              : use super::XLogRecord;
      13              : use crate::pg_constants::{
      14              :     RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
      15              :     XLR_BLOCK_ID_DATA_SHORT,
      16              : };
      17              : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
      18              : 
      19              : /// A WAL record payload. Will be prefixed by an XLogRecord header when encoded.
      20              : pub struct Record {
      21              :     pub rmid: RmgrId,
      22              :     pub info: u8,
      23              :     pub data: Bytes,
      24              : }
      25              : 
      26              : impl Record {
      27              :     /// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of
      28              :     /// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres.
      29        11381 :     pub fn encode(&self, prev_lsn: Lsn) -> Bytes {
      30              :         // Prefix data with block ID and length.
      31        11381 :         let data_header = Bytes::from(match self.data.len() {
      32            0 :             0 => vec![],
      33        11381 :             1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8],
      34            0 :             256.. => {
      35            0 :                 let len_bytes = (self.data.len() as u32).to_le_bytes();
      36            0 :                 [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
      37              :             }
      38              :         });
      39              : 
      40              :         // Construct the WAL record header.
      41        11381 :         let mut header = XLogRecord {
      42        11381 :             xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
      43        11381 :             xl_xid: 0,
      44        11381 :             xl_prev: prev_lsn.into(),
      45        11381 :             xl_info: self.info,
      46        11381 :             xl_rmid: self.rmid,
      47        11381 :             __bindgen_padding_0: [0; 2],
      48        11381 :             xl_crc: 0, // see below
      49        11381 :         };
      50        11381 : 
      51        11381 :         // Compute the CRC checksum for the data, and the header up to the CRC field.
      52        11381 :         let mut crc = 0;
      53        11381 :         crc = crc32c_append(crc, &data_header);
      54        11381 :         crc = crc32c_append(crc, &self.data);
      55        11381 :         crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
      56        11381 :         header.xl_crc = crc;
      57        11381 : 
      58        11381 :         // Encode the final header and record.
      59        11381 :         let header = header.encode().unwrap();
      60        11381 : 
      61        11381 :         [header, data_header, self.data.clone()].concat().into()
      62        11381 :     }
      63              : }
      64              : 
      65              : /// Generates WAL record payloads.
      66              : ///
      67              : /// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator
      68              : /// that creates a table and inserts rows.
      69              : pub trait RecordGenerator: Iterator<Item = Record> {}
      70              : 
      71              : impl<I: Iterator<Item = Record>> RecordGenerator for I {}
      72              : 
      73              : /// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs
      74              : /// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record,
      75              : /// including internal page headers if it spans pages. Concatenating the bytes will yield a
      76              : /// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized
      77              : /// for performance.
      78              : ///
      79              : /// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this
      80              : /// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`).
      81              : ///
      82              : /// A WAL is split into 16 MB segments. Each segment is split into 8 KB pages, with headers.
      83              : /// Records are arbitrary length, 8-byte aligned, and may span pages. The layout is e.g.:
      84              : ///
      85              : /// |        Segment 1         |        Segment 2         |        Segment 3         |
      86              : /// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 |
      87              : /// | R1 |   R2  |R3|  R4  | R5  |  R6  |                 R7            | R8  |
      88              : #[derive(Default)]
      89              : pub struct WalGenerator<R: RecordGenerator> {
      90              :     /// Generates record payloads for the WAL.
      91              :     pub record_generator: R,
      92              :     /// Current LSN to append the next record at.
      93              :     ///
      94              :     /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should
      95              :     /// ensure that the LSN is on a valid record boundary (i.e. we can't start appending in the
      96              :     /// middle on an existing record or header, or beyond the end of the existing WAL).
      97              :     pub lsn: Lsn,
      98              :     /// The starting LSN of the previous record. Used in WAL record headers. The Safekeeper doesn't
      99              :     /// care about this, unlike Postgres, but we include it for completeness.
     100              :     pub prev_lsn: Lsn,
     101              : }
     102              : 
     103              : impl<R: RecordGenerator> WalGenerator<R> {
     104              :     // Hardcode the sys and timeline ID. We can make them configurable if we care about them.
     105              :     const SYS_ID: u64 = 0;
     106              :     const TIMELINE_ID: u32 = 1;
     107              : 
     108              :     /// Creates a new WAL generator with the given record generator.
     109         9265 :     pub fn new(record_generator: R) -> WalGenerator<R> {
     110         9265 :         Self {
     111         9265 :             record_generator,
     112         9265 :             lsn: Lsn(0),
     113         9265 :             prev_lsn: Lsn(0),
     114         9265 :         }
     115         9265 :     }
     116              : 
     117              :     /// Appends a record with an arbitrary payload at the current LSN, then increments the LSN.
     118              :     /// Returns the WAL bytes for the record, including page headers and padding, and the start LSN.
     119        11377 :     fn append_record(&mut self, record: Record) -> (Lsn, Bytes) {
     120        11377 :         let record = record.encode(self.prev_lsn);
     121        11377 :         let record = Self::insert_pages(record, self.lsn);
     122        11377 :         let record = Self::pad_record(record, self.lsn);
     123        11377 :         let lsn = self.lsn;
     124        11377 :         self.prev_lsn = self.lsn;
     125        11377 :         self.lsn += record.len() as u64;
     126        11377 :         (lsn, record)
     127        11377 :     }
     128              : 
     129              :     /// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record
     130              :     /// is to be appended.
     131        11377 :     fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
     132        11377 :         // Fast path: record fits in current page, and the page already has a header.
     133        11377 :         if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 {
     134        11294 :             return record;
     135           83 :         }
     136           83 : 
     137           83 :         let mut pages = BytesMut::new();
     138           83 :         let mut remaining = record.clone(); // Bytes::clone() is cheap
     139          240 :         while !remaining.is_empty() {
     140              :             // At new page boundary, inject page header.
     141          157 :             if lsn.block_offset() == 0 {
     142           83 :                 let mut page_header = XLogPageHeaderData {
     143           83 :                     xlp_magic: XLOG_PAGE_MAGIC as u16,
     144           83 :                     xlp_info: XLP_BKP_REMOVABLE,
     145           83 :                     xlp_tli: Self::TIMELINE_ID,
     146           83 :                     xlp_pageaddr: lsn.0,
     147           83 :                     xlp_rem_len: 0,
     148           83 :                     __bindgen_padding_0: [0; 4],
     149           83 :                 };
     150           83 :                 // If the record was split across page boundaries, mark as continuation.
     151           83 :                 if remaining.len() < record.len() {
     152           74 :                     page_header.xlp_rem_len = remaining.len() as u32;
     153           74 :                     page_header.xlp_info |= XLP_FIRST_IS_CONTRECORD;
     154           74 :                 }
     155              :                 // At start of segment, use a long page header.
     156           83 :                 let page_header = if lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     157            0 :                     page_header.xlp_info |= XLP_LONG_HEADER;
     158            0 :                     XLogLongPageHeaderData {
     159            0 :                         std: page_header,
     160            0 :                         xlp_sysid: Self::SYS_ID,
     161            0 :                         xlp_seg_size: WAL_SEGMENT_SIZE as u32,
     162            0 :                         xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
     163            0 :                     }
     164            0 :                     .encode()
     165            0 :                     .unwrap()
     166              :                 } else {
     167           83 :                     page_header.encode().unwrap()
     168              :                 };
     169           83 :                 pages.extend_from_slice(&page_header);
     170           83 :                 lsn += page_header.len() as u64;
     171           74 :             }
     172              : 
     173              :             // Append the record up to the next page boundary, if any.
     174          157 :             let page_free = lsn.remaining_in_block() as usize;
     175          157 :             let chunk = remaining.split_to(std::cmp::min(page_free, remaining.len()));
     176          157 :             pages.extend_from_slice(&chunk);
     177          157 :             lsn += chunk.len() as u64;
     178              :         }
     179           83 :         pages.freeze()
     180        11377 :     }
     181              : 
     182              :     /// Records must be 8-byte aligned. Take an encoded record (including any injected page
     183              :     /// boundaries), starting at the given LSN, and add any necessary padding at the end.
     184        11377 :     fn pad_record(record: Bytes, mut lsn: Lsn) -> Bytes {
     185        11377 :         lsn += record.len() as u64;
     186        11377 :         let padding = lsn.calc_padding(8u64) as usize;
     187        11377 :         if padding == 0 {
     188        11377 :             return record;
     189            0 :         }
     190            0 :         [record, Bytes::from(vec![0; padding])].concat().into()
     191        11377 :     }
     192              : }
     193              : 
     194              : /// Generates WAL records as an iterator.
     195              : impl<R: RecordGenerator> Iterator for WalGenerator<R> {
     196              :     type Item = (Lsn, Bytes);
     197              : 
     198            0 :     fn next(&mut self) -> Option<Self::Item> {
     199            0 :         let record = self.record_generator.next()?;
     200            0 :         Some(self.append_record(record))
     201            0 :     }
     202              : }
     203              : 
     204              : /// Generates logical message records (effectively noops) with a fixed message.
     205              : pub struct LogicalMessageGenerator {
     206              :     prefix: CString,
     207              :     message: Vec<u8>,
     208              : }
     209              : 
     210              : impl LogicalMessageGenerator {
     211              :     const DB_ID: u32 = 0; // hardcoded for now
     212              :     const RM_ID: RmgrId = RM_LOGICALMSG_ID;
     213              :     const INFO: u8 = XLOG_LOGICAL_MESSAGE;
     214              : 
     215              :     /// Creates a new LogicalMessageGenerator.
     216         9269 :     pub fn new(prefix: &CStr, message: &[u8]) -> Self {
     217         9269 :         Self {
     218         9269 :             prefix: prefix.to_owned(),
     219         9269 :             message: message.to_owned(),
     220         9269 :         }
     221         9269 :     }
     222              : 
     223              :     /// Encodes a logical message.
     224        11381 :     fn encode(prefix: &CStr, message: &[u8]) -> Bytes {
     225        11381 :         let prefix = prefix.to_bytes_with_nul();
     226        11381 :         let header = XlLogicalMessage {
     227        11381 :             db_id: Self::DB_ID,
     228        11381 :             transactional: 0,
     229        11381 :             prefix_size: prefix.len() as u64,
     230        11381 :             message_size: message.len() as u64,
     231        11381 :         };
     232        11381 :         [&header.encode(), prefix, message].concat().into()
     233        11381 :     }
     234              : }
     235              : 
     236              : impl Iterator for LogicalMessageGenerator {
     237              :     type Item = Record;
     238              : 
     239            4 :     fn next(&mut self) -> Option<Self::Item> {
     240            4 :         Some(Record {
     241            4 :             rmid: Self::RM_ID,
     242            4 :             info: Self::INFO,
     243            4 :             data: Self::encode(&self.prefix, &self.message),
     244            4 :         })
     245            4 :     }
     246              : }
     247              : 
     248              : impl WalGenerator<LogicalMessageGenerator> {
     249              :     /// Convenience method for appending a WAL record with an arbitrary logical message at the
     250              :     /// current WAL LSN position. Returns the start LSN and resulting WAL bytes.
     251        11377 :     pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) {
     252        11377 :         let record = Record {
     253        11377 :             rmid: LogicalMessageGenerator::RM_ID,
     254        11377 :             info: LogicalMessageGenerator::INFO,
     255        11377 :             data: LogicalMessageGenerator::encode(prefix, message),
     256        11377 :         };
     257        11377 :         self.append_record(record)
     258        11377 :     }
     259              : }
        

Generated by: LCOV version 2.1-beta