TLA Line data Source code
1 : #![warn(missing_docs)]
2 :
3 : use camino::Utf8Path;
4 : use serde::{Deserialize, Serialize};
5 : use std::fmt;
6 : use std::ops::{Add, AddAssign};
7 : use std::str::FromStr;
8 : use std::sync::atomic::{AtomicU64, Ordering};
9 :
10 : use crate::seqwait::MonotonicCounter;
11 :
12 : /// Transaction log block size in bytes
13 : pub const XLOG_BLCKSZ: u32 = 8192;
14 :
15 : /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
16 CBC 1089316426 : #[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
17 : #[serde(transparent)]
18 : pub struct Lsn(pub u64);
19 :
20 : /// We tried to parse an LSN from a string, but failed
21 1 : #[derive(Debug, PartialEq, Eq, thiserror::Error)]
22 : #[error("LsnParseError")]
23 : pub struct LsnParseError;
24 :
25 : impl Lsn {
26 : /// Maximum possible value for an LSN
27 : pub const MAX: Lsn = Lsn(u64::MAX);
28 :
29 : /// Invalid value for InvalidXLogRecPtr, as defined in xlogdefs.h
30 : pub const INVALID: Lsn = Lsn(0);
31 :
32 : /// Subtract a number, returning None on overflow.
33 3697257 : pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
34 3697257 : let other: u64 = other.into();
35 3697257 : self.0.checked_sub(other).map(Lsn)
36 3697257 : }
37 :
38 : /// Subtract a number, returning the difference as i128 to avoid overflow.
39 776350 : pub fn widening_sub<T: Into<u64>>(self, other: T) -> i128 {
40 776350 : let other: u64 = other.into();
41 776350 : i128::from(self.0) - i128::from(other)
42 776350 : }
43 :
44 : /// Parse an LSN from a filename in the form `0000000000000000`
45 UBC 0 : pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
46 0 : where
47 0 : F: AsRef<Utf8Path>,
48 0 : {
49 0 : Lsn::from_hex(filename.as_ref().as_str())
50 0 : }
51 :
52 : /// Parse an LSN from a string in the form `0000000000000000`
53 CBC 29784 : pub fn from_hex<S>(s: S) -> Result<Self, LsnParseError>
54 29784 : where
55 29784 : S: AsRef<str>,
56 29784 : {
57 29784 : let s: &str = s.as_ref();
58 29784 : let n = u64::from_str_radix(s, 16).or(Err(LsnParseError))?;
59 21613 : Ok(Lsn(n))
60 29784 : }
61 :
62 : /// Compute the offset into a segment
63 : #[inline]
64 180363886 : pub fn segment_offset(self, seg_sz: usize) -> usize {
65 180363886 : (self.0 % seg_sz as u64) as usize
66 180363886 : }
67 :
68 : /// Compute LSN of the segment start.
69 : #[inline]
70 1430 : pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
71 1430 : Lsn(self.0 - (self.0 % seg_sz as u64))
72 1430 : }
73 :
74 : /// Compute the segment number
75 : #[inline]
76 1627810 : pub fn segment_number(self, seg_sz: usize) -> u64 {
77 1627810 : self.0 / seg_sz as u64
78 1627810 : }
79 :
80 : /// Compute the offset into a block
81 : #[inline]
82 177194933 : pub fn block_offset(self) -> u64 {
83 177194933 : const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
84 177194933 : self.0 % BLCKSZ
85 177194933 : }
86 :
87 : /// Compute the block offset of the first byte of this Lsn within this
88 : /// segment
89 : #[inline]
90 638 : pub fn page_lsn(self) -> Lsn {
91 638 : Lsn(self.0 - self.block_offset())
92 638 : }
93 :
94 : /// Compute the block offset of the first byte of this Lsn within this
95 : /// segment
96 : #[inline]
97 638 : pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
98 638 : (self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
99 638 : }
100 :
101 : /// Compute the bytes remaining in this block
102 : ///
103 : /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
104 : #[inline]
105 174888368 : pub fn remaining_in_block(self) -> u64 {
106 174888368 : const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
107 174888368 : BLCKSZ - (self.0 % BLCKSZ)
108 174888368 : }
109 :
110 : /// Compute the bytes remaining to fill a chunk of some size
111 : ///
112 : /// If the LSN is already at the chunk boundary, it will return 0.
113 697 : pub fn calc_padding<T: Into<u64>>(self, sz: T) -> u64 {
114 697 : let sz: u64 = sz.into();
115 697 : // By using wrapping_sub, we can subtract first and then mod second.
116 697 : // If it's done the other way around, then we would return a full
117 697 : // chunk size if we're already at the chunk boundary.
118 697 : // (Regular subtraction will panic on overflow in debug builds.)
119 697 : (sz.wrapping_sub(self.0)) % sz
120 697 : }
121 :
122 : /// Align LSN on 8-byte boundary (alignment of WAL records).
123 383414846 : pub fn align(&self) -> Lsn {
124 383414846 : Lsn((self.0 + 7) & !7)
125 383414846 : }
126 :
127 : /// Align LSN on 8-byte boundary (alignment of WAL records).
128 214561566 : pub fn is_aligned(&self) -> bool {
129 214561566 : *self == self.align()
130 214561566 : }
131 :
132 : /// Return if the LSN is valid
133 : /// mimics postgres XLogRecPtrIsInvalid macro
134 13870016 : pub fn is_valid(self) -> bool {
135 13870016 : self != Lsn::INVALID
136 13870016 : }
137 : }
138 :
139 : impl From<u64> for Lsn {
140 6115845 : fn from(n: u64) -> Self {
141 6115845 : Lsn(n)
142 6115845 : }
143 : }
144 :
145 : impl From<Lsn> for u64 {
146 10480005 : fn from(lsn: Lsn) -> u64 {
147 10480005 : lsn.0
148 10480005 : }
149 : }
150 :
151 : impl FromStr for Lsn {
152 : type Err = LsnParseError;
153 :
154 : /// Parse an LSN from a string in the form `00000000/00000000`
155 : ///
156 : /// If the input string is missing the '/' character, then use `Lsn::from_hex`
157 10410 : fn from_str(s: &str) -> Result<Self, Self::Err> {
158 10410 : let mut splitter = s.trim().split('/');
159 10410 : if let (Some(left), Some(right), None) = (splitter.next(), splitter.next(), splitter.next())
160 : {
161 10410 : let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?;
162 10407 : let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?;
163 10405 : Ok(Lsn((left_num as u64) << 32 | right_num as u64))
164 : } else {
165 UBC 0 : Err(LsnParseError)
166 : }
167 CBC 10410 : }
168 : }
169 :
170 : impl fmt::Display for Lsn {
171 4750025 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 4750025 : write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
173 4750025 : }
174 : }
175 :
176 : impl fmt::Debug for Lsn {
177 24154 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 24154 : write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff)
179 24154 : }
180 : }
181 :
182 : impl Add<u64> for Lsn {
183 : type Output = Lsn;
184 :
185 73280397 : fn add(self, other: u64) -> Self::Output {
186 73280397 : // panic if the addition overflows.
187 73280397 : Lsn(self.0.checked_add(other).unwrap())
188 73280397 : }
189 : }
190 :
191 : impl AddAssign<u64> for Lsn {
192 346955183 : fn add_assign(&mut self, other: u64) {
193 346955183 : // panic if the addition overflows.
194 346955183 : self.0 = self.0.checked_add(other).unwrap();
195 346955183 : }
196 : }
197 :
198 : /// An [`Lsn`] that can be accessed atomically.
199 : pub struct AtomicLsn {
200 : inner: AtomicU64,
201 : }
202 :
203 : impl AtomicLsn {
204 : /// Creates a new atomic `Lsn`.
205 4472 : pub fn new(val: u64) -> Self {
206 4472 : AtomicLsn {
207 4472 : inner: AtomicU64::new(val),
208 4472 : }
209 4472 : }
210 :
211 : /// Atomically retrieve the `Lsn` value from memory.
212 2365381 : pub fn load(&self) -> Lsn {
213 2365381 : Lsn(self.inner.load(Ordering::Acquire))
214 2365381 : }
215 :
216 : /// Atomically store a new `Lsn` value to memory.
217 20264 : pub fn store(&self, lsn: Lsn) {
218 20264 : self.inner.store(lsn.0, Ordering::Release);
219 20264 : }
220 :
221 : /// Adds to the current value, returning the previous value.
222 : ///
223 : /// This operation will panic on overflow.
224 1 : pub fn fetch_add(&self, val: u64) -> Lsn {
225 1 : let prev = self.inner.fetch_add(val, Ordering::AcqRel);
226 1 : assert!(prev.checked_add(val).is_some(), "AtomicLsn overflow");
227 1 : Lsn(prev)
228 1 : }
229 :
230 : /// Atomically sets the Lsn to the max of old and new value, returning the old value.
231 789185 : pub fn fetch_max(&self, lsn: Lsn) -> Lsn {
232 789185 : let prev = self.inner.fetch_max(lsn.0, Ordering::AcqRel);
233 789185 : Lsn(prev)
234 789185 : }
235 : }
236 :
237 : impl From<Lsn> for AtomicLsn {
238 900 : fn from(lsn: Lsn) -> Self {
239 900 : Self::new(lsn.0)
240 900 : }
241 : }
242 :
243 : /// Pair of LSN's pointing to the end of the last valid record and previous one
244 UBC 0 : #[derive(Debug, Clone, Copy)]
245 : pub struct RecordLsn {
246 : /// LSN at the end of the current record
247 : pub last: Lsn,
248 : /// LSN at the end of the previous record
249 : pub prev: Lsn,
250 : }
251 :
252 : /// Expose `self.last` as counter to be able to use RecordLsn in SeqWait
253 : impl MonotonicCounter<Lsn> for RecordLsn {
254 CBC 69331227 : fn cnt_advance(&mut self, lsn: Lsn) {
255 69331227 : assert!(self.last <= lsn);
256 69331227 : let new_prev = self.last;
257 69331227 : self.last = lsn;
258 69331227 : self.prev = new_prev;
259 69331227 : }
260 70609848 : fn cnt_value(&self) -> Lsn {
261 70609848 : self.last
262 70609848 : }
263 : }
264 :
265 : #[cfg(test)]
266 : mod tests {
267 : use super::*;
268 :
269 1 : #[test]
270 1 : fn test_lsn_strings() {
271 1 : assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555)));
272 1 : assert_eq!("aaaa/bbbb".parse(), Ok(Lsn(0x0000AAAA0000BBBB)));
273 1 : assert_eq!("1/A".parse(), Ok(Lsn(0x000000010000000A)));
274 1 : assert_eq!("0/0".parse(), Ok(Lsn(0)));
275 1 : "ABCDEFG/12345678".parse::<Lsn>().unwrap_err();
276 1 : "123456789/AAAA5555".parse::<Lsn>().unwrap_err();
277 1 : "12345678/AAAA55550".parse::<Lsn>().unwrap_err();
278 1 : "-1/0".parse::<Lsn>().unwrap_err();
279 1 : "1/-1".parse::<Lsn>().unwrap_err();
280 1 :
281 1 : assert_eq!(format!("{}", Lsn(0x12345678AAAA5555)), "12345678/AAAA5555");
282 1 : assert_eq!(format!("{}", Lsn(0x000000010000000A)), "1/A");
283 :
284 1 : assert_eq!(
285 1 : Lsn::from_hex("12345678AAAA5555"),
286 1 : Ok(Lsn(0x12345678AAAA5555))
287 1 : );
288 1 : assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0)));
289 1 : assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError));
290 :
291 1 : let expected_lsn = Lsn(0x3C490F8);
292 1 : assert_eq!(" 0/3C490F8".parse(), Ok(expected_lsn));
293 1 : assert_eq!("0/3C490F8 ".parse(), Ok(expected_lsn));
294 1 : assert_eq!(" 0/3C490F8 ".parse(), Ok(expected_lsn));
295 1 : }
296 :
297 1 : #[test]
298 1 : fn test_lsn_math() {
299 1 : assert_eq!(Lsn(1234) + 11u64, Lsn(1245));
300 :
301 1 : assert_eq!(
302 1 : {
303 1 : let mut lsn = Lsn(1234);
304 1 : lsn += 11u64;
305 1 : lsn
306 1 : },
307 1 : Lsn(1245)
308 1 : );
309 :
310 1 : assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1)));
311 1 : assert_eq!(Lsn(1234).checked_sub(1235u64), None);
312 :
313 1 : assert_eq!(Lsn(1235).widening_sub(1234u64), 1);
314 1 : assert_eq!(Lsn(1234).widening_sub(1235u64), -1);
315 1 : assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
316 1 : assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
317 :
318 1 : let seg_sz: usize = 16 * 1024 * 1024;
319 1 : assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
320 1 : assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);
321 :
322 1 : assert_eq!(Lsn(0x4007).block_offset(), 7u64);
323 1 : assert_eq!(Lsn(0x4000).block_offset(), 0u64);
324 1 : assert_eq!(Lsn(0x4007).remaining_in_block(), 8185u64);
325 1 : assert_eq!(Lsn(0x4000).remaining_in_block(), 8192u64);
326 :
327 1 : assert_eq!(Lsn(0xffff01).calc_padding(seg_sz as u64), 255u64);
328 1 : assert_eq!(Lsn(0x2000000).calc_padding(seg_sz as u64), 0u64);
329 1 : assert_eq!(Lsn(0xffff01).calc_padding(8u32), 7u64);
330 1 : assert_eq!(Lsn(0xffff00).calc_padding(8u32), 0u64);
331 1 : }
332 :
333 1 : #[test]
334 1 : fn test_atomic_lsn() {
335 1 : let lsn = AtomicLsn::new(0);
336 1 : assert_eq!(lsn.fetch_add(1234), Lsn(0));
337 1 : assert_eq!(lsn.load(), Lsn(1234));
338 1 : lsn.store(Lsn(5678));
339 1 : assert_eq!(lsn.load(), Lsn(5678));
340 :
341 1 : assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678));
342 1 : assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000));
343 1 : }
344 : }
|