Line data Source code
1 : use super::{LimitAlgorithm, Outcome, Sample};
2 :
3 : /// Loss-based congestion avoidance.
4 : ///
5 : /// Additive-increase, multiplicative decrease.
6 : ///
7 : /// Adds available currency when:
8 : /// 1. no load-based errors are observed, and
9 : /// 2. the utilisation of the current limit is high.
10 : ///
11 : /// Reduces available concurrency by a factor when load-based errors are detected.
12 6 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
13 : pub(crate) struct Aimd {
14 : /// Minimum limit for AIMD algorithm.
15 : pub(crate) min: usize,
16 : /// Maximum limit for AIMD algorithm.
17 : pub(crate) max: usize,
18 : /// Decrease AIMD decrease by value in case of error.
19 : pub(crate) dec: f32,
20 : /// Increase AIMD increase by value in case of success.
21 : pub(crate) inc: usize,
22 : /// A threshold below which the limit won't be increased.
23 : pub(crate) utilisation: f32,
24 : }
25 :
26 : impl LimitAlgorithm for Aimd {
27 8 : fn update(&self, old_limit: usize, sample: Sample) -> usize {
28 8 : match sample.outcome {
29 : Outcome::Success => {
30 5 : let utilisation = sample.in_flight as f32 / old_limit as f32;
31 5 :
32 5 : if utilisation > self.utilisation {
33 3 : let limit = old_limit + self.inc;
34 3 : let increased_limit = limit.clamp(self.min, self.max);
35 3 : if increased_limit > old_limit {
36 3 : tracing::info!(increased_limit, "limit increased");
37 0 : }
38 :
39 3 : increased_limit
40 : } else {
41 2 : old_limit
42 : }
43 : }
44 : Outcome::Overload => {
45 3 : let limit = old_limit as f32 * self.dec;
46 3 :
47 3 : // Floor instead of round, so the limit reduces even with small numbers.
48 3 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
49 3 : let limit = limit.floor() as usize;
50 3 :
51 3 : let limit = limit.clamp(self.min, self.max);
52 3 : tracing::info!(limit, "limit decreased");
53 3 : limit
54 : }
55 : }
56 8 : }
57 : }
58 :
59 : #[cfg(test)]
60 : mod tests {
61 : use std::time::Duration;
62 :
63 : use super::*;
64 : use crate::rate_limiter::limit_algorithm::{
65 : DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
66 : };
67 :
68 : #[tokio::test(start_paused = true)]
69 1 : async fn increase_decrease() {
70 1 : let config = RateLimiterConfig {
71 1 : initial_limit: 1,
72 1 : algorithm: RateLimitAlgorithm::Aimd {
73 1 : conf: Aimd {
74 1 : min: 1,
75 1 : max: 2,
76 1 : inc: 10,
77 1 : dec: 0.5,
78 1 : utilisation: 0.8,
79 1 : },
80 1 : },
81 1 : };
82 1 :
83 1 : let limiter = DynamicLimiter::new(config);
84 1 :
85 1 : let token = limiter
86 1 : .acquire_timeout(Duration::from_millis(1))
87 1 : .await
88 1 : .unwrap();
89 1 : token.release(Outcome::Success);
90 1 :
91 1 : assert_eq!(limiter.state().limit(), 2);
92 1 :
93 1 : let token = limiter
94 1 : .acquire_timeout(Duration::from_millis(1))
95 1 : .await
96 1 : .unwrap();
97 1 : token.release(Outcome::Success);
98 1 : assert_eq!(limiter.state().limit(), 2);
99 1 :
100 1 : let token = limiter
101 1 : .acquire_timeout(Duration::from_millis(1))
102 1 : .await
103 1 : .unwrap();
104 1 : token.release(Outcome::Overload);
105 1 : assert_eq!(limiter.state().limit(), 1);
106 1 :
107 1 : let token = limiter
108 1 : .acquire_timeout(Duration::from_millis(1))
109 1 : .await
110 1 : .unwrap();
111 1 : token.release(Outcome::Overload);
112 1 : assert_eq!(limiter.state().limit(), 1);
113 1 : }
114 :
115 : #[tokio::test(start_paused = true)]
116 1 : async fn should_decrease_limit_on_overload() {
117 1 : let config = RateLimiterConfig {
118 1 : initial_limit: 10,
119 1 : algorithm: RateLimitAlgorithm::Aimd {
120 1 : conf: Aimd {
121 1 : min: 1,
122 1 : max: 1500,
123 1 : inc: 10,
124 1 : dec: 0.5,
125 1 : utilisation: 0.8,
126 1 : },
127 1 : },
128 1 : };
129 1 :
130 1 : let limiter = DynamicLimiter::new(config);
131 1 :
132 1 : let token = limiter
133 1 : .acquire_timeout(Duration::from_millis(100))
134 1 : .await
135 1 : .unwrap();
136 1 : token.release(Outcome::Overload);
137 1 :
138 1 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
139 1 : }
140 :
141 : #[tokio::test(start_paused = true)]
142 1 : async fn acquire_timeout_times_out() {
143 1 : let config = RateLimiterConfig {
144 1 : initial_limit: 1,
145 1 : algorithm: RateLimitAlgorithm::Aimd {
146 1 : conf: Aimd {
147 1 : min: 1,
148 1 : max: 2,
149 1 : inc: 10,
150 1 : dec: 0.5,
151 1 : utilisation: 0.8,
152 1 : },
153 1 : },
154 1 : };
155 1 :
156 1 : let limiter = DynamicLimiter::new(config);
157 1 :
158 1 : let token = limiter
159 1 : .acquire_timeout(Duration::from_millis(1))
160 1 : .await
161 1 : .unwrap();
162 1 : let now = tokio::time::Instant::now();
163 1 : limiter
164 1 : .acquire_timeout(Duration::from_secs(1))
165 1 : .await
166 1 : .err()
167 1 : .unwrap();
168 1 :
169 1 : assert!(now.elapsed() >= Duration::from_secs(1));
170 1 :
171 1 : token.release(Outcome::Success);
172 1 :
173 1 : assert_eq!(limiter.state().limit(), 2);
174 1 : }
175 :
176 : #[tokio::test(start_paused = true)]
177 1 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
178 1 : let config = RateLimiterConfig {
179 1 : initial_limit: 4,
180 1 : algorithm: RateLimitAlgorithm::Aimd {
181 1 : conf: Aimd {
182 1 : min: 1,
183 1 : max: 1500,
184 1 : inc: 1,
185 1 : dec: 0.5,
186 1 : utilisation: 0.5,
187 1 : },
188 1 : },
189 1 : };
190 1 :
191 1 : let limiter = DynamicLimiter::new(config);
192 1 :
193 1 : let token = limiter
194 1 : .acquire_timeout(Duration::from_millis(1))
195 1 : .await
196 1 : .unwrap();
197 1 : let _token = limiter
198 1 : .acquire_timeout(Duration::from_millis(1))
199 1 : .await
200 1 : .unwrap();
201 1 : let _token = limiter
202 1 : .acquire_timeout(Duration::from_millis(1))
203 1 : .await
204 1 : .unwrap();
205 1 :
206 1 : token.release(Outcome::Success);
207 1 : assert_eq!(limiter.state().limit(), 5, "success: increase");
208 1 : }
209 :
210 : #[tokio::test(start_paused = true)]
211 1 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
212 1 : let config = RateLimiterConfig {
213 1 : initial_limit: 4,
214 1 : algorithm: RateLimitAlgorithm::Aimd {
215 1 : conf: Aimd {
216 1 : min: 1,
217 1 : max: 1500,
218 1 : inc: 10,
219 1 : dec: 0.5,
220 1 : utilisation: 0.5,
221 1 : },
222 1 : },
223 1 : };
224 1 :
225 1 : let limiter = DynamicLimiter::new(config);
226 1 :
227 1 : let token = limiter
228 1 : .acquire_timeout(Duration::from_millis(1))
229 1 : .await
230 1 : .unwrap();
231 1 :
232 1 : token.release(Outcome::Success);
233 1 : assert_eq!(
234 1 : limiter.state().limit(),
235 1 : 4,
236 1 : "success: ignore when < half limit"
237 1 : );
238 1 : }
239 :
240 : #[tokio::test(start_paused = true)]
241 1 : async fn should_not_change_limit_when_no_outcome() {
242 1 : let config = RateLimiterConfig {
243 1 : initial_limit: 10,
244 1 : algorithm: RateLimitAlgorithm::Aimd {
245 1 : conf: Aimd {
246 1 : min: 1,
247 1 : max: 1500,
248 1 : inc: 10,
249 1 : dec: 0.5,
250 1 : utilisation: 0.5,
251 1 : },
252 1 : },
253 1 : };
254 1 :
255 1 : let limiter = DynamicLimiter::new(config);
256 1 :
257 1 : let token = limiter
258 1 : .acquire_timeout(Duration::from_millis(1))
259 1 : .await
260 1 : .unwrap();
261 1 : drop(token);
262 1 : assert_eq!(limiter.state().limit(), 10, "ignore");
263 1 : }
264 : }
|