Line data Source code
1 : use super::{LimitAlgorithm, Outcome, Sample};
2 :
3 : /// Loss-based congestion avoidance.
4 : ///
5 : /// Additive-increase, multiplicative decrease.
6 : ///
7 : /// Adds available currency when:
8 : /// 1. no load-based errors are observed, and
9 : /// 2. the utilisation of the current limit is high.
10 : ///
11 : /// Reduces available concurrency by a factor when load-based errors are detected.
12 12 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
13 : pub struct Aimd {
14 : /// Minimum limit for AIMD algorithm.
15 : pub min: usize,
16 : /// Maximum limit for AIMD algorithm.
17 : pub max: usize,
18 : /// Decrease AIMD decrease by value in case of error.
19 : pub dec: f32,
20 : /// Increase AIMD increase by value in case of success.
21 : pub inc: usize,
22 : /// A threshold below which the limit won't be increased.
23 : pub utilisation: f32,
24 : }
25 :
26 : impl LimitAlgorithm for Aimd {
27 16 : fn update(&self, old_limit: usize, sample: Sample) -> usize {
28 16 : use Outcome::*;
29 16 : match sample.outcome {
30 : Success => {
31 10 : let utilisation = sample.in_flight as f32 / old_limit as f32;
32 10 :
33 10 : if utilisation > self.utilisation {
34 6 : let limit = old_limit + self.inc;
35 6 : let increased_limit = limit.clamp(self.min, self.max);
36 6 : if increased_limit > old_limit {
37 6 : tracing::info!(increased_limit, "limit increased");
38 0 : }
39 :
40 6 : increased_limit
41 : } else {
42 4 : old_limit
43 : }
44 : }
45 : Overload => {
46 6 : let limit = old_limit as f32 * self.dec;
47 6 :
48 6 : // Floor instead of round, so the limit reduces even with small numbers.
49 6 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
50 6 : let limit = limit.floor() as usize;
51 6 :
52 6 : let limit = limit.clamp(self.min, self.max);
53 6 : tracing::info!(limit, "limit decreased");
54 6 : limit
55 : }
56 : }
57 16 : }
58 : }
59 :
60 : #[cfg(test)]
61 : mod tests {
62 : use std::time::Duration;
63 :
64 : use crate::rate_limiter::limit_algorithm::{
65 : DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
66 : };
67 :
68 : use super::*;
69 :
70 : #[tokio::test(start_paused = true)]
71 2 : async fn increase_decrease() {
72 2 : let config = RateLimiterConfig {
73 2 : initial_limit: 1,
74 2 : algorithm: RateLimitAlgorithm::Aimd {
75 2 : conf: Aimd {
76 2 : min: 1,
77 2 : max: 2,
78 2 : inc: 10,
79 2 : dec: 0.5,
80 2 : utilisation: 0.8,
81 2 : },
82 2 : },
83 2 : };
84 2 :
85 2 : let limiter = DynamicLimiter::new(config);
86 2 :
87 2 : let token = limiter
88 2 : .acquire_timeout(Duration::from_millis(1))
89 2 : .await
90 2 : .unwrap();
91 2 : token.release(Outcome::Success);
92 2 :
93 2 : assert_eq!(limiter.state().limit(), 2);
94 2 :
95 2 : let token = limiter
96 2 : .acquire_timeout(Duration::from_millis(1))
97 2 : .await
98 2 : .unwrap();
99 2 : token.release(Outcome::Success);
100 2 : assert_eq!(limiter.state().limit(), 2);
101 2 :
102 2 : let token = limiter
103 2 : .acquire_timeout(Duration::from_millis(1))
104 2 : .await
105 2 : .unwrap();
106 2 : token.release(Outcome::Overload);
107 2 : assert_eq!(limiter.state().limit(), 1);
108 2 :
109 2 : let token = limiter
110 2 : .acquire_timeout(Duration::from_millis(1))
111 2 : .await
112 2 : .unwrap();
113 2 : token.release(Outcome::Overload);
114 2 : assert_eq!(limiter.state().limit(), 1);
115 2 : }
116 :
117 : #[tokio::test(start_paused = true)]
118 2 : async fn should_decrease_limit_on_overload() {
119 2 : let config = RateLimiterConfig {
120 2 : initial_limit: 10,
121 2 : algorithm: RateLimitAlgorithm::Aimd {
122 2 : conf: Aimd {
123 2 : min: 1,
124 2 : max: 1500,
125 2 : inc: 10,
126 2 : dec: 0.5,
127 2 : utilisation: 0.8,
128 2 : },
129 2 : },
130 2 : };
131 2 :
132 2 : let limiter = DynamicLimiter::new(config);
133 2 :
134 2 : let token = limiter
135 2 : .acquire_timeout(Duration::from_millis(100))
136 2 : .await
137 2 : .unwrap();
138 2 : token.release(Outcome::Overload);
139 2 :
140 2 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
141 2 : }
142 :
143 : #[tokio::test(start_paused = true)]
144 2 : async fn acquire_timeout_times_out() {
145 2 : let config = RateLimiterConfig {
146 2 : initial_limit: 1,
147 2 : algorithm: RateLimitAlgorithm::Aimd {
148 2 : conf: Aimd {
149 2 : min: 1,
150 2 : max: 2,
151 2 : inc: 10,
152 2 : dec: 0.5,
153 2 : utilisation: 0.8,
154 2 : },
155 2 : },
156 2 : };
157 2 :
158 2 : let limiter = DynamicLimiter::new(config);
159 2 :
160 2 : let token = limiter
161 2 : .acquire_timeout(Duration::from_millis(1))
162 2 : .await
163 2 : .unwrap();
164 2 : let now = tokio::time::Instant::now();
165 2 : limiter
166 2 : .acquire_timeout(Duration::from_secs(1))
167 2 : .await
168 2 : .err()
169 2 : .unwrap();
170 2 :
171 2 : assert!(now.elapsed() >= Duration::from_secs(1));
172 2 :
173 2 : token.release(Outcome::Success);
174 2 :
175 2 : assert_eq!(limiter.state().limit(), 2);
176 2 : }
177 :
178 : #[tokio::test(start_paused = true)]
179 2 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
180 2 : let config = RateLimiterConfig {
181 2 : initial_limit: 4,
182 2 : algorithm: RateLimitAlgorithm::Aimd {
183 2 : conf: Aimd {
184 2 : min: 1,
185 2 : max: 1500,
186 2 : inc: 1,
187 2 : dec: 0.5,
188 2 : utilisation: 0.5,
189 2 : },
190 2 : },
191 2 : };
192 2 :
193 2 : let limiter = DynamicLimiter::new(config);
194 2 :
195 2 : let token = limiter
196 2 : .acquire_timeout(Duration::from_millis(1))
197 2 : .await
198 2 : .unwrap();
199 2 : let _token = limiter
200 2 : .acquire_timeout(Duration::from_millis(1))
201 2 : .await
202 2 : .unwrap();
203 2 : let _token = limiter
204 2 : .acquire_timeout(Duration::from_millis(1))
205 2 : .await
206 2 : .unwrap();
207 2 :
208 2 : token.release(Outcome::Success);
209 2 : assert_eq!(limiter.state().limit(), 5, "success: increase");
210 2 : }
211 :
212 : #[tokio::test(start_paused = true)]
213 2 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
214 2 : let config = RateLimiterConfig {
215 2 : initial_limit: 4,
216 2 : algorithm: RateLimitAlgorithm::Aimd {
217 2 : conf: Aimd {
218 2 : min: 1,
219 2 : max: 1500,
220 2 : inc: 10,
221 2 : dec: 0.5,
222 2 : utilisation: 0.5,
223 2 : },
224 2 : },
225 2 : };
226 2 :
227 2 : let limiter = DynamicLimiter::new(config);
228 2 :
229 2 : let token = limiter
230 2 : .acquire_timeout(Duration::from_millis(1))
231 2 : .await
232 2 : .unwrap();
233 2 :
234 2 : token.release(Outcome::Success);
235 2 : assert_eq!(
236 2 : limiter.state().limit(),
237 2 : 4,
238 2 : "success: ignore when < half limit"
239 2 : );
240 2 : }
241 :
242 : #[tokio::test(start_paused = true)]
243 2 : async fn should_not_change_limit_when_no_outcome() {
244 2 : let config = RateLimiterConfig {
245 2 : initial_limit: 10,
246 2 : algorithm: RateLimitAlgorithm::Aimd {
247 2 : conf: Aimd {
248 2 : min: 1,
249 2 : max: 1500,
250 2 : inc: 10,
251 2 : dec: 0.5,
252 2 : utilisation: 0.5,
253 2 : },
254 2 : },
255 2 : };
256 2 :
257 2 : let limiter = DynamicLimiter::new(config);
258 2 :
259 2 : let token = limiter
260 2 : .acquire_timeout(Duration::from_millis(1))
261 2 : .await
262 2 : .unwrap();
263 2 : drop(token);
264 2 : assert_eq!(limiter.state().limit(), 10, "ignore");
265 2 : }
266 : }
|