LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter/limit_algorithm - aimd.rs (source / functions) Coverage Total Hit
Test: a2f0f8a80fbf1089336086fa360ce27fa555cb1a.info Lines: 99.5 % 208 207
Test Date: 2024-11-20 17:59:39 Functions: 76.2 % 21 16

            Line data    Source code
       1              : use super::{LimitAlgorithm, Outcome, Sample};
       2              : 
       3              : /// Loss-based congestion avoidance.
       4              : ///
       5              : /// Additive-increase, multiplicative decrease.
       6              : ///
       7              : /// Adds available currency when:
       8              : /// 1. no load-based errors are observed, and
       9              : /// 2. the utilisation of the current limit is high.
      10              : ///
      11              : /// Reduces available concurrency by a factor when load-based errors are detected.
      12            6 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      13              : pub(crate) struct Aimd {
      14              :     /// Minimum limit for AIMD algorithm.
      15              :     pub(crate) min: usize,
      16              :     /// Maximum limit for AIMD algorithm.
      17              :     pub(crate) max: usize,
      18              :     /// Decrease AIMD decrease by value in case of error.
      19              :     pub(crate) dec: f32,
      20              :     /// Increase AIMD increase by value in case of success.
      21              :     pub(crate) inc: usize,
      22              :     /// A threshold below which the limit won't be increased.
      23              :     pub(crate) utilisation: f32,
      24              : }
      25              : 
      26              : impl LimitAlgorithm for Aimd {
      27            8 :     fn update(&self, old_limit: usize, sample: Sample) -> usize {
      28            8 :         match sample.outcome {
      29              :             Outcome::Success => {
      30            5 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      31            5 : 
      32            5 :                 if utilisation > self.utilisation {
      33            3 :                     let limit = old_limit + self.inc;
      34            3 :                     let increased_limit = limit.clamp(self.min, self.max);
      35            3 :                     if increased_limit > old_limit {
      36            3 :                         tracing::info!(increased_limit, "limit increased");
      37            0 :                     }
      38              : 
      39            3 :                     increased_limit
      40              :                 } else {
      41            2 :                     old_limit
      42              :                 }
      43              :             }
      44              :             Outcome::Overload => {
      45            3 :                 let limit = old_limit as f32 * self.dec;
      46            3 : 
      47            3 :                 // Floor instead of round, so the limit reduces even with small numbers.
      48            3 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      49            3 :                 let limit = limit.floor() as usize;
      50            3 : 
      51            3 :                 let limit = limit.clamp(self.min, self.max);
      52            3 :                 tracing::info!(limit, "limit decreased");
      53            3 :                 limit
      54              :             }
      55              :         }
      56            8 :     }
      57              : }
      58              : 
      59              : #[cfg(test)]
      60              : mod tests {
      61              :     use std::time::Duration;
      62              : 
      63              :     use super::*;
      64              :     use crate::rate_limiter::limit_algorithm::{
      65              :         DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
      66              :     };
      67              : 
      68              :     #[tokio::test(start_paused = true)]
      69            1 :     async fn increase_decrease() {
      70            1 :         let config = RateLimiterConfig {
      71            1 :             initial_limit: 1,
      72            1 :             algorithm: RateLimitAlgorithm::Aimd {
      73            1 :                 conf: Aimd {
      74            1 :                     min: 1,
      75            1 :                     max: 2,
      76            1 :                     inc: 10,
      77            1 :                     dec: 0.5,
      78            1 :                     utilisation: 0.8,
      79            1 :                 },
      80            1 :             },
      81            1 :         };
      82            1 : 
      83            1 :         let limiter = DynamicLimiter::new(config);
      84            1 : 
      85            1 :         let token = limiter
      86            1 :             .acquire_timeout(Duration::from_millis(1))
      87            1 :             .await
      88            1 :             .unwrap();
      89            1 :         token.release(Outcome::Success);
      90            1 : 
      91            1 :         assert_eq!(limiter.state().limit(), 2);
      92            1 : 
      93            1 :         let token = limiter
      94            1 :             .acquire_timeout(Duration::from_millis(1))
      95            1 :             .await
      96            1 :             .unwrap();
      97            1 :         token.release(Outcome::Success);
      98            1 :         assert_eq!(limiter.state().limit(), 2);
      99            1 : 
     100            1 :         let token = limiter
     101            1 :             .acquire_timeout(Duration::from_millis(1))
     102            1 :             .await
     103            1 :             .unwrap();
     104            1 :         token.release(Outcome::Overload);
     105            1 :         assert_eq!(limiter.state().limit(), 1);
     106            1 : 
     107            1 :         let token = limiter
     108            1 :             .acquire_timeout(Duration::from_millis(1))
     109            1 :             .await
     110            1 :             .unwrap();
     111            1 :         token.release(Outcome::Overload);
     112            1 :         assert_eq!(limiter.state().limit(), 1);
     113            1 :     }
     114              : 
     115              :     #[tokio::test(start_paused = true)]
     116            1 :     async fn should_decrease_limit_on_overload() {
     117            1 :         let config = RateLimiterConfig {
     118            1 :             initial_limit: 10,
     119            1 :             algorithm: RateLimitAlgorithm::Aimd {
     120            1 :                 conf: Aimd {
     121            1 :                     min: 1,
     122            1 :                     max: 1500,
     123            1 :                     inc: 10,
     124            1 :                     dec: 0.5,
     125            1 :                     utilisation: 0.8,
     126            1 :                 },
     127            1 :             },
     128            1 :         };
     129            1 : 
     130            1 :         let limiter = DynamicLimiter::new(config);
     131            1 : 
     132            1 :         let token = limiter
     133            1 :             .acquire_timeout(Duration::from_millis(100))
     134            1 :             .await
     135            1 :             .unwrap();
     136            1 :         token.release(Outcome::Overload);
     137            1 : 
     138            1 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
     139            1 :     }
     140              : 
     141              :     #[tokio::test(start_paused = true)]
     142            1 :     async fn acquire_timeout_times_out() {
     143            1 :         let config = RateLimiterConfig {
     144            1 :             initial_limit: 1,
     145            1 :             algorithm: RateLimitAlgorithm::Aimd {
     146            1 :                 conf: Aimd {
     147            1 :                     min: 1,
     148            1 :                     max: 2,
     149            1 :                     inc: 10,
     150            1 :                     dec: 0.5,
     151            1 :                     utilisation: 0.8,
     152            1 :                 },
     153            1 :             },
     154            1 :         };
     155            1 : 
     156            1 :         let limiter = DynamicLimiter::new(config);
     157            1 : 
     158            1 :         let token = limiter
     159            1 :             .acquire_timeout(Duration::from_millis(1))
     160            1 :             .await
     161            1 :             .unwrap();
     162            1 :         let now = tokio::time::Instant::now();
     163            1 :         limiter
     164            1 :             .acquire_timeout(Duration::from_secs(1))
     165            1 :             .await
     166            1 :             .err()
     167            1 :             .unwrap();
     168            1 : 
     169            1 :         assert!(now.elapsed() >= Duration::from_secs(1));
     170            1 : 
     171            1 :         token.release(Outcome::Success);
     172            1 : 
     173            1 :         assert_eq!(limiter.state().limit(), 2);
     174            1 :     }
     175              : 
     176              :     #[tokio::test(start_paused = true)]
     177            1 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     178            1 :         let config = RateLimiterConfig {
     179            1 :             initial_limit: 4,
     180            1 :             algorithm: RateLimitAlgorithm::Aimd {
     181            1 :                 conf: Aimd {
     182            1 :                     min: 1,
     183            1 :                     max: 1500,
     184            1 :                     inc: 1,
     185            1 :                     dec: 0.5,
     186            1 :                     utilisation: 0.5,
     187            1 :                 },
     188            1 :             },
     189            1 :         };
     190            1 : 
     191            1 :         let limiter = DynamicLimiter::new(config);
     192            1 : 
     193            1 :         let token = limiter
     194            1 :             .acquire_timeout(Duration::from_millis(1))
     195            1 :             .await
     196            1 :             .unwrap();
     197            1 :         let _token = limiter
     198            1 :             .acquire_timeout(Duration::from_millis(1))
     199            1 :             .await
     200            1 :             .unwrap();
     201            1 :         let _token = limiter
     202            1 :             .acquire_timeout(Duration::from_millis(1))
     203            1 :             .await
     204            1 :             .unwrap();
     205            1 : 
     206            1 :         token.release(Outcome::Success);
     207            1 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     208            1 :     }
     209              : 
     210              :     #[tokio::test(start_paused = true)]
     211            1 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     212            1 :         let config = RateLimiterConfig {
     213            1 :             initial_limit: 4,
     214            1 :             algorithm: RateLimitAlgorithm::Aimd {
     215            1 :                 conf: Aimd {
     216            1 :                     min: 1,
     217            1 :                     max: 1500,
     218            1 :                     inc: 10,
     219            1 :                     dec: 0.5,
     220            1 :                     utilisation: 0.5,
     221            1 :                 },
     222            1 :             },
     223            1 :         };
     224            1 : 
     225            1 :         let limiter = DynamicLimiter::new(config);
     226            1 : 
     227            1 :         let token = limiter
     228            1 :             .acquire_timeout(Duration::from_millis(1))
     229            1 :             .await
     230            1 :             .unwrap();
     231            1 : 
     232            1 :         token.release(Outcome::Success);
     233            1 :         assert_eq!(
     234            1 :             limiter.state().limit(),
     235            1 :             4,
     236            1 :             "success: ignore when < half limit"
     237            1 :         );
     238            1 :     }
     239              : 
     240              :     #[tokio::test(start_paused = true)]
     241            1 :     async fn should_not_change_limit_when_no_outcome() {
     242            1 :         let config = RateLimiterConfig {
     243            1 :             initial_limit: 10,
     244            1 :             algorithm: RateLimitAlgorithm::Aimd {
     245            1 :                 conf: Aimd {
     246            1 :                     min: 1,
     247            1 :                     max: 1500,
     248            1 :                     inc: 10,
     249            1 :                     dec: 0.5,
     250            1 :                     utilisation: 0.5,
     251            1 :                 },
     252            1 :             },
     253            1 :         };
     254            1 : 
     255            1 :         let limiter = DynamicLimiter::new(config);
     256            1 : 
     257            1 :         let token = limiter
     258            1 :             .acquire_timeout(Duration::from_millis(1))
     259            1 :             .await
     260            1 :             .unwrap();
     261            1 :         drop(token);
     262            1 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     263            1 :     }
     264              : }
        

Generated by: LCOV version 2.1-beta