LCOV - code coverage report
Current view: top level - pageserver/src/tenant - throttle.rs (source / functions) Coverage Total Hit
Test: 7eb96e224e685167ad85f58f858387d8cf253f63.info Lines: 47.5 % 101 48
Test Date: 2024-09-23 21:23:07 Functions: 50.0 % 10 5

            Line data    Source code
       1              : use std::{
       2              :     str::FromStr,
       3              :     sync::{
       4              :         atomic::{AtomicU64, Ordering},
       5              :         Arc, Mutex,
       6              :     },
       7              :     time::{Duration, Instant},
       8              : };
       9              : 
      10              : use arc_swap::ArcSwap;
      11              : use enumset::EnumSet;
      12              : use tracing::{error, warn};
      13              : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
      14              : 
      15              : use crate::{context::RequestContext, task_mgr::TaskKind};
      16              : 
      17              : /// Throttle for `async` functions.
      18              : ///
      19              : /// Runtime reconfigurable.
      20              : ///
      21              : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
      22              : ///
      23              : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
      24              : pub struct Throttle<M: Metric> {
      25              :     inner: ArcSwap<Inner>,
      26              :     metric: M,
      27              :     /// will be turned into [`Stats::count_accounted_start`]
      28              :     count_accounted_start: AtomicU64,
      29              :     /// will be turned into [`Stats::count_accounted_finish`]
      30              :     count_accounted_finish: AtomicU64,
      31              :     /// will be turned into [`Stats::count_throttled`]
      32              :     count_throttled: AtomicU64,
      33              :     /// will be turned into [`Stats::sum_throttled_usecs`]
      34              :     sum_throttled_usecs: AtomicU64,
      35              : }
      36              : 
      37              : pub struct Inner {
      38              :     task_kinds: EnumSet<TaskKind>,
      39              :     rate_limiter: Arc<RateLimiter>,
      40              : }
      41              : 
      42              : pub type Config = pageserver_api::models::ThrottleConfig;
      43              : 
      44              : pub struct Observation {
      45              :     pub wait_time: Duration,
      46              : }
      47              : pub trait Metric {
      48              :     fn accounting_start(&self);
      49              :     fn accounting_finish(&self);
      50              :     fn observe_throttling(&self, observation: &Observation);
      51              : }
      52              : 
      53              : /// See [`Throttle::reset_stats`].
      54              : pub struct Stats {
      55              :     /// Number of requests that started [`Throttle::throttle`] calls.
      56              :     pub count_accounted_start: u64,
      57              :     /// Number of requests that finished [`Throttle::throttle`] calls.
      58              :     pub count_accounted_finish: u64,
      59              :     /// Subset of the `accounted` requests that were actually throttled.
      60              :     /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
      61              :     pub count_throttled: u64,
      62              :     /// Sum of microseconds that throttled requests spent waiting for throttling.
      63              :     pub sum_throttled_usecs: u64,
      64              : }
      65              : 
      66              : impl<M> Throttle<M>
      67              : where
      68              :     M: Metric,
      69              : {
      70          570 :     pub fn new(config: Config, metric: M) -> Self {
      71          570 :         Self {
      72          570 :             inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
      73          570 :             metric,
      74          570 :             count_accounted_start: AtomicU64::new(0),
      75          570 :             count_accounted_finish: AtomicU64::new(0),
      76          570 :             count_throttled: AtomicU64::new(0),
      77          570 :             sum_throttled_usecs: AtomicU64::new(0),
      78          570 :         }
      79          570 :     }
      80          594 :     fn new_inner(config: Config) -> Inner {
      81          594 :         let Config {
      82          594 :             task_kinds,
      83          594 :             initial,
      84          594 :             refill_interval,
      85          594 :             refill_amount,
      86          594 :             max,
      87          594 :         } = config;
      88          594 :         let task_kinds: EnumSet<TaskKind> = task_kinds
      89          594 :             .iter()
      90          594 :             .filter_map(|s| match TaskKind::from_str(s) {
      91            0 :                 Ok(v) => Some(v),
      92            0 :                 Err(e) => {
      93            0 :                     // TODO: avoid this failure mode
      94            0 :                     error!(
      95            0 :                         "cannot parse task kind, ignoring for rate limiting {}",
      96            0 :                         utils::error::report_compact_sources(&e)
      97              :                     );
      98            0 :                     None
      99              :                 }
     100          594 :             })
     101          594 :             .collect();
     102          594 : 
     103          594 :         // steady rate, we expect `refill_amount` requests per `refill_interval`.
     104          594 :         // dividing gives us the rps.
     105          594 :         let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
     106          594 :         let config = LeakyBucketConfig::new(rps, f64::from(max));
     107          594 : 
     108          594 :         // initial tracks how many tokens are available to put in the bucket
     109          594 :         // we want how many tokens are currently in the bucket
     110          594 :         let initial_tokens = max - initial;
     111          594 : 
     112          594 :         let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
     113          594 : 
     114          594 :         Inner {
     115          594 :             task_kinds,
     116          594 :             rate_limiter: Arc::new(rate_limiter),
     117          594 :         }
     118          594 :     }
     119           24 :     pub fn reconfigure(&self, config: Config) {
     120           24 :         self.inner.store(Arc::new(Self::new_inner(config)));
     121           24 :     }
     122              : 
     123              :     /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
     124              :     /// This method allows retrieving & resetting that flag.
     125              :     /// Useful for periodic reporting.
     126            0 :     pub fn reset_stats(&self) -> Stats {
     127            0 :         let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
     128            0 :         let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
     129            0 :         let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
     130            0 :         let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
     131            0 :         Stats {
     132            0 :             count_accounted_start,
     133            0 :             count_accounted_finish,
     134            0 :             count_throttled,
     135            0 :             sum_throttled_usecs,
     136            0 :         }
     137            0 :     }
     138              : 
     139              :     /// See [`Config::steady_rps`].
     140            0 :     pub fn steady_rps(&self) -> f64 {
     141            0 :         self.inner.load().rate_limiter.steady_rps()
     142            0 :     }
     143              : 
     144      1879563 :     pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
     145      1879563 :         let inner = self.inner.load_full(); // clones the `Inner` Arc
     146      1879563 :         if !inner.task_kinds.contains(ctx.task_kind()) {
     147      1879563 :             return None;
     148            0 :         };
     149            0 :         let start = std::time::Instant::now();
     150            0 : 
     151            0 :         self.metric.accounting_start();
     152            0 :         self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
     153            0 :         let did_throttle = inner.rate_limiter.acquire(key_count).await;
     154            0 :         self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
     155            0 :         self.metric.accounting_finish();
     156            0 : 
     157            0 :         if did_throttle {
     158            0 :             self.count_throttled.fetch_add(1, Ordering::Relaxed);
     159            0 :             let now = Instant::now();
     160            0 :             let wait_time = now - start;
     161            0 :             self.sum_throttled_usecs
     162            0 :                 .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
     163            0 :             let observation = Observation { wait_time };
     164            0 :             self.metric.observe_throttling(&observation);
     165            0 :             match ctx.micros_spent_throttled.add(wait_time) {
     166            0 :                 Ok(res) => res,
     167            0 :                 Err(error) => {
     168            0 :                     use once_cell::sync::Lazy;
     169            0 :                     use utils::rate_limit::RateLimit;
     170            0 :                     static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
     171            0 :                         Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
     172            0 :                     let mut guard = WARN_RATE_LIMIT.lock().unwrap();
     173            0 :                     guard.call(move || {
     174            0 :                         warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
     175            0 :                     });
     176            0 :                 }
     177              :             }
     178            0 :             Some(wait_time)
     179              :         } else {
     180            0 :             None
     181              :         }
     182      1879563 :     }
     183              : }
        

Generated by: LCOV version 2.1-beta