Line data Source code
1 : use super::{LimitAlgorithm, Outcome, Sample};
2 :
3 : /// Loss-based congestion avoidance.
4 : ///
5 : /// Additive-increase, multiplicative decrease.
6 : ///
7 : /// Adds available currency when:
8 : /// 1. no load-based errors are observed, and
9 : /// 2. the utilisation of the current limit is high.
10 : ///
11 : /// Reduces available concurrency by a factor when load-based errors are detected.
12 6 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
13 : pub(crate) struct Aimd {
14 : /// Minimum limit for AIMD algorithm.
15 : pub(crate) min: usize,
16 : /// Maximum limit for AIMD algorithm.
17 : pub(crate) max: usize,
18 : /// Decrease AIMD decrease by value in case of error.
19 : pub(crate) dec: f32,
20 : /// Increase AIMD increase by value in case of success.
21 : pub(crate) inc: usize,
22 : /// A threshold below which the limit won't be increased.
23 : pub(crate) utilisation: f32,
24 : }
25 :
26 : impl LimitAlgorithm for Aimd {
27 8 : fn update(&self, old_limit: usize, sample: Sample) -> usize {
28 8 : match sample.outcome {
29 : Outcome::Success => {
30 5 : let utilisation = sample.in_flight as f32 / old_limit as f32;
31 5 :
32 5 : if utilisation > self.utilisation {
33 3 : let limit = old_limit + self.inc;
34 3 : let increased_limit = limit.clamp(self.min, self.max);
35 3 : if increased_limit > old_limit {
36 3 : tracing::info!(increased_limit, "limit increased");
37 0 : }
38 :
39 3 : increased_limit
40 : } else {
41 2 : old_limit
42 : }
43 : }
44 : Outcome::Overload => {
45 3 : let limit = old_limit as f32 * self.dec;
46 3 :
47 3 : // Floor instead of round, so the limit reduces even with small numbers.
48 3 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
49 3 : let limit = limit.floor() as usize;
50 3 :
51 3 : let limit = limit.clamp(self.min, self.max);
52 3 : tracing::info!(limit, "limit decreased");
53 3 : limit
54 : }
55 : }
56 8 : }
57 : }
58 :
59 : #[cfg(test)]
60 : mod tests {
61 : use std::time::Duration;
62 :
63 : use crate::rate_limiter::limit_algorithm::{
64 : DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
65 : };
66 :
67 : use super::*;
68 :
69 : #[tokio::test(start_paused = true)]
70 1 : async fn increase_decrease() {
71 1 : let config = RateLimiterConfig {
72 1 : initial_limit: 1,
73 1 : algorithm: RateLimitAlgorithm::Aimd {
74 1 : conf: Aimd {
75 1 : min: 1,
76 1 : max: 2,
77 1 : inc: 10,
78 1 : dec: 0.5,
79 1 : utilisation: 0.8,
80 1 : },
81 1 : },
82 1 : };
83 1 :
84 1 : let limiter = DynamicLimiter::new(config);
85 1 :
86 1 : let token = limiter
87 1 : .acquire_timeout(Duration::from_millis(1))
88 1 : .await
89 1 : .unwrap();
90 1 : token.release(Outcome::Success);
91 1 :
92 1 : assert_eq!(limiter.state().limit(), 2);
93 1 :
94 1 : let token = limiter
95 1 : .acquire_timeout(Duration::from_millis(1))
96 1 : .await
97 1 : .unwrap();
98 1 : token.release(Outcome::Success);
99 1 : assert_eq!(limiter.state().limit(), 2);
100 1 :
101 1 : let token = limiter
102 1 : .acquire_timeout(Duration::from_millis(1))
103 1 : .await
104 1 : .unwrap();
105 1 : token.release(Outcome::Overload);
106 1 : assert_eq!(limiter.state().limit(), 1);
107 1 :
108 1 : let token = limiter
109 1 : .acquire_timeout(Duration::from_millis(1))
110 1 : .await
111 1 : .unwrap();
112 1 : token.release(Outcome::Overload);
113 1 : assert_eq!(limiter.state().limit(), 1);
114 1 : }
115 :
116 : #[tokio::test(start_paused = true)]
117 1 : async fn should_decrease_limit_on_overload() {
118 1 : let config = RateLimiterConfig {
119 1 : initial_limit: 10,
120 1 : algorithm: RateLimitAlgorithm::Aimd {
121 1 : conf: Aimd {
122 1 : min: 1,
123 1 : max: 1500,
124 1 : inc: 10,
125 1 : dec: 0.5,
126 1 : utilisation: 0.8,
127 1 : },
128 1 : },
129 1 : };
130 1 :
131 1 : let limiter = DynamicLimiter::new(config);
132 1 :
133 1 : let token = limiter
134 1 : .acquire_timeout(Duration::from_millis(100))
135 1 : .await
136 1 : .unwrap();
137 1 : token.release(Outcome::Overload);
138 1 :
139 1 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
140 1 : }
141 :
142 : #[tokio::test(start_paused = true)]
143 1 : async fn acquire_timeout_times_out() {
144 1 : let config = RateLimiterConfig {
145 1 : initial_limit: 1,
146 1 : algorithm: RateLimitAlgorithm::Aimd {
147 1 : conf: Aimd {
148 1 : min: 1,
149 1 : max: 2,
150 1 : inc: 10,
151 1 : dec: 0.5,
152 1 : utilisation: 0.8,
153 1 : },
154 1 : },
155 1 : };
156 1 :
157 1 : let limiter = DynamicLimiter::new(config);
158 1 :
159 1 : let token = limiter
160 1 : .acquire_timeout(Duration::from_millis(1))
161 1 : .await
162 1 : .unwrap();
163 1 : let now = tokio::time::Instant::now();
164 1 : limiter
165 1 : .acquire_timeout(Duration::from_secs(1))
166 1 : .await
167 1 : .err()
168 1 : .unwrap();
169 1 :
170 1 : assert!(now.elapsed() >= Duration::from_secs(1));
171 1 :
172 1 : token.release(Outcome::Success);
173 1 :
174 1 : assert_eq!(limiter.state().limit(), 2);
175 1 : }
176 :
177 : #[tokio::test(start_paused = true)]
178 1 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
179 1 : let config = RateLimiterConfig {
180 1 : initial_limit: 4,
181 1 : algorithm: RateLimitAlgorithm::Aimd {
182 1 : conf: Aimd {
183 1 : min: 1,
184 1 : max: 1500,
185 1 : inc: 1,
186 1 : dec: 0.5,
187 1 : utilisation: 0.5,
188 1 : },
189 1 : },
190 1 : };
191 1 :
192 1 : let limiter = DynamicLimiter::new(config);
193 1 :
194 1 : let token = limiter
195 1 : .acquire_timeout(Duration::from_millis(1))
196 1 : .await
197 1 : .unwrap();
198 1 : let _token = limiter
199 1 : .acquire_timeout(Duration::from_millis(1))
200 1 : .await
201 1 : .unwrap();
202 1 : let _token = limiter
203 1 : .acquire_timeout(Duration::from_millis(1))
204 1 : .await
205 1 : .unwrap();
206 1 :
207 1 : token.release(Outcome::Success);
208 1 : assert_eq!(limiter.state().limit(), 5, "success: increase");
209 1 : }
210 :
211 : #[tokio::test(start_paused = true)]
212 1 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
213 1 : let config = RateLimiterConfig {
214 1 : initial_limit: 4,
215 1 : algorithm: RateLimitAlgorithm::Aimd {
216 1 : conf: Aimd {
217 1 : min: 1,
218 1 : max: 1500,
219 1 : inc: 10,
220 1 : dec: 0.5,
221 1 : utilisation: 0.5,
222 1 : },
223 1 : },
224 1 : };
225 1 :
226 1 : let limiter = DynamicLimiter::new(config);
227 1 :
228 1 : let token = limiter
229 1 : .acquire_timeout(Duration::from_millis(1))
230 1 : .await
231 1 : .unwrap();
232 1 :
233 1 : token.release(Outcome::Success);
234 1 : assert_eq!(
235 1 : limiter.state().limit(),
236 1 : 4,
237 1 : "success: ignore when < half limit"
238 1 : );
239 1 : }
240 :
241 : #[tokio::test(start_paused = true)]
242 1 : async fn should_not_change_limit_when_no_outcome() {
243 1 : let config = RateLimiterConfig {
244 1 : initial_limit: 10,
245 1 : algorithm: RateLimitAlgorithm::Aimd {
246 1 : conf: Aimd {
247 1 : min: 1,
248 1 : max: 1500,
249 1 : inc: 10,
250 1 : dec: 0.5,
251 1 : utilisation: 0.5,
252 1 : },
253 1 : },
254 1 : };
255 1 :
256 1 : let limiter = DynamicLimiter::new(config);
257 1 :
258 1 : let token = limiter
259 1 : .acquire_timeout(Duration::from_millis(1))
260 1 : .await
261 1 : .unwrap();
262 1 : drop(token);
263 1 : assert_eq!(limiter.state().limit(), 10, "ignore");
264 1 : }
265 : }
|