LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter/limit_algorithm - aimd.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 99.5 % 208 207
Test Date: 2024-08-29 11:33:10 Functions: 76.2 % 21 16

            Line data    Source code
       1              : use super::{LimitAlgorithm, Outcome, Sample};
       2              : 
       3              : /// Loss-based congestion avoidance.
       4              : ///
       5              : /// Additive-increase, multiplicative decrease.
       6              : ///
       7              : /// Adds available currency when:
       8              : /// 1. no load-based errors are observed, and
       9              : /// 2. the utilisation of the current limit is high.
      10              : ///
      11              : /// Reduces available concurrency by a factor when load-based errors are detected.
      12           36 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      13              : pub(crate) struct Aimd {
      14              :     /// Minimum limit for AIMD algorithm.
      15              :     pub(crate) min: usize,
      16              :     /// Maximum limit for AIMD algorithm.
      17              :     pub(crate) max: usize,
      18              :     /// Decrease AIMD decrease by value in case of error.
      19              :     pub(crate) dec: f32,
      20              :     /// Increase AIMD increase by value in case of success.
      21              :     pub(crate) inc: usize,
      22              :     /// A threshold below which the limit won't be increased.
      23              :     pub(crate) utilisation: f32,
      24              : }
      25              : 
      26              : impl LimitAlgorithm for Aimd {
      27           48 :     fn update(&self, old_limit: usize, sample: Sample) -> usize {
      28           48 :         match sample.outcome {
      29              :             Outcome::Success => {
      30           30 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      31           30 : 
      32           30 :                 if utilisation > self.utilisation {
      33           18 :                     let limit = old_limit + self.inc;
      34           18 :                     let increased_limit = limit.clamp(self.min, self.max);
      35           18 :                     if increased_limit > old_limit {
      36           18 :                         tracing::info!(increased_limit, "limit increased");
      37            0 :                     }
      38              : 
      39           18 :                     increased_limit
      40              :                 } else {
      41           12 :                     old_limit
      42              :                 }
      43              :             }
      44              :             Outcome::Overload => {
      45           18 :                 let limit = old_limit as f32 * self.dec;
      46           18 : 
      47           18 :                 // Floor instead of round, so the limit reduces even with small numbers.
      48           18 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      49           18 :                 let limit = limit.floor() as usize;
      50           18 : 
      51           18 :                 let limit = limit.clamp(self.min, self.max);
      52           18 :                 tracing::info!(limit, "limit decreased");
      53           18 :                 limit
      54              :             }
      55              :         }
      56           48 :     }
      57              : }
      58              : 
      59              : #[cfg(test)]
      60              : mod tests {
      61              :     use std::time::Duration;
      62              : 
      63              :     use crate::rate_limiter::limit_algorithm::{
      64              :         DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
      65              :     };
      66              : 
      67              :     use super::*;
      68              : 
      69              :     #[tokio::test(start_paused = true)]
      70            6 :     async fn increase_decrease() {
      71            6 :         let config = RateLimiterConfig {
      72            6 :             initial_limit: 1,
      73            6 :             algorithm: RateLimitAlgorithm::Aimd {
      74            6 :                 conf: Aimd {
      75            6 :                     min: 1,
      76            6 :                     max: 2,
      77            6 :                     inc: 10,
      78            6 :                     dec: 0.5,
      79            6 :                     utilisation: 0.8,
      80            6 :                 },
      81            6 :             },
      82            6 :         };
      83            6 : 
      84            6 :         let limiter = DynamicLimiter::new(config);
      85            6 : 
      86            6 :         let token = limiter
      87            6 :             .acquire_timeout(Duration::from_millis(1))
      88            6 :             .await
      89            6 :             .unwrap();
      90            6 :         token.release(Outcome::Success);
      91            6 : 
      92            6 :         assert_eq!(limiter.state().limit(), 2);
      93            6 : 
      94            6 :         let token = limiter
      95            6 :             .acquire_timeout(Duration::from_millis(1))
      96            6 :             .await
      97            6 :             .unwrap();
      98            6 :         token.release(Outcome::Success);
      99            6 :         assert_eq!(limiter.state().limit(), 2);
     100            6 : 
     101            6 :         let token = limiter
     102            6 :             .acquire_timeout(Duration::from_millis(1))
     103            6 :             .await
     104            6 :             .unwrap();
     105            6 :         token.release(Outcome::Overload);
     106            6 :         assert_eq!(limiter.state().limit(), 1);
     107            6 : 
     108            6 :         let token = limiter
     109            6 :             .acquire_timeout(Duration::from_millis(1))
     110            6 :             .await
     111            6 :             .unwrap();
     112            6 :         token.release(Outcome::Overload);
     113            6 :         assert_eq!(limiter.state().limit(), 1);
     114            6 :     }
     115              : 
     116              :     #[tokio::test(start_paused = true)]
     117            6 :     async fn should_decrease_limit_on_overload() {
     118            6 :         let config = RateLimiterConfig {
     119            6 :             initial_limit: 10,
     120            6 :             algorithm: RateLimitAlgorithm::Aimd {
     121            6 :                 conf: Aimd {
     122            6 :                     min: 1,
     123            6 :                     max: 1500,
     124            6 :                     inc: 10,
     125            6 :                     dec: 0.5,
     126            6 :                     utilisation: 0.8,
     127            6 :                 },
     128            6 :             },
     129            6 :         };
     130            6 : 
     131            6 :         let limiter = DynamicLimiter::new(config);
     132            6 : 
     133            6 :         let token = limiter
     134            6 :             .acquire_timeout(Duration::from_millis(100))
     135            6 :             .await
     136            6 :             .unwrap();
     137            6 :         token.release(Outcome::Overload);
     138            6 : 
     139            6 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
     140            6 :     }
     141              : 
     142              :     #[tokio::test(start_paused = true)]
     143            6 :     async fn acquire_timeout_times_out() {
     144            6 :         let config = RateLimiterConfig {
     145            6 :             initial_limit: 1,
     146            6 :             algorithm: RateLimitAlgorithm::Aimd {
     147            6 :                 conf: Aimd {
     148            6 :                     min: 1,
     149            6 :                     max: 2,
     150            6 :                     inc: 10,
     151            6 :                     dec: 0.5,
     152            6 :                     utilisation: 0.8,
     153            6 :                 },
     154            6 :             },
     155            6 :         };
     156            6 : 
     157            6 :         let limiter = DynamicLimiter::new(config);
     158            6 : 
     159            6 :         let token = limiter
     160            6 :             .acquire_timeout(Duration::from_millis(1))
     161            6 :             .await
     162            6 :             .unwrap();
     163            6 :         let now = tokio::time::Instant::now();
     164            6 :         limiter
     165            6 :             .acquire_timeout(Duration::from_secs(1))
     166            6 :             .await
     167            6 :             .err()
     168            6 :             .unwrap();
     169            6 : 
     170            6 :         assert!(now.elapsed() >= Duration::from_secs(1));
     171            6 : 
     172            6 :         token.release(Outcome::Success);
     173            6 : 
     174            6 :         assert_eq!(limiter.state().limit(), 2);
     175            6 :     }
     176              : 
     177              :     #[tokio::test(start_paused = true)]
     178            6 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     179            6 :         let config = RateLimiterConfig {
     180            6 :             initial_limit: 4,
     181            6 :             algorithm: RateLimitAlgorithm::Aimd {
     182            6 :                 conf: Aimd {
     183            6 :                     min: 1,
     184            6 :                     max: 1500,
     185            6 :                     inc: 1,
     186            6 :                     dec: 0.5,
     187            6 :                     utilisation: 0.5,
     188            6 :                 },
     189            6 :             },
     190            6 :         };
     191            6 : 
     192            6 :         let limiter = DynamicLimiter::new(config);
     193            6 : 
     194            6 :         let token = limiter
     195            6 :             .acquire_timeout(Duration::from_millis(1))
     196            6 :             .await
     197            6 :             .unwrap();
     198            6 :         let _token = limiter
     199            6 :             .acquire_timeout(Duration::from_millis(1))
     200            6 :             .await
     201            6 :             .unwrap();
     202            6 :         let _token = limiter
     203            6 :             .acquire_timeout(Duration::from_millis(1))
     204            6 :             .await
     205            6 :             .unwrap();
     206            6 : 
     207            6 :         token.release(Outcome::Success);
     208            6 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     209            6 :     }
     210              : 
     211              :     #[tokio::test(start_paused = true)]
     212            6 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     213            6 :         let config = RateLimiterConfig {
     214            6 :             initial_limit: 4,
     215            6 :             algorithm: RateLimitAlgorithm::Aimd {
     216            6 :                 conf: Aimd {
     217            6 :                     min: 1,
     218            6 :                     max: 1500,
     219            6 :                     inc: 10,
     220            6 :                     dec: 0.5,
     221            6 :                     utilisation: 0.5,
     222            6 :                 },
     223            6 :             },
     224            6 :         };
     225            6 : 
     226            6 :         let limiter = DynamicLimiter::new(config);
     227            6 : 
     228            6 :         let token = limiter
     229            6 :             .acquire_timeout(Duration::from_millis(1))
     230            6 :             .await
     231            6 :             .unwrap();
     232            6 : 
     233            6 :         token.release(Outcome::Success);
     234            6 :         assert_eq!(
     235            6 :             limiter.state().limit(),
     236            6 :             4,
     237            6 :             "success: ignore when < half limit"
     238            6 :         );
     239            6 :     }
     240              : 
     241              :     #[tokio::test(start_paused = true)]
     242            6 :     async fn should_not_change_limit_when_no_outcome() {
     243            6 :         let config = RateLimiterConfig {
     244            6 :             initial_limit: 10,
     245            6 :             algorithm: RateLimitAlgorithm::Aimd {
     246            6 :                 conf: Aimd {
     247            6 :                     min: 1,
     248            6 :                     max: 1500,
     249            6 :                     inc: 10,
     250            6 :                     dec: 0.5,
     251            6 :                     utilisation: 0.5,
     252            6 :                 },
     253            6 :             },
     254            6 :         };
     255            6 : 
     256            6 :         let limiter = DynamicLimiter::new(config);
     257            6 : 
     258            6 :         let token = limiter
     259            6 :             .acquire_timeout(Duration::from_millis(1))
     260            6 :             .await
     261            6 :             .unwrap();
     262            6 :         drop(token);
     263            6 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     264            6 :     }
     265              : }
        

Generated by: LCOV version 2.1-beta