Line data Source code
1 : #![warn(missing_docs)]
2 :
3 : use serde::{Deserialize, Serialize};
4 : use std::fmt;
5 : use std::ops::{Add, AddAssign};
6 : use std::path::Path;
7 : use std::str::FromStr;
8 : use std::sync::atomic::{AtomicU64, Ordering};
9 :
10 : use crate::seqwait::MonotonicCounter;
11 :
12 : /// Transaction log block size in bytes
13 : pub const XLOG_BLCKSZ: u32 = 8192;
14 :
15 : /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
16 1331803250 : #[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
17 : #[serde(transparent)]
18 : pub struct Lsn(pub u64);
19 :
20 : /// We tried to parse an LSN from a string, but failed
21 1 : #[derive(Debug, PartialEq, Eq, thiserror::Error)]
22 : #[error("LsnParseError")]
23 : pub struct LsnParseError;
24 :
25 : impl Lsn {
26 : /// Maximum possible value for an LSN
27 : pub const MAX: Lsn = Lsn(u64::MAX);
28 :
29 : /// Invalid value for InvalidXLogRecPtr, as defined in xlogdefs.h
30 : pub const INVALID: Lsn = Lsn(0);
31 :
32 : /// Subtract a number, returning None on overflow.
33 3384700 : pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
34 3384700 : let other: u64 = other.into();
35 3384700 : self.0.checked_sub(other).map(Lsn)
36 3384700 : }
37 :
38 : /// Subtract a number, returning the difference as i128 to avoid overflow.
39 733765 : pub fn widening_sub<T: Into<u64>>(self, other: T) -> i128 {
40 733765 : let other: u64 = other.into();
41 733765 : i128::from(self.0) - i128::from(other)
42 733765 : }
43 :
44 : /// Parse an LSN from a filename in the form `0000000000000000`
45 0 : pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
46 0 : where
47 0 : F: AsRef<Path>,
48 0 : {
49 0 : let filename: &Path = filename.as_ref();
50 0 : let filename = filename.to_str().ok_or(LsnParseError)?;
51 0 : Lsn::from_hex(filename)
52 0 : }
53 :
54 : /// Parse an LSN from a string in the form `0000000000000000`
55 28541 : pub fn from_hex<S>(s: S) -> Result<Self, LsnParseError>
56 28541 : where
57 28541 : S: AsRef<str>,
58 28541 : {
59 28541 : let s: &str = s.as_ref();
60 28541 : let n = u64::from_str_radix(s, 16).or(Err(LsnParseError))?;
61 19351 : Ok(Lsn(n))
62 28541 : }
63 :
64 : /// Compute the offset into a segment
65 : #[inline]
66 191849252 : pub fn segment_offset(self, seg_sz: usize) -> usize {
67 191849252 : (self.0 % seg_sz as u64) as usize
68 191849252 : }
69 :
70 : /// Compute LSN of the segment start.
71 : #[inline]
72 1497 : pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
73 1497 : Lsn(self.0 - (self.0 % seg_sz as u64))
74 1497 : }
75 :
76 : /// Compute the segment number
77 : #[inline]
78 1491816 : pub fn segment_number(self, seg_sz: usize) -> u64 {
79 1491816 : self.0 / seg_sz as u64
80 1491816 : }
81 :
82 : /// Compute the offset into a block
83 : #[inline]
84 188903486 : pub fn block_offset(self) -> u64 {
85 188903486 : const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
86 188903486 : self.0 % BLCKSZ
87 188903486 : }
88 :
89 : /// Compute the block offset of the first byte of this Lsn within this
90 : /// segment
91 : #[inline]
92 658 : pub fn page_lsn(self) -> Lsn {
93 658 : Lsn(self.0 - self.block_offset())
94 658 : }
95 :
96 : /// Compute the block offset of the first byte of this Lsn within this
97 : /// segment
98 : #[inline]
99 658 : pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
100 658 : (self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
101 658 : }
102 :
103 : /// Compute the bytes remaining in this block
104 : ///
105 : /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
106 : #[inline]
107 186786138 : pub fn remaining_in_block(self) -> u64 {
108 186786138 : const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
109 186786138 : BLCKSZ - (self.0 % BLCKSZ)
110 186786138 : }
111 :
112 : /// Compute the bytes remaining to fill a chunk of some size
113 : ///
114 : /// If the LSN is already at the chunk boundary, it will return 0.
115 768 : pub fn calc_padding<T: Into<u64>>(self, sz: T) -> u64 {
116 768 : let sz: u64 = sz.into();
117 768 : // By using wrapping_sub, we can subtract first and then mod second.
118 768 : // If it's done the other way around, then we would return a full
119 768 : // chunk size if we're already at the chunk boundary.
120 768 : // (Regular subtraction will panic on overflow in debug builds.)
121 768 : (sz.wrapping_sub(self.0)) % sz
122 768 : }
123 :
124 : /// Align LSN on 8-byte boundary (alignment of WAL records).
125 410828345 : pub fn align(&self) -> Lsn {
126 410828345 : Lsn((self.0 + 7) & !7)
127 410828345 : }
128 :
129 : /// Align LSN on 8-byte boundary (alignment of WAL records).
130 230361292 : pub fn is_aligned(&self) -> bool {
131 230361292 : *self == self.align()
132 230361292 : }
133 :
134 : /// Return if the LSN is valid
135 : /// mimics postgres XLogRecPtrIsInvalid macro
136 14284070 : pub fn is_valid(self) -> bool {
137 14284070 : self != Lsn::INVALID
138 14284070 : }
139 : }
140 :
141 : impl From<u64> for Lsn {
142 6806419 : fn from(n: u64) -> Self {
143 6806419 : Lsn(n)
144 6806419 : }
145 : }
146 :
147 : impl From<Lsn> for u64 {
148 9391754 : fn from(lsn: Lsn) -> u64 {
149 9391754 : lsn.0
150 9391754 : }
151 : }
152 :
153 : impl FromStr for Lsn {
154 : type Err = LsnParseError;
155 :
156 : /// Parse an LSN from a string in the form `00000000/00000000`
157 : ///
158 : /// If the input string is missing the '/' character, then use `Lsn::from_hex`
159 10492 : fn from_str(s: &str) -> Result<Self, Self::Err> {
160 10492 : let mut splitter = s.trim().split('/');
161 10492 : if let (Some(left), Some(right), None) = (splitter.next(), splitter.next(), splitter.next())
162 : {
163 10492 : let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?;
164 10489 : let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?;
165 10487 : Ok(Lsn((left_num as u64) << 32 | right_num as u64))
166 : } else {
167 0 : Err(LsnParseError)
168 : }
169 10492 : }
170 : }
171 :
172 : impl fmt::Display for Lsn {
173 5562072 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 5562072 : write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
175 5562072 : }
176 : }
177 :
178 : impl fmt::Debug for Lsn {
179 22662 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 22662 : write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
181 22662 : }
182 : }
183 :
184 : impl Add<u64> for Lsn {
185 : type Output = Lsn;
186 :
187 119387519 : fn add(self, other: u64) -> Self::Output {
188 119387519 : // panic if the addition overflows.
189 119387519 : Lsn(self.0.checked_add(other).unwrap())
190 119387519 : }
191 : }
192 :
193 : impl AddAssign<u64> for Lsn {
194 370222435 : fn add_assign(&mut self, other: u64) {
195 370222435 : // panic if the addition overflows.
196 370222435 : self.0 = self.0.checked_add(other).unwrap();
197 370222435 : }
198 : }
199 :
200 : /// An [`Lsn`] that can be accessed atomically.
201 : pub struct AtomicLsn {
202 : inner: AtomicU64,
203 : }
204 :
205 : impl AtomicLsn {
206 : /// Creates a new atomic `Lsn`.
207 3393 : pub fn new(val: u64) -> Self {
208 3393 : AtomicLsn {
209 3393 : inner: AtomicU64::new(val),
210 3393 : }
211 3393 : }
212 :
213 : /// Atomically retrieve the `Lsn` value from memory.
214 1500673 : pub fn load(&self) -> Lsn {
215 1500673 : Lsn(self.inner.load(Ordering::Acquire))
216 1500673 : }
217 :
218 : /// Atomically store a new `Lsn` value to memory.
219 18027 : pub fn store(&self, lsn: Lsn) {
220 18027 : self.inner.store(lsn.0, Ordering::Release);
221 18027 : }
222 :
223 : /// Adds to the current value, returning the previous value.
224 : ///
225 : /// This operation will panic on overflow.
226 1 : pub fn fetch_add(&self, val: u64) -> Lsn {
227 1 : let prev = self.inner.fetch_add(val, Ordering::AcqRel);
228 1 : assert!(prev.checked_add(val).is_some(), "AtomicLsn overflow");
229 1 : Lsn(prev)
230 1 : }
231 :
232 : /// Atomically sets the Lsn to the max of old and new value, returning the old value.
233 745529 : pub fn fetch_max(&self, lsn: Lsn) -> Lsn {
234 745529 : let prev = self.inner.fetch_max(lsn.0, Ordering::AcqRel);
235 745529 : Lsn(prev)
236 745529 : }
237 : }
238 :
239 : impl From<Lsn> for AtomicLsn {
240 604 : fn from(lsn: Lsn) -> Self {
241 604 : Self::new(lsn.0)
242 604 : }
243 : }
244 :
245 : /// Pair of LSN's pointing to the end of the last valid record and previous one
246 0 : #[derive(Debug, Clone, Copy)]
247 : pub struct RecordLsn {
248 : /// LSN at the end of the current record
249 : pub last: Lsn,
250 : /// LSN at the end of the previous record
251 : pub prev: Lsn,
252 : }
253 :
254 : /// Expose `self.last` as counter to be able to use RecordLsn in SeqWait
255 : impl MonotonicCounter<Lsn> for RecordLsn {
256 74269227 : fn cnt_advance(&mut self, lsn: Lsn) {
257 74269227 : assert!(self.last <= lsn);
258 74269227 : let new_prev = self.last;
259 74269227 : self.last = lsn;
260 74269227 : self.prev = new_prev;
261 74269227 : }
262 75564981 : fn cnt_value(&self) -> Lsn {
263 75564981 : self.last
264 75564981 : }
265 : }
266 :
267 : #[cfg(test)]
268 : mod tests {
269 : use super::*;
270 :
271 1 : #[test]
272 1 : fn test_lsn_strings() {
273 1 : assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555)));
274 1 : assert_eq!("aaaa/bbbb".parse(), Ok(Lsn(0x0000AAAA0000BBBB)));
275 1 : assert_eq!("1/A".parse(), Ok(Lsn(0x000000010000000A)));
276 1 : assert_eq!("0/0".parse(), Ok(Lsn(0)));
277 1 : "ABCDEFG/12345678".parse::<Lsn>().unwrap_err();
278 1 : "123456789/AAAA5555".parse::<Lsn>().unwrap_err();
279 1 : "12345678/AAAA55550".parse::<Lsn>().unwrap_err();
280 1 : "-1/0".parse::<Lsn>().unwrap_err();
281 1 : "1/-1".parse::<Lsn>().unwrap_err();
282 1 :
283 1 : assert_eq!(format!("{}", Lsn(0x12345678AAAA5555)), "12345678/AAAA5555");
284 1 : assert_eq!(format!("{}", Lsn(0x000000010000000A)), "1/A");
285 :
286 1 : assert_eq!(
287 1 : Lsn::from_hex("12345678AAAA5555"),
288 1 : Ok(Lsn(0x12345678AAAA5555))
289 1 : );
290 1 : assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0)));
291 1 : assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError));
292 :
293 1 : let expected_lsn = Lsn(0x3C490F8);
294 1 : assert_eq!(" 0/3C490F8".parse(), Ok(expected_lsn));
295 1 : assert_eq!("0/3C490F8 ".parse(), Ok(expected_lsn));
296 1 : assert_eq!(" 0/3C490F8 ".parse(), Ok(expected_lsn));
297 1 : }
298 :
299 1 : #[test]
300 1 : fn test_lsn_math() {
301 1 : assert_eq!(Lsn(1234) + 11u64, Lsn(1245));
302 :
303 1 : assert_eq!(
304 1 : {
305 1 : let mut lsn = Lsn(1234);
306 1 : lsn += 11u64;
307 1 : lsn
308 1 : },
309 1 : Lsn(1245)
310 1 : );
311 :
312 1 : assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1)));
313 1 : assert_eq!(Lsn(1234).checked_sub(1235u64), None);
314 :
315 1 : assert_eq!(Lsn(1235).widening_sub(1234u64), 1);
316 1 : assert_eq!(Lsn(1234).widening_sub(1235u64), -1);
317 1 : assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
318 1 : assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
319 :
320 1 : let seg_sz: usize = 16 * 1024 * 1024;
321 1 : assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
322 1 : assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);
323 :
324 1 : assert_eq!(Lsn(0x4007).block_offset(), 7u64);
325 1 : assert_eq!(Lsn(0x4000).block_offset(), 0u64);
326 1 : assert_eq!(Lsn(0x4007).remaining_in_block(), 8185u64);
327 1 : assert_eq!(Lsn(0x4000).remaining_in_block(), 8192u64);
328 :
329 1 : assert_eq!(Lsn(0xffff01).calc_padding(seg_sz as u64), 255u64);
330 1 : assert_eq!(Lsn(0x2000000).calc_padding(seg_sz as u64), 0u64);
331 1 : assert_eq!(Lsn(0xffff01).calc_padding(8u32), 7u64);
332 1 : assert_eq!(Lsn(0xffff00).calc_padding(8u32), 0u64);
333 1 : }
334 :
335 1 : #[test]
336 1 : fn test_atomic_lsn() {
337 1 : let lsn = AtomicLsn::new(0);
338 1 : assert_eq!(lsn.fetch_add(1234), Lsn(0));
339 1 : assert_eq!(lsn.load(), Lsn(1234));
340 1 : lsn.store(Lsn(5678));
341 1 : assert_eq!(lsn.load(), Lsn(5678));
342 :
343 1 : assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678));
344 1 : assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000));
345 1 : }
346 : }
|