LCOV - differential code coverage report
Current view: top level - libs/utils/src - lsn.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 95.8 % 189 181 8 181
Current Date: 2023-10-19 02:04:12 Functions: 75.9 % 83 63 20 63
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : #![warn(missing_docs)]
       2                 : 
       3                 : use camino::Utf8Path;
       4                 : use serde::{Deserialize, Serialize};
       5                 : use std::fmt;
       6                 : use std::ops::{Add, AddAssign};
       7                 : use std::str::FromStr;
       8                 : use std::sync::atomic::{AtomicU64, Ordering};
       9                 : 
      10                 : use crate::seqwait::MonotonicCounter;
      11                 : 
      12                 : /// Transaction log block size in bytes
      13                 : pub const XLOG_BLCKSZ: u32 = 8192;
      14                 : 
      15                 : /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
      16 CBC  1089316426 : #[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
      17                 : #[serde(transparent)]
      18                 : pub struct Lsn(pub u64);
      19                 : 
      20                 : /// We tried to parse an LSN from a string, but failed
      21               1 : #[derive(Debug, PartialEq, Eq, thiserror::Error)]
      22                 : #[error("LsnParseError")]
      23                 : pub struct LsnParseError;
      24                 : 
      25                 : impl Lsn {
      26                 :     /// Maximum possible value for an LSN
      27                 :     pub const MAX: Lsn = Lsn(u64::MAX);
      28                 : 
      29                 :     /// Invalid value for InvalidXLogRecPtr, as defined in xlogdefs.h
      30                 :     pub const INVALID: Lsn = Lsn(0);
      31                 : 
      32                 :     /// Subtract a number, returning None on overflow.
      33         3697257 :     pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
      34         3697257 :         let other: u64 = other.into();
      35         3697257 :         self.0.checked_sub(other).map(Lsn)
      36         3697257 :     }
      37                 : 
      38                 :     /// Subtract a number, returning the difference as i128 to avoid overflow.
      39          776350 :     pub fn widening_sub<T: Into<u64>>(self, other: T) -> i128 {
      40          776350 :         let other: u64 = other.into();
      41          776350 :         i128::from(self.0) - i128::from(other)
      42          776350 :     }
      43                 : 
      44                 :     /// Parse an LSN from a filename in the form `0000000000000000`
      45 UBC           0 :     pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
      46               0 :     where
      47               0 :         F: AsRef<Utf8Path>,
      48               0 :     {
      49               0 :         Lsn::from_hex(filename.as_ref().as_str())
      50               0 :     }
      51                 : 
      52                 :     /// Parse an LSN from a string in the form `0000000000000000`
      53 CBC       29784 :     pub fn from_hex<S>(s: S) -> Result<Self, LsnParseError>
      54           29784 :     where
      55           29784 :         S: AsRef<str>,
      56           29784 :     {
      57           29784 :         let s: &str = s.as_ref();
      58           29784 :         let n = u64::from_str_radix(s, 16).or(Err(LsnParseError))?;
      59           21613 :         Ok(Lsn(n))
      60           29784 :     }
      61                 : 
      62                 :     /// Compute the offset into a segment
      63                 :     #[inline]
      64       180363886 :     pub fn segment_offset(self, seg_sz: usize) -> usize {
      65       180363886 :         (self.0 % seg_sz as u64) as usize
      66       180363886 :     }
      67                 : 
      68                 :     /// Compute LSN of the segment start.
      69                 :     #[inline]
      70            1430 :     pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
      71            1430 :         Lsn(self.0 - (self.0 % seg_sz as u64))
      72            1430 :     }
      73                 : 
      74                 :     /// Compute the segment number
      75                 :     #[inline]
      76         1627810 :     pub fn segment_number(self, seg_sz: usize) -> u64 {
      77         1627810 :         self.0 / seg_sz as u64
      78         1627810 :     }
      79                 : 
      80                 :     /// Compute the offset into a block
      81                 :     #[inline]
      82       177194933 :     pub fn block_offset(self) -> u64 {
      83       177194933 :         const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
      84       177194933 :         self.0 % BLCKSZ
      85       177194933 :     }
      86                 : 
      87                 :     /// Compute the block offset of the first byte of this Lsn within this
      88                 :     /// segment
      89                 :     #[inline]
      90             638 :     pub fn page_lsn(self) -> Lsn {
      91             638 :         Lsn(self.0 - self.block_offset())
      92             638 :     }
      93                 : 
      94                 :     /// Compute the block offset of the first byte of this Lsn within this
      95                 :     /// segment
      96                 :     #[inline]
      97             638 :     pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
      98             638 :         (self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
      99             638 :     }
     100                 : 
     101                 :     /// Compute the bytes remaining in this block
     102                 :     ///
     103                 :     /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
     104                 :     #[inline]
     105       174888368 :     pub fn remaining_in_block(self) -> u64 {
     106       174888368 :         const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
     107       174888368 :         BLCKSZ - (self.0 % BLCKSZ)
     108       174888368 :     }
     109                 : 
     110                 :     /// Compute the bytes remaining to fill a chunk of some size
     111                 :     ///
     112                 :     /// If the LSN is already at the chunk boundary, it will return 0.
     113             697 :     pub fn calc_padding<T: Into<u64>>(self, sz: T) -> u64 {
     114             697 :         let sz: u64 = sz.into();
     115             697 :         // By using wrapping_sub, we can subtract first and then mod second.
     116             697 :         // If it's done the other way around, then we would return a full
     117             697 :         // chunk size if we're already at the chunk boundary.
     118             697 :         // (Regular subtraction will panic on overflow in debug builds.)
     119             697 :         (sz.wrapping_sub(self.0)) % sz
     120             697 :     }
     121                 : 
     122                 :     /// Align LSN on 8-byte boundary (alignment of WAL records).
     123       383414846 :     pub fn align(&self) -> Lsn {
     124       383414846 :         Lsn((self.0 + 7) & !7)
     125       383414846 :     }
     126                 : 
     127                 :     /// Align LSN on 8-byte boundary (alignment of WAL records).
     128       214561566 :     pub fn is_aligned(&self) -> bool {
     129       214561566 :         *self == self.align()
     130       214561566 :     }
     131                 : 
     132                 :     /// Return if the LSN is valid
     133                 :     /// mimics postgres XLogRecPtrIsInvalid macro
     134        13870016 :     pub fn is_valid(self) -> bool {
     135        13870016 :         self != Lsn::INVALID
     136        13870016 :     }
     137                 : }
     138                 : 
     139                 : impl From<u64> for Lsn {
     140         6115845 :     fn from(n: u64) -> Self {
     141         6115845 :         Lsn(n)
     142         6115845 :     }
     143                 : }
     144                 : 
     145                 : impl From<Lsn> for u64 {
     146        10480005 :     fn from(lsn: Lsn) -> u64 {
     147        10480005 :         lsn.0
     148        10480005 :     }
     149                 : }
     150                 : 
     151                 : impl FromStr for Lsn {
     152                 :     type Err = LsnParseError;
     153                 : 
     154                 :     /// Parse an LSN from a string in the form `00000000/00000000`
     155                 :     ///
     156                 :     /// If the input string is missing the '/' character, then use `Lsn::from_hex`
     157           10410 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     158           10410 :         let mut splitter = s.trim().split('/');
     159           10410 :         if let (Some(left), Some(right), None) = (splitter.next(), splitter.next(), splitter.next())
     160                 :         {
     161           10410 :             let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?;
     162           10407 :             let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?;
     163           10405 :             Ok(Lsn((left_num as u64) << 32 | right_num as u64))
     164                 :         } else {
     165 UBC           0 :             Err(LsnParseError)
     166                 :         }
     167 CBC       10410 :     }
     168                 : }
     169                 : 
     170                 : impl fmt::Display for Lsn {
     171         4750025 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     172         4750025 :         write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
     173         4750025 :     }
     174                 : }
     175                 : 
     176                 : impl fmt::Debug for Lsn {
     177           24154 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     178           24154 :         write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
     179           24154 :     }
     180                 : }
     181                 : 
     182                 : impl Add<u64> for Lsn {
     183                 :     type Output = Lsn;
     184                 : 
     185        73280397 :     fn add(self, other: u64) -> Self::Output {
     186        73280397 :         // panic if the addition overflows.
     187        73280397 :         Lsn(self.0.checked_add(other).unwrap())
     188        73280397 :     }
     189                 : }
     190                 : 
     191                 : impl AddAssign<u64> for Lsn {
     192       346955183 :     fn add_assign(&mut self, other: u64) {
     193       346955183 :         // panic if the addition overflows.
     194       346955183 :         self.0 = self.0.checked_add(other).unwrap();
     195       346955183 :     }
     196                 : }
     197                 : 
     198                 : /// An [`Lsn`] that can be accessed atomically.
     199                 : pub struct AtomicLsn {
     200                 :     inner: AtomicU64,
     201                 : }
     202                 : 
     203                 : impl AtomicLsn {
     204                 :     /// Creates a new atomic `Lsn`.
     205            4472 :     pub fn new(val: u64) -> Self {
     206            4472 :         AtomicLsn {
     207            4472 :             inner: AtomicU64::new(val),
     208            4472 :         }
     209            4472 :     }
     210                 : 
     211                 :     /// Atomically retrieve the `Lsn` value from memory.
     212         2365381 :     pub fn load(&self) -> Lsn {
     213         2365381 :         Lsn(self.inner.load(Ordering::Acquire))
     214         2365381 :     }
     215                 : 
     216                 :     /// Atomically store a new `Lsn` value to memory.
     217           20264 :     pub fn store(&self, lsn: Lsn) {
     218           20264 :         self.inner.store(lsn.0, Ordering::Release);
     219           20264 :     }
     220                 : 
     221                 :     /// Adds to the current value, returning the previous value.
     222                 :     ///
     223                 :     /// This operation will panic on overflow.
     224               1 :     pub fn fetch_add(&self, val: u64) -> Lsn {
     225               1 :         let prev = self.inner.fetch_add(val, Ordering::AcqRel);
     226               1 :         assert!(prev.checked_add(val).is_some(), "AtomicLsn overflow");
     227               1 :         Lsn(prev)
     228               1 :     }
     229                 : 
     230                 :     /// Atomically sets the Lsn to the max of old and new value, returning the old value.
     231          789185 :     pub fn fetch_max(&self, lsn: Lsn) -> Lsn {
     232          789185 :         let prev = self.inner.fetch_max(lsn.0, Ordering::AcqRel);
     233          789185 :         Lsn(prev)
     234          789185 :     }
     235                 : }
     236                 : 
     237                 : impl From<Lsn> for AtomicLsn {
     238             900 :     fn from(lsn: Lsn) -> Self {
     239             900 :         Self::new(lsn.0)
     240             900 :     }
     241                 : }
     242                 : 
     243                 : /// Pair of LSN's pointing to the end of the last valid record and previous one
     244 UBC           0 : #[derive(Debug, Clone, Copy)]
     245                 : pub struct RecordLsn {
     246                 :     /// LSN at the end of the current record
     247                 :     pub last: Lsn,
     248                 :     /// LSN at the end of the previous record
     249                 :     pub prev: Lsn,
     250                 : }
     251                 : 
     252                 : /// Expose `self.last` as counter to be able to use RecordLsn in SeqWait
     253                 : impl MonotonicCounter<Lsn> for RecordLsn {
     254 CBC    69331227 :     fn cnt_advance(&mut self, lsn: Lsn) {
     255        69331227 :         assert!(self.last <= lsn);
     256        69331227 :         let new_prev = self.last;
     257        69331227 :         self.last = lsn;
     258        69331227 :         self.prev = new_prev;
     259        69331227 :     }
     260        70609848 :     fn cnt_value(&self) -> Lsn {
     261        70609848 :         self.last
     262        70609848 :     }
     263                 : }
     264                 : 
     265                 : #[cfg(test)]
     266                 : mod tests {
     267                 :     use super::*;
     268                 : 
     269               1 :     #[test]
     270               1 :     fn test_lsn_strings() {
     271               1 :         assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555)));
     272               1 :         assert_eq!("aaaa/bbbb".parse(), Ok(Lsn(0x0000AAAA0000BBBB)));
     273               1 :         assert_eq!("1/A".parse(), Ok(Lsn(0x000000010000000A)));
     274               1 :         assert_eq!("0/0".parse(), Ok(Lsn(0)));
     275               1 :         "ABCDEFG/12345678".parse::<Lsn>().unwrap_err();
     276               1 :         "123456789/AAAA5555".parse::<Lsn>().unwrap_err();
     277               1 :         "12345678/AAAA55550".parse::<Lsn>().unwrap_err();
     278               1 :         "-1/0".parse::<Lsn>().unwrap_err();
     279               1 :         "1/-1".parse::<Lsn>().unwrap_err();
     280               1 : 
     281               1 :         assert_eq!(format!("{}", Lsn(0x12345678AAAA5555)), "12345678/AAAA5555");
     282               1 :         assert_eq!(format!("{}", Lsn(0x000000010000000A)), "1/A");
     283                 : 
     284               1 :         assert_eq!(
     285               1 :             Lsn::from_hex("12345678AAAA5555"),
     286               1 :             Ok(Lsn(0x12345678AAAA5555))
     287               1 :         );
     288               1 :         assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0)));
     289               1 :         assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError));
     290                 : 
     291               1 :         let expected_lsn = Lsn(0x3C490F8);
     292               1 :         assert_eq!(" 0/3C490F8".parse(), Ok(expected_lsn));
     293               1 :         assert_eq!("0/3C490F8 ".parse(), Ok(expected_lsn));
     294               1 :         assert_eq!(" 0/3C490F8 ".parse(), Ok(expected_lsn));
     295               1 :     }
     296                 : 
     297               1 :     #[test]
     298               1 :     fn test_lsn_math() {
     299               1 :         assert_eq!(Lsn(1234) + 11u64, Lsn(1245));
     300                 : 
     301               1 :         assert_eq!(
     302               1 :             {
     303               1 :                 let mut lsn = Lsn(1234);
     304               1 :                 lsn += 11u64;
     305               1 :                 lsn
     306               1 :             },
     307               1 :             Lsn(1245)
     308               1 :         );
     309                 : 
     310               1 :         assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1)));
     311               1 :         assert_eq!(Lsn(1234).checked_sub(1235u64), None);
     312                 : 
     313               1 :         assert_eq!(Lsn(1235).widening_sub(1234u64), 1);
     314               1 :         assert_eq!(Lsn(1234).widening_sub(1235u64), -1);
     315               1 :         assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
     316               1 :         assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
     317                 : 
     318               1 :         let seg_sz: usize = 16 * 1024 * 1024;
     319               1 :         assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
     320               1 :         assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);
     321                 : 
     322               1 :         assert_eq!(Lsn(0x4007).block_offset(), 7u64);
     323               1 :         assert_eq!(Lsn(0x4000).block_offset(), 0u64);
     324               1 :         assert_eq!(Lsn(0x4007).remaining_in_block(), 8185u64);
     325               1 :         assert_eq!(Lsn(0x4000).remaining_in_block(), 8192u64);
     326                 : 
     327               1 :         assert_eq!(Lsn(0xffff01).calc_padding(seg_sz as u64), 255u64);
     328               1 :         assert_eq!(Lsn(0x2000000).calc_padding(seg_sz as u64), 0u64);
     329               1 :         assert_eq!(Lsn(0xffff01).calc_padding(8u32), 7u64);
     330               1 :         assert_eq!(Lsn(0xffff00).calc_padding(8u32), 0u64);
     331               1 :     }
     332                 : 
     333               1 :     #[test]
     334               1 :     fn test_atomic_lsn() {
     335               1 :         let lsn = AtomicLsn::new(0);
     336               1 :         assert_eq!(lsn.fetch_add(1234), Lsn(0));
     337               1 :         assert_eq!(lsn.load(), Lsn(1234));
     338               1 :         lsn.store(Lsn(5678));
     339               1 :         assert_eq!(lsn.load(), Lsn(5678));
     340                 : 
     341               1 :         assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678));
     342               1 :         assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000));
     343               1 :     }
     344                 : }
        

Generated by: LCOV version 2.1-beta