LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter - limit_algorithm.rs (source / functions) Coverage Total Hit
Test: 02e8c57acd6e2b986849f552ca30280d54699b79.info Lines: 81.1 % 122 99
Test Date: 2024-06-26 17:13:54 Functions: 36.2 % 69 25

            Line data    Source code
       1              : //! Algorithms for controlling concurrency limits.
       2              : use parking_lot::Mutex;
       3              : use std::{pin::pin, sync::Arc, time::Duration};
       4              : use tokio::{
       5              :     sync::Notify,
       6              :     time::{error::Elapsed, Instant},
       7              : };
       8              : 
       9              : use self::aimd::Aimd;
      10              : 
      11              : pub mod aimd;
      12              : 
      13              : /// Whether a job succeeded or failed as a result of congestion/overload.
      14              : ///
      15              : /// Errors not considered to be caused by overload should be ignored.
      16              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
      17              : pub enum Outcome {
      18              :     /// The job succeeded, or failed in a way unrelated to overload.
      19              :     Success,
      20              :     /// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
      21              :     /// was observed.
      22              :     Overload,
      23              : }
      24              : 
      25              : /// An algorithm for controlling a concurrency limit.
      26              : pub trait LimitAlgorithm: Send + Sync + 'static {
      27              :     /// Update the concurrency limit in response to a new job completion.
      28              :     fn update(&self, old_limit: usize, sample: Sample) -> usize;
      29              : }
      30              : 
      31              : /// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
      32              : #[derive(Debug, Clone, PartialEq, Eq, Copy)]
      33              : pub struct Sample {
      34              :     pub(crate) latency: Duration,
      35              :     /// Jobs in flight when the sample was taken.
      36              :     pub(crate) in_flight: usize,
      37              :     pub(crate) outcome: Outcome,
      38              : }
      39              : 
      40            6 : #[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
      41              : #[serde(rename_all = "snake_case")]
      42              : pub enum RateLimitAlgorithm {
      43              :     #[default]
      44              :     Fixed,
      45              :     Aimd {
      46              :         #[serde(flatten)]
      47              :         conf: Aimd,
      48              :     },
      49              : }
      50              : 
      51              : pub struct Fixed;
      52              : 
      53              : impl LimitAlgorithm for Fixed {
      54            0 :     fn update(&self, old_limit: usize, _sample: Sample) -> usize {
      55            0 :         old_limit
      56            0 :     }
      57              : }
      58              : 
      59            6 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      60              : pub struct RateLimiterConfig {
      61              :     #[serde(flatten)]
      62              :     pub algorithm: RateLimitAlgorithm,
      63              :     pub initial_limit: usize,
      64              : }
      65              : 
      66              : impl RateLimiterConfig {
      67           12 :     pub fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
      68           12 :         match self.algorithm {
      69            0 :             RateLimitAlgorithm::Fixed => Box::new(Fixed),
      70           12 :             RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
      71              :         }
      72           12 :     }
      73              : }
      74              : 
      75              : pub struct LimiterInner {
      76              :     alg: Box<dyn LimitAlgorithm>,
      77              :     available: usize,
      78              :     limit: usize,
      79              :     in_flight: usize,
      80              : }
      81              : 
      82              : impl LimiterInner {
      83           22 :     fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
      84           22 :         if let Some(outcome) = outcome {
      85           16 :             let sample = Sample {
      86           16 :                 latency,
      87           16 :                 in_flight: self.in_flight,
      88           16 :                 outcome,
      89           16 :             };
      90           16 :             self.limit = self.alg.update(self.limit, sample);
      91           16 :         }
      92           22 :     }
      93              : 
      94           22 :     fn take(&mut self, ready: &Notify) -> Option<()> {
      95           22 :         if self.available >= 1 {
      96           22 :             self.available -= 1;
      97           22 :             self.in_flight += 1;
      98           22 : 
      99           22 :             // tell the next in the queue that there is a permit ready
     100           22 :             if self.available >= 1 {
     101           16 :                 ready.notify_one();
     102           16 :             }
     103           22 :             Some(())
     104              :         } else {
     105            0 :             None
     106              :         }
     107           22 :     }
     108              : }
     109              : 
     110              : /// Limits the number of concurrent jobs.
     111              : ///
     112              : /// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
     113              : /// token once the job is finished.
     114              : ///
     115              : /// The limit will be automatically adjusted based on observed latency (delay) and/or failures
     116              : /// caused by overload (loss).
     117              : pub struct DynamicLimiter {
     118              :     config: RateLimiterConfig,
     119              :     inner: Mutex<LimiterInner>,
     120              :     // to notify when a token is available
     121              :     ready: Notify,
     122              : }
     123              : 
     124              : /// A concurrency token, required to run a job.
     125              : ///
     126              : /// Release the token back to the [`DynamicLimiter`] after the job is complete.
     127              : pub struct Token {
     128              :     start: Instant,
     129              :     limiter: Option<Arc<DynamicLimiter>>,
     130              : }
     131              : 
     132              : /// A snapshot of the state of the [`DynamicLimiter`].
     133              : ///
     134              : /// Not guaranteed to be consistent under high concurrency.
     135              : #[derive(Debug, Clone, Copy)]
     136              : pub struct LimiterState {
     137              :     limit: usize,
     138              :     in_flight: usize,
     139              : }
     140              : 
     141              : impl DynamicLimiter {
     142              :     /// Create a limiter with a given limit control algorithm.
     143           12 :     pub fn new(config: RateLimiterConfig) -> Arc<Self> {
     144           12 :         let ready = Notify::new();
     145           12 :         ready.notify_one();
     146           12 : 
     147           12 :         Arc::new(Self {
     148           12 :             inner: Mutex::new(LimiterInner {
     149           12 :                 alg: config.create_rate_limit_algorithm(),
     150           12 :                 available: config.initial_limit,
     151           12 :                 limit: config.initial_limit,
     152           12 :                 in_flight: 0,
     153           12 :             }),
     154           12 :             ready,
     155           12 :             config,
     156           12 :         })
     157           12 :     }
     158              : 
     159              :     /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
     160           24 :     pub async fn acquire_timeout(self: &Arc<Self>, duration: Duration) -> Result<Token, Elapsed> {
     161           24 :         tokio::time::timeout(duration, self.acquire()).await?
     162           24 :     }
     163              : 
     164              :     /// Try to acquire a concurrency [Token].
     165           24 :     async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
     166           24 :         if self.config.initial_limit == 0 {
     167              :             // If the rate limiter is disabled, we can always acquire a token.
     168            0 :             Ok(Token::disabled())
     169              :         } else {
     170           24 :             let mut notified = pin!(self.ready.notified());
     171           24 :             let mut ready = notified.as_mut().enable();
     172              :             loop {
     173           24 :                 if ready {
     174           22 :                     let mut inner = self.inner.lock();
     175           22 :                     if inner.take(&self.ready).is_some() {
     176           22 :                         break Ok(Token::new(self.clone()));
     177            0 :                     } else {
     178            0 :                         notified.set(self.ready.notified());
     179            0 :                     }
     180            2 :                 }
     181            2 :                 notified.as_mut().await;
     182            0 :                 ready = true;
     183              :             }
     184              :         }
     185           22 :     }
     186              : 
     187              :     /// Return the concurrency [Token], along with the outcome of the job.
     188              :     ///
     189              :     /// The [Outcome] of the job, and the time taken to perform it, may be used
     190              :     /// to update the concurrency limit.
     191              :     ///
     192              :     /// Set the outcome to `None` to ignore the job.
     193           22 :     fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
     194           22 :         tracing::info!("outcome is {:?}", outcome);
     195           22 :         if self.config.initial_limit == 0 {
     196            0 :             return;
     197           22 :         }
     198           22 : 
     199           22 :         let mut inner = self.inner.lock();
     200           22 : 
     201           22 :         inner.update_limit(start.elapsed(), outcome);
     202           22 : 
     203           22 :         inner.in_flight -= 1;
     204           22 :         if inner.in_flight < inner.limit {
     205           22 :             inner.available = inner.limit - inner.in_flight;
     206           22 :             // At least 1 permit is now available
     207           22 :             self.ready.notify_one();
     208           22 :         }
     209           22 :     }
     210              : 
     211              :     /// The current state of the limiter.
     212           18 :     pub fn state(&self) -> LimiterState {
     213           18 :         let inner = self.inner.lock();
     214           18 :         LimiterState {
     215           18 :             limit: inner.limit,
     216           18 :             in_flight: inner.in_flight,
     217           18 :         }
     218           18 :     }
     219              : }
     220              : 
     221              : impl Token {
     222           22 :     fn new(limiter: Arc<DynamicLimiter>) -> Self {
     223           22 :         Self {
     224           22 :             start: Instant::now(),
     225           22 :             limiter: Some(limiter),
     226           22 :         }
     227           22 :     }
     228            0 :     pub fn disabled() -> Self {
     229            0 :         Self {
     230            0 :             start: Instant::now(),
     231            0 :             limiter: None,
     232            0 :         }
     233            0 :     }
     234              : 
     235            0 :     pub fn is_disabled(&self) -> bool {
     236            0 :         self.limiter.is_none()
     237            0 :     }
     238              : 
     239           16 :     pub fn release(mut self, outcome: Outcome) {
     240           16 :         self.release_mut(Some(outcome))
     241           16 :     }
     242              : 
     243           38 :     pub fn release_mut(&mut self, outcome: Option<Outcome>) {
     244           38 :         if let Some(limiter) = self.limiter.take() {
     245           22 :             limiter.release_inner(self.start, outcome);
     246           22 :         }
     247           38 :     }
     248              : }
     249              : 
     250              : impl Drop for Token {
     251           22 :     fn drop(&mut self) {
     252           22 :         self.release_mut(None)
     253           22 :     }
     254              : }
     255              : 
     256              : impl LimiterState {
     257              :     /// The current concurrency limit.
     258           18 :     pub fn limit(&self) -> usize {
     259           18 :         self.limit
     260           18 :     }
     261              :     /// The number of jobs in flight.
     262            0 :     pub fn in_flight(&self) -> usize {
     263            0 :         self.in_flight
     264            0 :     }
     265              : }
        

Generated by: LCOV version 2.1-beta