LCOV - code coverage report
Current view: top level - safekeeper/tests/walproposer_sim - walproposer_disk.rs (source / functions) Coverage Total Hit
Test: 465a86b0c1fda0069b3e0f6c1c126e6b635a1f72.info Lines: 88.4 % 216 191
Test Date: 2024-06-25 15:47:26 Functions: 100.0 % 45 45

            Line data    Source code
       1              : use std::{ffi::CString, sync::Arc};
       2              : 
       3              : use byteorder::{LittleEndian, WriteBytesExt};
       4              : use crc32c::crc32c_append;
       5              : use parking_lot::{Mutex, MutexGuard};
       6              : use postgres_ffi::{
       7              :     pg_constants::{
       8              :         RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG,
       9              :         XLR_BLOCK_ID_DATA_SHORT,
      10              :     },
      11              :     v16::{
      12              :         wal_craft_test_export::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC},
      13              :         xlog_utils::{
      14              :             XLogSegNoOffsetToRecPtr, XlLogicalMessage, XLOG_RECORD_CRC_OFFS,
      15              :             XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
      16              :             XLP_FIRST_IS_CONTRECORD,
      17              :         },
      18              :         XLogRecord,
      19              :     },
      20              :     WAL_SEGMENT_SIZE, XLOG_BLCKSZ,
      21              : };
      22              : use utils::lsn::Lsn;
      23              : 
      24              : use super::block_storage::BlockStorage;
      25              : 
      26              : /// Simulation implementation of walproposer WAL storage.
      27              : pub struct DiskWalProposer {
      28              :     state: Mutex<State>,
      29              : }
      30              : 
      31              : impl DiskWalProposer {
      32        73807 :     pub fn new() -> Arc<DiskWalProposer> {
      33        73807 :         Arc::new(DiskWalProposer {
      34        73807 :             state: Mutex::new(State {
      35        73807 :                 internal_available_lsn: Lsn(0),
      36        73807 :                 prev_lsn: Lsn(0),
      37        73807 :                 disk: BlockStorage::new(),
      38        73807 :             }),
      39        73807 :         })
      40        73807 :     }
      41              : 
      42        79731 :     pub fn lock(&self) -> MutexGuard<State> {
      43        79731 :         self.state.lock()
      44        79731 :     }
      45              : }
      46              : 
      47              : pub struct State {
      48              :     // flush_lsn
      49              :     internal_available_lsn: Lsn,
      50              :     // needed for WAL generation
      51              :     prev_lsn: Lsn,
      52              :     // actual WAL storage
      53              :     disk: BlockStorage,
      54              : }
      55              : 
      56              : impl State {
      57        29248 :     pub fn read(&self, pos: u64, buf: &mut [u8]) {
      58        29248 :         self.disk.read(pos, buf);
      59        29248 :         // TODO: fail on reading uninitialized data
      60        29248 :     }
      61              : 
      62       138237 :     pub fn write(&mut self, pos: u64, buf: &[u8]) {
      63       138237 :         self.disk.write(pos, buf);
      64       138237 :     }
      65              : 
      66              :     /// Update the internal available LSN to the given value.
      67         2929 :     pub fn reset_to(&mut self, lsn: Lsn) {
      68         2929 :         self.internal_available_lsn = lsn;
      69         2929 :     }
      70              : 
      71              :     /// Get current LSN.
      72        11750 :     pub fn flush_rec_ptr(&self) -> Lsn {
      73        11750 :         self.internal_available_lsn
      74        11750 :     }
      75              : 
      76              :     /// Generate a new WAL record at the current LSN.
      77        34019 :     pub fn insert_logical_message(&mut self, prefix: &str, msg: &[u8]) -> anyhow::Result<()> {
      78        34019 :         let prefix_cstr = CString::new(prefix)?;
      79        34019 :         let prefix_bytes = prefix_cstr.as_bytes_with_nul();
      80        34019 : 
      81        34019 :         let lm = XlLogicalMessage {
      82        34019 :             db_id: 0,
      83        34019 :             transactional: 0,
      84        34019 :             prefix_size: prefix_bytes.len() as ::std::os::raw::c_ulong,
      85        34019 :             message_size: msg.len() as ::std::os::raw::c_ulong,
      86        34019 :         };
      87        34019 : 
      88        34019 :         let record_bytes = lm.encode();
      89        34019 :         let rdatas: Vec<&[u8]> = vec![&record_bytes, prefix_bytes, msg];
      90        34019 :         insert_wal_record(self, rdatas, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE)
      91        34019 :     }
      92              : }
      93              : 
      94        34019 : fn insert_wal_record(
      95        34019 :     state: &mut State,
      96        34019 :     rdatas: Vec<&[u8]>,
      97        34019 :     rmid: u8,
      98        34019 :     info: u8,
      99        34019 : ) -> anyhow::Result<()> {
     100        34019 :     // bytes right after the header, in the same rdata block
     101        34019 :     let mut scratch = Vec::new();
     102       102057 :     let mainrdata_len: usize = rdatas.iter().map(|rdata| rdata.len()).sum();
     103        34019 : 
     104        34019 :     if mainrdata_len > 0 {
     105        34019 :         if mainrdata_len > 255 {
     106            0 :             scratch.push(XLR_BLOCK_ID_DATA_LONG);
     107            0 :             // TODO: verify endiness
     108            0 :             let _ = scratch.write_u32::<LittleEndian>(mainrdata_len as u32);
     109        34019 :         } else {
     110        34019 :             scratch.push(XLR_BLOCK_ID_DATA_SHORT);
     111        34019 :             scratch.push(mainrdata_len as u8);
     112        34019 :         }
     113            0 :     }
     114              : 
     115        34019 :     let total_len: u32 = (XLOG_SIZE_OF_XLOG_RECORD + scratch.len() + mainrdata_len) as u32;
     116        34019 :     let size = maxalign(total_len);
     117        34019 :     assert!(size as usize > XLOG_SIZE_OF_XLOG_RECORD);
     118              : 
     119        34019 :     let start_bytepos = recptr_to_bytepos(state.internal_available_lsn);
     120        34019 :     let end_bytepos = start_bytepos + size as u64;
     121        34019 : 
     122        34019 :     let start_recptr = bytepos_to_recptr(start_bytepos);
     123        34019 :     let end_recptr = bytepos_to_recptr(end_bytepos);
     124        34019 : 
     125        34019 :     assert!(recptr_to_bytepos(start_recptr) == start_bytepos);
     126        34019 :     assert!(recptr_to_bytepos(end_recptr) == end_bytepos);
     127              : 
     128        34019 :     let mut crc = crc32c_append(0, &scratch);
     129       136076 :     for rdata in &rdatas {
     130       102057 :         crc = crc32c_append(crc, rdata);
     131       102057 :     }
     132              : 
     133        34019 :     let mut header = XLogRecord {
     134        34019 :         xl_tot_len: total_len,
     135        34019 :         xl_xid: 0,
     136        34019 :         xl_prev: state.prev_lsn.0,
     137        34019 :         xl_info: info,
     138        34019 :         xl_rmid: rmid,
     139        34019 :         __bindgen_padding_0: [0u8; 2usize],
     140        34019 :         xl_crc: crc,
     141        34019 :     };
     142              : 
     143              :     // now we have the header and can finish the crc
     144        34019 :     let header_bytes = header.encode()?;
     145        34019 :     let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
     146        34019 :     header.xl_crc = crc;
     147              : 
     148        34019 :     let mut header_bytes = header.encode()?.to_vec();
     149        34019 :     assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_RECORD);
     150              : 
     151        34019 :     header_bytes.extend_from_slice(&scratch);
     152        34019 : 
     153        34019 :     // finish rdatas
     154        34019 :     let mut rdatas = rdatas;
     155        34019 :     rdatas.insert(0, &header_bytes);
     156        34019 : 
     157        34019 :     write_walrecord_to_disk(state, total_len as u64, rdatas, start_recptr, end_recptr)?;
     158              : 
     159        34019 :     state.internal_available_lsn = end_recptr;
     160        34019 :     state.prev_lsn = start_recptr;
     161        34019 :     Ok(())
     162        34019 : }
     163              : 
     164        34019 : fn write_walrecord_to_disk(
     165        34019 :     state: &mut State,
     166        34019 :     total_len: u64,
     167        34019 :     rdatas: Vec<&[u8]>,
     168        34019 :     start: Lsn,
     169        34019 :     end: Lsn,
     170        34019 : ) -> anyhow::Result<()> {
     171        34019 :     let mut curr_ptr = start;
     172        34019 :     let mut freespace = insert_freespace(curr_ptr);
     173        34019 :     let mut written: usize = 0;
     174        34019 : 
     175        34019 :     assert!(freespace >= std::mem::size_of::<u32>());
     176              : 
     177       170095 :     for mut rdata in rdatas {
     178       136264 :         while rdata.len() >= freespace {
     179          188 :             assert!(
     180          188 :                 curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD
     181            0 :                     || freespace == 0
     182              :             );
     183              : 
     184          188 :             state.write(curr_ptr.0, &rdata[..freespace]);
     185          188 :             rdata = &rdata[freespace..];
     186          188 :             written += freespace;
     187          188 :             curr_ptr = Lsn(curr_ptr.0 + freespace as u64);
     188          188 : 
     189          188 :             let mut new_page = XLogPageHeaderData {
     190          188 :                 xlp_magic: XLOG_PAGE_MAGIC as u16,
     191          188 :                 xlp_info: XLP_BKP_REMOVABLE,
     192          188 :                 xlp_tli: 1,
     193          188 :                 xlp_pageaddr: curr_ptr.0,
     194          188 :                 xlp_rem_len: (total_len - written as u64) as u32,
     195          188 :                 ..Default::default() // Put 0 in padding fields.
     196          188 :             };
     197          188 :             if new_page.xlp_rem_len > 0 {
     198          170 :                 new_page.xlp_info |= XLP_FIRST_IS_CONTRECORD;
     199          170 :             }
     200              : 
     201          188 :             if curr_ptr.segment_offset(WAL_SEGMENT_SIZE) == 0 {
     202            0 :                 new_page.xlp_info |= XLP_LONG_HEADER;
     203            0 :                 let long_page = XLogLongPageHeaderData {
     204            0 :                     std: new_page,
     205            0 :                     xlp_sysid: 0,
     206            0 :                     xlp_seg_size: WAL_SEGMENT_SIZE as u32,
     207            0 :                     xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
     208            0 :                 };
     209            0 :                 let header_bytes = long_page.encode()?;
     210            0 :                 assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_LONG_PHD);
     211            0 :                 state.write(curr_ptr.0, &header_bytes);
     212            0 :                 curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64);
     213              :             } else {
     214          188 :                 let header_bytes = new_page.encode()?;
     215          188 :                 assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_SHORT_PHD);
     216          188 :                 state.write(curr_ptr.0, &header_bytes);
     217          188 :                 curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64);
     218              :             }
     219          188 :             freespace = insert_freespace(curr_ptr);
     220              :         }
     221              : 
     222       136076 :         assert!(
     223       136076 :             curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD
     224            0 :                 || rdata.is_empty()
     225              :         );
     226       136076 :         state.write(curr_ptr.0, rdata);
     227       136076 :         curr_ptr = Lsn(curr_ptr.0 + rdata.len() as u64);
     228       136076 :         written += rdata.len();
     229       136076 :         freespace -= rdata.len();
     230              :     }
     231              : 
     232        34019 :     assert!(written == total_len as usize);
     233        34019 :     curr_ptr.0 = maxalign(curr_ptr.0);
     234        34019 :     assert!(curr_ptr == end);
     235        34019 :     Ok(())
     236        34019 : }
     237              : 
     238        68038 : fn maxalign<T>(size: T) -> T
     239        68038 : where
     240        68038 :     T: std::ops::BitAnd<Output = T>
     241        68038 :         + std::ops::Add<Output = T>
     242        68038 :         + std::ops::Not<Output = T>
     243        68038 :         + From<u8>,
     244        68038 : {
     245        68038 :     (size + T::from(7)) & !T::from(7)
     246        68038 : }
     247              : 
     248        34207 : fn insert_freespace(ptr: Lsn) -> usize {
     249        34207 :     if ptr.block_offset() == 0 {
     250            0 :         0
     251              :     } else {
     252        34207 :         (XLOG_BLCKSZ as u64 - ptr.block_offset()) as usize
     253              :     }
     254        34207 : }
     255              : 
     256              : const XLP_BKP_REMOVABLE: u16 = 0x0004;
     257              : const USABLE_BYTES_IN_PAGE: u64 = (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
     258              : const USABLE_BYTES_IN_SEGMENT: u64 = ((WAL_SEGMENT_SIZE / XLOG_BLCKSZ) as u64
     259              :     * USABLE_BYTES_IN_PAGE)
     260              :     - (XLOG_SIZE_OF_XLOG_RECORD - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
     261              : 
     262        68038 : fn bytepos_to_recptr(bytepos: u64) -> Lsn {
     263        68038 :     let fullsegs = bytepos / USABLE_BYTES_IN_SEGMENT;
     264        68038 :     let mut bytesleft = bytepos % USABLE_BYTES_IN_SEGMENT;
     265              : 
     266        68038 :     let seg_offset = if bytesleft < (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64 {
     267              :         // fits on first page of segment
     268            0 :         bytesleft + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
     269              :     } else {
     270              :         // account for the first page on segment with long header
     271        68038 :         bytesleft -= (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64;
     272        68038 :         let fullpages = bytesleft / USABLE_BYTES_IN_PAGE;
     273        68038 :         bytesleft %= USABLE_BYTES_IN_PAGE;
     274        68038 : 
     275        68038 :         XLOG_BLCKSZ as u64
     276        68038 :             + fullpages * XLOG_BLCKSZ as u64
     277        68038 :             + bytesleft
     278        68038 :             + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
     279              :     };
     280              : 
     281        68038 :     Lsn(XLogSegNoOffsetToRecPtr(
     282        68038 :         fullsegs,
     283        68038 :         seg_offset as u32,
     284        68038 :         WAL_SEGMENT_SIZE,
     285        68038 :     ))
     286        68038 : }
     287              : 
     288       102057 : fn recptr_to_bytepos(ptr: Lsn) -> u64 {
     289       102057 :     let fullsegs = ptr.segment_number(WAL_SEGMENT_SIZE);
     290       102057 :     let offset = ptr.segment_offset(WAL_SEGMENT_SIZE) as u64;
     291       102057 : 
     292       102057 :     let fullpages = offset / XLOG_BLCKSZ as u64;
     293       102057 :     let offset = offset % XLOG_BLCKSZ as u64;
     294       102057 : 
     295       102057 :     if fullpages == 0 {
     296            0 :         fullsegs * USABLE_BYTES_IN_SEGMENT
     297            0 :             + if offset > 0 {
     298            0 :                 assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
     299            0 :                 offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
     300              :             } else {
     301            0 :                 0
     302              :             }
     303              :     } else {
     304       102057 :         fullsegs * USABLE_BYTES_IN_SEGMENT
     305       102057 :             + (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64
     306       102057 :             + (fullpages - 1) * USABLE_BYTES_IN_PAGE
     307       102057 :             + if offset > 0 {
     308       102057 :                 assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
     309       102057 :                 offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64
     310              :             } else {
     311            0 :                 0
     312              :             }
     313              :     }
     314       102057 : }
        

Generated by: LCOV version 2.1-beta