Line data Source code
1 : use std::usize;
2 :
3 : use async_trait::async_trait;
4 :
5 : use super::limit_algorithm::{AimdConfig, LimitAlgorithm, Sample};
6 :
7 : use super::limiter::Outcome;
8 :
9 : /// Loss-based congestion avoidance.
10 : ///
11 : /// Additive-increase, multiplicative decrease.
12 : ///
13 : /// Adds available currency when:
14 : /// 1. no load-based errors are observed, and
15 : /// 2. the utilisation of the current limit is high.
16 : ///
17 : /// Reduces available concurrency by a factor when load-based errors are detected.
18 : pub struct Aimd {
19 : min_limit: usize,
20 : max_limit: usize,
21 : decrease_factor: f32,
22 : increase_by: usize,
23 : min_utilisation_threshold: f32,
24 : }
25 :
26 : impl Aimd {
27 11 : pub fn new(config: AimdConfig) -> Self {
28 11 : Self {
29 11 : min_limit: config.aimd_min_limit,
30 11 : max_limit: config.aimd_max_limit,
31 11 : decrease_factor: config.aimd_decrease_factor,
32 11 : increase_by: config.aimd_increase_by,
33 11 : min_utilisation_threshold: config.aimd_min_utilisation_threshold,
34 11 : }
35 11 : }
36 : }
37 :
38 : #[async_trait]
39 : impl LimitAlgorithm for Aimd {
40 13 : async fn update(&mut self, old_limit: usize, sample: Sample) -> usize {
41 : use Outcome::*;
42 13 : match sample.outcome {
43 : Success => {
44 9 : let utilisation = sample.in_flight as f32 / old_limit as f32;
45 9 :
46 9 : if utilisation > self.min_utilisation_threshold {
47 3 : let limit = old_limit + self.increase_by;
48 3 : limit.clamp(self.min_limit, self.max_limit)
49 : } else {
50 6 : old_limit
51 : }
52 : }
53 : Overload => {
54 4 : let limit = old_limit as f32 * self.decrease_factor;
55 4 :
56 4 : // Floor instead of round, so the limit reduces even with small numbers.
57 4 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
58 4 : let limit = limit.floor() as usize;
59 4 :
60 4 : limit.clamp(self.min_limit, self.max_limit)
61 : }
62 : }
63 26 : }
64 : }
65 :
66 : #[cfg(test)]
67 : mod tests {
68 : use std::sync::Arc;
69 :
70 : use tokio::sync::Notify;
71 :
72 : use super::*;
73 :
74 : use crate::rate_limiter::{Limiter, RateLimiterConfig};
75 :
76 2 : #[tokio::test]
77 2 : async fn should_decrease_limit_on_overload() {
78 2 : let config = RateLimiterConfig {
79 2 : initial_limit: 10,
80 2 : aimd_config: Some(AimdConfig {
81 2 : aimd_decrease_factor: 0.5,
82 2 : ..Default::default()
83 2 : }),
84 2 : disable: false,
85 2 : ..Default::default()
86 2 : };
87 2 :
88 2 : let release_notifier = Arc::new(Notify::new());
89 2 :
90 2 : let limiter = Limiter::new(config).with_release_notifier(release_notifier.clone());
91 2 :
92 2 : let token = limiter.try_acquire().unwrap();
93 2 : limiter.release(token, Some(Outcome::Overload)).await;
94 2 : release_notifier.notified().await;
95 2 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
96 : }
97 :
98 2 : #[tokio::test]
99 2 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
100 2 : let config = RateLimiterConfig {
101 2 : initial_limit: 4,
102 2 : aimd_config: Some(AimdConfig {
103 2 : aimd_decrease_factor: 0.5,
104 2 : aimd_min_utilisation_threshold: 0.5,
105 2 : aimd_increase_by: 1,
106 2 : ..Default::default()
107 2 : }),
108 2 : disable: false,
109 2 : ..Default::default()
110 2 : };
111 2 :
112 2 : let limiter = Limiter::new(config);
113 2 :
114 2 : let token = limiter.try_acquire().unwrap();
115 2 : let _token = limiter.try_acquire().unwrap();
116 2 : let _token = limiter.try_acquire().unwrap();
117 2 :
118 2 : limiter.release(token, Some(Outcome::Success)).await;
119 2 : assert_eq!(limiter.state().limit(), 5, "success: increase");
120 : }
121 :
122 2 : #[tokio::test]
123 2 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
124 2 : let config = RateLimiterConfig {
125 2 : initial_limit: 4,
126 2 : aimd_config: Some(AimdConfig {
127 2 : aimd_decrease_factor: 0.5,
128 2 : aimd_min_utilisation_threshold: 0.5,
129 2 : ..Default::default()
130 2 : }),
131 2 : disable: false,
132 2 : ..Default::default()
133 2 : };
134 2 :
135 2 : let limiter = Limiter::new(config);
136 2 :
137 2 : let token = limiter.try_acquire().unwrap();
138 2 :
139 2 : limiter.release(token, Some(Outcome::Success)).await;
140 2 : assert_eq!(
141 2 : limiter.state().limit(),
142 : 4,
143 0 : "success: ignore when < half limit"
144 : );
145 : }
146 :
147 2 : #[tokio::test]
148 2 : async fn should_not_change_limit_when_no_outcome() {
149 2 : let config = RateLimiterConfig {
150 2 : initial_limit: 10,
151 2 : aimd_config: Some(AimdConfig {
152 2 : aimd_decrease_factor: 0.5,
153 2 : aimd_min_utilisation_threshold: 0.5,
154 2 : ..Default::default()
155 2 : }),
156 2 : disable: false,
157 2 : ..Default::default()
158 2 : };
159 2 :
160 2 : let limiter = Limiter::new(config);
161 2 :
162 2 : let token = limiter.try_acquire().unwrap();
163 2 : limiter.release(token, None).await;
164 2 : assert_eq!(limiter.state().limit(), 10, "ignore");
165 : }
166 : }
|