LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - logical_size.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 97.7 % 43 42
Test Date: 2023-09-06 10:18:01 Functions: 66.7 % 9 6

            Line data    Source code
       1              : use anyhow::Context;
       2              : use once_cell::sync::OnceCell;
       3              : 
       4              : use tokio::sync::Semaphore;
       5              : use utils::lsn::Lsn;
       6              : 
       7              : use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
       8              : use std::sync::Arc;
       9              : 
      10              : /// Internal structure to hold all data needed for logical size calculation.
      11              : ///
      12              : /// Calculation consists of two stages:
      13              : ///
      14              : /// 1. Initial size calculation. That might take a long time, because it requires
      15              : /// reading all layers containing relation sizes at `initial_part_end`.
      16              : ///
      17              : /// 2. Collecting an incremental part and adding that to the initial size.
      18              : /// Increments are appended on walreceiver writing new timeline data,
      19              : /// which result in increase or decrease of the logical size.
      20              : pub(super) struct LogicalSize {
      21              :     /// Size, potentially slow to compute. Calculating this might require reading multiple
      22              :     /// layers, and even ancestor's layers.
      23              :     ///
      24              :     /// NOTE: size at a given LSN is constant, but after a restart we will calculate
      25              :     /// the initial size at a different LSN.
      26              :     pub initial_logical_size: OnceCell<u64>,
      27              : 
      28              :     /// Semaphore to track ongoing calculation of `initial_logical_size`.
      29              :     pub initial_size_computation: Arc<tokio::sync::Semaphore>,
      30              : 
      31              :     /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
      32              :     pub initial_part_end: Option<Lsn>,
      33              : 
      34              :     /// All other size changes after startup, combined together.
      35              :     ///
      36              :     /// Size shouldn't ever be negative, but this is signed for two reasons:
      37              :     ///
      38              :     /// 1. If we initialized the "baseline" size lazily, while we already
      39              :     /// process incoming WAL, the incoming WAL records could decrement the
      40              :     /// variable and temporarily make it negative. (This is just future-proofing;
      41              :     /// the initialization is currently not done lazily.)
      42              :     ///
      43              :     /// 2. If there is a bug and we e.g. forget to increment it in some cases
      44              :     /// when size grows, but remember to decrement it when it shrinks again, the
      45              :     /// variable could go negative. In that case, it seems better to at least
      46              :     /// try to keep tracking it, rather than clamp or overflow it. Note that
      47              :     /// get_current_logical_size() will clamp the returned value to zero if it's
      48              :     /// negative, and log an error. Could set it permanently to zero or some
      49              :     /// special value to indicate "broken" instead, but this will do for now.
      50              :     ///
      51              :     /// Note that we also expose a copy of this value as a prometheus metric,
      52              :     /// see `current_logical_size_gauge`. Use the `update_current_logical_size`
      53              :     /// to modify this, it will also keep the prometheus metric in sync.
      54              :     pub size_added_after_initial: AtomicI64,
      55              : }
      56              : 
      57              : /// Normalized current size, that the data in pageserver occupies.
      58            0 : #[derive(Debug, Clone, Copy)]
      59              : pub(super) enum CurrentLogicalSize {
      60              :     /// The size is not yet calculated to the end, this is an intermediate result,
      61              :     /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
      62              :     /// yet total logical size cannot be below 0.
      63              :     Approximate(u64),
      64              :     // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
      65              :     // available for observation without any calculations.
      66              :     Exact(u64),
      67              : }
      68              : 
      69              : impl CurrentLogicalSize {
      70       736545 :     pub(super) fn size(&self) -> u64 {
      71       736545 :         *match self {
      72          562 :             Self::Approximate(size) => size,
      73       735983 :             Self::Exact(size) => size,
      74              :         }
      75       736545 :     }
      76              : }
      77              : 
      78              : impl LogicalSize {
      79          684 :     pub(super) fn empty_initial() -> Self {
      80          684 :         Self {
      81          684 :             initial_logical_size: OnceCell::with_value(0),
      82          684 :             //  initial_logical_size already computed, so, don't admit any calculations
      83          684 :             initial_size_computation: Arc::new(Semaphore::new(0)),
      84          684 :             initial_part_end: None,
      85          684 :             size_added_after_initial: AtomicI64::new(0),
      86          684 :         }
      87          684 :     }
      88              : 
      89          710 :     pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
      90          710 :         Self {
      91          710 :             initial_logical_size: OnceCell::new(),
      92          710 :             initial_size_computation: Arc::new(Semaphore::new(1)),
      93          710 :             initial_part_end: Some(compute_to),
      94          710 :             size_added_after_initial: AtomicI64::new(0),
      95          710 :         }
      96          710 :     }
      97              : 
      98      1830899 :     pub(super) fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
      99      1830899 :         let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
     100      1830899 :         //                  ^^^ keep this type explicit so that the casts in this function break if
     101      1830899 :         //                  we change the type.
     102      1830899 :         match self.initial_logical_size.get() {
     103      1829208 :             Some(initial_size) => {
     104      1829208 :                 initial_size.checked_add_signed(size_increment)
     105      1829208 :                     .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
     106      1829208 :                     .map(CurrentLogicalSize::Exact)
     107              :             }
     108              :             None => {
     109         1691 :                 let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
     110         1691 :                 Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
     111              :             }
     112              :         }
     113      1830899 :     }
     114              : 
     115      1094354 :     pub(super) fn increment_size(&self, delta: i64) {
     116      1094354 :         self.size_added_after_initial
     117      1094354 :             .fetch_add(delta, AtomicOrdering::SeqCst);
     118      1094354 :     }
     119              : 
     120              :     /// Make the value computed by initial logical size computation
     121              :     /// available for re-use. This doesn't contain the incremental part.
     122              :     pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
     123          394 :         match self.initial_part_end {
     124          394 :             Some(v) if v == lsn => self.initial_logical_size.get().copied(),
     125           33 :             _ => None,
     126              :         }
     127          406 :     }
     128              : }
        

Generated by: LCOV version 2.1-beta