LCOV - code coverage report
Current view: top level - libs/utils/src - lsn.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 94.8 % 191 181
Test Date: 2023-09-06 10:18:01 Functions: 75.9 % 83 63

            Line data    Source code
       1              : #![warn(missing_docs)]
       2              : 
       3              : use serde::{Deserialize, Serialize};
       4              : use std::fmt;
       5              : use std::ops::{Add, AddAssign};
       6              : use std::path::Path;
       7              : use std::str::FromStr;
       8              : use std::sync::atomic::{AtomicU64, Ordering};
       9              : 
      10              : use crate::seqwait::MonotonicCounter;
      11              : 
      12              : /// Transaction log block size in bytes
      13              : pub const XLOG_BLCKSZ: u32 = 8192;
      14              : 
      15              : /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
      16   1331803250 : #[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
      17              : #[serde(transparent)]
      18              : pub struct Lsn(pub u64);
      19              : 
      20              : /// We tried to parse an LSN from a string, but failed
      21            1 : #[derive(Debug, PartialEq, Eq, thiserror::Error)]
      22              : #[error("LsnParseError")]
      23              : pub struct LsnParseError;
      24              : 
      25              : impl Lsn {
      26              :     /// Maximum possible value for an LSN
      27              :     pub const MAX: Lsn = Lsn(u64::MAX);
      28              : 
      29              :     /// Invalid value for InvalidXLogRecPtr, as defined in xlogdefs.h
      30              :     pub const INVALID: Lsn = Lsn(0);
      31              : 
      32              :     /// Subtract a number, returning None on overflow.
      33      3384700 :     pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
      34      3384700 :         let other: u64 = other.into();
      35      3384700 :         self.0.checked_sub(other).map(Lsn)
      36      3384700 :     }
      37              : 
      38              :     /// Subtract a number, returning the difference as i128 to avoid overflow.
      39       733765 :     pub fn widening_sub<T: Into<u64>>(self, other: T) -> i128 {
      40       733765 :         let other: u64 = other.into();
      41       733765 :         i128::from(self.0) - i128::from(other)
      42       733765 :     }
      43              : 
      44              :     /// Parse an LSN from a filename in the form `0000000000000000`
      45            0 :     pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
      46            0 :     where
      47            0 :         F: AsRef<Path>,
      48            0 :     {
      49            0 :         let filename: &Path = filename.as_ref();
      50            0 :         let filename = filename.to_str().ok_or(LsnParseError)?;
      51            0 :         Lsn::from_hex(filename)
      52            0 :     }
      53              : 
      54              :     /// Parse an LSN from a string in the form `0000000000000000`
      55        28541 :     pub fn from_hex<S>(s: S) -> Result<Self, LsnParseError>
      56        28541 :     where
      57        28541 :         S: AsRef<str>,
      58        28541 :     {
      59        28541 :         let s: &str = s.as_ref();
      60        28541 :         let n = u64::from_str_radix(s, 16).or(Err(LsnParseError))?;
      61        19351 :         Ok(Lsn(n))
      62        28541 :     }
      63              : 
      64              :     /// Compute the offset into a segment
      65              :     #[inline]
      66    191849252 :     pub fn segment_offset(self, seg_sz: usize) -> usize {
      67    191849252 :         (self.0 % seg_sz as u64) as usize
      68    191849252 :     }
      69              : 
      70              :     /// Compute LSN of the segment start.
      71              :     #[inline]
      72         1497 :     pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
      73         1497 :         Lsn(self.0 - (self.0 % seg_sz as u64))
      74         1497 :     }
      75              : 
      76              :     /// Compute the segment number
      77              :     #[inline]
      78      1491816 :     pub fn segment_number(self, seg_sz: usize) -> u64 {
      79      1491816 :         self.0 / seg_sz as u64
      80      1491816 :     }
      81              : 
      82              :     /// Compute the offset into a block
      83              :     #[inline]
      84    188903486 :     pub fn block_offset(self) -> u64 {
      85    188903486 :         const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
      86    188903486 :         self.0 % BLCKSZ
      87    188903486 :     }
      88              : 
      89              :     /// Compute the block offset of the first byte of this Lsn within this
      90              :     /// segment
      91              :     #[inline]
      92          658 :     pub fn page_lsn(self) -> Lsn {
      93          658 :         Lsn(self.0 - self.block_offset())
      94          658 :     }
      95              : 
      96              :     /// Compute the block offset of the first byte of this Lsn within this
      97              :     /// segment
      98              :     #[inline]
      99          658 :     pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
     100          658 :         (self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
     101          658 :     }
     102              : 
     103              :     /// Compute the bytes remaining in this block
     104              :     ///
     105              :     /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
     106              :     #[inline]
     107    186786138 :     pub fn remaining_in_block(self) -> u64 {
     108    186786138 :         const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
     109    186786138 :         BLCKSZ - (self.0 % BLCKSZ)
     110    186786138 :     }
     111              : 
     112              :     /// Compute the bytes remaining to fill a chunk of some size
     113              :     ///
     114              :     /// If the LSN is already at the chunk boundary, it will return 0.
     115          768 :     pub fn calc_padding<T: Into<u64>>(self, sz: T) -> u64 {
     116          768 :         let sz: u64 = sz.into();
     117          768 :         // By using wrapping_sub, we can subtract first and then mod second.
     118          768 :         // If it's done the other way around, then we would return a full
     119          768 :         // chunk size if we're already at the chunk boundary.
     120          768 :         // (Regular subtraction will panic on overflow in debug builds.)
     121          768 :         (sz.wrapping_sub(self.0)) % sz
     122          768 :     }
     123              : 
     124              :     /// Align LSN on 8-byte boundary (alignment of WAL records).
     125    410828345 :     pub fn align(&self) -> Lsn {
     126    410828345 :         Lsn((self.0 + 7) & !7)
     127    410828345 :     }
     128              : 
     129              :     /// Align LSN on 8-byte boundary (alignment of WAL records).
     130    230361292 :     pub fn is_aligned(&self) -> bool {
     131    230361292 :         *self == self.align()
     132    230361292 :     }
     133              : 
     134              :     /// Return if the LSN is valid
     135              :     /// mimics postgres XLogRecPtrIsInvalid macro
     136     14284070 :     pub fn is_valid(self) -> bool {
     137     14284070 :         self != Lsn::INVALID
     138     14284070 :     }
     139              : }
     140              : 
     141              : impl From<u64> for Lsn {
     142      6806419 :     fn from(n: u64) -> Self {
     143      6806419 :         Lsn(n)
     144      6806419 :     }
     145              : }
     146              : 
     147              : impl From<Lsn> for u64 {
     148      9391754 :     fn from(lsn: Lsn) -> u64 {
     149      9391754 :         lsn.0
     150      9391754 :     }
     151              : }
     152              : 
     153              : impl FromStr for Lsn {
     154              :     type Err = LsnParseError;
     155              : 
     156              :     /// Parse an LSN from a string in the form `00000000/00000000`
     157              :     ///
     158              :     /// If the input string is missing the '/' character, then use `Lsn::from_hex`
     159        10492 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     160        10492 :         let mut splitter = s.trim().split('/');
     161        10492 :         if let (Some(left), Some(right), None) = (splitter.next(), splitter.next(), splitter.next())
     162              :         {
     163        10492 :             let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?;
     164        10489 :             let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?;
     165        10487 :             Ok(Lsn((left_num as u64) << 32 | right_num as u64))
     166              :         } else {
     167            0 :             Err(LsnParseError)
     168              :         }
     169        10492 :     }
     170              : }
     171              : 
     172              : impl fmt::Display for Lsn {
     173      5562072 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     174      5562072 :         write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
     175      5562072 :     }
     176              : }
     177              : 
     178              : impl fmt::Debug for Lsn {
     179        22662 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     180        22662 :         write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
     181        22662 :     }
     182              : }
     183              : 
     184              : impl Add<u64> for Lsn {
     185              :     type Output = Lsn;
     186              : 
     187    119387519 :     fn add(self, other: u64) -> Self::Output {
     188    119387519 :         // panic if the addition overflows.
     189    119387519 :         Lsn(self.0.checked_add(other).unwrap())
     190    119387519 :     }
     191              : }
     192              : 
     193              : impl AddAssign<u64> for Lsn {
     194    370222435 :     fn add_assign(&mut self, other: u64) {
     195    370222435 :         // panic if the addition overflows.
     196    370222435 :         self.0 = self.0.checked_add(other).unwrap();
     197    370222435 :     }
     198              : }
     199              : 
     200              : /// An [`Lsn`] that can be accessed atomically.
     201              : pub struct AtomicLsn {
     202              :     inner: AtomicU64,
     203              : }
     204              : 
     205              : impl AtomicLsn {
     206              :     /// Creates a new atomic `Lsn`.
     207         3393 :     pub fn new(val: u64) -> Self {
     208         3393 :         AtomicLsn {
     209         3393 :             inner: AtomicU64::new(val),
     210         3393 :         }
     211         3393 :     }
     212              : 
     213              :     /// Atomically retrieve the `Lsn` value from memory.
     214      1500673 :     pub fn load(&self) -> Lsn {
     215      1500673 :         Lsn(self.inner.load(Ordering::Acquire))
     216      1500673 :     }
     217              : 
     218              :     /// Atomically store a new `Lsn` value to memory.
     219        18027 :     pub fn store(&self, lsn: Lsn) {
     220        18027 :         self.inner.store(lsn.0, Ordering::Release);
     221        18027 :     }
     222              : 
     223              :     /// Adds to the current value, returning the previous value.
     224              :     ///
     225              :     /// This operation will panic on overflow.
     226            1 :     pub fn fetch_add(&self, val: u64) -> Lsn {
     227            1 :         let prev = self.inner.fetch_add(val, Ordering::AcqRel);
     228            1 :         assert!(prev.checked_add(val).is_some(), "AtomicLsn overflow");
     229            1 :         Lsn(prev)
     230            1 :     }
     231              : 
     232              :     /// Atomically sets the Lsn to the max of old and new value, returning the old value.
     233       745529 :     pub fn fetch_max(&self, lsn: Lsn) -> Lsn {
     234       745529 :         let prev = self.inner.fetch_max(lsn.0, Ordering::AcqRel);
     235       745529 :         Lsn(prev)
     236       745529 :     }
     237              : }
     238              : 
     239              : impl From<Lsn> for AtomicLsn {
     240          604 :     fn from(lsn: Lsn) -> Self {
     241          604 :         Self::new(lsn.0)
     242          604 :     }
     243              : }
     244              : 
     245              : /// Pair of LSN's pointing to the end of the last valid record and previous one
     246            0 : #[derive(Debug, Clone, Copy)]
     247              : pub struct RecordLsn {
     248              :     /// LSN at the end of the current record
     249              :     pub last: Lsn,
     250              :     /// LSN at the end of the previous record
     251              :     pub prev: Lsn,
     252              : }
     253              : 
     254              : /// Expose `self.last` as counter to be able to use RecordLsn in SeqWait
     255              : impl MonotonicCounter<Lsn> for RecordLsn {
     256     74269227 :     fn cnt_advance(&mut self, lsn: Lsn) {
     257     74269227 :         assert!(self.last <= lsn);
     258     74269227 :         let new_prev = self.last;
     259     74269227 :         self.last = lsn;
     260     74269227 :         self.prev = new_prev;
     261     74269227 :     }
     262     75564981 :     fn cnt_value(&self) -> Lsn {
     263     75564981 :         self.last
     264     75564981 :     }
     265              : }
     266              : 
     267              : #[cfg(test)]
     268              : mod tests {
     269              :     use super::*;
     270              : 
     271            1 :     #[test]
     272            1 :     fn test_lsn_strings() {
     273            1 :         assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555)));
     274            1 :         assert_eq!("aaaa/bbbb".parse(), Ok(Lsn(0x0000AAAA0000BBBB)));
     275            1 :         assert_eq!("1/A".parse(), Ok(Lsn(0x000000010000000A)));
     276            1 :         assert_eq!("0/0".parse(), Ok(Lsn(0)));
     277            1 :         "ABCDEFG/12345678".parse::<Lsn>().unwrap_err();
     278            1 :         "123456789/AAAA5555".parse::<Lsn>().unwrap_err();
     279            1 :         "12345678/AAAA55550".parse::<Lsn>().unwrap_err();
     280            1 :         "-1/0".parse::<Lsn>().unwrap_err();
     281            1 :         "1/-1".parse::<Lsn>().unwrap_err();
     282            1 : 
     283            1 :         assert_eq!(format!("{}", Lsn(0x12345678AAAA5555)), "12345678/AAAA5555");
     284            1 :         assert_eq!(format!("{}", Lsn(0x000000010000000A)), "1/A");
     285              : 
     286            1 :         assert_eq!(
     287            1 :             Lsn::from_hex("12345678AAAA5555"),
     288            1 :             Ok(Lsn(0x12345678AAAA5555))
     289            1 :         );
     290            1 :         assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0)));
     291            1 :         assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError));
     292              : 
     293            1 :         let expected_lsn = Lsn(0x3C490F8);
     294            1 :         assert_eq!(" 0/3C490F8".parse(), Ok(expected_lsn));
     295            1 :         assert_eq!("0/3C490F8 ".parse(), Ok(expected_lsn));
     296            1 :         assert_eq!(" 0/3C490F8 ".parse(), Ok(expected_lsn));
     297            1 :     }
     298              : 
     299            1 :     #[test]
     300            1 :     fn test_lsn_math() {
     301            1 :         assert_eq!(Lsn(1234) + 11u64, Lsn(1245));
     302              : 
     303            1 :         assert_eq!(
     304            1 :             {
     305            1 :                 let mut lsn = Lsn(1234);
     306            1 :                 lsn += 11u64;
     307            1 :                 lsn
     308            1 :             },
     309            1 :             Lsn(1245)
     310            1 :         );
     311              : 
     312            1 :         assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1)));
     313            1 :         assert_eq!(Lsn(1234).checked_sub(1235u64), None);
     314              : 
     315            1 :         assert_eq!(Lsn(1235).widening_sub(1234u64), 1);
     316            1 :         assert_eq!(Lsn(1234).widening_sub(1235u64), -1);
     317            1 :         assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
     318            1 :         assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
     319              : 
     320            1 :         let seg_sz: usize = 16 * 1024 * 1024;
     321            1 :         assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
     322            1 :         assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);
     323              : 
     324            1 :         assert_eq!(Lsn(0x4007).block_offset(), 7u64);
     325            1 :         assert_eq!(Lsn(0x4000).block_offset(), 0u64);
     326            1 :         assert_eq!(Lsn(0x4007).remaining_in_block(), 8185u64);
     327            1 :         assert_eq!(Lsn(0x4000).remaining_in_block(), 8192u64);
     328              : 
     329            1 :         assert_eq!(Lsn(0xffff01).calc_padding(seg_sz as u64), 255u64);
     330            1 :         assert_eq!(Lsn(0x2000000).calc_padding(seg_sz as u64), 0u64);
     331            1 :         assert_eq!(Lsn(0xffff01).calc_padding(8u32), 7u64);
     332            1 :         assert_eq!(Lsn(0xffff00).calc_padding(8u32), 0u64);
     333            1 :     }
     334              : 
     335            1 :     #[test]
     336            1 :     fn test_atomic_lsn() {
     337            1 :         let lsn = AtomicLsn::new(0);
     338            1 :         assert_eq!(lsn.fetch_add(1234), Lsn(0));
     339            1 :         assert_eq!(lsn.load(), Lsn(1234));
     340            1 :         lsn.store(Lsn(5678));
     341            1 :         assert_eq!(lsn.load(), Lsn(5678));
     342              : 
     343            1 :         assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678));
     344            1 :         assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000));
     345            1 :     }
     346              : }
        

Generated by: LCOV version 2.1-beta