|             Line data    Source code 
       1              : //! Algorithms for controlling concurrency limits.
       2              : use std::pin::pin;
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use parking_lot::Mutex;
       7              : use tokio::sync::Notify;
       8              : use tokio::time::error::Elapsed;
       9              : use tokio::time::Instant;
      10              : 
      11              : use self::aimd::Aimd;
      12              : 
      13              : pub(crate) mod aimd;
      14              : 
      15              : /// Whether a job succeeded or failed as a result of congestion/overload.
      16              : ///
      17              : /// Errors not considered to be caused by overload should be ignored.
      18              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
      19              : pub(crate) enum Outcome {
      20              :     /// The job succeeded, or failed in a way unrelated to overload.
      21              :     Success,
      22              :     /// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
      23              :     /// was observed.
      24              :     Overload,
      25              : }
      26              : 
      27              : /// An algorithm for controlling a concurrency limit.
      28              : pub(crate) trait LimitAlgorithm: Send + Sync + 'static {
      29              :     /// Update the concurrency limit in response to a new job completion.
      30              :     fn update(&self, old_limit: usize, sample: Sample) -> usize;
      31              : }
      32              : 
      33              : /// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
      34              : #[derive(Debug, Clone, PartialEq, Eq, Copy)]
      35              : pub(crate) struct Sample {
      36              :     pub(crate) latency: Duration,
      37              :     /// Jobs in flight when the sample was taken.
      38              :     pub(crate) in_flight: usize,
      39              :     pub(crate) outcome: Outcome,
      40              : }
      41              : 
      42           12 : #[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
      43              : #[serde(rename_all = "snake_case")]
      44              : pub(crate) enum RateLimitAlgorithm {
      45              :     #[default]
      46              :     Fixed,
      47              :     Aimd {
      48              :         #[serde(flatten)]
      49              :         conf: Aimd,
      50              :     },
      51              : }
      52              : 
      53              : pub(crate) struct Fixed;
      54              : 
      55              : impl LimitAlgorithm for Fixed {
      56            0 :     fn update(&self, old_limit: usize, _sample: Sample) -> usize {
      57            0 :         old_limit
      58            0 :     }
      59              : }
      60              : 
      61            2 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      62              : pub struct RateLimiterConfig {
      63              :     #[serde(flatten)]
      64              :     pub(crate) algorithm: RateLimitAlgorithm,
      65              :     pub(crate) initial_limit: usize,
      66              : }
      67              : 
      68              : impl RateLimiterConfig {
      69            6 :     pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
      70            6 :         match self.algorithm {
      71            0 :             RateLimitAlgorithm::Fixed => Box::new(Fixed),
      72            6 :             RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
      73              :         }
      74            6 :     }
      75              : }
      76              : 
      77              : pub(crate) struct LimiterInner {
      78              :     alg: Box<dyn LimitAlgorithm>,
      79              :     available: usize,
      80              :     limit: usize,
      81              :     in_flight: usize,
      82              : }
      83              : 
      84              : impl LimiterInner {
      85           11 :     fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
      86           11 :         if let Some(outcome) = outcome {
      87            8 :             let sample = Sample {
      88            8 :                 latency,
      89            8 :                 in_flight: self.in_flight,
      90            8 :                 outcome,
      91            8 :             };
      92            8 :             self.limit = self.alg.update(self.limit, sample);
      93            8 :         }
      94           11 :     }
      95              : 
      96           11 :     fn take(&mut self, ready: &Notify) -> Option<()> {
      97           11 :         if self.available >= 1 {
      98           11 :             self.available -= 1;
      99           11 :             self.in_flight += 1;
     100           11 : 
     101           11 :             // tell the next in the queue that there is a permit ready
     102           11 :             if self.available >= 1 {
     103            8 :                 ready.notify_one();
     104            8 :             }
     105           11 :             Some(())
     106              :         } else {
     107            0 :             None
     108              :         }
     109           11 :     }
     110              : }
     111              : 
     112              : /// Limits the number of concurrent jobs.
     113              : ///
     114              : /// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
     115              : /// token once the job is finished.
     116              : ///
     117              : /// The limit will be automatically adjusted based on observed latency (delay) and/or failures
     118              : /// caused by overload (loss).
     119              : pub(crate) struct DynamicLimiter {
     120              :     config: RateLimiterConfig,
     121              :     inner: Mutex<LimiterInner>,
     122              :     // to notify when a token is available
     123              :     ready: Notify,
     124              : }
     125              : 
     126              : /// A concurrency token, required to run a job.
     127              : ///
     128              : /// Release the token back to the [`DynamicLimiter`] after the job is complete.
     129              : pub(crate) struct Token {
     130              :     start: Instant,
     131              :     limiter: Option<Arc<DynamicLimiter>>,
     132              : }
     133              : 
     134              : /// A snapshot of the state of the [`DynamicLimiter`].
     135              : ///
     136              : /// Not guaranteed to be consistent under high concurrency.
     137              : #[derive(Debug, Clone, Copy)]
     138              : #[cfg(test)]
     139              : struct LimiterState {
     140              :     limit: usize,
     141              : }
     142              : 
     143              : impl DynamicLimiter {
     144              :     /// Create a limiter with a given limit control algorithm.
     145            6 :     pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
     146            6 :         let ready = Notify::new();
     147            6 :         ready.notify_one();
     148            6 : 
     149            6 :         Arc::new(Self {
     150            6 :             inner: Mutex::new(LimiterInner {
     151            6 :                 alg: config.create_rate_limit_algorithm(),
     152            6 :                 available: config.initial_limit,
     153            6 :                 limit: config.initial_limit,
     154            6 :                 in_flight: 0,
     155            6 :             }),
     156            6 :             ready,
     157            6 :             config,
     158            6 :         })
     159            6 :     }
     160              : 
     161              :     /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
     162           12 :     pub(crate) async fn acquire_timeout(
     163           12 :         self: &Arc<Self>,
     164           12 :         duration: Duration,
     165           12 :     ) -> Result<Token, Elapsed> {
     166           12 :         tokio::time::timeout(duration, self.acquire()).await?
     167           12 :     }
     168              : 
     169              :     /// Try to acquire a concurrency [Token].
     170           12 :     async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
     171           12 :         if self.config.initial_limit == 0 {
     172              :             // If the rate limiter is disabled, we can always acquire a token.
     173            0 :             Ok(Token::disabled())
     174              :         } else {
     175           12 :             let mut notified = pin!(self.ready.notified());
     176           12 :             let mut ready = notified.as_mut().enable();
     177              :             loop {
     178           12 :                 if ready {
     179           11 :                     let mut inner = self.inner.lock();
     180           11 :                     if inner.take(&self.ready).is_some() {
     181           11 :                         break Ok(Token::new(self.clone()));
     182            0 :                     }
     183            0 :                     notified.set(self.ready.notified());
     184            1 :                 }
     185            1 :                 notified.as_mut().await;
     186            0 :                 ready = true;
     187              :             }
     188              :         }
     189           11 :     }
     190              : 
     191              :     /// Return the concurrency [Token], along with the outcome of the job.
     192              :     ///
     193              :     /// The [Outcome] of the job, and the time taken to perform it, may be used
     194              :     /// to update the concurrency limit.
     195              :     ///
     196              :     /// Set the outcome to `None` to ignore the job.
     197           11 :     fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
     198           11 :         if outcome.is_none() {
     199            3 :             tracing::warn!("outcome is {:?}", outcome);
     200              :         } else {
     201            8 :             tracing::debug!("outcome is {:?}", outcome);
     202              :         }
     203           11 :         if self.config.initial_limit == 0 {
     204            0 :             return;
     205           11 :         }
     206           11 : 
     207           11 :         let mut inner = self.inner.lock();
     208           11 : 
     209           11 :         inner.update_limit(start.elapsed(), outcome);
     210           11 : 
     211           11 :         inner.in_flight -= 1;
     212           11 :         if inner.in_flight < inner.limit {
     213           11 :             inner.available = inner.limit - inner.in_flight;
     214           11 :             // At least 1 permit is now available
     215           11 :             self.ready.notify_one();
     216           11 :         }
     217           11 :     }
     218              : 
     219              :     /// The current state of the limiter.
     220              :     #[cfg(test)]
     221            9 :     fn state(&self) -> LimiterState {
     222            9 :         let inner = self.inner.lock();
     223            9 :         LimiterState { limit: inner.limit }
     224            9 :     }
     225              : }
     226              : 
     227              : impl Token {
     228           11 :     fn new(limiter: Arc<DynamicLimiter>) -> Self {
     229           11 :         Self {
     230           11 :             start: Instant::now(),
     231           11 :             limiter: Some(limiter),
     232           11 :         }
     233           11 :     }
     234            0 :     pub(crate) fn disabled() -> Self {
     235            0 :         Self {
     236            0 :             start: Instant::now(),
     237            0 :             limiter: None,
     238            0 :         }
     239            0 :     }
     240              : 
     241            0 :     pub(crate) fn is_disabled(&self) -> bool {
     242            0 :         self.limiter.is_none()
     243            0 :     }
     244              : 
     245            8 :     pub(crate) fn release(mut self, outcome: Outcome) {
     246            8 :         self.release_mut(Some(outcome));
     247            8 :     }
     248              : 
     249           19 :     pub(crate) fn release_mut(&mut self, outcome: Option<Outcome>) {
     250           19 :         if let Some(limiter) = self.limiter.take() {
     251           11 :             limiter.release_inner(self.start, outcome);
     252           11 :         }
     253           19 :     }
     254              : }
     255              : 
     256              : impl Drop for Token {
     257           11 :     fn drop(&mut self) {
     258           11 :         self.release_mut(None);
     259           11 :     }
     260              : }
     261              : 
     262              : #[cfg(test)]
     263              : impl LimiterState {
     264              :     /// The current concurrency limit.
     265            9 :     fn limit(self) -> usize {
     266            9 :         self.limit
     267            9 :     }
     268              : }
         |