|             Line data    Source code 
       1              : use super::{LimitAlgorithm, Outcome, Sample};
       2              : 
       3              : /// Loss-based congestion avoidance.
       4              : ///
       5              : /// Additive-increase, multiplicative decrease.
       6              : ///
       7              : /// Adds available currency when:
       8              : /// 1. no load-based errors are observed, and
       9              : /// 2. the utilisation of the current limit is high.
      10              : ///
      11              : /// Reduces available concurrency by a factor when load-based errors are detected.
      12            0 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
      13              : pub(crate) struct Aimd {
      14              :     /// Minimum limit for AIMD algorithm.
      15              :     pub(crate) min: usize,
      16              :     /// Maximum limit for AIMD algorithm.
      17              :     pub(crate) max: usize,
      18              :     /// Decrease AIMD decrease by value in case of error.
      19              :     pub(crate) dec: f32,
      20              :     /// Increase AIMD increase by value in case of success.
      21              :     pub(crate) inc: usize,
      22              :     /// A threshold below which the limit won't be increased.
      23              :     pub(crate) utilisation: f32,
      24              : }
      25              : 
      26              : impl LimitAlgorithm for Aimd {
      27            8 :     fn update(&self, old_limit: usize, sample: Sample) -> usize {
      28            8 :         match sample.outcome {
      29              :             Outcome::Success => {
      30            5 :                 let utilisation = sample.in_flight as f32 / old_limit as f32;
      31              : 
      32            5 :                 if utilisation > self.utilisation {
      33            3 :                     let limit = old_limit + self.inc;
      34            3 :                     let new_limit = limit.clamp(self.min, self.max);
      35            3 :                     if new_limit > old_limit {
      36            3 :                         tracing::info!(old_limit, new_limit, "limit increased");
      37              :                     } else {
      38            0 :                         tracing::debug!(old_limit, new_limit, "limit clamped at max");
      39              :                     }
      40              : 
      41            3 :                     new_limit
      42              :                 } else {
      43            2 :                     old_limit
      44              :                 }
      45              :             }
      46              :             Outcome::Overload => {
      47            3 :                 let new_limit = old_limit as f32 * self.dec;
      48              : 
      49              :                 // Floor instead of round, so the limit reduces even with small numbers.
      50              :                 // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
      51            3 :                 let new_limit = new_limit.floor() as usize;
      52              : 
      53            3 :                 let new_limit = new_limit.clamp(self.min, self.max);
      54            3 :                 if new_limit < old_limit {
      55            2 :                     tracing::info!(old_limit, new_limit, "limit decreased");
      56              :                 } else {
      57            1 :                     tracing::debug!(old_limit, new_limit, "limit clamped at min");
      58              :                 }
      59            3 :                 new_limit
      60              :             }
      61              :         }
      62            8 :     }
      63              : }
      64              : 
      65              : #[cfg(test)]
      66              : mod tests {
      67              :     use std::time::Duration;
      68              : 
      69              :     use super::*;
      70              :     use crate::rate_limiter::limit_algorithm::{
      71              :         DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
      72              :     };
      73              : 
      74              :     #[tokio::test(start_paused = true)]
      75            1 :     async fn increase_decrease() {
      76            1 :         let config = RateLimiterConfig {
      77            1 :             initial_limit: 1,
      78            1 :             algorithm: RateLimitAlgorithm::Aimd {
      79            1 :                 conf: Aimd {
      80            1 :                     min: 1,
      81            1 :                     max: 2,
      82            1 :                     inc: 10,
      83            1 :                     dec: 0.5,
      84            1 :                     utilisation: 0.8,
      85            1 :                 },
      86            1 :             },
      87            1 :         };
      88              : 
      89            1 :         let limiter = DynamicLimiter::new(config);
      90              : 
      91            1 :         let token = limiter
      92            1 :             .acquire_timeout(Duration::from_millis(1))
      93            1 :             .await
      94            1 :             .unwrap();
      95            1 :         token.release(Outcome::Success);
      96              : 
      97            1 :         assert_eq!(limiter.state().limit(), 2);
      98              : 
      99            1 :         let token = limiter
     100            1 :             .acquire_timeout(Duration::from_millis(1))
     101            1 :             .await
     102            1 :             .unwrap();
     103            1 :         token.release(Outcome::Success);
     104            1 :         assert_eq!(limiter.state().limit(), 2);
     105              : 
     106            1 :         let token = limiter
     107            1 :             .acquire_timeout(Duration::from_millis(1))
     108            1 :             .await
     109            1 :             .unwrap();
     110            1 :         token.release(Outcome::Overload);
     111            1 :         assert_eq!(limiter.state().limit(), 1);
     112              : 
     113            1 :         let token = limiter
     114            1 :             .acquire_timeout(Duration::from_millis(1))
     115            1 :             .await
     116            1 :             .unwrap();
     117            1 :         token.release(Outcome::Overload);
     118            1 :         assert_eq!(limiter.state().limit(), 1);
     119            1 :     }
     120              : 
     121              :     #[tokio::test(start_paused = true)]
     122            1 :     async fn should_decrease_limit_on_overload() {
     123            1 :         let config = RateLimiterConfig {
     124            1 :             initial_limit: 10,
     125            1 :             algorithm: RateLimitAlgorithm::Aimd {
     126            1 :                 conf: Aimd {
     127            1 :                     min: 1,
     128            1 :                     max: 1500,
     129            1 :                     inc: 10,
     130            1 :                     dec: 0.5,
     131            1 :                     utilisation: 0.8,
     132            1 :                 },
     133            1 :             },
     134            1 :         };
     135              : 
     136            1 :         let limiter = DynamicLimiter::new(config);
     137              : 
     138            1 :         let token = limiter
     139            1 :             .acquire_timeout(Duration::from_millis(100))
     140            1 :             .await
     141            1 :             .unwrap();
     142            1 :         token.release(Outcome::Overload);
     143              : 
     144            1 :         assert_eq!(limiter.state().limit(), 5, "overload: decrease");
     145            1 :     }
     146              : 
     147              :     #[tokio::test(start_paused = true)]
     148            1 :     async fn acquire_timeout_times_out() {
     149            1 :         let config = RateLimiterConfig {
     150            1 :             initial_limit: 1,
     151            1 :             algorithm: RateLimitAlgorithm::Aimd {
     152            1 :                 conf: Aimd {
     153            1 :                     min: 1,
     154            1 :                     max: 2,
     155            1 :                     inc: 10,
     156            1 :                     dec: 0.5,
     157            1 :                     utilisation: 0.8,
     158            1 :                 },
     159            1 :             },
     160            1 :         };
     161              : 
     162            1 :         let limiter = DynamicLimiter::new(config);
     163              : 
     164            1 :         let token = limiter
     165            1 :             .acquire_timeout(Duration::from_millis(1))
     166            1 :             .await
     167            1 :             .unwrap();
     168            1 :         let now = tokio::time::Instant::now();
     169            1 :         limiter
     170            1 :             .acquire_timeout(Duration::from_secs(1))
     171            1 :             .await
     172            1 :             .err()
     173            1 :             .unwrap();
     174              : 
     175            1 :         assert!(now.elapsed() >= Duration::from_secs(1));
     176              : 
     177            1 :         token.release(Outcome::Success);
     178              : 
     179            1 :         assert_eq!(limiter.state().limit(), 2);
     180            1 :     }
     181              : 
     182              :     #[tokio::test(start_paused = true)]
     183            1 :     async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
     184            1 :         let config = RateLimiterConfig {
     185            1 :             initial_limit: 4,
     186            1 :             algorithm: RateLimitAlgorithm::Aimd {
     187            1 :                 conf: Aimd {
     188            1 :                     min: 1,
     189            1 :                     max: 1500,
     190            1 :                     inc: 1,
     191            1 :                     dec: 0.5,
     192            1 :                     utilisation: 0.5,
     193            1 :                 },
     194            1 :             },
     195            1 :         };
     196              : 
     197            1 :         let limiter = DynamicLimiter::new(config);
     198              : 
     199            1 :         let token = limiter
     200            1 :             .acquire_timeout(Duration::from_millis(1))
     201            1 :             .await
     202            1 :             .unwrap();
     203            1 :         let _token = limiter
     204            1 :             .acquire_timeout(Duration::from_millis(1))
     205            1 :             .await
     206            1 :             .unwrap();
     207            1 :         let _token = limiter
     208            1 :             .acquire_timeout(Duration::from_millis(1))
     209            1 :             .await
     210            1 :             .unwrap();
     211              : 
     212            1 :         token.release(Outcome::Success);
     213            1 :         assert_eq!(limiter.state().limit(), 5, "success: increase");
     214            1 :     }
     215              : 
     216              :     #[tokio::test(start_paused = true)]
     217            1 :     async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
     218            1 :         let config = RateLimiterConfig {
     219            1 :             initial_limit: 4,
     220            1 :             algorithm: RateLimitAlgorithm::Aimd {
     221            1 :                 conf: Aimd {
     222            1 :                     min: 1,
     223            1 :                     max: 1500,
     224            1 :                     inc: 10,
     225            1 :                     dec: 0.5,
     226            1 :                     utilisation: 0.5,
     227            1 :                 },
     228            1 :             },
     229            1 :         };
     230              : 
     231            1 :         let limiter = DynamicLimiter::new(config);
     232              : 
     233            1 :         let token = limiter
     234            1 :             .acquire_timeout(Duration::from_millis(1))
     235            1 :             .await
     236            1 :             .unwrap();
     237              : 
     238            1 :         token.release(Outcome::Success);
     239            1 :         assert_eq!(
     240            1 :             limiter.state().limit(),
     241            1 :             4,
     242            1 :             "success: ignore when < half limit"
     243            1 :         );
     244            1 :     }
     245              : 
     246              :     #[tokio::test(start_paused = true)]
     247            1 :     async fn should_not_change_limit_when_no_outcome() {
     248            1 :         let config = RateLimiterConfig {
     249            1 :             initial_limit: 10,
     250            1 :             algorithm: RateLimitAlgorithm::Aimd {
     251            1 :                 conf: Aimd {
     252            1 :                     min: 1,
     253            1 :                     max: 1500,
     254            1 :                     inc: 10,
     255            1 :                     dec: 0.5,
     256            1 :                     utilisation: 0.5,
     257            1 :                 },
     258            1 :             },
     259            1 :         };
     260              : 
     261            1 :         let limiter = DynamicLimiter::new(config);
     262              : 
     263            1 :         let token = limiter
     264            1 :             .acquire_timeout(Duration::from_millis(1))
     265            1 :             .await
     266            1 :             .unwrap();
     267            1 :         drop(token);
     268            1 :         assert_eq!(limiter.state().limit(), 10, "ignore");
     269            1 :     }
     270              : }
         |