LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter/limit_algorithm - aimd.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 99.5 % 210 209
Test Date: 2025-02-20 13:11:02 Functions: 77.8 % 18 14

            Line data    Source code
       1              : use super::{LimitAlgorithm, Outcome, Sample};
       2              : 
       3              : /// Loss-based congestion avoidance.
       4              : ///
       5              : /// Additive-increase, multiplicative decrease.
       6              : ///
       7              : /// Adds available currency when:
       8              : /// 1. no load-based errors are observed, and
       9              : /// 2. the utilisation of the current limit is high.
      10              : ///
      11              : /// Reduces available concurrency by a factor when load-based errors are detected.
      12            5 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      13              : pub(crate) struct Aimd {
      14              :     /// Minimum limit for AIMD algorithm.
      15              :     pub(crate) min: usize,
      16              :     /// Maximum limit for AIMD algorithm.
      17              :     pub(crate) max: usize,
      18              :     /// Decrease AIMD decrease by value in case of error.
      19              :     pub(crate) dec: f32,
      20              :     /// Increase AIMD increase by value in case of success.
      21              :     pub(crate) inc: usize,
      22              :     /// A threshold below which the limit won't be increased.
      23              :     pub(crate) utilisation: f32,
      24              : }
      25              : 
      26              : impl LimitAlgorithm for Aimd {
      27            8 :     fn update(&self, old_limit: usize, sample: Sample) -> usize {
      28            8 :         match sample.outcome {
      29              :             Outcome::Success => {
      30            5 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      31            5 : 
      32            5 :                 if utilisation > self.utilisation {
      33            3 :                     let limit = old_limit + self.inc;
      34            3 :                     let new_limit = limit.clamp(self.min, self.max);
      35            3 :                     if new_limit > old_limit {
      36            3 :                         tracing::info!(old_limit, new_limit, "limit increased");
      37              :                     } else {
      38            0 :                         tracing::debug!(old_limit, new_limit, "limit clamped at max");
      39              :                     }
      40              : 
      41            3 :                     new_limit
      42              :                 } else {
      43            2 :                     old_limit
      44              :                 }
      45              :             }
      46              :             Outcome::Overload => {
      47            3 :                 let new_limit = old_limit as f32 * self.dec;
      48            3 : 
      49            3 :                 // Floor instead of round, so the limit reduces even with small numbers.
      50            3 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      51            3 :                 let new_limit = new_limit.floor() as usize;
      52            3 : 
      53            3 :                 let new_limit = new_limit.clamp(self.min, self.max);
      54            3 :                 if new_limit < old_limit {
      55            2 :                     tracing::info!(old_limit, new_limit, "limit decreased");
      56              :                 } else {
      57            1 :                     tracing::debug!(old_limit, new_limit, "limit clamped at min");
      58              :                 }
      59            3 :                 new_limit
      60              :             }
      61              :         }
      62            8 :     }
      63              : }
      64              : 
      65              : #[cfg(test)]
      66              : #[expect(clippy::unwrap_used)]
      67              : mod tests {
      68              :     use std::time::Duration;
      69              : 
      70              :     use super::*;
      71              :     use crate::rate_limiter::limit_algorithm::{
      72              :         DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
      73              :     };
      74              : 
      75              :     #[tokio::test(start_paused = true)]
      76            1 :     async fn increase_decrease() {
      77            1 :         let config = RateLimiterConfig {
      78            1 :             initial_limit: 1,
      79            1 :             algorithm: RateLimitAlgorithm::Aimd {
      80            1 :                 conf: Aimd {
      81            1 :                     min: 1,
      82            1 :                     max: 2,
      83            1 :                     inc: 10,
      84            1 :                     dec: 0.5,
      85            1 :                     utilisation: 0.8,
      86            1 :                 },
      87            1 :             },
      88            1 :         };
      89            1 : 
      90            1 :         let limiter = DynamicLimiter::new(config);
      91            1 : 
      92            1 :         let token = limiter
      93            1 :             .acquire_timeout(Duration::from_millis(1))
      94            1 :             .await
      95            1 :             .unwrap();
      96            1 :         token.release(Outcome::Success);
      97            1 : 
      98            1 :         assert_eq!(limiter.state().limit(), 2);
      99            1 : 
     100            1 :         let token = limiter
     101            1 :             .acquire_timeout(Duration::from_millis(1))
     102            1 :             .await
     103            1 :             .unwrap();
     104            1 :         token.release(Outcome::Success);
     105            1 :         assert_eq!(limiter.state().limit(), 2);
     106            1 : 
     107            1 :         let token = limiter
     108            1 :             .acquire_timeout(Duration::from_millis(1))
     109            1 :             .await
     110            1 :             .unwrap();
     111            1 :         token.release(Outcome::Overload);
     112            1 :         assert_eq!(limiter.state().limit(), 1);
     113            1 : 
     114            1 :         let token = limiter
     115            1 :             .acquire_timeout(Duration::from_millis(1))
     116            1 :             .await
     117            1 :             .unwrap();
     118            1 :         token.release(Outcome::Overload);
     119            1 :         assert_eq!(limiter.state().limit(), 1);
     120            1 :     }
     121              : 
     122              :     #[tokio::test(start_paused = true)]
     123            1 :     async fn should_decrease_limit_on_overload() {
     124            1 :         let config = RateLimiterConfig {
     125            1 :             initial_limit: 10,
     126            1 :             algorithm: RateLimitAlgorithm::Aimd {
     127            1 :                 conf: Aimd {
     128            1 :                     min: 1,
     129            1 :                     max: 1500,
     130            1 :                     inc: 10,
     131            1 :                     dec: 0.5,
     132            1 :                     utilisation: 0.8,
     133            1 :                 },
     134            1 :             },
     135            1 :         };
     136            1 : 
     137            1 :         let limiter = DynamicLimiter::new(config);
     138            1 : 
     139            1 :         let token = limiter
     140            1 :             .acquire_timeout(Duration::from_millis(100))
     141            1 :             .await
     142            1 :             .unwrap();
     143            1 :         token.release(Outcome::Overload);
     144            1 : 
     145            1 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
     146            1 :     }
     147              : 
     148              :     #[tokio::test(start_paused = true)]
     149            1 :     async fn acquire_timeout_times_out() {
     150            1 :         let config = RateLimiterConfig {
     151            1 :             initial_limit: 1,
     152            1 :             algorithm: RateLimitAlgorithm::Aimd {
     153            1 :                 conf: Aimd {
     154            1 :                     min: 1,
     155            1 :                     max: 2,
     156            1 :                     inc: 10,
     157            1 :                     dec: 0.5,
     158            1 :                     utilisation: 0.8,
     159            1 :                 },
     160            1 :             },
     161            1 :         };
     162            1 : 
     163            1 :         let limiter = DynamicLimiter::new(config);
     164            1 : 
     165            1 :         let token = limiter
     166            1 :             .acquire_timeout(Duration::from_millis(1))
     167            1 :             .await
     168            1 :             .unwrap();
     169            1 :         let now = tokio::time::Instant::now();
     170            1 :         limiter
     171            1 :             .acquire_timeout(Duration::from_secs(1))
     172            1 :             .await
     173            1 :             .err()
     174            1 :             .unwrap();
     175            1 : 
     176            1 :         assert!(now.elapsed() >= Duration::from_secs(1));
     177            1 : 
     178            1 :         token.release(Outcome::Success);
     179            1 : 
     180            1 :         assert_eq!(limiter.state().limit(), 2);
     181            1 :     }
     182              : 
     183              :     #[tokio::test(start_paused = true)]
     184            1 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     185            1 :         let config = RateLimiterConfig {
     186            1 :             initial_limit: 4,
     187            1 :             algorithm: RateLimitAlgorithm::Aimd {
     188            1 :                 conf: Aimd {
     189            1 :                     min: 1,
     190            1 :                     max: 1500,
     191            1 :                     inc: 1,
     192            1 :                     dec: 0.5,
     193            1 :                     utilisation: 0.5,
     194            1 :                 },
     195            1 :             },
     196            1 :         };
     197            1 : 
     198            1 :         let limiter = DynamicLimiter::new(config);
     199            1 : 
     200            1 :         let token = limiter
     201            1 :             .acquire_timeout(Duration::from_millis(1))
     202            1 :             .await
     203            1 :             .unwrap();
     204            1 :         let _token = limiter
     205            1 :             .acquire_timeout(Duration::from_millis(1))
     206            1 :             .await
     207            1 :             .unwrap();
     208            1 :         let _token = limiter
     209            1 :             .acquire_timeout(Duration::from_millis(1))
     210            1 :             .await
     211            1 :             .unwrap();
     212            1 : 
     213            1 :         token.release(Outcome::Success);
     214            1 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     215            1 :     }
     216              : 
     217              :     #[tokio::test(start_paused = true)]
     218            1 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     219            1 :         let config = RateLimiterConfig {
     220            1 :             initial_limit: 4,
     221            1 :             algorithm: RateLimitAlgorithm::Aimd {
     222            1 :                 conf: Aimd {
     223            1 :                     min: 1,
     224            1 :                     max: 1500,
     225            1 :                     inc: 10,
     226            1 :                     dec: 0.5,
     227            1 :                     utilisation: 0.5,
     228            1 :                 },
     229            1 :             },
     230            1 :         };
     231            1 : 
     232            1 :         let limiter = DynamicLimiter::new(config);
     233            1 : 
     234            1 :         let token = limiter
     235            1 :             .acquire_timeout(Duration::from_millis(1))
     236            1 :             .await
     237            1 :             .unwrap();
     238            1 : 
     239            1 :         token.release(Outcome::Success);
     240            1 :         assert_eq!(
     241            1 :             limiter.state().limit(),
     242            1 :             4,
     243            1 :             "success: ignore when < half limit"
     244            1 :         );
     245            1 :     }
     246              : 
     247              :     #[tokio::test(start_paused = true)]
     248            1 :     async fn should_not_change_limit_when_no_outcome() {
     249            1 :         let config = RateLimiterConfig {
     250            1 :             initial_limit: 10,
     251            1 :             algorithm: RateLimitAlgorithm::Aimd {
     252            1 :                 conf: Aimd {
     253            1 :                     min: 1,
     254            1 :                     max: 1500,
     255            1 :                     inc: 10,
     256            1 :                     dec: 0.5,
     257            1 :                     utilisation: 0.5,
     258            1 :                 },
     259            1 :             },
     260            1 :         };
     261            1 : 
     262            1 :         let limiter = DynamicLimiter::new(config);
     263            1 : 
     264            1 :         let token = limiter
     265            1 :             .acquire_timeout(Duration::from_millis(1))
     266            1 :             .await
     267            1 :             .unwrap();
     268            1 :         drop(token);
     269            1 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     270            1 :     }
     271              : }
        

Generated by: LCOV version 2.1-beta