LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter/limit_algorithm - aimd.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 99.5 % 208 207
Test Date: 2024-08-21 17:32:46 Functions: 76.2 % 21 16

            Line data    Source code
       1              : use super::{LimitAlgorithm, Outcome, Sample};
       2              : 
       3              : /// Loss-based congestion avoidance.
       4              : ///
       5              : /// Additive-increase, multiplicative decrease.
       6              : ///
       7              : /// Adds available currency when:
       8              : /// 1. no load-based errors are observed, and
       9              : /// 2. the utilisation of the current limit is high.
      10              : ///
      11              : /// Reduces available concurrency by a factor when load-based errors are detected.
      12           12 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      13              : pub struct Aimd {
      14              :     /// Minimum limit for AIMD algorithm.
      15              :     pub min: usize,
      16              :     /// Maximum limit for AIMD algorithm.
      17              :     pub max: usize,
      18              :     /// Decrease AIMD decrease by value in case of error.
      19              :     pub dec: f32,
      20              :     /// Increase AIMD increase by value in case of success.
      21              :     pub inc: usize,
      22              :     /// A threshold below which the limit won't be increased.
      23              :     pub utilisation: f32,
      24              : }
      25              : 
      26              : impl LimitAlgorithm for Aimd {
      27           16 :     fn update(&self, old_limit: usize, sample: Sample) -> usize {
      28           16 :         match sample.outcome {
      29              :             Outcome::Success => {
      30           10 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      31           10 : 
      32           10 :                 if utilisation > self.utilisation {
      33            6 :                     let limit = old_limit + self.inc;
      34            6 :                     let increased_limit = limit.clamp(self.min, self.max);
      35            6 :                     if increased_limit > old_limit {
      36            6 :                         tracing::info!(increased_limit, "limit increased");
      37            0 :                     }
      38              : 
      39            6 :                     increased_limit
      40              :                 } else {
      41            4 :                     old_limit
      42              :                 }
      43              :             }
      44              :             Outcome::Overload => {
      45            6 :                 let limit = old_limit as f32 * self.dec;
      46            6 : 
      47            6 :                 // Floor instead of round, so the limit reduces even with small numbers.
      48            6 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      49            6 :                 let limit = limit.floor() as usize;
      50            6 : 
      51            6 :                 let limit = limit.clamp(self.min, self.max);
      52            6 :                 tracing::info!(limit, "limit decreased");
      53            6 :                 limit
      54              :             }
      55              :         }
      56           16 :     }
      57              : }
      58              : 
      59              : #[cfg(test)]
      60              : mod tests {
      61              :     use std::time::Duration;
      62              : 
      63              :     use crate::rate_limiter::limit_algorithm::{
      64              :         DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
      65              :     };
      66              : 
      67              :     use super::*;
      68              : 
      69              :     #[tokio::test(start_paused = true)]
      70            2 :     async fn increase_decrease() {
      71            2 :         let config = RateLimiterConfig {
      72            2 :             initial_limit: 1,
      73            2 :             algorithm: RateLimitAlgorithm::Aimd {
      74            2 :                 conf: Aimd {
      75            2 :                     min: 1,
      76            2 :                     max: 2,
      77            2 :                     inc: 10,
      78            2 :                     dec: 0.5,
      79            2 :                     utilisation: 0.8,
      80            2 :                 },
      81            2 :             },
      82            2 :         };
      83            2 : 
      84            2 :         let limiter = DynamicLimiter::new(config);
      85            2 : 
      86            2 :         let token = limiter
      87            2 :             .acquire_timeout(Duration::from_millis(1))
      88            2 :             .await
      89            2 :             .unwrap();
      90            2 :         token.release(Outcome::Success);
      91            2 : 
      92            2 :         assert_eq!(limiter.state().limit(), 2);
      93            2 : 
      94            2 :         let token = limiter
      95            2 :             .acquire_timeout(Duration::from_millis(1))
      96            2 :             .await
      97            2 :             .unwrap();
      98            2 :         token.release(Outcome::Success);
      99            2 :         assert_eq!(limiter.state().limit(), 2);
     100            2 : 
     101            2 :         let token = limiter
     102            2 :             .acquire_timeout(Duration::from_millis(1))
     103            2 :             .await
     104            2 :             .unwrap();
     105            2 :         token.release(Outcome::Overload);
     106            2 :         assert_eq!(limiter.state().limit(), 1);
     107            2 : 
     108            2 :         let token = limiter
     109            2 :             .acquire_timeout(Duration::from_millis(1))
     110            2 :             .await
     111            2 :             .unwrap();
     112            2 :         token.release(Outcome::Overload);
     113            2 :         assert_eq!(limiter.state().limit(), 1);
     114            2 :     }
     115              : 
     116              :     #[tokio::test(start_paused = true)]
     117            2 :     async fn should_decrease_limit_on_overload() {
     118            2 :         let config = RateLimiterConfig {
     119            2 :             initial_limit: 10,
     120            2 :             algorithm: RateLimitAlgorithm::Aimd {
     121            2 :                 conf: Aimd {
     122            2 :                     min: 1,
     123            2 :                     max: 1500,
     124            2 :                     inc: 10,
     125            2 :                     dec: 0.5,
     126            2 :                     utilisation: 0.8,
     127            2 :                 },
     128            2 :             },
     129            2 :         };
     130            2 : 
     131            2 :         let limiter = DynamicLimiter::new(config);
     132            2 : 
     133            2 :         let token = limiter
     134            2 :             .acquire_timeout(Duration::from_millis(100))
     135            2 :             .await
     136            2 :             .unwrap();
     137            2 :         token.release(Outcome::Overload);
     138            2 : 
     139            2 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
     140            2 :     }
     141              : 
     142              :     #[tokio::test(start_paused = true)]
     143            2 :     async fn acquire_timeout_times_out() {
     144            2 :         let config = RateLimiterConfig {
     145            2 :             initial_limit: 1,
     146            2 :             algorithm: RateLimitAlgorithm::Aimd {
     147            2 :                 conf: Aimd {
     148            2 :                     min: 1,
     149            2 :                     max: 2,
     150            2 :                     inc: 10,
     151            2 :                     dec: 0.5,
     152            2 :                     utilisation: 0.8,
     153            2 :                 },
     154            2 :             },
     155            2 :         };
     156            2 : 
     157            2 :         let limiter = DynamicLimiter::new(config);
     158            2 : 
     159            2 :         let token = limiter
     160            2 :             .acquire_timeout(Duration::from_millis(1))
     161            2 :             .await
     162            2 :             .unwrap();
     163            2 :         let now = tokio::time::Instant::now();
     164            2 :         limiter
     165            2 :             .acquire_timeout(Duration::from_secs(1))
     166            2 :             .await
     167            2 :             .err()
     168            2 :             .unwrap();
     169            2 : 
     170            2 :         assert!(now.elapsed() >= Duration::from_secs(1));
     171            2 : 
     172            2 :         token.release(Outcome::Success);
     173            2 : 
     174            2 :         assert_eq!(limiter.state().limit(), 2);
     175            2 :     }
     176              : 
     177              :     #[tokio::test(start_paused = true)]
     178            2 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     179            2 :         let config = RateLimiterConfig {
     180            2 :             initial_limit: 4,
     181            2 :             algorithm: RateLimitAlgorithm::Aimd {
     182            2 :                 conf: Aimd {
     183            2 :                     min: 1,
     184            2 :                     max: 1500,
     185            2 :                     inc: 1,
     186            2 :                     dec: 0.5,
     187            2 :                     utilisation: 0.5,
     188            2 :                 },
     189            2 :             },
     190            2 :         };
     191            2 : 
     192            2 :         let limiter = DynamicLimiter::new(config);
     193            2 : 
     194            2 :         let token = limiter
     195            2 :             .acquire_timeout(Duration::from_millis(1))
     196            2 :             .await
     197            2 :             .unwrap();
     198            2 :         let _token = limiter
     199            2 :             .acquire_timeout(Duration::from_millis(1))
     200            2 :             .await
     201            2 :             .unwrap();
     202            2 :         let _token = limiter
     203            2 :             .acquire_timeout(Duration::from_millis(1))
     204            2 :             .await
     205            2 :             .unwrap();
     206            2 : 
     207            2 :         token.release(Outcome::Success);
     208            2 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     209            2 :     }
     210              : 
     211              :     #[tokio::test(start_paused = true)]
     212            2 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     213            2 :         let config = RateLimiterConfig {
     214            2 :             initial_limit: 4,
     215            2 :             algorithm: RateLimitAlgorithm::Aimd {
     216            2 :                 conf: Aimd {
     217            2 :                     min: 1,
     218            2 :                     max: 1500,
     219            2 :                     inc: 10,
     220            2 :                     dec: 0.5,
     221            2 :                     utilisation: 0.5,
     222            2 :                 },
     223            2 :             },
     224            2 :         };
     225            2 : 
     226            2 :         let limiter = DynamicLimiter::new(config);
     227            2 : 
     228            2 :         let token = limiter
     229            2 :             .acquire_timeout(Duration::from_millis(1))
     230            2 :             .await
     231            2 :             .unwrap();
     232            2 : 
     233            2 :         token.release(Outcome::Success);
     234            2 :         assert_eq!(
     235            2 :             limiter.state().limit(),
     236            2 :             4,
     237            2 :             "success: ignore when < half limit"
     238            2 :         );
     239            2 :     }
     240              : 
     241              :     #[tokio::test(start_paused = true)]
     242            2 :     async fn should_not_change_limit_when_no_outcome() {
     243            2 :         let config = RateLimiterConfig {
     244            2 :             initial_limit: 10,
     245            2 :             algorithm: RateLimitAlgorithm::Aimd {
     246            2 :                 conf: Aimd {
     247            2 :                     min: 1,
     248            2 :                     max: 1500,
     249            2 :                     inc: 10,
     250            2 :                     dec: 0.5,
     251            2 :                     utilisation: 0.5,
     252            2 :                 },
     253            2 :             },
     254            2 :         };
     255            2 : 
     256            2 :         let limiter = DynamicLimiter::new(config);
     257            2 : 
     258            2 :         let token = limiter
     259            2 :             .acquire_timeout(Duration::from_millis(1))
     260            2 :             .await
     261            2 :             .unwrap();
     262            2 :         drop(token);
     263            2 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     264            2 :     }
     265              : }
        

Generated by: LCOV version 2.1-beta