TLA Line data Source code
1 : use std::usize;
2 :
3 : use async_trait::async_trait;
4 :
5 : use super::limit_algorithm::{AimdConfig, LimitAlgorithm, Sample};
6 :
7 : use super::limiter::Outcome;
8 :
9 : /// Loss-based congestion avoidance.
10 : ///
11 : /// Additive-increase, multiplicative decrease.
12 : ///
13 : /// Adds available currency when:
14 : /// 1. no load-based errors are observed, and
15 : /// 2. the utilisation of the current limit is high.
16 : ///
17 : /// Reduces available concurrency by a factor when load-based errors are detected.
18 : pub struct Aimd {
19 : min_limit: usize,
20 : max_limit: usize,
21 : decrease_factor: f32,
22 : increase_by: usize,
23 : min_utilisation_threshold: f32,
24 : }
25 :
26 : impl Aimd {
27 CBC 6 : pub fn new(config: AimdConfig) -> Self {
28 6 : Self {
29 6 : min_limit: config.aimd_min_limit,
30 6 : max_limit: config.aimd_max_limit,
31 6 : decrease_factor: config.aimd_decrease_factor,
32 6 : increase_by: config.aimd_increase_by,
33 6 : min_utilisation_threshold: config.aimd_min_utilisation_threshold,
34 6 : }
35 6 : }
36 : }
37 :
38 : #[async_trait]
39 : impl LimitAlgorithm for Aimd {
40 8 : async fn update(&mut self, old_limit: usize, sample: Sample) -> usize {
41 : use Outcome::*;
42 8 : match sample.outcome {
43 : Success => {
44 5 : let utilisation = sample.in_flight as f32 / old_limit as f32;
45 5 :
46 5 : if utilisation > self.min_utilisation_threshold {
47 2 : let limit = old_limit + self.increase_by;
48 2 : limit.clamp(self.min_limit, self.max_limit)
49 : } else {
50 3 : old_limit
51 : }
52 : }
53 : Overload => {
54 3 : let limit = old_limit as f32 * self.decrease_factor;
55 3 :
56 3 : // Floor instead of round, so the limit reduces even with small numbers.
57 3 : // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
58 3 : let limit = limit.floor() as usize;
59 3 :
60 3 : limit.clamp(self.min_limit, self.max_limit)
61 : }
62 : }
63 16 : }
64 : }
65 :
66 : #[cfg(test)]
67 : mod tests {
68 : use std::sync::Arc;
69 :
70 : use tokio::sync::Notify;
71 :
72 : use super::*;
73 :
74 : use crate::rate_limiter::{Limiter, RateLimiterConfig};
75 :
76 1 : #[tokio::test]
77 1 : async fn should_decrease_limit_on_overload() {
78 1 : let config = RateLimiterConfig {
79 1 : initial_limit: 10,
80 1 : aimd_config: Some(AimdConfig {
81 1 : aimd_decrease_factor: 0.5,
82 1 : ..Default::default()
83 1 : }),
84 1 : disable: false,
85 1 : ..Default::default()
86 1 : };
87 1 :
88 1 : let release_notifier = Arc::new(Notify::new());
89 1 :
90 1 : let limiter = Limiter::new(config).with_release_notifier(release_notifier.clone());
91 1 :
92 1 : let token = limiter.try_acquire().unwrap();
93 1 : limiter.release(token, Some(Outcome::Overload)).await;
94 1 : release_notifier.notified().await;
95 1 : assert_eq!(limiter.state().limit(), 5, "overload: decrease");
96 : }
97 :
98 1 : #[tokio::test]
99 1 : async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
100 1 : let config = RateLimiterConfig {
101 1 : initial_limit: 4,
102 1 : aimd_config: Some(AimdConfig {
103 1 : aimd_decrease_factor: 0.5,
104 1 : aimd_min_utilisation_threshold: 0.5,
105 1 : aimd_increase_by: 1,
106 1 : ..Default::default()
107 1 : }),
108 1 : disable: false,
109 1 : ..Default::default()
110 1 : };
111 1 :
112 1 : let limiter = Limiter::new(config);
113 1 :
114 1 : let token = limiter.try_acquire().unwrap();
115 1 : let _token = limiter.try_acquire().unwrap();
116 1 : let _token = limiter.try_acquire().unwrap();
117 1 :
118 1 : limiter.release(token, Some(Outcome::Success)).await;
119 1 : assert_eq!(limiter.state().limit(), 5, "success: increase");
120 : }
121 :
122 1 : #[tokio::test]
123 1 : async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
124 1 : let config = RateLimiterConfig {
125 1 : initial_limit: 4,
126 1 : aimd_config: Some(AimdConfig {
127 1 : aimd_decrease_factor: 0.5,
128 1 : aimd_min_utilisation_threshold: 0.5,
129 1 : ..Default::default()
130 1 : }),
131 1 : disable: false,
132 1 : ..Default::default()
133 1 : };
134 1 :
135 1 : let limiter = Limiter::new(config);
136 1 :
137 1 : let token = limiter.try_acquire().unwrap();
138 1 :
139 1 : limiter.release(token, Some(Outcome::Success)).await;
140 1 : assert_eq!(
141 1 : limiter.state().limit(),
142 : 4,
143 UBC 0 : "success: ignore when < half limit"
144 : );
145 : }
146 :
147 CBC 1 : #[tokio::test]
148 1 : async fn should_not_change_limit_when_no_outcome() {
149 1 : let config = RateLimiterConfig {
150 1 : initial_limit: 10,
151 1 : aimd_config: Some(AimdConfig {
152 1 : aimd_decrease_factor: 0.5,
153 1 : aimd_min_utilisation_threshold: 0.5,
154 1 : ..Default::default()
155 1 : }),
156 1 : disable: false,
157 1 : ..Default::default()
158 1 : };
159 1 :
160 1 : let limiter = Limiter::new(config);
161 1 :
162 1 : let token = limiter.try_acquire().unwrap();
163 1 : limiter.release(token, None).await;
164 1 : assert_eq!(limiter.state().limit(), 10, "ignore");
165 : }
166 : }
|