LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter - aimd.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 99.1 % 106 105
Test Date: 2024-02-07 07:37:29 Functions: 100.0 % 11 11

            Line data    Source code
       1              : use std::usize;
       2              : 
       3              : use async_trait::async_trait;
       4              : 
       5              : use super::limit_algorithm::{AimdConfig, LimitAlgorithm, Sample};
       6              : 
       7              : use super::limiter::Outcome;
       8              : 
       9              : /// Loss-based congestion avoidance.
      10              : ///
      11              : /// Additive-increase, multiplicative decrease.
      12              : ///
      13              : /// Adds available currency when:
      14              : /// 1. no load-based errors are observed, and
      15              : /// 2. the utilisation of the current limit is high.
      16              : ///
      17              : /// Reduces available concurrency by a factor when load-based errors are detected.
      18              : pub struct Aimd {
      19              :     min_limit: usize,
      20              :     max_limit: usize,
      21              :     decrease_factor: f32,
      22              :     increase_by: usize,
      23              :     min_utilisation_threshold: f32,
      24              : }
      25              : 
      26              : impl Aimd {
      27           11 :     pub fn new(config: AimdConfig) -> Self {
      28           11 :         Self {
      29           11 :             min_limit: config.aimd_min_limit,
      30           11 :             max_limit: config.aimd_max_limit,
      31           11 :             decrease_factor: config.aimd_decrease_factor,
      32           11 :             increase_by: config.aimd_increase_by,
      33           11 :             min_utilisation_threshold: config.aimd_min_utilisation_threshold,
      34           11 :         }
      35           11 :     }
      36              : }
      37              : 
      38              : #[async_trait]
      39              : impl LimitAlgorithm for Aimd {
      40           13 :     async fn update(&mut self, old_limit: usize, sample: Sample) -> usize {
      41              :         use Outcome::*;
      42           13 :         match sample.outcome {
      43              :             Success => {
      44            9 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      45            9 : 
      46            9 :                 if utilisation > self.min_utilisation_threshold {
      47            3 :                     let limit = old_limit + self.increase_by;
      48            3 :                     limit.clamp(self.min_limit, self.max_limit)
      49              :                 } else {
      50            6 :                     old_limit
      51              :                 }
      52              :             }
      53              :             Overload => {
      54            4 :                 let limit = old_limit as f32 * self.decrease_factor;
      55            4 : 
      56            4 :                 // Floor instead of round, so the limit reduces even with small numbers.
      57            4 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      58            4 :                 let limit = limit.floor() as usize;
      59            4 : 
      60            4 :                 limit.clamp(self.min_limit, self.max_limit)
      61              :             }
      62              :         }
      63           26 :     }
      64              : }
      65              : 
      66              : #[cfg(test)]
      67              : mod tests {
      68              :     use std::sync::Arc;
      69              : 
      70              :     use tokio::sync::Notify;
      71              : 
      72              :     use super::*;
      73              : 
      74              :     use crate::rate_limiter::{Limiter, RateLimiterConfig};
      75              : 
      76            2 :     #[tokio::test]
      77            2 :     async fn should_decrease_limit_on_overload() {
      78            2 :         let config = RateLimiterConfig {
      79            2 :             initial_limit: 10,
      80            2 :             aimd_config: Some(AimdConfig {
      81            2 :                 aimd_decrease_factor: 0.5,
      82            2 :                 ..Default::default()
      83            2 :             }),
      84            2 :             disable: false,
      85            2 :             ..Default::default()
      86            2 :         };
      87            2 : 
      88            2 :         let release_notifier = Arc::new(Notify::new());
      89            2 : 
      90            2 :         let limiter = Limiter::new(config).with_release_notifier(release_notifier.clone());
      91            2 : 
      92            2 :         let token = limiter.try_acquire().unwrap();
      93            2 :         limiter.release(token, Some(Outcome::Overload)).await;
      94            2 :         release_notifier.notified().await;
      95            2 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
      96              :     }
      97              : 
      98            2 :     #[tokio::test]
      99            2 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     100            2 :         let config = RateLimiterConfig {
     101            2 :             initial_limit: 4,
     102            2 :             aimd_config: Some(AimdConfig {
     103            2 :                 aimd_decrease_factor: 0.5,
     104            2 :                 aimd_min_utilisation_threshold: 0.5,
     105            2 :                 aimd_increase_by: 1,
     106            2 :                 ..Default::default()
     107            2 :             }),
     108            2 :             disable: false,
     109            2 :             ..Default::default()
     110            2 :         };
     111            2 : 
     112            2 :         let limiter = Limiter::new(config);
     113            2 : 
     114            2 :         let token = limiter.try_acquire().unwrap();
     115            2 :         let _token = limiter.try_acquire().unwrap();
     116            2 :         let _token = limiter.try_acquire().unwrap();
     117            2 : 
     118            2 :         limiter.release(token, Some(Outcome::Success)).await;
     119            2 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     120              :     }
     121              : 
     122            2 :     #[tokio::test]
     123            2 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     124            2 :         let config = RateLimiterConfig {
     125            2 :             initial_limit: 4,
     126            2 :             aimd_config: Some(AimdConfig {
     127            2 :                 aimd_decrease_factor: 0.5,
     128            2 :                 aimd_min_utilisation_threshold: 0.5,
     129            2 :                 ..Default::default()
     130            2 :             }),
     131            2 :             disable: false,
     132            2 :             ..Default::default()
     133            2 :         };
     134            2 : 
     135            2 :         let limiter = Limiter::new(config);
     136            2 : 
     137            2 :         let token = limiter.try_acquire().unwrap();
     138            2 : 
     139            2 :         limiter.release(token, Some(Outcome::Success)).await;
     140            2 :         assert_eq!(
     141            2 :             limiter.state().limit(),
     142              :             4,
     143            0 :             "success: ignore when < half limit"
     144              :         );
     145              :     }
     146              : 
     147            2 :     #[tokio::test]
     148            2 :     async fn should_not_change_limit_when_no_outcome() {
     149            2 :         let config = RateLimiterConfig {
     150            2 :             initial_limit: 10,
     151            2 :             aimd_config: Some(AimdConfig {
     152            2 :                 aimd_decrease_factor: 0.5,
     153            2 :                 aimd_min_utilisation_threshold: 0.5,
     154            2 :                 ..Default::default()
     155            2 :             }),
     156            2 :             disable: false,
     157            2 :             ..Default::default()
     158            2 :         };
     159            2 : 
     160            2 :         let limiter = Limiter::new(config);
     161            2 : 
     162            2 :         let token = limiter.try_acquire().unwrap();
     163            2 :         limiter.release(token, None).await;
     164            2 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     165              :     }
     166              : }
        

Generated by: LCOV version 2.1-beta