Line data Source code
1 : use super::{LimitAlgorithm, Outcome, Sample};
2 :
3 : /// Loss-based congestion avoidance.
4 : ///
5 : /// Additive-increase, multiplicative decrease.
6 : ///
7 : /// Adds available currency when:
8 : /// 1. no load-based errors are observed, and
9 : /// 2. the utilisation of the current limit is high.
10 : ///
11 : /// Reduces available concurrency by a factor when load-based errors are detected.
12 5 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
13 : pub(crate) struct Aimd {
14 : /// Minimum limit for AIMD algorithm.
15 : pub(crate) min: usize,
16 : /// Maximum limit for AIMD algorithm.
17 : pub(crate) max: usize,
18 : /// Decrease AIMD decrease by value in case of error.
19 : pub(crate) dec: f32,
20 : /// Increase AIMD increase by value in case of success.
21 : pub(crate) inc: usize,
22 : /// A threshold below which the limit won't be increased.
23 : pub(crate) utilisation: f32,
24 : }
25 :
26 : impl LimitAlgorithm for Aimd {
27 8 : fn update(&self, old_limit: usize, sample: Sample) -> usize {
28 8 : match sample.outcome {
29 : Outcome::Success => {
30 5 : let utilisation = sample.in_flight as f32 / old_limit as f32;
31 5 :
32 5 : if utilisation > self.utilisation {
33 3 : let limit = old_limit + self.inc;
34 3 : let new_limit = limit.clamp(self.min, self.max);
35 3 : if new_limit > old_limit {
36 3 : tracing::info!(old_limit, new_limit, "limit increased");
37 : } else {
38 0 : tracing::debug!(old_limit, new_limit, "limit clamped at max");
39 : }
40 :
41 3 : new_limit
42 : } else {
43 2 : old_limit
44 : }
45 : }
46 : Outcome::Overload => {
47 3 : let new_limit = old_limit as f32 * self.dec;
48 3 :
49 3 : // Floor instead of round, so the limit reduces even with small numbers.
50 3 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
51 3 : let new_limit = new_limit.floor() as usize;
52 3 :
53 3 : let new_limit = new_limit.clamp(self.min, self.max);
54 3 : if new_limit < old_limit {
55 2 : tracing::info!(old_limit, new_limit, "limit decreased");
56 : } else {
57 1 : tracing::debug!(old_limit, new_limit, "limit clamped at min");
58 : }
59 3 : new_limit
60 : }
61 : }
62 8 : }
63 : }
64 :
65 : #[cfg(test)]
66 : #[expect(clippy::unwrap_used)]
67 : mod tests {
68 : use std::time::Duration;
69 :
70 : use super::*;
71 : use crate::rate_limiter::limit_algorithm::{
72 : DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
73 : };
74 :
75 : #[tokio::test(start_paused = true)]
76 1 : async fn increase_decrease() {
77 1 : let config = RateLimiterConfig {
78 1 : initial_limit: 1,
79 1 : algorithm: RateLimitAlgorithm::Aimd {
80 1 : conf: Aimd {
81 1 : min: 1,
82 1 : max: 2,
83 1 : inc: 10,
84 1 : dec: 0.5,
85 1 : utilisation: 0.8,
86 1 : },
87 1 : },
88 1 : };
89 1 :
90 1 : let limiter = DynamicLimiter::new(config);
91 1 :
92 1 : let token = limiter
93 1 : .acquire_timeout(Duration::from_millis(1))
94 1 : .await
95 1 : .unwrap();
96 1 : token.release(Outcome::Success);
97 1 :
98 1 : assert_eq!(limiter.state().limit(), 2);
99 1 :
100 1 : let token = limiter
101 1 : .acquire_timeout(Duration::from_millis(1))
102 1 : .await
103 1 : .unwrap();
104 1 : token.release(Outcome::Success);
105 1 : assert_eq!(limiter.state().limit(), 2);
106 1 :
107 1 : let token = limiter
108 1 : .acquire_timeout(Duration::from_millis(1))
109 1 : .await
110 1 : .unwrap();
111 1 : token.release(Outcome::Overload);
112 1 : assert_eq!(limiter.state().limit(), 1);
113 1 :
114 1 : let token = limiter
115 1 : .acquire_timeout(Duration::from_millis(1))
116 1 : .await
117 1 : .unwrap();
118 1 : token.release(Outcome::Overload);
119 1 : assert_eq!(limiter.state().limit(), 1);
120 1 : }
121 :
122 : #[tokio::test(start_paused = true)]
123 1 : async fn should_decrease_limit_on_overload() {
124 1 : let config = RateLimiterConfig {
125 1 : initial_limit: 10,
126 1 : algorithm: RateLimitAlgorithm::Aimd {
127 1 : conf: Aimd {
128 1 : min: 1,
129 1 : max: 1500,
130 1 : inc: 10,
131 1 : dec: 0.5,
132 1 : utilisation: 0.8,
133 1 : },
134 1 : },
135 1 : };
136 1 :
137 1 : let limiter = DynamicLimiter::new(config);
138 1 :
139 1 : let token = limiter
140 1 : .acquire_timeout(Duration::from_millis(100))
141 1 : .await
142 1 : .unwrap();
143 1 : token.release(Outcome::Overload);
144 1 :
145 1 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
146 1 : }
147 :
148 : #[tokio::test(start_paused = true)]
149 1 : async fn acquire_timeout_times_out() {
150 1 : let config = RateLimiterConfig {
151 1 : initial_limit: 1,
152 1 : algorithm: RateLimitAlgorithm::Aimd {
153 1 : conf: Aimd {
154 1 : min: 1,
155 1 : max: 2,
156 1 : inc: 10,
157 1 : dec: 0.5,
158 1 : utilisation: 0.8,
159 1 : },
160 1 : },
161 1 : };
162 1 :
163 1 : let limiter = DynamicLimiter::new(config);
164 1 :
165 1 : let token = limiter
166 1 : .acquire_timeout(Duration::from_millis(1))
167 1 : .await
168 1 : .unwrap();
169 1 : let now = tokio::time::Instant::now();
170 1 : limiter
171 1 : .acquire_timeout(Duration::from_secs(1))
172 1 : .await
173 1 : .err()
174 1 : .unwrap();
175 1 :
176 1 : assert!(now.elapsed() >= Duration::from_secs(1));
177 1 :
178 1 : token.release(Outcome::Success);
179 1 :
180 1 : assert_eq!(limiter.state().limit(), 2);
181 1 : }
182 :
183 : #[tokio::test(start_paused = true)]
184 1 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
185 1 : let config = RateLimiterConfig {
186 1 : initial_limit: 4,
187 1 : algorithm: RateLimitAlgorithm::Aimd {
188 1 : conf: Aimd {
189 1 : min: 1,
190 1 : max: 1500,
191 1 : inc: 1,
192 1 : dec: 0.5,
193 1 : utilisation: 0.5,
194 1 : },
195 1 : },
196 1 : };
197 1 :
198 1 : let limiter = DynamicLimiter::new(config);
199 1 :
200 1 : let token = limiter
201 1 : .acquire_timeout(Duration::from_millis(1))
202 1 : .await
203 1 : .unwrap();
204 1 : let _token = limiter
205 1 : .acquire_timeout(Duration::from_millis(1))
206 1 : .await
207 1 : .unwrap();
208 1 : let _token = limiter
209 1 : .acquire_timeout(Duration::from_millis(1))
210 1 : .await
211 1 : .unwrap();
212 1 :
213 1 : token.release(Outcome::Success);
214 1 : assert_eq!(limiter.state().limit(), 5, "success: increase");
215 1 : }
216 :
217 : #[tokio::test(start_paused = true)]
218 1 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
219 1 : let config = RateLimiterConfig {
220 1 : initial_limit: 4,
221 1 : algorithm: RateLimitAlgorithm::Aimd {
222 1 : conf: Aimd {
223 1 : min: 1,
224 1 : max: 1500,
225 1 : inc: 10,
226 1 : dec: 0.5,
227 1 : utilisation: 0.5,
228 1 : },
229 1 : },
230 1 : };
231 1 :
232 1 : let limiter = DynamicLimiter::new(config);
233 1 :
234 1 : let token = limiter
235 1 : .acquire_timeout(Duration::from_millis(1))
236 1 : .await
237 1 : .unwrap();
238 1 :
239 1 : token.release(Outcome::Success);
240 1 : assert_eq!(
241 1 : limiter.state().limit(),
242 1 : 4,
243 1 : "success: ignore when < half limit"
244 1 : );
245 1 : }
246 :
247 : #[tokio::test(start_paused = true)]
248 1 : async fn should_not_change_limit_when_no_outcome() {
249 1 : let config = RateLimiterConfig {
250 1 : initial_limit: 10,
251 1 : algorithm: RateLimitAlgorithm::Aimd {
252 1 : conf: Aimd {
253 1 : min: 1,
254 1 : max: 1500,
255 1 : inc: 10,
256 1 : dec: 0.5,
257 1 : utilisation: 0.5,
258 1 : },
259 1 : },
260 1 : };
261 1 :
262 1 : let limiter = DynamicLimiter::new(config);
263 1 :
264 1 : let token = limiter
265 1 : .acquire_timeout(Duration::from_millis(1))
266 1 : .await
267 1 : .unwrap();
268 1 : drop(token);
269 1 : assert_eq!(limiter.state().limit(), 10, "ignore");
270 1 : }
271 : }
|