LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter - limit_algorithm.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 83.9 % 118 99
Test Date: 2024-09-20 13:14:58 Functions: 36.8 % 68 25

            Line data    Source code
       1              : //! Algorithms for controlling concurrency limits.
       2              : use parking_lot::Mutex;
       3              : use std::{pin::pin, sync::Arc, time::Duration};
       4              : use tokio::{
       5              :     sync::Notify,
       6              :     time::{error::Elapsed, Instant},
       7              : };
       8              : 
       9              : use self::aimd::Aimd;
      10              : 
      11              : pub(crate) mod aimd;
      12              : 
      13              : /// Whether a job succeeded or failed as a result of congestion/overload.
      14              : ///
      15              : /// Errors not considered to be caused by overload should be ignored.
      16              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
      17              : pub(crate) enum Outcome {
      18              :     /// The job succeeded, or failed in a way unrelated to overload.
      19              :     Success,
      20              :     /// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
      21              :     /// was observed.
      22              :     Overload,
      23              : }
      24              : 
      25              : /// An algorithm for controlling a concurrency limit.
      26              : pub(crate) trait LimitAlgorithm: Send + Sync + 'static {
      27              :     /// Update the concurrency limit in response to a new job completion.
      28              :     fn update(&self, old_limit: usize, sample: Sample) -> usize;
      29              : }
      30              : 
      31              : /// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
      32              : #[derive(Debug, Clone, PartialEq, Eq, Copy)]
      33              : pub(crate) struct Sample {
      34              :     pub(crate) latency: Duration,
      35              :     /// Jobs in flight when the sample was taken.
      36              :     pub(crate) in_flight: usize,
      37              :     pub(crate) outcome: Outcome,
      38              : }
      39              : 
      40            3 : #[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
      41              : #[serde(rename_all = "snake_case")]
      42              : pub(crate) enum RateLimitAlgorithm {
      43              :     #[default]
      44              :     Fixed,
      45              :     Aimd {
      46              :         #[serde(flatten)]
      47              :         conf: Aimd,
      48              :     },
      49              : }
      50              : 
      51              : pub(crate) struct Fixed;
      52              : 
      53              : impl LimitAlgorithm for Fixed {
      54            0 :     fn update(&self, old_limit: usize, _sample: Sample) -> usize {
      55            0 :         old_limit
      56            0 :     }
      57              : }
      58              : 
      59            3 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      60              : pub struct RateLimiterConfig {
      61              :     #[serde(flatten)]
      62              :     pub(crate) algorithm: RateLimitAlgorithm,
      63              :     pub(crate) initial_limit: usize,
      64              : }
      65              : 
      66              : impl RateLimiterConfig {
      67            6 :     pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
      68            6 :         match self.algorithm {
      69            0 :             RateLimitAlgorithm::Fixed => Box::new(Fixed),
      70            6 :             RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
      71              :         }
      72            6 :     }
      73              : }
      74              : 
      75              : pub(crate) struct LimiterInner {
      76              :     alg: Box<dyn LimitAlgorithm>,
      77              :     available: usize,
      78              :     limit: usize,
      79              :     in_flight: usize,
      80              : }
      81              : 
      82              : impl LimiterInner {
      83           11 :     fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
      84           11 :         if let Some(outcome) = outcome {
      85            8 :             let sample = Sample {
      86            8 :                 latency,
      87            8 :                 in_flight: self.in_flight,
      88            8 :                 outcome,
      89            8 :             };
      90            8 :             self.limit = self.alg.update(self.limit, sample);
      91            8 :         }
      92           11 :     }
      93              : 
      94           11 :     fn take(&mut self, ready: &Notify) -> Option<()> {
      95           11 :         if self.available >= 1 {
      96           11 :             self.available -= 1;
      97           11 :             self.in_flight += 1;
      98           11 : 
      99           11 :             // tell the next in the queue that there is a permit ready
     100           11 :             if self.available >= 1 {
     101            8 :                 ready.notify_one();
     102            8 :             }
     103           11 :             Some(())
     104              :         } else {
     105            0 :             None
     106              :         }
     107           11 :     }
     108              : }
     109              : 
     110              : /// Limits the number of concurrent jobs.
     111              : ///
     112              : /// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
     113              : /// token once the job is finished.
     114              : ///
     115              : /// The limit will be automatically adjusted based on observed latency (delay) and/or failures
     116              : /// caused by overload (loss).
     117              : pub(crate) struct DynamicLimiter {
     118              :     config: RateLimiterConfig,
     119              :     inner: Mutex<LimiterInner>,
     120              :     // to notify when a token is available
     121              :     ready: Notify,
     122              : }
     123              : 
     124              : /// A concurrency token, required to run a job.
     125              : ///
     126              : /// Release the token back to the [`DynamicLimiter`] after the job is complete.
     127              : pub(crate) struct Token {
     128              :     start: Instant,
     129              :     limiter: Option<Arc<DynamicLimiter>>,
     130              : }
     131              : 
     132              : /// A snapshot of the state of the [`DynamicLimiter`].
     133              : ///
     134              : /// Not guaranteed to be consistent under high concurrency.
     135              : #[derive(Debug, Clone, Copy)]
     136              : #[cfg(test)]
     137              : struct LimiterState {
     138              :     limit: usize,
     139              : }
     140              : 
     141              : impl DynamicLimiter {
     142              :     /// Create a limiter with a given limit control algorithm.
     143            6 :     pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
     144            6 :         let ready = Notify::new();
     145            6 :         ready.notify_one();
     146            6 : 
     147            6 :         Arc::new(Self {
     148            6 :             inner: Mutex::new(LimiterInner {
     149            6 :                 alg: config.create_rate_limit_algorithm(),
     150            6 :                 available: config.initial_limit,
     151            6 :                 limit: config.initial_limit,
     152            6 :                 in_flight: 0,
     153            6 :             }),
     154            6 :             ready,
     155            6 :             config,
     156            6 :         })
     157            6 :     }
     158              : 
     159              :     /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
     160           12 :     pub(crate) async fn acquire_timeout(
     161           12 :         self: &Arc<Self>,
     162           12 :         duration: Duration,
     163           12 :     ) -> Result<Token, Elapsed> {
     164           12 :         tokio::time::timeout(duration, self.acquire()).await?
     165           12 :     }
     166              : 
     167              :     /// Try to acquire a concurrency [Token].
     168           12 :     async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
     169           12 :         if self.config.initial_limit == 0 {
     170              :             // If the rate limiter is disabled, we can always acquire a token.
     171            0 :             Ok(Token::disabled())
     172              :         } else {
     173           12 :             let mut notified = pin!(self.ready.notified());
     174           12 :             let mut ready = notified.as_mut().enable();
     175              :             loop {
     176           12 :                 if ready {
     177           11 :                     let mut inner = self.inner.lock();
     178           11 :                     if inner.take(&self.ready).is_some() {
     179           11 :                         break Ok(Token::new(self.clone()));
     180            0 :                     }
     181            0 :                     notified.set(self.ready.notified());
     182            1 :                 }
     183            1 :                 notified.as_mut().await;
     184            0 :                 ready = true;
     185              :             }
     186              :         }
     187           11 :     }
     188              : 
     189              :     /// Return the concurrency [Token], along with the outcome of the job.
     190              :     ///
     191              :     /// The [Outcome] of the job, and the time taken to perform it, may be used
     192              :     /// to update the concurrency limit.
     193              :     ///
     194              :     /// Set the outcome to `None` to ignore the job.
     195           11 :     fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
     196           11 :         tracing::info!("outcome is {:?}", outcome);
     197           11 :         if self.config.initial_limit == 0 {
     198            0 :             return;
     199           11 :         }
     200           11 : 
     201           11 :         let mut inner = self.inner.lock();
     202           11 : 
     203           11 :         inner.update_limit(start.elapsed(), outcome);
     204           11 : 
     205           11 :         inner.in_flight -= 1;
     206           11 :         if inner.in_flight < inner.limit {
     207           11 :             inner.available = inner.limit - inner.in_flight;
     208           11 :             // At least 1 permit is now available
     209           11 :             self.ready.notify_one();
     210           11 :         }
     211           11 :     }
     212              : 
     213              :     /// The current state of the limiter.
     214              :     #[cfg(test)]
     215            9 :     fn state(&self) -> LimiterState {
     216            9 :         let inner = self.inner.lock();
     217            9 :         LimiterState { limit: inner.limit }
     218            9 :     }
     219              : }
     220              : 
     221              : impl Token {
     222           11 :     fn new(limiter: Arc<DynamicLimiter>) -> Self {
     223           11 :         Self {
     224           11 :             start: Instant::now(),
     225           11 :             limiter: Some(limiter),
     226           11 :         }
     227           11 :     }
     228            0 :     pub(crate) fn disabled() -> Self {
     229            0 :         Self {
     230            0 :             start: Instant::now(),
     231            0 :             limiter: None,
     232            0 :         }
     233            0 :     }
     234              : 
     235            0 :     pub(crate) fn is_disabled(&self) -> bool {
     236            0 :         self.limiter.is_none()
     237            0 :     }
     238              : 
     239            8 :     pub(crate) fn release(mut self, outcome: Outcome) {
     240            8 :         self.release_mut(Some(outcome));
     241            8 :     }
     242              : 
     243           19 :     pub(crate) fn release_mut(&mut self, outcome: Option<Outcome>) {
     244           19 :         if let Some(limiter) = self.limiter.take() {
     245           11 :             limiter.release_inner(self.start, outcome);
     246           11 :         }
     247           19 :     }
     248              : }
     249              : 
     250              : impl Drop for Token {
     251           11 :     fn drop(&mut self) {
     252           11 :         self.release_mut(None);
     253           11 :     }
     254              : }
     255              : 
     256              : #[cfg(test)]
     257              : impl LimiterState {
     258              :     /// The current concurrency limit.
     259            9 :     fn limit(self) -> usize {
     260            9 :         self.limit
     261            9 :     }
     262              : }
        

Generated by: LCOV version 2.1-beta