Line data Source code
1 : use anyhow::Context;
2 :
3 : use once_cell::sync::OnceCell;
4 : use tokio_util::sync::CancellationToken;
5 : use utils::lsn::Lsn;
6 :
7 : use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
8 :
9 : /// Internal structure to hold all data needed for logical size calculation.
10 : ///
11 : /// Calculation consists of two stages:
12 : ///
13 : /// 1. Initial size calculation. That might take a long time, because it requires
14 : /// reading all layers containing relation sizes at `initial_part_end`.
15 : ///
16 : /// 2. Collecting an incremental part and adding that to the initial size.
17 : /// Increments are appended on walreceiver writing new timeline data,
18 : /// which result in increase or decrease of the logical size.
19 : pub(super) struct LogicalSize {
20 : /// Size, potentially slow to compute. Calculating this might require reading multiple
21 : /// layers, and even ancestor's layers.
22 : ///
23 : /// NOTE: size at a given LSN is constant, but after a restart we will calculate
24 : /// the initial size at a different LSN.
25 : pub initial_logical_size: OnceCell<(
26 : u64,
27 : crate::metrics::initial_logical_size::FinishedCalculationGuard,
28 : )>,
29 :
30 : /// Cancellation for the best-effort logical size calculation.
31 : ///
32 : /// The token is kept in a once-cell so that we can error out if a higher priority
33 : /// request comes in *before* we have started the normal logical size calculation.
34 : pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore:
35 : OnceCell<CancellationToken>,
36 :
37 : /// Once the initial logical size is initialized, this is notified.
38 : pub(crate) initialized: tokio::sync::Semaphore,
39 :
40 : /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
41 : pub initial_part_end: Option<Lsn>,
42 :
43 : /// All other size changes after startup, combined together.
44 : ///
45 : /// Size shouldn't ever be negative, but this is signed for two reasons:
46 : ///
47 : /// 1. If we initialized the "baseline" size lazily, while we already
48 : /// process incoming WAL, the incoming WAL records could decrement the
49 : /// variable and temporarily make it negative. (This is just future-proofing;
50 : /// the initialization is currently not done lazily.)
51 : ///
52 : /// 2. If there is a bug and we e.g. forget to increment it in some cases
53 : /// when size grows, but remember to decrement it when it shrinks again, the
54 : /// variable could go negative. In that case, it seems better to at least
55 : /// try to keep tracking it, rather than clamp or overflow it. Note that
56 : /// get_current_logical_size() will clamp the returned value to zero if it's
57 : /// negative, and log an error. Could set it permanently to zero or some
58 : /// special value to indicate "broken" instead, but this will do for now.
59 : ///
60 : /// Note that we also expose a copy of this value as a prometheus metric,
61 : /// see `current_logical_size_gauge`. Use the `update_current_logical_size`
62 : /// to modify this, it will also keep the prometheus metric in sync.
63 : pub size_added_after_initial: AtomicI64,
64 :
65 : /// For [`crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE`].
66 : pub(super) did_return_approximate_to_walreceiver: AtomicBool,
67 : }
68 :
69 : /// Normalized current size, that the data in pageserver occupies.
70 : #[derive(Debug, Clone, Copy)]
71 : pub(crate) enum CurrentLogicalSize {
72 : /// The size is not yet calculated to the end, this is an intermediate result,
73 : /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
74 : /// yet total logical size cannot be below 0.
75 : Approximate(Approximate),
76 : // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
77 : // available for observation without any calculations.
78 : Exact(Exact),
79 : }
80 :
81 : #[derive(Debug, Copy, Clone, PartialEq, Eq)]
82 : pub(crate) enum Accuracy {
83 : Approximate,
84 : Exact,
85 : }
86 :
87 : #[derive(Debug, Clone, Copy)]
88 : pub(crate) struct Approximate(u64);
89 : #[derive(Debug, Clone, Copy)]
90 : pub(crate) struct Exact(u64);
91 :
92 : impl From<&Approximate> for u64 {
93 0 : fn from(value: &Approximate) -> Self {
94 0 : value.0
95 0 : }
96 : }
97 :
98 : impl From<&Exact> for u64 {
99 271184 : fn from(val: &Exact) -> Self {
100 271184 : val.0
101 271184 : }
102 : }
103 :
104 : impl Approximate {
105 : /// For use in situations where we don't have a sane logical size value but need
106 : /// to return something, e.g. in HTTP API on shard >0 of a sharded tenant.
107 0 : pub(crate) fn zero() -> Self {
108 0 : Self(0)
109 0 : }
110 : }
111 :
112 : impl CurrentLogicalSize {
113 0 : pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
114 0 : match self {
115 0 : Self::Approximate(size) => size.into(),
116 0 : Self::Exact(size) => size.into(),
117 : }
118 0 : }
119 0 : pub(crate) fn accuracy(&self) -> Accuracy {
120 0 : match self {
121 0 : Self::Approximate(_) => Accuracy::Approximate,
122 0 : Self::Exact(_) => Accuracy::Exact,
123 : }
124 0 : }
125 :
126 0 : pub(crate) fn is_exact(&self) -> bool {
127 0 : matches!(self, Self::Exact(_))
128 0 : }
129 : }
130 :
131 : impl LogicalSize {
132 184 : pub(super) fn empty_initial() -> Self {
133 184 : Self {
134 184 : initial_logical_size: OnceCell::with_value((0, {
135 184 : crate::metrics::initial_logical_size::START_CALCULATION
136 184 : .first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
137 184 : .calculation_result_saved()
138 184 : })),
139 184 : cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
140 184 : initial_part_end: None,
141 184 : size_added_after_initial: AtomicI64::new(0),
142 184 : did_return_approximate_to_walreceiver: AtomicBool::new(false),
143 184 : initialized: tokio::sync::Semaphore::new(0),
144 184 : }
145 184 : }
146 :
147 234 : pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
148 234 : Self {
149 234 : initial_logical_size: OnceCell::new(),
150 234 : cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
151 234 : initial_part_end: Some(compute_to),
152 234 : size_added_after_initial: AtomicI64::new(0),
153 234 : did_return_approximate_to_walreceiver: AtomicBool::new(false),
154 234 : initialized: tokio::sync::Semaphore::new(0),
155 234 : }
156 234 : }
157 :
158 271286 : pub(super) fn current_size(&self) -> CurrentLogicalSize {
159 271286 : let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
160 271286 : // ^^^ keep this type explicit so that the casts in this function break if
161 271286 : // we change the type.
162 271286 : match self.initial_logical_size.get() {
163 271184 : Some((initial_size, _)) => {
164 271184 : CurrentLogicalSize::Exact(Exact(initial_size.checked_add_signed(size_increment)
165 271184 : .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
166 271184 : .unwrap()))
167 : }
168 : None => {
169 :
170 102 : let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
171 102 : CurrentLogicalSize::Approximate(Approximate(non_negative_size_increment))
172 : }
173 : }
174 271286 : }
175 :
176 270570 : pub(super) fn increment_size(&self, delta: i64) {
177 270570 : self.size_added_after_initial
178 270570 : .fetch_add(delta, AtomicOrdering::SeqCst);
179 270570 : }
180 :
181 : /// Make the value computed by initial logical size computation
182 : /// available for re-use. This doesn't contain the incremental part.
183 0 : pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
184 0 : match self.initial_part_end {
185 0 : Some(v) if v == lsn => self.initial_logical_size.get().map(|(s, _)| *s),
186 0 : _ => None,
187 : }
188 0 : }
189 : }
|