|             Line data    Source code 
       1              : use anyhow::Context;
       2              : 
       3              : use once_cell::sync::OnceCell;
       4              : use tokio_util::sync::CancellationToken;
       5              : use utils::lsn::Lsn;
       6              : 
       7              : use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
       8              : 
       9              : /// Internal structure to hold all data needed for logical size calculation.
      10              : ///
      11              : /// Calculation consists of two stages:
      12              : ///
      13              : /// 1. Initial size calculation. That might take a long time, because it requires
      14              : /// reading all layers containing relation sizes at `initial_part_end`.
      15              : ///
      16              : /// 2. Collecting an incremental part and adding that to the initial size.
      17              : /// Increments are appended on walreceiver writing new timeline data,
      18              : /// which result in increase or decrease of the logical size.
      19              : pub(super) struct LogicalSize {
      20              :     /// Size, potentially slow to compute. Calculating this might require reading multiple
      21              :     /// layers, and even ancestor's layers.
      22              :     ///
      23              :     /// NOTE: size at a given LSN is constant, but after a restart we will calculate
      24              :     /// the initial size at a different LSN.
      25              :     pub initial_logical_size: OnceCell<(
      26              :         u64,
      27              :         crate::metrics::initial_logical_size::FinishedCalculationGuard,
      28              :     )>,
      29              : 
      30              :     /// Cancellation for the best-effort logical size calculation.
      31              :     ///
      32              :     /// The token is kept in a once-cell so that we can error out if a higher priority
      33              :     /// request comes in *before* we have started the normal logical size calculation.
      34              :     pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore:
      35              :         OnceCell<CancellationToken>,
      36              : 
      37              :     /// Once the initial logical size is initialized, this is notified.
      38              :     pub(crate) initialized: tokio::sync::Semaphore,
      39              : 
      40              :     /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
      41              :     pub initial_part_end: Option<Lsn>,
      42              : 
      43              :     /// All other size changes after startup, combined together.
      44              :     ///
      45              :     /// Size shouldn't ever be negative, but this is signed for two reasons:
      46              :     ///
      47              :     /// 1. If we initialized the "baseline" size lazily, while we already
      48              :     /// process incoming WAL, the incoming WAL records could decrement the
      49              :     /// variable and temporarily make it negative. (This is just future-proofing;
      50              :     /// the initialization is currently not done lazily.)
      51              :     ///
      52              :     /// 2. If there is a bug and we e.g. forget to increment it in some cases
      53              :     /// when size grows, but remember to decrement it when it shrinks again, the
      54              :     /// variable could go negative. In that case, it seems better to at least
      55              :     /// try to keep tracking it, rather than clamp or overflow it. Note that
      56              :     /// get_current_logical_size() will clamp the returned value to zero if it's
      57              :     /// negative, and log an error. Could set it permanently to zero or some
      58              :     /// special value to indicate "broken" instead, but this will do for now.
      59              :     ///
      60              :     /// Note that we also expose a copy of this value as a prometheus metric,
      61              :     /// see `current_logical_size_gauge`. Use the `update_current_logical_size`
      62              :     /// to modify this, it will also keep the prometheus metric in sync.
      63              :     pub size_added_after_initial: AtomicI64,
      64              : 
      65              :     /// For [`crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE`].
      66              :     pub(super) did_return_approximate_to_walreceiver: AtomicBool,
      67              : }
      68              : 
      69              : /// Normalized current size, that the data in pageserver occupies.
      70              : #[derive(Debug, Clone, Copy)]
      71              : pub(crate) enum CurrentLogicalSize {
      72              :     /// The size is not yet calculated to the end, this is an intermediate result,
      73              :     /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
      74              :     /// yet total logical size cannot be below 0.
      75              :     Approximate(Approximate),
      76              :     // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
      77              :     // available for observation without any calculations.
      78              :     Exact(Exact),
      79              : }
      80              : 
      81              : #[derive(Debug, Copy, Clone, PartialEq, Eq)]
      82              : pub(crate) enum Accuracy {
      83              :     Approximate,
      84              :     Exact,
      85              : }
      86              : 
      87              : #[derive(Debug, Clone, Copy)]
      88              : pub(crate) struct Approximate(u64);
      89              : #[derive(Debug, Clone, Copy)]
      90              : pub(crate) struct Exact(u64);
      91              : 
      92              : impl From<&Approximate> for u64 {
      93            0 :     fn from(value: &Approximate) -> Self {
      94            0 :         value.0
      95            0 :     }
      96              : }
      97              : 
      98              : impl From<&Exact> for u64 {
      99       270570 :     fn from(val: &Exact) -> Self {
     100       270570 :         val.0
     101       270570 :     }
     102              : }
     103              : 
     104              : impl Approximate {
     105              :     /// For use in situations where we don't have a sane logical size value but need
     106              :     /// to return something, e.g. in HTTP API on shard >0 of a sharded tenant.
     107            0 :     pub(crate) fn zero() -> Self {
     108            0 :         Self(0)
     109            0 :     }
     110              : }
     111              : 
     112              : impl CurrentLogicalSize {
     113            0 :     pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
     114            0 :         match self {
     115            0 :             Self::Approximate(size) => size.into(),
     116            0 :             Self::Exact(size) => size.into(),
     117              :         }
     118            0 :     }
     119            0 :     pub(crate) fn accuracy(&self) -> Accuracy {
     120            0 :         match self {
     121            0 :             Self::Approximate(_) => Accuracy::Approximate,
     122            0 :             Self::Exact(_) => Accuracy::Exact,
     123              :         }
     124            0 :     }
     125              : }
     126              : 
     127              : impl LogicalSize {
     128          150 :     pub(super) fn empty_initial() -> Self {
     129          150 :         Self {
     130          150 :             initial_logical_size: OnceCell::with_value((0, {
     131          150 :                 crate::metrics::initial_logical_size::START_CALCULATION
     132          150 :                     .first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
     133          150 :                     .calculation_result_saved()
     134          150 :             })),
     135          150 :             cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
     136          150 :             initial_part_end: None,
     137          150 :             size_added_after_initial: AtomicI64::new(0),
     138          150 :             did_return_approximate_to_walreceiver: AtomicBool::new(false),
     139          150 :             initialized: tokio::sync::Semaphore::new(0),
     140          150 :         }
     141          150 :     }
     142              : 
     143          230 :     pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
     144          230 :         Self {
     145          230 :             initial_logical_size: OnceCell::new(),
     146          230 :             cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
     147          230 :             initial_part_end: Some(compute_to),
     148          230 :             size_added_after_initial: AtomicI64::new(0),
     149          230 :             did_return_approximate_to_walreceiver: AtomicBool::new(false),
     150          230 :             initialized: tokio::sync::Semaphore::new(0),
     151          230 :         }
     152          230 :     }
     153              : 
     154       270570 :     pub(super) fn current_size(&self) -> CurrentLogicalSize {
     155       270570 :         let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
     156       270570 :         //                  ^^^ keep this type explicit so that the casts in this function break if
     157       270570 :         //                  we change the type.
     158       270570 :         match self.initial_logical_size.get() {
     159       270570 :             Some((initial_size, _)) => {
     160       270570 :                 CurrentLogicalSize::Exact(Exact(initial_size.checked_add_signed(size_increment)
     161       270570 :                     .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
     162       270570 :                     .unwrap()))
     163              :             }
     164              :             None => {
     165              : 
     166            0 :                 let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
     167            0 :                 CurrentLogicalSize::Approximate(Approximate(non_negative_size_increment))
     168              :             }
     169              :         }
     170       270570 :     }
     171              : 
     172       270570 :     pub(super) fn increment_size(&self, delta: i64) {
     173       270570 :         self.size_added_after_initial
     174       270570 :             .fetch_add(delta, AtomicOrdering::SeqCst);
     175       270570 :     }
     176              : 
     177              :     /// Make the value computed by initial logical size computation
     178              :     /// available for re-use. This doesn't contain the incremental part.
     179            0 :     pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
     180            0 :         match self.initial_part_end {
     181            0 :             Some(v) if v == lsn => self.initial_logical_size.get().map(|(s, _)| *s),
     182            0 :             _ => None,
     183              :         }
     184            0 :     }
     185              : }
         |