Line data Source code
1 : use super::{LimitAlgorithm, Outcome, Sample};
2 :
3 : /// Loss-based congestion avoidance.
4 : ///
5 : /// Additive-increase, multiplicative decrease.
6 : ///
7 : /// Adds available currency when:
8 : /// 1. no load-based errors are observed, and
9 : /// 2. the utilisation of the current limit is high.
10 : ///
11 : /// Reduces available concurrency by a factor when load-based errors are detected.
12 36 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
13 : pub(crate) struct Aimd {
14 : /// Minimum limit for AIMD algorithm.
15 : pub(crate) min: usize,
16 : /// Maximum limit for AIMD algorithm.
17 : pub(crate) max: usize,
18 : /// Decrease AIMD decrease by value in case of error.
19 : pub(crate) dec: f32,
20 : /// Increase AIMD increase by value in case of success.
21 : pub(crate) inc: usize,
22 : /// A threshold below which the limit won't be increased.
23 : pub(crate) utilisation: f32,
24 : }
25 :
26 : impl LimitAlgorithm for Aimd {
27 48 : fn update(&self, old_limit: usize, sample: Sample) -> usize {
28 48 : match sample.outcome {
29 : Outcome::Success => {
30 30 : let utilisation = sample.in_flight as f32 / old_limit as f32;
31 30 :
32 30 : if utilisation > self.utilisation {
33 18 : let limit = old_limit + self.inc;
34 18 : let increased_limit = limit.clamp(self.min, self.max);
35 18 : if increased_limit > old_limit {
36 18 : tracing::info!(increased_limit, "limit increased");
37 0 : }
38 :
39 18 : increased_limit
40 : } else {
41 12 : old_limit
42 : }
43 : }
44 : Outcome::Overload => {
45 18 : let limit = old_limit as f32 * self.dec;
46 18 :
47 18 : // Floor instead of round, so the limit reduces even with small numbers.
48 18 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
49 18 : let limit = limit.floor() as usize;
50 18 :
51 18 : let limit = limit.clamp(self.min, self.max);
52 18 : tracing::info!(limit, "limit decreased");
53 18 : limit
54 : }
55 : }
56 48 : }
57 : }
58 :
59 : #[cfg(test)]
60 : mod tests {
61 : use std::time::Duration;
62 :
63 : use crate::rate_limiter::limit_algorithm::{
64 : DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
65 : };
66 :
67 : use super::*;
68 :
69 : #[tokio::test(start_paused = true)]
70 6 : async fn increase_decrease() {
71 6 : let config = RateLimiterConfig {
72 6 : initial_limit: 1,
73 6 : algorithm: RateLimitAlgorithm::Aimd {
74 6 : conf: Aimd {
75 6 : min: 1,
76 6 : max: 2,
77 6 : inc: 10,
78 6 : dec: 0.5,
79 6 : utilisation: 0.8,
80 6 : },
81 6 : },
82 6 : };
83 6 :
84 6 : let limiter = DynamicLimiter::new(config);
85 6 :
86 6 : let token = limiter
87 6 : .acquire_timeout(Duration::from_millis(1))
88 6 : .await
89 6 : .unwrap();
90 6 : token.release(Outcome::Success);
91 6 :
92 6 : assert_eq!(limiter.state().limit(), 2);
93 6 :
94 6 : let token = limiter
95 6 : .acquire_timeout(Duration::from_millis(1))
96 6 : .await
97 6 : .unwrap();
98 6 : token.release(Outcome::Success);
99 6 : assert_eq!(limiter.state().limit(), 2);
100 6 :
101 6 : let token = limiter
102 6 : .acquire_timeout(Duration::from_millis(1))
103 6 : .await
104 6 : .unwrap();
105 6 : token.release(Outcome::Overload);
106 6 : assert_eq!(limiter.state().limit(), 1);
107 6 :
108 6 : let token = limiter
109 6 : .acquire_timeout(Duration::from_millis(1))
110 6 : .await
111 6 : .unwrap();
112 6 : token.release(Outcome::Overload);
113 6 : assert_eq!(limiter.state().limit(), 1);
114 6 : }
115 :
116 : #[tokio::test(start_paused = true)]
117 6 : async fn should_decrease_limit_on_overload() {
118 6 : let config = RateLimiterConfig {
119 6 : initial_limit: 10,
120 6 : algorithm: RateLimitAlgorithm::Aimd {
121 6 : conf: Aimd {
122 6 : min: 1,
123 6 : max: 1500,
124 6 : inc: 10,
125 6 : dec: 0.5,
126 6 : utilisation: 0.8,
127 6 : },
128 6 : },
129 6 : };
130 6 :
131 6 : let limiter = DynamicLimiter::new(config);
132 6 :
133 6 : let token = limiter
134 6 : .acquire_timeout(Duration::from_millis(100))
135 6 : .await
136 6 : .unwrap();
137 6 : token.release(Outcome::Overload);
138 6 :
139 6 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
140 6 : }
141 :
142 : #[tokio::test(start_paused = true)]
143 6 : async fn acquire_timeout_times_out() {
144 6 : let config = RateLimiterConfig {
145 6 : initial_limit: 1,
146 6 : algorithm: RateLimitAlgorithm::Aimd {
147 6 : conf: Aimd {
148 6 : min: 1,
149 6 : max: 2,
150 6 : inc: 10,
151 6 : dec: 0.5,
152 6 : utilisation: 0.8,
153 6 : },
154 6 : },
155 6 : };
156 6 :
157 6 : let limiter = DynamicLimiter::new(config);
158 6 :
159 6 : let token = limiter
160 6 : .acquire_timeout(Duration::from_millis(1))
161 6 : .await
162 6 : .unwrap();
163 6 : let now = tokio::time::Instant::now();
164 6 : limiter
165 6 : .acquire_timeout(Duration::from_secs(1))
166 6 : .await
167 6 : .err()
168 6 : .unwrap();
169 6 :
170 6 : assert!(now.elapsed() >= Duration::from_secs(1));
171 6 :
172 6 : token.release(Outcome::Success);
173 6 :
174 6 : assert_eq!(limiter.state().limit(), 2);
175 6 : }
176 :
177 : #[tokio::test(start_paused = true)]
178 6 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
179 6 : let config = RateLimiterConfig {
180 6 : initial_limit: 4,
181 6 : algorithm: RateLimitAlgorithm::Aimd {
182 6 : conf: Aimd {
183 6 : min: 1,
184 6 : max: 1500,
185 6 : inc: 1,
186 6 : dec: 0.5,
187 6 : utilisation: 0.5,
188 6 : },
189 6 : },
190 6 : };
191 6 :
192 6 : let limiter = DynamicLimiter::new(config);
193 6 :
194 6 : let token = limiter
195 6 : .acquire_timeout(Duration::from_millis(1))
196 6 : .await
197 6 : .unwrap();
198 6 : let _token = limiter
199 6 : .acquire_timeout(Duration::from_millis(1))
200 6 : .await
201 6 : .unwrap();
202 6 : let _token = limiter
203 6 : .acquire_timeout(Duration::from_millis(1))
204 6 : .await
205 6 : .unwrap();
206 6 :
207 6 : token.release(Outcome::Success);
208 6 : assert_eq!(limiter.state().limit(), 5, "success: increase");
209 6 : }
210 :
211 : #[tokio::test(start_paused = true)]
212 6 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
213 6 : let config = RateLimiterConfig {
214 6 : initial_limit: 4,
215 6 : algorithm: RateLimitAlgorithm::Aimd {
216 6 : conf: Aimd {
217 6 : min: 1,
218 6 : max: 1500,
219 6 : inc: 10,
220 6 : dec: 0.5,
221 6 : utilisation: 0.5,
222 6 : },
223 6 : },
224 6 : };
225 6 :
226 6 : let limiter = DynamicLimiter::new(config);
227 6 :
228 6 : let token = limiter
229 6 : .acquire_timeout(Duration::from_millis(1))
230 6 : .await
231 6 : .unwrap();
232 6 :
233 6 : token.release(Outcome::Success);
234 6 : assert_eq!(
235 6 : limiter.state().limit(),
236 6 : 4,
237 6 : "success: ignore when < half limit"
238 6 : );
239 6 : }
240 :
241 : #[tokio::test(start_paused = true)]
242 6 : async fn should_not_change_limit_when_no_outcome() {
243 6 : let config = RateLimiterConfig {
244 6 : initial_limit: 10,
245 6 : algorithm: RateLimitAlgorithm::Aimd {
246 6 : conf: Aimd {
247 6 : min: 1,
248 6 : max: 1500,
249 6 : inc: 10,
250 6 : dec: 0.5,
251 6 : utilisation: 0.5,
252 6 : },
253 6 : },
254 6 : };
255 6 :
256 6 : let limiter = DynamicLimiter::new(config);
257 6 :
258 6 : let token = limiter
259 6 : .acquire_timeout(Duration::from_millis(1))
260 6 : .await
261 6 : .unwrap();
262 6 : drop(token);
263 6 : assert_eq!(limiter.state().limit(), 10, "ignore");
264 6 : }
265 : }
|