LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter/limit_algorithm - aimd.rs (source / functions) Coverage Total Hit
Test: fcf55189004bd3119eed75e2873a97da8078700c.info Lines: 99.5 % 209 208
Test Date: 2024-06-25 12:07:31 Functions: 76.2 % 21 16

            Line data    Source code
       1              : use super::{LimitAlgorithm, Outcome, Sample};
       2              : 
       3              : /// Loss-based congestion avoidance.
       4              : ///
       5              : /// Additive-increase, multiplicative decrease.
       6              : ///
       7              : /// Adds available currency when:
       8              : /// 1. no load-based errors are observed, and
       9              : /// 2. the utilisation of the current limit is high.
      10              : ///
      11              : /// Reduces available concurrency by a factor when load-based errors are detected.
      12           12 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      13              : pub struct Aimd {
      14              :     /// Minimum limit for AIMD algorithm.
      15              :     pub min: usize,
      16              :     /// Maximum limit for AIMD algorithm.
      17              :     pub max: usize,
      18              :     /// Decrease AIMD decrease by value in case of error.
      19              :     pub dec: f32,
      20              :     /// Increase AIMD increase by value in case of success.
      21              :     pub inc: usize,
      22              :     /// A threshold below which the limit won't be increased.
      23              :     pub utilisation: f32,
      24              : }
      25              : 
      26              : impl LimitAlgorithm for Aimd {
      27           16 :     fn update(&self, old_limit: usize, sample: Sample) -> usize {
      28           16 :         use Outcome::*;
      29           16 :         match sample.outcome {
      30              :             Success => {
      31           10 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      32           10 : 
      33           10 :                 if utilisation > self.utilisation {
      34            6 :                     let limit = old_limit + self.inc;
      35            6 :                     let increased_limit = limit.clamp(self.min, self.max);
      36            6 :                     if increased_limit > old_limit {
      37            6 :                         tracing::info!(increased_limit, "limit increased");
      38            0 :                     }
      39              : 
      40            6 :                     increased_limit
      41              :                 } else {
      42            4 :                     old_limit
      43              :                 }
      44              :             }
      45              :             Overload => {
      46            6 :                 let limit = old_limit as f32 * self.dec;
      47            6 : 
      48            6 :                 // Floor instead of round, so the limit reduces even with small numbers.
      49            6 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      50            6 :                 let limit = limit.floor() as usize;
      51            6 : 
      52            6 :                 let limit = limit.clamp(self.min, self.max);
      53            6 :                 tracing::info!(limit, "limit decreased");
      54            6 :                 limit
      55              :             }
      56              :         }
      57           16 :     }
      58              : }
      59              : 
      60              : #[cfg(test)]
      61              : mod tests {
      62              :     use std::time::Duration;
      63              : 
      64              :     use crate::rate_limiter::limit_algorithm::{
      65              :         DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
      66              :     };
      67              : 
      68              :     use super::*;
      69              : 
      70              :     #[tokio::test(start_paused = true)]
      71            2 :     async fn increase_decrease() {
      72            2 :         let config = RateLimiterConfig {
      73            2 :             initial_limit: 1,
      74            2 :             algorithm: RateLimitAlgorithm::Aimd {
      75            2 :                 conf: Aimd {
      76            2 :                     min: 1,
      77            2 :                     max: 2,
      78            2 :                     inc: 10,
      79            2 :                     dec: 0.5,
      80            2 :                     utilisation: 0.8,
      81            2 :                 },
      82            2 :             },
      83            2 :         };
      84            2 : 
      85            2 :         let limiter = DynamicLimiter::new(config);
      86            2 : 
      87            2 :         let token = limiter
      88            2 :             .acquire_timeout(Duration::from_millis(1))
      89            2 :             .await
      90            2 :             .unwrap();
      91            2 :         token.release(Outcome::Success);
      92            2 : 
      93            2 :         assert_eq!(limiter.state().limit(), 2);
      94            2 : 
      95            2 :         let token = limiter
      96            2 :             .acquire_timeout(Duration::from_millis(1))
      97            2 :             .await
      98            2 :             .unwrap();
      99            2 :         token.release(Outcome::Success);
     100            2 :         assert_eq!(limiter.state().limit(), 2);
     101            2 : 
     102            2 :         let token = limiter
     103            2 :             .acquire_timeout(Duration::from_millis(1))
     104            2 :             .await
     105            2 :             .unwrap();
     106            2 :         token.release(Outcome::Overload);
     107            2 :         assert_eq!(limiter.state().limit(), 1);
     108            2 : 
     109            2 :         let token = limiter
     110            2 :             .acquire_timeout(Duration::from_millis(1))
     111            2 :             .await
     112            2 :             .unwrap();
     113            2 :         token.release(Outcome::Overload);
     114            2 :         assert_eq!(limiter.state().limit(), 1);
     115            2 :     }
     116              : 
     117              :     #[tokio::test(start_paused = true)]
     118            2 :     async fn should_decrease_limit_on_overload() {
     119            2 :         let config = RateLimiterConfig {
     120            2 :             initial_limit: 10,
     121            2 :             algorithm: RateLimitAlgorithm::Aimd {
     122            2 :                 conf: Aimd {
     123            2 :                     min: 1,
     124            2 :                     max: 1500,
     125            2 :                     inc: 10,
     126            2 :                     dec: 0.5,
     127            2 :                     utilisation: 0.8,
     128            2 :                 },
     129            2 :             },
     130            2 :         };
     131            2 : 
     132            2 :         let limiter = DynamicLimiter::new(config);
     133            2 : 
     134            2 :         let token = limiter
     135            2 :             .acquire_timeout(Duration::from_millis(100))
     136            2 :             .await
     137            2 :             .unwrap();
     138            2 :         token.release(Outcome::Overload);
     139            2 : 
     140            2 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
     141            2 :     }
     142              : 
     143              :     #[tokio::test(start_paused = true)]
     144            2 :     async fn acquire_timeout_times_out() {
     145            2 :         let config = RateLimiterConfig {
     146            2 :             initial_limit: 1,
     147            2 :             algorithm: RateLimitAlgorithm::Aimd {
     148            2 :                 conf: Aimd {
     149            2 :                     min: 1,
     150            2 :                     max: 2,
     151            2 :                     inc: 10,
     152            2 :                     dec: 0.5,
     153            2 :                     utilisation: 0.8,
     154            2 :                 },
     155            2 :             },
     156            2 :         };
     157            2 : 
     158            2 :         let limiter = DynamicLimiter::new(config);
     159            2 : 
     160            2 :         let token = limiter
     161            2 :             .acquire_timeout(Duration::from_millis(1))
     162            2 :             .await
     163            2 :             .unwrap();
     164            2 :         let now = tokio::time::Instant::now();
     165            2 :         limiter
     166            2 :             .acquire_timeout(Duration::from_secs(1))
     167            2 :             .await
     168            2 :             .err()
     169            2 :             .unwrap();
     170            2 : 
     171            2 :         assert!(now.elapsed() >= Duration::from_secs(1));
     172            2 : 
     173            2 :         token.release(Outcome::Success);
     174            2 : 
     175            2 :         assert_eq!(limiter.state().limit(), 2);
     176            2 :     }
     177              : 
     178              :     #[tokio::test(start_paused = true)]
     179            2 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     180            2 :         let config = RateLimiterConfig {
     181            2 :             initial_limit: 4,
     182            2 :             algorithm: RateLimitAlgorithm::Aimd {
     183            2 :                 conf: Aimd {
     184            2 :                     min: 1,
     185            2 :                     max: 1500,
     186            2 :                     inc: 1,
     187            2 :                     dec: 0.5,
     188            2 :                     utilisation: 0.5,
     189            2 :                 },
     190            2 :             },
     191            2 :         };
     192            2 : 
     193            2 :         let limiter = DynamicLimiter::new(config);
     194            2 : 
     195            2 :         let token = limiter
     196            2 :             .acquire_timeout(Duration::from_millis(1))
     197            2 :             .await
     198            2 :             .unwrap();
     199            2 :         let _token = limiter
     200            2 :             .acquire_timeout(Duration::from_millis(1))
     201            2 :             .await
     202            2 :             .unwrap();
     203            2 :         let _token = limiter
     204            2 :             .acquire_timeout(Duration::from_millis(1))
     205            2 :             .await
     206            2 :             .unwrap();
     207            2 : 
     208            2 :         token.release(Outcome::Success);
     209            2 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     210            2 :     }
     211              : 
     212              :     #[tokio::test(start_paused = true)]
     213            2 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     214            2 :         let config = RateLimiterConfig {
     215            2 :             initial_limit: 4,
     216            2 :             algorithm: RateLimitAlgorithm::Aimd {
     217            2 :                 conf: Aimd {
     218            2 :                     min: 1,
     219            2 :                     max: 1500,
     220            2 :                     inc: 10,
     221            2 :                     dec: 0.5,
     222            2 :                     utilisation: 0.5,
     223            2 :                 },
     224            2 :             },
     225            2 :         };
     226            2 : 
     227            2 :         let limiter = DynamicLimiter::new(config);
     228            2 : 
     229            2 :         let token = limiter
     230            2 :             .acquire_timeout(Duration::from_millis(1))
     231            2 :             .await
     232            2 :             .unwrap();
     233            2 : 
     234            2 :         token.release(Outcome::Success);
     235            2 :         assert_eq!(
     236            2 :             limiter.state().limit(),
     237            2 :             4,
     238            2 :             "success: ignore when < half limit"
     239            2 :         );
     240            2 :     }
     241              : 
     242              :     #[tokio::test(start_paused = true)]
     243            2 :     async fn should_not_change_limit_when_no_outcome() {
     244            2 :         let config = RateLimiterConfig {
     245            2 :             initial_limit: 10,
     246            2 :             algorithm: RateLimitAlgorithm::Aimd {
     247            2 :                 conf: Aimd {
     248            2 :                     min: 1,
     249            2 :                     max: 1500,
     250            2 :                     inc: 10,
     251            2 :                     dec: 0.5,
     252            2 :                     utilisation: 0.5,
     253            2 :                 },
     254            2 :             },
     255            2 :         };
     256            2 : 
     257            2 :         let limiter = DynamicLimiter::new(config);
     258            2 : 
     259            2 :         let token = limiter
     260            2 :             .acquire_timeout(Duration::from_millis(1))
     261            2 :             .await
     262            2 :             .unwrap();
     263            2 :         drop(token);
     264            2 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     265            2 :     }
     266              : }
        

Generated by: LCOV version 2.1-beta