Line data Source code
1 : use super::{LimitAlgorithm, Outcome, Sample};
2 :
3 : /// Loss-based congestion avoidance.
4 : ///
5 : /// Additive-increase, multiplicative decrease.
6 : ///
7 : /// Adds available currency when:
8 : /// 1. no load-based errors are observed, and
9 : /// 2. the utilisation of the current limit is high.
10 : ///
11 : /// Reduces available concurrency by a factor when load-based errors are detected.
12 6 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
13 : pub(crate) struct Aimd {
14 : /// Minimum limit for AIMD algorithm.
15 : pub(crate) min: usize,
16 : /// Maximum limit for AIMD algorithm.
17 : pub(crate) max: usize,
18 : /// Decrease AIMD decrease by value in case of error.
19 : pub(crate) dec: f32,
20 : /// Increase AIMD increase by value in case of success.
21 : pub(crate) inc: usize,
22 : /// A threshold below which the limit won't be increased.
23 : pub(crate) utilisation: f32,
24 : }
25 :
26 : impl LimitAlgorithm for Aimd {
27 8 : fn update(&self, old_limit: usize, sample: Sample) -> usize {
28 8 : match sample.outcome {
29 : Outcome::Success => {
30 5 : let utilisation = sample.in_flight as f32 / old_limit as f32;
31 5 :
32 5 : if utilisation > self.utilisation {
33 3 : let limit = old_limit + self.inc;
34 3 : let new_limit = limit.clamp(self.min, self.max);
35 3 : if new_limit > old_limit {
36 3 : tracing::info!(old_limit, new_limit, "limit increased");
37 : } else {
38 0 : tracing::debug!(old_limit, new_limit, "limit clamped at max");
39 : }
40 :
41 3 : new_limit
42 : } else {
43 2 : old_limit
44 : }
45 : }
46 : Outcome::Overload => {
47 3 : let new_limit = old_limit as f32 * self.dec;
48 3 :
49 3 : // Floor instead of round, so the limit reduces even with small numbers.
50 3 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
51 3 : let new_limit = new_limit.floor() as usize;
52 3 :
53 3 : let new_limit = new_limit.clamp(self.min, self.max);
54 3 : if new_limit < old_limit {
55 2 : tracing::info!(old_limit, new_limit, "limit decreased");
56 : } else {
57 1 : tracing::debug!(old_limit, new_limit, "limit clamped at min");
58 : }
59 3 : new_limit
60 : }
61 : }
62 8 : }
63 : }
64 :
65 : #[cfg(test)]
66 : mod tests {
67 : use std::time::Duration;
68 :
69 : use super::*;
70 : use crate::rate_limiter::limit_algorithm::{
71 : DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
72 : };
73 :
74 : #[tokio::test(start_paused = true)]
75 1 : async fn increase_decrease() {
76 1 : let config = RateLimiterConfig {
77 1 : initial_limit: 1,
78 1 : algorithm: RateLimitAlgorithm::Aimd {
79 1 : conf: Aimd {
80 1 : min: 1,
81 1 : max: 2,
82 1 : inc: 10,
83 1 : dec: 0.5,
84 1 : utilisation: 0.8,
85 1 : },
86 1 : },
87 1 : };
88 1 :
89 1 : let limiter = DynamicLimiter::new(config);
90 1 :
91 1 : let token = limiter
92 1 : .acquire_timeout(Duration::from_millis(1))
93 1 : .await
94 1 : .unwrap();
95 1 : token.release(Outcome::Success);
96 1 :
97 1 : assert_eq!(limiter.state().limit(), 2);
98 1 :
99 1 : let token = limiter
100 1 : .acquire_timeout(Duration::from_millis(1))
101 1 : .await
102 1 : .unwrap();
103 1 : token.release(Outcome::Success);
104 1 : assert_eq!(limiter.state().limit(), 2);
105 1 :
106 1 : let token = limiter
107 1 : .acquire_timeout(Duration::from_millis(1))
108 1 : .await
109 1 : .unwrap();
110 1 : token.release(Outcome::Overload);
111 1 : assert_eq!(limiter.state().limit(), 1);
112 1 :
113 1 : let token = limiter
114 1 : .acquire_timeout(Duration::from_millis(1))
115 1 : .await
116 1 : .unwrap();
117 1 : token.release(Outcome::Overload);
118 1 : assert_eq!(limiter.state().limit(), 1);
119 1 : }
120 :
121 : #[tokio::test(start_paused = true)]
122 1 : async fn should_decrease_limit_on_overload() {
123 1 : let config = RateLimiterConfig {
124 1 : initial_limit: 10,
125 1 : algorithm: RateLimitAlgorithm::Aimd {
126 1 : conf: Aimd {
127 1 : min: 1,
128 1 : max: 1500,
129 1 : inc: 10,
130 1 : dec: 0.5,
131 1 : utilisation: 0.8,
132 1 : },
133 1 : },
134 1 : };
135 1 :
136 1 : let limiter = DynamicLimiter::new(config);
137 1 :
138 1 : let token = limiter
139 1 : .acquire_timeout(Duration::from_millis(100))
140 1 : .await
141 1 : .unwrap();
142 1 : token.release(Outcome::Overload);
143 1 :
144 1 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
145 1 : }
146 :
147 : #[tokio::test(start_paused = true)]
148 1 : async fn acquire_timeout_times_out() {
149 1 : let config = RateLimiterConfig {
150 1 : initial_limit: 1,
151 1 : algorithm: RateLimitAlgorithm::Aimd {
152 1 : conf: Aimd {
153 1 : min: 1,
154 1 : max: 2,
155 1 : inc: 10,
156 1 : dec: 0.5,
157 1 : utilisation: 0.8,
158 1 : },
159 1 : },
160 1 : };
161 1 :
162 1 : let limiter = DynamicLimiter::new(config);
163 1 :
164 1 : let token = limiter
165 1 : .acquire_timeout(Duration::from_millis(1))
166 1 : .await
167 1 : .unwrap();
168 1 : let now = tokio::time::Instant::now();
169 1 : limiter
170 1 : .acquire_timeout(Duration::from_secs(1))
171 1 : .await
172 1 : .err()
173 1 : .unwrap();
174 1 :
175 1 : assert!(now.elapsed() >= Duration::from_secs(1));
176 1 :
177 1 : token.release(Outcome::Success);
178 1 :
179 1 : assert_eq!(limiter.state().limit(), 2);
180 1 : }
181 :
182 : #[tokio::test(start_paused = true)]
183 1 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
184 1 : let config = RateLimiterConfig {
185 1 : initial_limit: 4,
186 1 : algorithm: RateLimitAlgorithm::Aimd {
187 1 : conf: Aimd {
188 1 : min: 1,
189 1 : max: 1500,
190 1 : inc: 1,
191 1 : dec: 0.5,
192 1 : utilisation: 0.5,
193 1 : },
194 1 : },
195 1 : };
196 1 :
197 1 : let limiter = DynamicLimiter::new(config);
198 1 :
199 1 : let token = limiter
200 1 : .acquire_timeout(Duration::from_millis(1))
201 1 : .await
202 1 : .unwrap();
203 1 : let _token = limiter
204 1 : .acquire_timeout(Duration::from_millis(1))
205 1 : .await
206 1 : .unwrap();
207 1 : let _token = limiter
208 1 : .acquire_timeout(Duration::from_millis(1))
209 1 : .await
210 1 : .unwrap();
211 1 :
212 1 : token.release(Outcome::Success);
213 1 : assert_eq!(limiter.state().limit(), 5, "success: increase");
214 1 : }
215 :
216 : #[tokio::test(start_paused = true)]
217 1 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
218 1 : let config = RateLimiterConfig {
219 1 : initial_limit: 4,
220 1 : algorithm: RateLimitAlgorithm::Aimd {
221 1 : conf: Aimd {
222 1 : min: 1,
223 1 : max: 1500,
224 1 : inc: 10,
225 1 : dec: 0.5,
226 1 : utilisation: 0.5,
227 1 : },
228 1 : },
229 1 : };
230 1 :
231 1 : let limiter = DynamicLimiter::new(config);
232 1 :
233 1 : let token = limiter
234 1 : .acquire_timeout(Duration::from_millis(1))
235 1 : .await
236 1 : .unwrap();
237 1 :
238 1 : token.release(Outcome::Success);
239 1 : assert_eq!(
240 1 : limiter.state().limit(),
241 1 : 4,
242 1 : "success: ignore when < half limit"
243 1 : );
244 1 : }
245 :
246 : #[tokio::test(start_paused = true)]
247 1 : async fn should_not_change_limit_when_no_outcome() {
248 1 : let config = RateLimiterConfig {
249 1 : initial_limit: 10,
250 1 : algorithm: RateLimitAlgorithm::Aimd {
251 1 : conf: Aimd {
252 1 : min: 1,
253 1 : max: 1500,
254 1 : inc: 10,
255 1 : dec: 0.5,
256 1 : utilisation: 0.5,
257 1 : },
258 1 : },
259 1 : };
260 1 :
261 1 : let limiter = DynamicLimiter::new(config);
262 1 :
263 1 : let token = limiter
264 1 : .acquire_timeout(Duration::from_millis(1))
265 1 : .await
266 1 : .unwrap();
267 1 : drop(token);
268 1 : assert_eq!(limiter.state().limit(), 10, "ignore");
269 1 : }
270 : }
|