Line data Source code
1 : use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
2 :
3 : use anyhow::Context;
4 : use once_cell::sync::OnceCell;
5 : use tokio_util::sync::CancellationToken;
6 : use utils::lsn::Lsn;
7 :
8 : /// Internal structure to hold all data needed for logical size calculation.
9 : ///
10 : /// Calculation consists of two stages:
11 : ///
12 : /// 1. Initial size calculation. That might take a long time, because it requires
13 : /// reading all layers containing relation sizes at `initial_part_end`.
14 : ///
15 : /// 2. Collecting an incremental part and adding that to the initial size.
16 : /// Increments are appended on walreceiver writing new timeline data,
17 : /// which result in increase or decrease of the logical size.
18 : pub(super) struct LogicalSize {
19 : /// Size, potentially slow to compute. Calculating this might require reading multiple
20 : /// layers, and even ancestor's layers.
21 : ///
22 : /// NOTE: size at a given LSN is constant, but after a restart we will calculate
23 : /// the initial size at a different LSN.
24 : pub initial_logical_size: OnceCell<(
25 : u64,
26 : crate::metrics::initial_logical_size::FinishedCalculationGuard,
27 : )>,
28 :
29 : /// Cancellation for the best-effort logical size calculation.
30 : ///
31 : /// The token is kept in a once-cell so that we can error out if a higher priority
32 : /// request comes in *before* we have started the normal logical size calculation.
33 : pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore:
34 : OnceCell<CancellationToken>,
35 :
36 : /// Once the initial logical size is initialized, this is notified.
37 : pub(crate) initialized: tokio::sync::Semaphore,
38 :
39 : /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
40 : pub initial_part_end: Option<Lsn>,
41 :
42 : /// All other size changes after startup, combined together.
43 : ///
44 : /// Size shouldn't ever be negative, but this is signed for two reasons:
45 : ///
46 : /// 1. If we initialized the "baseline" size lazily, while we already
47 : /// process incoming WAL, the incoming WAL records could decrement the
48 : /// variable and temporarily make it negative. (This is just future-proofing;
49 : /// the initialization is currently not done lazily.)
50 : ///
51 : /// 2. If there is a bug and we e.g. forget to increment it in some cases
52 : /// when size grows, but remember to decrement it when it shrinks again, the
53 : /// variable could go negative. In that case, it seems better to at least
54 : /// try to keep tracking it, rather than clamp or overflow it. Note that
55 : /// get_current_logical_size() will clamp the returned value to zero if it's
56 : /// negative, and log an error. Could set it permanently to zero or some
57 : /// special value to indicate "broken" instead, but this will do for now.
58 : ///
59 : /// Note that we also expose a copy of this value as a prometheus metric,
60 : /// see `current_logical_size_gauge`. Use the `update_current_logical_size`
61 : /// to modify this, it will also keep the prometheus metric in sync.
62 : pub size_added_after_initial: AtomicI64,
63 :
64 : /// For [`crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE`].
65 : pub(super) did_return_approximate_to_walreceiver: AtomicBool,
66 : }
67 :
68 : /// Normalized current size, that the data in pageserver occupies.
69 : #[derive(Debug, Clone, Copy)]
70 : pub(crate) enum CurrentLogicalSize {
71 : /// The size is not yet calculated to the end, this is an intermediate result,
72 : /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
73 : /// yet total logical size cannot be below 0.
74 : Approximate(Approximate),
75 : // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
76 : // available for observation without any calculations.
77 : Exact(Exact),
78 : }
79 :
80 : #[derive(Debug, Copy, Clone, PartialEq, Eq)]
81 : pub(crate) enum Accuracy {
82 : Approximate,
83 : Exact,
84 : }
85 :
86 : #[derive(Debug, Clone, Copy)]
87 : pub(crate) struct Approximate(u64);
88 : #[derive(Debug, Clone, Copy)]
89 : pub(crate) struct Exact(u64);
90 :
91 : impl From<&Approximate> for u64 {
92 0 : fn from(value: &Approximate) -> Self {
93 0 : value.0
94 0 : }
95 : }
96 :
97 : impl From<&Exact> for u64 {
98 542084 : fn from(val: &Exact) -> Self {
99 542084 : val.0
100 542084 : }
101 : }
102 :
103 : impl Approximate {
104 : /// For use in situations where we don't have a sane logical size value but need
105 : /// to return something, e.g. in HTTP API on shard >0 of a sharded tenant.
106 0 : pub(crate) fn zero() -> Self {
107 0 : Self(0)
108 0 : }
109 : }
110 :
111 : impl CurrentLogicalSize {
112 0 : pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
113 0 : match self {
114 0 : Self::Approximate(size) => size.into(),
115 0 : Self::Exact(size) => size.into(),
116 : }
117 0 : }
118 0 : pub(crate) fn accuracy(&self) -> Accuracy {
119 0 : match self {
120 0 : Self::Approximate(_) => Accuracy::Approximate,
121 0 : Self::Exact(_) => Accuracy::Exact,
122 : }
123 0 : }
124 :
125 0 : pub(crate) fn is_exact(&self) -> bool {
126 0 : matches!(self, Self::Exact(_))
127 0 : }
128 : }
129 :
130 : impl LogicalSize {
131 436 : pub(super) fn empty_initial() -> Self {
132 436 : Self {
133 436 : initial_logical_size: OnceCell::with_value((0, {
134 436 : crate::metrics::initial_logical_size::START_CALCULATION
135 436 : .first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
136 436 : .calculation_result_saved()
137 436 : })),
138 436 : cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
139 436 : initial_part_end: None,
140 436 : size_added_after_initial: AtomicI64::new(0),
141 436 : did_return_approximate_to_walreceiver: AtomicBool::new(false),
142 436 : initialized: tokio::sync::Semaphore::new(0),
143 436 : }
144 436 : }
145 :
146 468 : pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
147 468 : Self {
148 468 : initial_logical_size: OnceCell::new(),
149 468 : cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
150 468 : initial_part_end: Some(compute_to),
151 468 : size_added_after_initial: AtomicI64::new(0),
152 468 : did_return_approximate_to_walreceiver: AtomicBool::new(false),
153 468 : initialized: tokio::sync::Semaphore::new(0),
154 468 : }
155 468 : }
156 :
157 542288 : pub(super) fn current_size(&self) -> CurrentLogicalSize {
158 542288 : let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
159 542288 : // ^^^ keep this type explicit so that the casts in this function break if
160 542288 : // we change the type.
161 542288 : match self.initial_logical_size.get() {
162 542084 : Some((initial_size, _)) => {
163 542084 : CurrentLogicalSize::Exact(Exact(initial_size.checked_add_signed(size_increment)
164 542084 : .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
165 542084 : .unwrap()))
166 : }
167 : None => {
168 :
169 204 : let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
170 204 : CurrentLogicalSize::Approximate(Approximate(non_negative_size_increment))
171 : }
172 : }
173 542288 : }
174 :
175 541140 : pub(super) fn increment_size(&self, delta: i64) {
176 541140 : self.size_added_after_initial
177 541140 : .fetch_add(delta, AtomicOrdering::SeqCst);
178 541140 : }
179 :
180 : /// Make the value computed by initial logical size computation
181 : /// available for re-use. This doesn't contain the incremental part.
182 0 : pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
183 0 : match self.initial_part_end {
184 0 : Some(v) if v == lsn => self.initial_logical_size.get().map(|(s, _)| *s),
185 0 : _ => None,
186 : }
187 0 : }
188 : }
|