|             Line data    Source code 
       1              : use std::ffi::{CStr, CString};
       2              : 
       3              : use bytes::{Bytes, BytesMut};
       4              : use crc32c::crc32c_append;
       5              : use utils::lsn::Lsn;
       6              : 
       7              : use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
       8              : use super::xlog_utils::{
       9              :     XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE,
      10              :     XLP_FIRST_IS_CONTRECORD,
      11              : };
      12              : use super::XLogRecord;
      13              : use crate::pg_constants::{
      14              :     RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
      15              :     XLR_BLOCK_ID_DATA_SHORT,
      16              : };
      17              : use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
      18              : 
      19              : /// A WAL record payload. Will be prefixed by an XLogRecord header when encoded.
      20              : pub struct Record {
      21              :     pub rmid: RmgrId,
      22              :     pub info: u8,
      23              :     pub data: Bytes,
      24              : }
      25              : 
      26              : impl Record {
      27              :     /// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of
      28              :     /// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres.
      29        12964 :     pub fn encode(&self, prev_lsn: Lsn) -> Bytes {
      30              :         // Prefix data with block ID and length.
      31        12964 :         let data_header = Bytes::from(match self.data.len() {
      32            0 :             0 => vec![],
      33        12964 :             1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8],
      34          620 :             256.. => {
      35          620 :                 let len_bytes = (self.data.len() as u32).to_le_bytes();
      36          620 :                 [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
      37              :             }
      38              :         });
      39              : 
      40              :         // Construct the WAL record header.
      41        12964 :         let mut header = XLogRecord {
      42        12964 :             xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
      43        12964 :             xl_xid: 0,
      44        12964 :             xl_prev: prev_lsn.into(),
      45        12964 :             xl_info: self.info,
      46        12964 :             xl_rmid: self.rmid,
      47        12964 :             __bindgen_padding_0: [0; 2],
      48        12964 :             xl_crc: 0, // see below
      49        12964 :         };
      50              : 
      51              :         // Compute the CRC checksum for the data, and the header up to the CRC field.
      52        12964 :         let mut crc = 0;
      53        12964 :         crc = crc32c_append(crc, &data_header);
      54        12964 :         crc = crc32c_append(crc, &self.data);
      55        12964 :         crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
      56        12964 :         header.xl_crc = crc;
      57              : 
      58              :         // Encode the final header and record.
      59        12964 :         let header = header.encode().unwrap();
      60              : 
      61        12964 :         [header, data_header, self.data.clone()].concat().into()
      62        12964 :     }
      63              : }
      64              : 
      65              : /// Generates WAL record payloads.
      66              : ///
      67              : /// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator
      68              : /// that creates a table and inserts rows.
      69              : pub trait RecordGenerator: Iterator<Item = Record> {}
      70              : 
      71              : impl<I: Iterator<Item = Record>> RecordGenerator for I {}
      72              : 
      73              : /// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs
      74              : /// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record,
      75              : /// including internal page headers if it spans pages. Concatenating the bytes will yield a
      76              : /// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized
      77              : /// for performance.
      78              : ///
      79              : /// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this
      80              : /// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`).
      81              : ///
      82              : /// A WAL is split into 16 MB segments. Each segment is split into 8 KB pages, with headers.
      83              : /// Records are arbitrary length, 8-byte aligned, and may span pages. The layout is e.g.:
      84              : ///
      85              : /// |        Segment 1         |        Segment 2         |        Segment 3         |
      86              : /// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 |
      87              : /// | R1 |   R2  |R3|  R4  | R5  |  R6  |                 R7            | R8  |
      88              : #[derive(Default)]
      89              : pub struct WalGenerator<R: RecordGenerator> {
      90              :     /// Generates record payloads for the WAL.
      91              :     pub record_generator: R,
      92              :     /// Current LSN to append the next record at.
      93              :     ///
      94              :     /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should
      95              :     /// ensure that the LSN is on a valid record boundary (i.e. we can't start appending in the
      96              :     /// middle on an existing record or header, or beyond the end of the existing WAL).
      97              :     pub lsn: Lsn,
      98              :     /// The starting LSN of the previous record. Used in WAL record headers. The Safekeeper doesn't
      99              :     /// care about this, unlike Postgres, but we include it for completeness.
     100              :     pub prev_lsn: Lsn,
     101              : }
     102              : 
     103              : impl<R: RecordGenerator> WalGenerator<R> {
     104              :     // Hardcode the sys and timeline ID. We can make them configurable if we care about them.
     105              :     const SYS_ID: u64 = 0;
     106              :     const TIMELINE_ID: u32 = 1;
     107              : 
     108              :     /// Creates a new WAL generator with the given record generator.
     109         9317 :     pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
     110         9317 :         Self {
     111         9317 :             record_generator,
     112         9317 :             lsn: start_lsn,
     113         9317 :             prev_lsn: start_lsn,
     114         9317 :         }
     115            0 :     }
     116              : 
     117              :     /// Appends a record with an arbitrary payload at the current LSN, then increments the LSN.
     118              :     /// Returns the WAL bytes for the record, including page headers and padding, and the start LSN.
     119        12960 :     fn append_record(&mut self, record: Record) -> (Lsn, Bytes) {
     120        12960 :         let record = record.encode(self.prev_lsn);
     121        12960 :         let record = Self::insert_pages(record, self.lsn);
     122        12960 :         let record = Self::pad_record(record, self.lsn);
     123        12960 :         let lsn = self.lsn;
     124        12960 :         self.prev_lsn = self.lsn;
     125        12960 :         self.lsn += record.len() as u64;
     126        12960 :         (lsn, record)
     127        12960 :     }
     128              : 
     129              :     /// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record
     130              :     /// is to be appended.
     131        12960 :     fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
     132              :         // Fast path: record fits in current page, and the page already has a header.
     133        12960 :         if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 {
     134        12255 :             return record;
     135          705 :         }
     136              : 
     137          705 :         let mut pages = BytesMut::new();
     138          705 :         let mut remaining = record.clone(); // Bytes::clone() is cheap
     139         2183 :         while !remaining.is_empty() {
     140              :             // At new page boundary, inject page header.
     141         1478 :             if lsn.block_offset() == 0 {
     142          783 :                 let mut page_header = XLogPageHeaderData {
     143          783 :                     xlp_magic: XLOG_PAGE_MAGIC as u16,
     144          783 :                     xlp_info: XLP_BKP_REMOVABLE,
     145          783 :                     xlp_tli: Self::TIMELINE_ID,
     146          783 :                     xlp_pageaddr: lsn.0,
     147          783 :                     xlp_rem_len: 0,
     148          783 :                     __bindgen_padding_0: [0; 4],
     149          783 :                 };
     150              :                 // If the record was split across page boundaries, mark as continuation.
     151          783 :                 if remaining.len() < record.len() {
     152          773 :                     page_header.xlp_rem_len = remaining.len() as u32;
     153          773 :                     page_header.xlp_info |= XLP_FIRST_IS_CONTRECORD;
     154          773 :                 }
     155              :                 // At start of segment, use a long page header.
     156          783 :                 let page_header = if lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     157            0 :                     page_header.xlp_info |= XLP_LONG_HEADER;
     158            0 :                     XLogLongPageHeaderData {
     159            0 :                         std: page_header,
     160            0 :                         xlp_sysid: Self::SYS_ID,
     161            0 :                         xlp_seg_size: WAL_SEGMENT_SIZE as u32,
     162            0 :                         xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
     163            0 :                     }
     164            0 :                     .encode()
     165            0 :                     .unwrap()
     166              :                 } else {
     167          783 :                     page_header.encode().unwrap()
     168              :                 };
     169          783 :                 pages.extend_from_slice(&page_header);
     170          783 :                 lsn += page_header.len() as u64;
     171          695 :             }
     172              : 
     173              :             // Append the record up to the next page boundary, if any.
     174         1478 :             let page_free = lsn.remaining_in_block() as usize;
     175         1478 :             let chunk = remaining.split_to(std::cmp::min(page_free, remaining.len()));
     176         1478 :             pages.extend_from_slice(&chunk);
     177         1478 :             lsn += chunk.len() as u64;
     178              :         }
     179          705 :         pages.freeze()
     180        12960 :     }
     181              : 
     182              :     /// Records must be 8-byte aligned. Take an encoded record (including any injected page
     183              :     /// boundaries), starting at the given LSN, and add any necessary padding at the end.
     184        12960 :     fn pad_record(record: Bytes, mut lsn: Lsn) -> Bytes {
     185        12960 :         lsn += record.len() as u64;
     186        12960 :         let padding = lsn.calc_padding(8u64) as usize;
     187        12960 :         if padding == 0 {
     188        12340 :             return record;
     189          620 :         }
     190          620 :         [record, Bytes::from(vec![0; padding])].concat().into()
     191        12960 :     }
     192              : }
     193              : 
     194              : /// Generates WAL records as an iterator.
     195              : impl<R: RecordGenerator> Iterator for WalGenerator<R> {
     196              :     type Item = (Lsn, Bytes);
     197              : 
     198          620 :     fn next(&mut self) -> Option<Self::Item> {
     199          620 :         let record = self.record_generator.next()?;
     200          620 :         Some(self.append_record(record))
     201            0 :     }
     202              : }
     203              : 
     204              : /// Generates logical message records (effectively noops) with a fixed message.
     205              : pub struct LogicalMessageGenerator {
     206              :     prefix: CString,
     207              :     message: Vec<u8>,
     208              : }
     209              : 
     210              : impl LogicalMessageGenerator {
     211              :     const DB_ID: u32 = 0; // hardcoded for now
     212              :     const RM_ID: RmgrId = RM_LOGICALMSG_ID;
     213              :     const INFO: u8 = XLOG_LOGICAL_MESSAGE;
     214              : 
     215              :     /// Creates a new LogicalMessageGenerator.
     216         9321 :     pub fn new(prefix: &CStr, message: &[u8]) -> Self {
     217         9321 :         Self {
     218         9321 :             prefix: prefix.to_owned(),
     219         9321 :             message: message.to_owned(),
     220         9321 :         }
     221         9321 :     }
     222              : 
     223              :     /// Encodes a logical message.
     224        12964 :     fn encode(prefix: &CStr, message: &[u8]) -> Bytes {
     225        12964 :         let prefix = prefix.to_bytes_with_nul();
     226        12964 :         let header = XlLogicalMessage {
     227        12964 :             db_id: Self::DB_ID,
     228        12964 :             transactional: 0,
     229        12964 :             prefix_size: prefix.len() as u64,
     230        12964 :             message_size: message.len() as u64,
     231        12964 :         };
     232        12964 :         [&header.encode(), prefix, message].concat().into()
     233        12964 :     }
     234              : 
     235              :     /// Computes how large a value must be to get a record of the given size. Convenience method to
     236              :     /// construct records of pre-determined size. Panics if the record size is too small.
     237            0 :     pub fn make_value_size(record_size: usize, prefix: &CStr) -> usize {
     238            0 :         let xlog_header_size = XLOG_SIZE_OF_XLOG_RECORD;
     239            0 :         let lm_header_size = size_of::<XlLogicalMessage>();
     240            0 :         let prefix_size = prefix.to_bytes_with_nul().len();
     241            0 :         let data_header_size = match record_size - xlog_header_size - 2 {
     242            0 :             0..=255 => 2,
     243            0 :             256..=258 => panic!("impossible record_size {record_size}"),
     244            0 :             259.. => 5,
     245              :         };
     246            0 :         record_size
     247            0 :             .checked_sub(xlog_header_size + lm_header_size + prefix_size + data_header_size)
     248            0 :             .expect("record_size too small")
     249            0 :     }
     250              : }
     251              : 
     252              : impl Iterator for LogicalMessageGenerator {
     253              :     type Item = Record;
     254              : 
     255          624 :     fn next(&mut self) -> Option<Self::Item> {
     256          624 :         Some(Record {
     257          624 :             rmid: Self::RM_ID,
     258          624 :             info: Self::INFO,
     259          624 :             data: Self::encode(&self.prefix, &self.message),
     260          624 :         })
     261          624 :     }
     262              : }
     263              : 
     264              : impl WalGenerator<LogicalMessageGenerator> {
     265              :     /// Convenience method for appending a WAL record with an arbitrary logical message at the
     266              :     /// current WAL LSN position. Returns the start LSN and resulting WAL bytes.
     267        12340 :     pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) {
     268        12340 :         let record = Record {
     269        12340 :             rmid: LogicalMessageGenerator::RM_ID,
     270        12340 :             info: LogicalMessageGenerator::INFO,
     271        12340 :             data: LogicalMessageGenerator::encode(prefix, message),
     272        12340 :         };
     273        12340 :         self.append_record(record)
     274        12340 :     }
     275              : }
         |