Line data Source code
1 : use super::{LimitAlgorithm, Outcome, Sample};
2 :
3 : /// Loss-based congestion avoidance.
4 : ///
5 : /// Additive-increase, multiplicative decrease.
6 : ///
7 : /// Adds available currency when:
8 : /// 1. no load-based errors are observed, and
9 : /// 2. the utilisation of the current limit is high.
10 : ///
11 : /// Reduces available concurrency by a factor when load-based errors are detected.
12 12 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
13 : pub struct Aimd {
14 : /// Minimum limit for AIMD algorithm.
15 : pub min: usize,
16 : /// Maximum limit for AIMD algorithm.
17 : pub max: usize,
18 : /// Decrease AIMD decrease by value in case of error.
19 : pub dec: f32,
20 : /// Increase AIMD increase by value in case of success.
21 : pub inc: usize,
22 : /// A threshold below which the limit won't be increased.
23 : pub utilisation: f32,
24 : }
25 :
26 : impl LimitAlgorithm for Aimd {
27 16 : fn update(&self, old_limit: usize, sample: Sample) -> usize {
28 16 : match sample.outcome {
29 : Outcome::Success => {
30 10 : let utilisation = sample.in_flight as f32 / old_limit as f32;
31 10 :
32 10 : if utilisation > self.utilisation {
33 6 : let limit = old_limit + self.inc;
34 6 : let increased_limit = limit.clamp(self.min, self.max);
35 6 : if increased_limit > old_limit {
36 6 : tracing::info!(increased_limit, "limit increased");
37 0 : }
38 :
39 6 : increased_limit
40 : } else {
41 4 : old_limit
42 : }
43 : }
44 : Outcome::Overload => {
45 6 : let limit = old_limit as f32 * self.dec;
46 6 :
47 6 : // Floor instead of round, so the limit reduces even with small numbers.
48 6 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
49 6 : let limit = limit.floor() as usize;
50 6 :
51 6 : let limit = limit.clamp(self.min, self.max);
52 6 : tracing::info!(limit, "limit decreased");
53 6 : limit
54 : }
55 : }
56 16 : }
57 : }
58 :
59 : #[cfg(test)]
60 : mod tests {
61 : use std::time::Duration;
62 :
63 : use crate::rate_limiter::limit_algorithm::{
64 : DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
65 : };
66 :
67 : use super::*;
68 :
69 : #[tokio::test(start_paused = true)]
70 2 : async fn increase_decrease() {
71 2 : let config = RateLimiterConfig {
72 2 : initial_limit: 1,
73 2 : algorithm: RateLimitAlgorithm::Aimd {
74 2 : conf: Aimd {
75 2 : min: 1,
76 2 : max: 2,
77 2 : inc: 10,
78 2 : dec: 0.5,
79 2 : utilisation: 0.8,
80 2 : },
81 2 : },
82 2 : };
83 2 :
84 2 : let limiter = DynamicLimiter::new(config);
85 2 :
86 2 : let token = limiter
87 2 : .acquire_timeout(Duration::from_millis(1))
88 2 : .await
89 2 : .unwrap();
90 2 : token.release(Outcome::Success);
91 2 :
92 2 : assert_eq!(limiter.state().limit(), 2);
93 2 :
94 2 : let token = limiter
95 2 : .acquire_timeout(Duration::from_millis(1))
96 2 : .await
97 2 : .unwrap();
98 2 : token.release(Outcome::Success);
99 2 : assert_eq!(limiter.state().limit(), 2);
100 2 :
101 2 : let token = limiter
102 2 : .acquire_timeout(Duration::from_millis(1))
103 2 : .await
104 2 : .unwrap();
105 2 : token.release(Outcome::Overload);
106 2 : assert_eq!(limiter.state().limit(), 1);
107 2 :
108 2 : let token = limiter
109 2 : .acquire_timeout(Duration::from_millis(1))
110 2 : .await
111 2 : .unwrap();
112 2 : token.release(Outcome::Overload);
113 2 : assert_eq!(limiter.state().limit(), 1);
114 2 : }
115 :
116 : #[tokio::test(start_paused = true)]
117 2 : async fn should_decrease_limit_on_overload() {
118 2 : let config = RateLimiterConfig {
119 2 : initial_limit: 10,
120 2 : algorithm: RateLimitAlgorithm::Aimd {
121 2 : conf: Aimd {
122 2 : min: 1,
123 2 : max: 1500,
124 2 : inc: 10,
125 2 : dec: 0.5,
126 2 : utilisation: 0.8,
127 2 : },
128 2 : },
129 2 : };
130 2 :
131 2 : let limiter = DynamicLimiter::new(config);
132 2 :
133 2 : let token = limiter
134 2 : .acquire_timeout(Duration::from_millis(100))
135 2 : .await
136 2 : .unwrap();
137 2 : token.release(Outcome::Overload);
138 2 :
139 2 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
140 2 : }
141 :
142 : #[tokio::test(start_paused = true)]
143 2 : async fn acquire_timeout_times_out() {
144 2 : let config = RateLimiterConfig {
145 2 : initial_limit: 1,
146 2 : algorithm: RateLimitAlgorithm::Aimd {
147 2 : conf: Aimd {
148 2 : min: 1,
149 2 : max: 2,
150 2 : inc: 10,
151 2 : dec: 0.5,
152 2 : utilisation: 0.8,
153 2 : },
154 2 : },
155 2 : };
156 2 :
157 2 : let limiter = DynamicLimiter::new(config);
158 2 :
159 2 : let token = limiter
160 2 : .acquire_timeout(Duration::from_millis(1))
161 2 : .await
162 2 : .unwrap();
163 2 : let now = tokio::time::Instant::now();
164 2 : limiter
165 2 : .acquire_timeout(Duration::from_secs(1))
166 2 : .await
167 2 : .err()
168 2 : .unwrap();
169 2 :
170 2 : assert!(now.elapsed() >= Duration::from_secs(1));
171 2 :
172 2 : token.release(Outcome::Success);
173 2 :
174 2 : assert_eq!(limiter.state().limit(), 2);
175 2 : }
176 :
177 : #[tokio::test(start_paused = true)]
178 2 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
179 2 : let config = RateLimiterConfig {
180 2 : initial_limit: 4,
181 2 : algorithm: RateLimitAlgorithm::Aimd {
182 2 : conf: Aimd {
183 2 : min: 1,
184 2 : max: 1500,
185 2 : inc: 1,
186 2 : dec: 0.5,
187 2 : utilisation: 0.5,
188 2 : },
189 2 : },
190 2 : };
191 2 :
192 2 : let limiter = DynamicLimiter::new(config);
193 2 :
194 2 : let token = limiter
195 2 : .acquire_timeout(Duration::from_millis(1))
196 2 : .await
197 2 : .unwrap();
198 2 : let _token = limiter
199 2 : .acquire_timeout(Duration::from_millis(1))
200 2 : .await
201 2 : .unwrap();
202 2 : let _token = limiter
203 2 : .acquire_timeout(Duration::from_millis(1))
204 2 : .await
205 2 : .unwrap();
206 2 :
207 2 : token.release(Outcome::Success);
208 2 : assert_eq!(limiter.state().limit(), 5, "success: increase");
209 2 : }
210 :
211 : #[tokio::test(start_paused = true)]
212 2 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
213 2 : let config = RateLimiterConfig {
214 2 : initial_limit: 4,
215 2 : algorithm: RateLimitAlgorithm::Aimd {
216 2 : conf: Aimd {
217 2 : min: 1,
218 2 : max: 1500,
219 2 : inc: 10,
220 2 : dec: 0.5,
221 2 : utilisation: 0.5,
222 2 : },
223 2 : },
224 2 : };
225 2 :
226 2 : let limiter = DynamicLimiter::new(config);
227 2 :
228 2 : let token = limiter
229 2 : .acquire_timeout(Duration::from_millis(1))
230 2 : .await
231 2 : .unwrap();
232 2 :
233 2 : token.release(Outcome::Success);
234 2 : assert_eq!(
235 2 : limiter.state().limit(),
236 2 : 4,
237 2 : "success: ignore when < half limit"
238 2 : );
239 2 : }
240 :
241 : #[tokio::test(start_paused = true)]
242 2 : async fn should_not_change_limit_when_no_outcome() {
243 2 : let config = RateLimiterConfig {
244 2 : initial_limit: 10,
245 2 : algorithm: RateLimitAlgorithm::Aimd {
246 2 : conf: Aimd {
247 2 : min: 1,
248 2 : max: 1500,
249 2 : inc: 10,
250 2 : dec: 0.5,
251 2 : utilisation: 0.5,
252 2 : },
253 2 : },
254 2 : };
255 2 :
256 2 : let limiter = DynamicLimiter::new(config);
257 2 :
258 2 : let token = limiter
259 2 : .acquire_timeout(Duration::from_millis(1))
260 2 : .await
261 2 : .unwrap();
262 2 : drop(token);
263 2 : assert_eq!(limiter.state().limit(), 10, "ignore");
264 2 : }
265 : }
|