LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline - logical_size.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 97.7 % 43 42 1 42
Current Date: 2023-10-19 02:04:12 Functions: 66.7 % 9 6 3 6
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use anyhow::Context;
       2                 : use once_cell::sync::OnceCell;
       3                 : 
       4                 : use tokio::sync::Semaphore;
       5                 : use utils::lsn::Lsn;
       6                 : 
       7                 : use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
       8                 : use std::sync::Arc;
       9                 : 
      10                 : /// Internal structure to hold all data needed for logical size calculation.
      11                 : ///
      12                 : /// Calculation consists of two stages:
      13                 : ///
      14                 : /// 1. Initial size calculation. That might take a long time, because it requires
      15                 : /// reading all layers containing relation sizes at `initial_part_end`.
      16                 : ///
      17                 : /// 2. Collecting an incremental part and adding that to the initial size.
      18                 : /// Increments are appended on walreceiver writing new timeline data,
      19                 : /// which result in increase or decrease of the logical size.
      20                 : pub(super) struct LogicalSize {
      21                 :     /// Size, potentially slow to compute. Calculating this might require reading multiple
      22                 :     /// layers, and even ancestor's layers.
      23                 :     ///
      24                 :     /// NOTE: size at a given LSN is constant, but after a restart we will calculate
      25                 :     /// the initial size at a different LSN.
      26                 :     pub initial_logical_size: OnceCell<u64>,
      27                 : 
      28                 :     /// Semaphore to track ongoing calculation of `initial_logical_size`.
      29                 :     pub initial_size_computation: Arc<tokio::sync::Semaphore>,
      30                 : 
      31                 :     /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
      32                 :     pub initial_part_end: Option<Lsn>,
      33                 : 
      34                 :     /// All other size changes after startup, combined together.
      35                 :     ///
      36                 :     /// Size shouldn't ever be negative, but this is signed for two reasons:
      37                 :     ///
      38                 :     /// 1. If we initialized the "baseline" size lazily, while we already
      39                 :     /// process incoming WAL, the incoming WAL records could decrement the
      40                 :     /// variable and temporarily make it negative. (This is just future-proofing;
      41                 :     /// the initialization is currently not done lazily.)
      42                 :     ///
      43                 :     /// 2. If there is a bug and we e.g. forget to increment it in some cases
      44                 :     /// when size grows, but remember to decrement it when it shrinks again, the
      45                 :     /// variable could go negative. In that case, it seems better to at least
      46                 :     /// try to keep tracking it, rather than clamp or overflow it. Note that
      47                 :     /// get_current_logical_size() will clamp the returned value to zero if it's
      48                 :     /// negative, and log an error. Could set it permanently to zero or some
      49                 :     /// special value to indicate "broken" instead, but this will do for now.
      50                 :     ///
      51                 :     /// Note that we also expose a copy of this value as a prometheus metric,
      52                 :     /// see `current_logical_size_gauge`. Use the `update_current_logical_size`
      53                 :     /// to modify this, it will also keep the prometheus metric in sync.
      54                 :     pub size_added_after_initial: AtomicI64,
      55                 : }
      56                 : 
      57                 : /// Normalized current size, that the data in pageserver occupies.
      58 UBC           0 : #[derive(Debug, Clone, Copy)]
      59                 : pub(super) enum CurrentLogicalSize {
      60                 :     /// The size is not yet calculated to the end, this is an intermediate result,
      61                 :     /// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
      62                 :     /// yet total logical size cannot be below 0.
      63                 :     Approximate(u64),
      64                 :     // Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
      65                 :     // available for observation without any calculations.
      66                 :     Exact(u64),
      67                 : }
      68                 : 
      69                 : impl CurrentLogicalSize {
      70 CBC      779585 :     pub(super) fn size(&self) -> u64 {
      71          779585 :         *match self {
      72             717 :             Self::Approximate(size) => size,
      73          778868 :             Self::Exact(size) => size,
      74                 :         }
      75          779585 :     }
      76                 : }
      77                 : 
      78                 : impl LogicalSize {
      79             617 :     pub(super) fn empty_initial() -> Self {
      80             617 :         Self {
      81             617 :             initial_logical_size: OnceCell::with_value(0),
      82             617 :             //  initial_logical_size already computed, so, don't admit any calculations
      83             617 :             initial_size_computation: Arc::new(Semaphore::new(0)),
      84             617 :             initial_part_end: None,
      85             617 :             size_added_after_initial: AtomicI64::new(0),
      86             617 :         }
      87             617 :     }
      88                 : 
      89             685 :     pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
      90             685 :         Self {
      91             685 :             initial_logical_size: OnceCell::new(),
      92             685 :             initial_size_computation: Arc::new(Semaphore::new(1)),
      93             685 :             initial_part_end: Some(compute_to),
      94             685 :             size_added_after_initial: AtomicI64::new(0),
      95             685 :         }
      96             685 :     }
      97                 : 
      98         1808627 :     pub(super) fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
      99         1808627 :         let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
     100         1808627 :         //                  ^^^ keep this type explicit so that the casts in this function break if
     101         1808627 :         //                  we change the type.
     102         1808627 :         match self.initial_logical_size.get() {
     103         1806418 :             Some(initial_size) => {
     104         1806418 :                 initial_size.checked_add_signed(size_increment)
     105         1806418 :                     .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
     106         1806418 :                     .map(CurrentLogicalSize::Exact)
     107                 :             }
     108                 :             None => {
     109            2209 :                 let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
     110            2209 :                 Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
     111                 :             }
     112                 :         }
     113         1808627 :     }
     114                 : 
     115         1029042 :     pub(super) fn increment_size(&self, delta: i64) {
     116         1029042 :         self.size_added_after_initial
     117         1029042 :             .fetch_add(delta, AtomicOrdering::SeqCst);
     118         1029042 :     }
     119                 : 
     120                 :     /// Make the value computed by initial logical size computation
     121                 :     /// available for re-use. This doesn't contain the incremental part.
     122                 :     pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
     123             400 :         match self.initial_part_end {
     124             400 :             Some(v) if v == lsn => self.initial_logical_size.get().copied(),
     125              40 :             _ => None,
     126                 :         }
     127             416 :     }
     128                 : }
        

Generated by: LCOV version 2.1-beta