LCOV - code coverage report
Current view: top level - pageserver/src/tenant - throttle.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 44.3 % 79 35
Test Date: 2025-01-07 20:58:07 Functions: 28.6 % 7 2

            Line data    Source code
       1              : use std::{
       2              :     sync::{
       3              :         atomic::{AtomicU64, Ordering},
       4              :         Arc,
       5              :     },
       6              :     time::{Duration, Instant},
       7              : };
       8              : 
       9              : use arc_swap::ArcSwap;
      10              : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
      11              : 
      12              : /// Throttle for `async` functions.
      13              : ///
      14              : /// Runtime reconfigurable.
      15              : ///
      16              : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
      17              : ///
      18              : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
      19              : pub struct Throttle<M: Metric> {
      20              :     inner: ArcSwap<Inner>,
      21              :     metric: M,
      22              :     /// will be turned into [`Stats::count_accounted_start`]
      23              :     count_accounted_start: AtomicU64,
      24              :     /// will be turned into [`Stats::count_accounted_finish`]
      25              :     count_accounted_finish: AtomicU64,
      26              :     /// will be turned into [`Stats::count_throttled`]
      27              :     count_throttled: AtomicU64,
      28              :     /// will be turned into [`Stats::sum_throttled_usecs`]
      29              :     sum_throttled_usecs: AtomicU64,
      30              : }
      31              : 
      32              : pub struct Inner {
      33              :     enabled: bool,
      34              :     rate_limiter: Arc<RateLimiter>,
      35              : }
      36              : 
      37              : pub type Config = pageserver_api::models::ThrottleConfig;
      38              : 
      39              : pub struct Observation {
      40              :     pub wait_time: Duration,
      41              : }
      42              : pub trait Metric {
      43              :     fn accounting_start(&self);
      44              :     fn accounting_finish(&self);
      45              :     fn observe_throttling(&self, observation: &Observation);
      46              : }
      47              : 
      48              : /// See [`Throttle::reset_stats`].
      49              : pub struct Stats {
      50              :     /// Number of requests that started [`Throttle::throttle`] calls.
      51              :     pub count_accounted_start: u64,
      52              :     /// Number of requests that finished [`Throttle::throttle`] calls.
      53              :     pub count_accounted_finish: u64,
      54              :     /// Subset of the `accounted` requests that were actually throttled.
      55              :     /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
      56              :     pub count_throttled: u64,
      57              :     /// Sum of microseconds that throttled requests spent waiting for throttling.
      58              :     pub sum_throttled_usecs: u64,
      59              : }
      60              : 
      61              : pub enum ThrottleResult {
      62              :     NotThrottled { start: Instant },
      63              :     Throttled { start: Instant, end: Instant },
      64              : }
      65              : 
      66              : impl<M> Throttle<M>
      67              : where
      68              :     M: Metric,
      69              : {
      70          196 :     pub fn new(config: Config, metric: M) -> Self {
      71          196 :         Self {
      72          196 :             inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
      73          196 :             metric,
      74          196 :             count_accounted_start: AtomicU64::new(0),
      75          196 :             count_accounted_finish: AtomicU64::new(0),
      76          196 :             count_throttled: AtomicU64::new(0),
      77          196 :             sum_throttled_usecs: AtomicU64::new(0),
      78          196 :         }
      79          196 :     }
      80          196 :     fn new_inner(config: Config) -> Inner {
      81          196 :         let Config {
      82          196 :             enabled,
      83          196 :             initial,
      84          196 :             refill_interval,
      85          196 :             refill_amount,
      86          196 :             max,
      87          196 :         } = config;
      88          196 : 
      89          196 :         // steady rate, we expect `refill_amount` requests per `refill_interval`.
      90          196 :         // dividing gives us the rps.
      91          196 :         let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
      92          196 :         let config = LeakyBucketConfig::new(rps, f64::from(max));
      93          196 : 
      94          196 :         // initial tracks how many tokens are available to put in the bucket
      95          196 :         // we want how many tokens are currently in the bucket
      96          196 :         let initial_tokens = max - initial;
      97          196 : 
      98          196 :         let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
      99          196 : 
     100          196 :         Inner {
     101          196 :             enabled: enabled.is_enabled(),
     102          196 :             rate_limiter: Arc::new(rate_limiter),
     103          196 :         }
     104          196 :     }
     105            0 :     pub fn reconfigure(&self, config: Config) {
     106            0 :         self.inner.store(Arc::new(Self::new_inner(config)));
     107            0 :     }
     108              : 
     109              :     /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
     110              :     /// This method allows retrieving & resetting that flag.
     111              :     /// Useful for periodic reporting.
     112            0 :     pub fn reset_stats(&self) -> Stats {
     113            0 :         let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
     114            0 :         let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
     115            0 :         let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
     116            0 :         let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
     117            0 :         Stats {
     118            0 :             count_accounted_start,
     119            0 :             count_accounted_finish,
     120            0 :             count_throttled,
     121            0 :             sum_throttled_usecs,
     122            0 :         }
     123            0 :     }
     124              : 
     125              :     /// See [`Config::steady_rps`].
     126            0 :     pub fn steady_rps(&self) -> f64 {
     127            0 :         self.inner.load().rate_limiter.steady_rps()
     128            0 :     }
     129              : 
     130            0 :     pub async fn throttle(&self, key_count: usize) -> ThrottleResult {
     131            0 :         let inner = self.inner.load_full(); // clones the `Inner` Arc
     132            0 : 
     133            0 :         let start = std::time::Instant::now();
     134            0 : 
     135            0 :         if !inner.enabled {
     136            0 :             return ThrottleResult::NotThrottled { start };
     137            0 :         }
     138            0 : 
     139            0 :         self.metric.accounting_start();
     140            0 :         self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
     141            0 :         let did_throttle = inner.rate_limiter.acquire(key_count).await;
     142            0 :         self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
     143            0 :         self.metric.accounting_finish();
     144            0 : 
     145            0 :         if did_throttle {
     146            0 :             self.count_throttled.fetch_add(1, Ordering::Relaxed);
     147            0 :             let now = Instant::now();
     148            0 :             let wait_time = now - start;
     149            0 :             self.sum_throttled_usecs
     150            0 :                 .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
     151            0 :             let observation = Observation { wait_time };
     152            0 :             self.metric.observe_throttling(&observation);
     153            0 :             ThrottleResult::Throttled { start, end: now }
     154              :         } else {
     155            0 :             ThrottleResult::NotThrottled { start }
     156              :         }
     157            0 :     }
     158              : }
        

Generated by: LCOV version 2.1-beta