LCOV - code coverage report
Current view: top level - proxy/src/rate_limiter/limit_algorithm - aimd.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 99.5 % 208 207
Test Date: 2024-09-20 13:14:58 Functions: 76.2 % 21 16

            Line data    Source code
       1              : use super::{LimitAlgorithm, Outcome, Sample};
       2              : 
       3              : /// Loss-based congestion avoidance.
       4              : ///
       5              : /// Additive-increase, multiplicative decrease.
       6              : ///
       7              : /// Adds available currency when:
       8              : /// 1. no load-based errors are observed, and
       9              : /// 2. the utilisation of the current limit is high.
      10              : ///
      11              : /// Reduces available concurrency by a factor when load-based errors are detected.
      12            6 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      13              : pub(crate) struct Aimd {
      14              :     /// Minimum limit for AIMD algorithm.
      15              :     pub(crate) min: usize,
      16              :     /// Maximum limit for AIMD algorithm.
      17              :     pub(crate) max: usize,
      18              :     /// Decrease AIMD decrease by value in case of error.
      19              :     pub(crate) dec: f32,
      20              :     /// Increase AIMD increase by value in case of success.
      21              :     pub(crate) inc: usize,
      22              :     /// A threshold below which the limit won't be increased.
      23              :     pub(crate) utilisation: f32,
      24              : }
      25              : 
      26              : impl LimitAlgorithm for Aimd {
      27            8 :     fn update(&self, old_limit: usize, sample: Sample) -> usize {
      28            8 :         match sample.outcome {
      29              :             Outcome::Success => {
      30            5 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      31            5 : 
      32            5 :                 if utilisation > self.utilisation {
      33            3 :                     let limit = old_limit + self.inc;
      34            3 :                     let increased_limit = limit.clamp(self.min, self.max);
      35            3 :                     if increased_limit > old_limit {
      36            3 :                         tracing::info!(increased_limit, "limit increased");
      37            0 :                     }
      38              : 
      39            3 :                     increased_limit
      40              :                 } else {
      41            2 :                     old_limit
      42              :                 }
      43              :             }
      44              :             Outcome::Overload => {
      45            3 :                 let limit = old_limit as f32 * self.dec;
      46            3 : 
      47            3 :                 // Floor instead of round, so the limit reduces even with small numbers.
      48            3 :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      49            3 :                 let limit = limit.floor() as usize;
      50            3 : 
      51            3 :                 let limit = limit.clamp(self.min, self.max);
      52            3 :                 tracing::info!(limit, "limit decreased");
      53            3 :                 limit
      54              :             }
      55              :         }
      56            8 :     }
      57              : }
      58              : 
      59              : #[cfg(test)]
      60              : mod tests {
      61              :     use std::time::Duration;
      62              : 
      63              :     use crate::rate_limiter::limit_algorithm::{
      64              :         DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
      65              :     };
      66              : 
      67              :     use super::*;
      68              : 
      69              :     #[tokio::test(start_paused = true)]
      70            1 :     async fn increase_decrease() {
      71            1 :         let config = RateLimiterConfig {
      72            1 :             initial_limit: 1,
      73            1 :             algorithm: RateLimitAlgorithm::Aimd {
      74            1 :                 conf: Aimd {
      75            1 :                     min: 1,
      76            1 :                     max: 2,
      77            1 :                     inc: 10,
      78            1 :                     dec: 0.5,
      79            1 :                     utilisation: 0.8,
      80            1 :                 },
      81            1 :             },
      82            1 :         };
      83            1 : 
      84            1 :         let limiter = DynamicLimiter::new(config);
      85            1 : 
      86            1 :         let token = limiter
      87            1 :             .acquire_timeout(Duration::from_millis(1))
      88            1 :             .await
      89            1 :             .unwrap();
      90            1 :         token.release(Outcome::Success);
      91            1 : 
      92            1 :         assert_eq!(limiter.state().limit(), 2);
      93            1 : 
      94            1 :         let token = limiter
      95            1 :             .acquire_timeout(Duration::from_millis(1))
      96            1 :             .await
      97            1 :             .unwrap();
      98            1 :         token.release(Outcome::Success);
      99            1 :         assert_eq!(limiter.state().limit(), 2);
     100            1 : 
     101            1 :         let token = limiter
     102            1 :             .acquire_timeout(Duration::from_millis(1))
     103            1 :             .await
     104            1 :             .unwrap();
     105            1 :         token.release(Outcome::Overload);
     106            1 :         assert_eq!(limiter.state().limit(), 1);
     107            1 : 
     108            1 :         let token = limiter
     109            1 :             .acquire_timeout(Duration::from_millis(1))
     110            1 :             .await
     111            1 :             .unwrap();
     112            1 :         token.release(Outcome::Overload);
     113            1 :         assert_eq!(limiter.state().limit(), 1);
     114            1 :     }
     115              : 
     116              :     #[tokio::test(start_paused = true)]
     117            1 :     async fn should_decrease_limit_on_overload() {
     118            1 :         let config = RateLimiterConfig {
     119            1 :             initial_limit: 10,
     120            1 :             algorithm: RateLimitAlgorithm::Aimd {
     121            1 :                 conf: Aimd {
     122            1 :                     min: 1,
     123            1 :                     max: 1500,
     124            1 :                     inc: 10,
     125            1 :                     dec: 0.5,
     126            1 :                     utilisation: 0.8,
     127            1 :                 },
     128            1 :             },
     129            1 :         };
     130            1 : 
     131            1 :         let limiter = DynamicLimiter::new(config);
     132            1 : 
     133            1 :         let token = limiter
     134            1 :             .acquire_timeout(Duration::from_millis(100))
     135            1 :             .await
     136            1 :             .unwrap();
     137            1 :         token.release(Outcome::Overload);
     138            1 : 
     139            1 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
     140            1 :     }
     141              : 
     142              :     #[tokio::test(start_paused = true)]
     143            1 :     async fn acquire_timeout_times_out() {
     144            1 :         let config = RateLimiterConfig {
     145            1 :             initial_limit: 1,
     146            1 :             algorithm: RateLimitAlgorithm::Aimd {
     147            1 :                 conf: Aimd {
     148            1 :                     min: 1,
     149            1 :                     max: 2,
     150            1 :                     inc: 10,
     151            1 :                     dec: 0.5,
     152            1 :                     utilisation: 0.8,
     153            1 :                 },
     154            1 :             },
     155            1 :         };
     156            1 : 
     157            1 :         let limiter = DynamicLimiter::new(config);
     158            1 : 
     159            1 :         let token = limiter
     160            1 :             .acquire_timeout(Duration::from_millis(1))
     161            1 :             .await
     162            1 :             .unwrap();
     163            1 :         let now = tokio::time::Instant::now();
     164            1 :         limiter
     165            1 :             .acquire_timeout(Duration::from_secs(1))
     166            1 :             .await
     167            1 :             .err()
     168            1 :             .unwrap();
     169            1 : 
     170            1 :         assert!(now.elapsed() >= Duration::from_secs(1));
     171            1 : 
     172            1 :         token.release(Outcome::Success);
     173            1 : 
     174            1 :         assert_eq!(limiter.state().limit(), 2);
     175            1 :     }
     176              : 
     177              :     #[tokio::test(start_paused = true)]
     178            1 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     179            1 :         let config = RateLimiterConfig {
     180            1 :             initial_limit: 4,
     181            1 :             algorithm: RateLimitAlgorithm::Aimd {
     182            1 :                 conf: Aimd {
     183            1 :                     min: 1,
     184            1 :                     max: 1500,
     185            1 :                     inc: 1,
     186            1 :                     dec: 0.5,
     187            1 :                     utilisation: 0.5,
     188            1 :                 },
     189            1 :             },
     190            1 :         };
     191            1 : 
     192            1 :         let limiter = DynamicLimiter::new(config);
     193            1 : 
     194            1 :         let token = limiter
     195            1 :             .acquire_timeout(Duration::from_millis(1))
     196            1 :             .await
     197            1 :             .unwrap();
     198            1 :         let _token = limiter
     199            1 :             .acquire_timeout(Duration::from_millis(1))
     200            1 :             .await
     201            1 :             .unwrap();
     202            1 :         let _token = limiter
     203            1 :             .acquire_timeout(Duration::from_millis(1))
     204            1 :             .await
     205            1 :             .unwrap();
     206            1 : 
     207            1 :         token.release(Outcome::Success);
     208            1 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     209            1 :     }
     210              : 
     211              :     #[tokio::test(start_paused = true)]
     212            1 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     213            1 :         let config = RateLimiterConfig {
     214            1 :             initial_limit: 4,
     215            1 :             algorithm: RateLimitAlgorithm::Aimd {
     216            1 :                 conf: Aimd {
     217            1 :                     min: 1,
     218            1 :                     max: 1500,
     219            1 :                     inc: 10,
     220            1 :                     dec: 0.5,
     221            1 :                     utilisation: 0.5,
     222            1 :                 },
     223            1 :             },
     224            1 :         };
     225            1 : 
     226            1 :         let limiter = DynamicLimiter::new(config);
     227            1 : 
     228            1 :         let token = limiter
     229            1 :             .acquire_timeout(Duration::from_millis(1))
     230            1 :             .await
     231            1 :             .unwrap();
     232            1 : 
     233            1 :         token.release(Outcome::Success);
     234            1 :         assert_eq!(
     235            1 :             limiter.state().limit(),
     236            1 :             4,
     237            1 :             "success: ignore when < half limit"
     238            1 :         );
     239            1 :     }
     240              : 
     241              :     #[tokio::test(start_paused = true)]
     242            1 :     async fn should_not_change_limit_when_no_outcome() {
     243            1 :         let config = RateLimiterConfig {
     244            1 :             initial_limit: 10,
     245            1 :             algorithm: RateLimitAlgorithm::Aimd {
     246            1 :                 conf: Aimd {
     247            1 :                     min: 1,
     248            1 :                     max: 1500,
     249            1 :                     inc: 10,
     250            1 :                     dec: 0.5,
     251            1 :                     utilisation: 0.5,
     252            1 :                 },
     253            1 :             },
     254            1 :         };
     255            1 : 
     256            1 :         let limiter = DynamicLimiter::new(config);
     257            1 : 
     258            1 :         let token = limiter
     259            1 :             .acquire_timeout(Duration::from_millis(1))
     260            1 :             .await
     261            1 :             .unwrap();
     262            1 :         drop(token);
     263            1 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     264            1 :     }
     265              : }
        

Generated by: LCOV version 2.1-beta