LCOV - code coverage report
Current view: top level - pageserver/src/tenant - throttle.rs (source / functions) Coverage Total Hit
Test: f315fff9081e73c943c49dad06e5cc7779937e0b.info Lines: 44.1 % 102 45
Test Date: 2024-06-26 14:26:29 Functions: 45.5 % 11 5

            Line data    Source code
       1              : use std::{
       2              :     str::FromStr,
       3              :     sync::{
       4              :         atomic::{AtomicU64, Ordering},
       5              :         Arc, Mutex,
       6              :     },
       7              :     time::{Duration, Instant},
       8              : };
       9              : 
      10              : use arc_swap::ArcSwap;
      11              : use enumset::EnumSet;
      12              : use tracing::{error, warn};
      13              : 
      14              : use crate::{context::RequestContext, task_mgr::TaskKind};
      15              : 
      16              : /// Throttle for `async` functions.
      17              : ///
      18              : /// Runtime reconfigurable.
      19              : ///
      20              : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
      21              : ///
      22              : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
      23              : pub struct Throttle<M: Metric> {
      24              :     inner: ArcSwap<Inner>,
      25              :     metric: M,
      26              :     /// will be turned into [`Stats::count_accounted`]
      27              :     count_accounted: AtomicU64,
      28              :     /// will be turned into [`Stats::count_throttled`]
      29              :     count_throttled: AtomicU64,
      30              :     /// will be turned into [`Stats::sum_throttled_usecs`]
      31              :     sum_throttled_usecs: AtomicU64,
      32              : }
      33              : 
      34              : pub struct Inner {
      35              :     task_kinds: EnumSet<TaskKind>,
      36              :     rate_limiter: Arc<leaky_bucket::RateLimiter>,
      37              :     config: Config,
      38              : }
      39              : 
      40              : pub type Config = pageserver_api::models::ThrottleConfig;
      41              : 
      42              : pub struct Observation {
      43              :     pub wait_time: Duration,
      44              : }
      45              : pub trait Metric {
      46              :     fn observe_throttling(&self, observation: &Observation);
      47              : }
      48              : 
      49              : /// See [`Throttle::reset_stats`].
      50              : pub struct Stats {
      51              :     // Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`].
      52              :     pub count_accounted: u64,
      53              :     // Subset of the `accounted` requests that were actually throttled.
      54              :     // Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
      55              :     pub count_throttled: u64,
      56              :     // Sum of microseconds that throttled requests spent waiting for throttling.
      57              :     pub sum_throttled_usecs: u64,
      58              : }
      59              : 
      60              : impl<M> Throttle<M>
      61              : where
      62              :     M: Metric,
      63              : {
      64          161 :     pub fn new(config: Config, metric: M) -> Self {
      65          161 :         Self {
      66          161 :             inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
      67          161 :             metric,
      68          161 :             count_accounted: AtomicU64::new(0),
      69          161 :             count_throttled: AtomicU64::new(0),
      70          161 :             sum_throttled_usecs: AtomicU64::new(0),
      71          161 :         }
      72          161 :     }
      73          169 :     fn new_inner(config: Config) -> Inner {
      74          169 :         let Config {
      75          169 :             task_kinds,
      76          169 :             initial,
      77          169 :             refill_interval,
      78          169 :             refill_amount,
      79          169 :             max,
      80          169 :             fair,
      81          169 :         } = &config;
      82          169 :         let task_kinds: EnumSet<TaskKind> = task_kinds
      83          169 :             .iter()
      84          169 :             .filter_map(|s| match TaskKind::from_str(s) {
      85            0 :                 Ok(v) => Some(v),
      86            0 :                 Err(e) => {
      87            0 :                     // TODO: avoid this failure mode
      88            0 :                     error!(
      89            0 :                         "cannot parse task kind, ignoring for rate limiting {}",
      90            0 :                         utils::error::report_compact_sources(&e)
      91              :                     );
      92            0 :                     None
      93              :                 }
      94          169 :             })
      95          169 :             .collect();
      96          169 :         Inner {
      97          169 :             task_kinds,
      98          169 :             rate_limiter: Arc::new(
      99          169 :                 leaky_bucket::RateLimiter::builder()
     100          169 :                     .initial(*initial)
     101          169 :                     .interval(*refill_interval)
     102          169 :                     .refill(refill_amount.get())
     103          169 :                     .max(*max)
     104          169 :                     .fair(*fair)
     105          169 :                     .build(),
     106          169 :             ),
     107          169 :             config,
     108          169 :         }
     109          169 :     }
     110            8 :     pub fn reconfigure(&self, config: Config) {
     111            8 :         self.inner.store(Arc::new(Self::new_inner(config)));
     112            8 :     }
     113              : 
     114              :     /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
     115              :     /// This method allows retrieving & resetting that flag.
     116              :     /// Useful for periodic reporting.
     117            0 :     pub fn reset_stats(&self) -> Stats {
     118            0 :         let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed);
     119            0 :         let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
     120            0 :         let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
     121            0 :         Stats {
     122            0 :             count_accounted,
     123            0 :             count_throttled,
     124            0 :             sum_throttled_usecs,
     125            0 :         }
     126            0 :     }
     127              : 
     128              :     /// See [`Config::steady_rps`].
     129            0 :     pub fn steady_rps(&self) -> f64 {
     130            0 :         self.inner.load().config.steady_rps()
     131            0 :     }
     132              : 
     133       625272 :     pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
     134       625272 :         let inner = self.inner.load_full(); // clones the `Inner` Arc
     135       625272 :         if !inner.task_kinds.contains(ctx.task_kind()) {
     136       625272 :             return None;
     137            0 :         };
     138            0 :         let start = std::time::Instant::now();
     139            0 :         let mut did_throttle = false;
     140            0 :         let acquire = inner.rate_limiter.acquire(key_count);
     141            0 :         // turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate
     142            0 :         let acquire = tokio::task::unconstrained(acquire);
     143            0 :         let mut acquire = std::pin::pin!(acquire);
     144            0 :         std::future::poll_fn(|cx| {
     145            0 :             use std::future::Future;
     146            0 :             let poll = acquire.as_mut().poll(cx);
     147            0 :             did_throttle = did_throttle || poll.is_pending();
     148            0 :             poll
     149            0 :         })
     150            0 :         .await;
     151            0 :         self.count_accounted.fetch_add(1, Ordering::Relaxed);
     152            0 :         if did_throttle {
     153            0 :             self.count_throttled.fetch_add(1, Ordering::Relaxed);
     154            0 :             let now = Instant::now();
     155            0 :             let wait_time = now - start;
     156            0 :             self.sum_throttled_usecs
     157            0 :                 .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
     158            0 :             let observation = Observation { wait_time };
     159            0 :             self.metric.observe_throttling(&observation);
     160            0 :             match ctx.micros_spent_throttled.add(wait_time) {
     161            0 :                 Ok(res) => res,
     162            0 :                 Err(error) => {
     163            0 :                     use once_cell::sync::Lazy;
     164            0 :                     use utils::rate_limit::RateLimit;
     165            0 :                     static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
     166            0 :                         Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
     167            0 :                     let mut guard = WARN_RATE_LIMIT.lock().unwrap();
     168            0 :                     guard.call(move || {
     169            0 :                         warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
     170            0 :                     });
     171            0 :                 }
     172              :             }
     173            0 :             Some(wait_time)
     174              :         } else {
     175            0 :             None
     176              :         }
     177       625272 :     }
     178              : }
        

Generated by: LCOV version 2.1-beta