LCOV - differential code coverage report
Current view: top level - proxy/src/rate_limiter - aimd.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 99.1 % 106 105 1 105
Current Date: 2024-01-09 02:06:09 Functions: 100.0 % 11 11 11
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::usize;
       2                 : 
       3                 : use async_trait::async_trait;
       4                 : 
       5                 : use super::limit_algorithm::{AimdConfig, LimitAlgorithm, Sample};
       6                 : 
       7                 : use super::limiter::Outcome;
       8                 : 
       9                 : /// Loss-based congestion avoidance.
      10                 : ///
      11                 : /// Additive-increase, multiplicative decrease.
      12                 : ///
      13                 : /// Adds available currency when:
      14                 : /// 1. no load-based errors are observed, and
      15                 : /// 2. the utilisation of the current limit is high.
      16                 : ///
      17                 : /// Reduces available concurrency by a factor when load-based errors are detected.
      18                 : pub struct Aimd {
      19                 :     min_limit: usize,
      20                 :     max_limit: usize,
      21                 :     decrease_factor: f32,
      22                 :     increase_by: usize,
      23                 :     min_utilisation_threshold: f32,
      24                 : }
      25                 : 
      26                 : impl Aimd {
      27 CBC           6 :     pub fn new(config: AimdConfig) -> Self {
      28               6 :         Self {
      29               6 :             min_limit: config.aimd_min_limit,
      30               6 :             max_limit: config.aimd_max_limit,
      31               6 :             decrease_factor: config.aimd_decrease_factor,
      32               6 :             increase_by: config.aimd_increase_by,
      33               6 :             min_utilisation_threshold: config.aimd_min_utilisation_threshold,
      34               6 :         }
      35               6 :     }
      36                 : }
      37                 : 
      38                 : #[async_trait]
      39                 : impl LimitAlgorithm for Aimd {
      40               8 :     async fn update(&mut self, old_limit: usize, sample: Sample) -> usize {
      41                 :         use Outcome::*;
      42               8 :         match sample.outcome {
      43                 :             Success => {
      44               5 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      45               5 : 
      46               5 :                 if utilisation > self.min_utilisation_threshold {
      47               2 :                     let limit = old_limit + self.increase_by;
      48               2 :                     limit.clamp(self.min_limit, self.max_limit)
      49                 :                 } else {
      50               3 :                     old_limit
      51                 :                 }
      52                 :             }
      53                 :             Overload => {
      54               3 :                 let limit = old_limit as f32 * self.decrease_factor;
      55               3 : 
      56               3 :                 // Floor instead of round, so the limit reduces even with small numbers.
      57               3 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      58               3 :                 let limit = limit.floor() as usize;
      59               3 : 
      60               3 :                 limit.clamp(self.min_limit, self.max_limit)
      61                 :             }
      62                 :         }
      63              16 :     }
      64                 : }
      65                 : 
      66                 : #[cfg(test)]
      67                 : mod tests {
      68                 :     use std::sync::Arc;
      69                 : 
      70                 :     use tokio::sync::Notify;
      71                 : 
      72                 :     use super::*;
      73                 : 
      74                 :     use crate::rate_limiter::{Limiter, RateLimiterConfig};
      75                 : 
      76               1 :     #[tokio::test]
      77               1 :     async fn should_decrease_limit_on_overload() {
      78               1 :         let config = RateLimiterConfig {
      79               1 :             initial_limit: 10,
      80               1 :             aimd_config: Some(AimdConfig {
      81               1 :                 aimd_decrease_factor: 0.5,
      82               1 :                 ..Default::default()
      83               1 :             }),
      84               1 :             disable: false,
      85               1 :             ..Default::default()
      86               1 :         };
      87               1 : 
      88               1 :         let release_notifier = Arc::new(Notify::new());
      89               1 : 
      90               1 :         let limiter = Limiter::new(config).with_release_notifier(release_notifier.clone());
      91               1 : 
      92               1 :         let token = limiter.try_acquire().unwrap();
      93               1 :         limiter.release(token, Some(Outcome::Overload)).await;
      94               1 :         release_notifier.notified().await;
      95               1 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
      96                 :     }
      97                 : 
      98               1 :     #[tokio::test]
      99               1 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     100               1 :         let config = RateLimiterConfig {
     101               1 :             initial_limit: 4,
     102               1 :             aimd_config: Some(AimdConfig {
     103               1 :                 aimd_decrease_factor: 0.5,
     104               1 :                 aimd_min_utilisation_threshold: 0.5,
     105               1 :                 aimd_increase_by: 1,
     106               1 :                 ..Default::default()
     107               1 :             }),
     108               1 :             disable: false,
     109               1 :             ..Default::default()
     110               1 :         };
     111               1 : 
     112               1 :         let limiter = Limiter::new(config);
     113               1 : 
     114               1 :         let token = limiter.try_acquire().unwrap();
     115               1 :         let _token = limiter.try_acquire().unwrap();
     116               1 :         let _token = limiter.try_acquire().unwrap();
     117               1 : 
     118               1 :         limiter.release(token, Some(Outcome::Success)).await;
     119               1 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     120                 :     }
     121                 : 
     122               1 :     #[tokio::test]
     123               1 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     124               1 :         let config = RateLimiterConfig {
     125               1 :             initial_limit: 4,
     126               1 :             aimd_config: Some(AimdConfig {
     127               1 :                 aimd_decrease_factor: 0.5,
     128               1 :                 aimd_min_utilisation_threshold: 0.5,
     129               1 :                 ..Default::default()
     130               1 :             }),
     131               1 :             disable: false,
     132               1 :             ..Default::default()
     133               1 :         };
     134               1 : 
     135               1 :         let limiter = Limiter::new(config);
     136               1 : 
     137               1 :         let token = limiter.try_acquire().unwrap();
     138               1 : 
     139               1 :         limiter.release(token, Some(Outcome::Success)).await;
     140               1 :         assert_eq!(
     141               1 :             limiter.state().limit(),
     142                 :             4,
     143 UBC           0 :             "success: ignore when < half limit"
     144                 :         );
     145                 :     }
     146                 : 
     147 CBC           1 :     #[tokio::test]
     148               1 :     async fn should_not_change_limit_when_no_outcome() {
     149               1 :         let config = RateLimiterConfig {
     150               1 :             initial_limit: 10,
     151               1 :             aimd_config: Some(AimdConfig {
     152               1 :                 aimd_decrease_factor: 0.5,
     153               1 :                 aimd_min_utilisation_threshold: 0.5,
     154               1 :                 ..Default::default()
     155               1 :             }),
     156               1 :             disable: false,
     157               1 :             ..Default::default()
     158               1 :         };
     159               1 : 
     160               1 :         let limiter = Limiter::new(config);
     161               1 : 
     162               1 :         let token = limiter.try_acquire().unwrap();
     163               1 :         limiter.release(token, None).await;
     164               1 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     165                 :     }
     166                 : }
        

Generated by: LCOV version 2.1-beta