LCOV - code coverage report
Current view: top level - pageserver/src/tenant - throttle.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 47.2 % 72 34
Test Date: 2025-03-12 00:01:28 Functions: 28.6 % 7 2

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::sync::atomic::{AtomicU64, Ordering};
       3              : use std::time::Instant;
       4              : 
       5              : use arc_swap::ArcSwap;
       6              : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
       7              : 
       8              : /// Throttle for `async` functions.
       9              : ///
      10              : /// Runtime reconfigurable.
      11              : ///
      12              : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
      13              : ///
      14              : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
      15              : pub struct Throttle {
      16              :     inner: ArcSwap<Inner>,
      17              :     /// will be turned into [`Stats::count_accounted_start`]
      18              :     count_accounted_start: AtomicU64,
      19              :     /// will be turned into [`Stats::count_accounted_finish`]
      20              :     count_accounted_finish: AtomicU64,
      21              :     /// will be turned into [`Stats::count_throttled`]
      22              :     count_throttled: AtomicU64,
      23              :     /// will be turned into [`Stats::sum_throttled_usecs`]
      24              :     sum_throttled_usecs: AtomicU64,
      25              : }
      26              : 
      27              : pub struct Inner {
      28              :     enabled: bool,
      29              :     rate_limiter: Arc<RateLimiter>,
      30              : }
      31              : 
      32              : pub type Config = pageserver_api::models::ThrottleConfig;
      33              : 
      34              : /// See [`Throttle::reset_stats`].
      35              : pub struct Stats {
      36              :     /// Number of requests that started [`Throttle::throttle`] calls.
      37              :     pub count_accounted_start: u64,
      38              :     /// Number of requests that finished [`Throttle::throttle`] calls.
      39              :     pub count_accounted_finish: u64,
      40              :     /// Subset of the `accounted` requests that were actually throttled.
      41              :     /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
      42              :     pub count_throttled: u64,
      43              :     /// Sum of microseconds that throttled requests spent waiting for throttling.
      44              :     pub sum_throttled_usecs: u64,
      45              : }
      46              : 
      47              : pub enum ThrottleResult {
      48              :     NotThrottled { end: Instant },
      49              :     Throttled { end: Instant },
      50              : }
      51              : 
      52              : impl Throttle {
      53          452 :     pub fn new(config: Config) -> Self {
      54          452 :         Self {
      55          452 :             inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
      56          452 :             count_accounted_start: AtomicU64::new(0),
      57          452 :             count_accounted_finish: AtomicU64::new(0),
      58          452 :             count_throttled: AtomicU64::new(0),
      59          452 :             sum_throttled_usecs: AtomicU64::new(0),
      60          452 :         }
      61          452 :     }
      62          452 :     fn new_inner(config: Config) -> Inner {
      63          452 :         let Config {
      64          452 :             enabled,
      65          452 :             initial,
      66          452 :             refill_interval,
      67          452 :             refill_amount,
      68          452 :             max,
      69          452 :         } = config;
      70          452 : 
      71          452 :         // steady rate, we expect `refill_amount` requests per `refill_interval`.
      72          452 :         // dividing gives us the rps.
      73          452 :         let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
      74          452 :         let config = LeakyBucketConfig::new(rps, f64::from(max));
      75          452 : 
      76          452 :         // initial tracks how many tokens are available to put in the bucket
      77          452 :         // we want how many tokens are currently in the bucket
      78          452 :         let initial_tokens = max - initial;
      79          452 : 
      80          452 :         let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
      81          452 : 
      82          452 :         Inner {
      83          452 :             enabled: enabled.is_enabled(),
      84          452 :             rate_limiter: Arc::new(rate_limiter),
      85          452 :         }
      86          452 :     }
      87            0 :     pub fn reconfigure(&self, config: Config) {
      88            0 :         self.inner.store(Arc::new(Self::new_inner(config)));
      89            0 :     }
      90              : 
      91              :     /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
      92              :     /// This method allows retrieving & resetting that flag.
      93              :     /// Useful for periodic reporting.
      94            0 :     pub fn reset_stats(&self) -> Stats {
      95            0 :         let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
      96            0 :         let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
      97            0 :         let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
      98            0 :         let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
      99            0 :         Stats {
     100            0 :             count_accounted_start,
     101            0 :             count_accounted_finish,
     102            0 :             count_throttled,
     103            0 :             sum_throttled_usecs,
     104            0 :         }
     105            0 :     }
     106              : 
     107              :     /// See [`Config::steady_rps`].
     108            0 :     pub fn steady_rps(&self) -> f64 {
     109            0 :         self.inner.load().rate_limiter.steady_rps()
     110            0 :     }
     111              : 
     112              :     /// `start` must be [`Instant::now`] or earlier.
     113            0 :     pub async fn throttle(&self, key_count: usize, start: Instant) -> ThrottleResult {
     114            0 :         let inner = self.inner.load_full(); // clones the `Inner` Arc
     115            0 : 
     116            0 :         if !inner.enabled {
     117            0 :             return ThrottleResult::NotThrottled { end: start };
     118            0 :         }
     119            0 : 
     120            0 :         self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
     121            0 :         let did_throttle = inner.rate_limiter.acquire(key_count).await;
     122            0 :         self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
     123            0 : 
     124            0 :         if did_throttle {
     125            0 :             self.count_throttled.fetch_add(1, Ordering::Relaxed);
     126            0 :             let end = Instant::now();
     127            0 :             let wait_time = end - start;
     128            0 :             self.sum_throttled_usecs
     129            0 :                 .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
     130            0 :             ThrottleResult::Throttled { end }
     131              :         } else {
     132            0 :             ThrottleResult::NotThrottled { end: start }
     133              :         }
     134            0 :     }
     135              : }
        

Generated by: LCOV version 2.1-beta