Line data Source code
1 : //! Algorithms for controlling concurrency limits.
2 : use std::pin::pin;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use parking_lot::Mutex;
7 : use tokio::sync::Notify;
8 : use tokio::time::error::Elapsed;
9 : use tokio::time::Instant;
10 :
11 : use self::aimd::Aimd;
12 :
13 : pub(crate) mod aimd;
14 :
15 : /// Whether a job succeeded or failed as a result of congestion/overload.
16 : ///
17 : /// Errors not considered to be caused by overload should be ignored.
18 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
19 : pub(crate) enum Outcome {
20 : /// The job succeeded, or failed in a way unrelated to overload.
21 : Success,
22 : /// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
23 : /// was observed.
24 : Overload,
25 : }
26 :
27 : /// An algorithm for controlling a concurrency limit.
28 : pub(crate) trait LimitAlgorithm: Send + Sync + 'static {
29 : /// Update the concurrency limit in response to a new job completion.
30 : fn update(&self, old_limit: usize, sample: Sample) -> usize;
31 : }
32 :
33 : /// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
34 : #[derive(Debug, Clone, PartialEq, Eq, Copy)]
35 : pub(crate) struct Sample {
36 : pub(crate) latency: Duration,
37 : /// Jobs in flight when the sample was taken.
38 : pub(crate) in_flight: usize,
39 : pub(crate) outcome: Outcome,
40 : }
41 :
42 3 : #[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
43 : #[serde(rename_all = "snake_case")]
44 : pub(crate) enum RateLimitAlgorithm {
45 : #[default]
46 : Fixed,
47 : Aimd {
48 : #[serde(flatten)]
49 : conf: Aimd,
50 : },
51 : }
52 :
53 : pub(crate) struct Fixed;
54 :
55 : impl LimitAlgorithm for Fixed {
56 0 : fn update(&self, old_limit: usize, _sample: Sample) -> usize {
57 0 : old_limit
58 0 : }
59 : }
60 :
61 3 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
62 : pub struct RateLimiterConfig {
63 : #[serde(flatten)]
64 : pub(crate) algorithm: RateLimitAlgorithm,
65 : pub(crate) initial_limit: usize,
66 : }
67 :
68 : impl RateLimiterConfig {
69 6 : pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
70 6 : match self.algorithm {
71 0 : RateLimitAlgorithm::Fixed => Box::new(Fixed),
72 6 : RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
73 : }
74 6 : }
75 : }
76 :
77 : pub(crate) struct LimiterInner {
78 : alg: Box<dyn LimitAlgorithm>,
79 : available: usize,
80 : limit: usize,
81 : in_flight: usize,
82 : }
83 :
84 : impl LimiterInner {
85 11 : fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
86 11 : if let Some(outcome) = outcome {
87 8 : let sample = Sample {
88 8 : latency,
89 8 : in_flight: self.in_flight,
90 8 : outcome,
91 8 : };
92 8 : self.limit = self.alg.update(self.limit, sample);
93 8 : }
94 11 : }
95 :
96 11 : fn take(&mut self, ready: &Notify) -> Option<()> {
97 11 : if self.available >= 1 {
98 11 : self.available -= 1;
99 11 : self.in_flight += 1;
100 11 :
101 11 : // tell the next in the queue that there is a permit ready
102 11 : if self.available >= 1 {
103 8 : ready.notify_one();
104 8 : }
105 11 : Some(())
106 : } else {
107 0 : None
108 : }
109 11 : }
110 : }
111 :
112 : /// Limits the number of concurrent jobs.
113 : ///
114 : /// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
115 : /// token once the job is finished.
116 : ///
117 : /// The limit will be automatically adjusted based on observed latency (delay) and/or failures
118 : /// caused by overload (loss).
119 : pub(crate) struct DynamicLimiter {
120 : config: RateLimiterConfig,
121 : inner: Mutex<LimiterInner>,
122 : // to notify when a token is available
123 : ready: Notify,
124 : }
125 :
126 : /// A concurrency token, required to run a job.
127 : ///
128 : /// Release the token back to the [`DynamicLimiter`] after the job is complete.
129 : pub(crate) struct Token {
130 : start: Instant,
131 : limiter: Option<Arc<DynamicLimiter>>,
132 : }
133 :
134 : /// A snapshot of the state of the [`DynamicLimiter`].
135 : ///
136 : /// Not guaranteed to be consistent under high concurrency.
137 : #[derive(Debug, Clone, Copy)]
138 : #[cfg(test)]
139 : struct LimiterState {
140 : limit: usize,
141 : }
142 :
143 : impl DynamicLimiter {
144 : /// Create a limiter with a given limit control algorithm.
145 6 : pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
146 6 : let ready = Notify::new();
147 6 : ready.notify_one();
148 6 :
149 6 : Arc::new(Self {
150 6 : inner: Mutex::new(LimiterInner {
151 6 : alg: config.create_rate_limit_algorithm(),
152 6 : available: config.initial_limit,
153 6 : limit: config.initial_limit,
154 6 : in_flight: 0,
155 6 : }),
156 6 : ready,
157 6 : config,
158 6 : })
159 6 : }
160 :
161 : /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
162 12 : pub(crate) async fn acquire_timeout(
163 12 : self: &Arc<Self>,
164 12 : duration: Duration,
165 12 : ) -> Result<Token, Elapsed> {
166 12 : tokio::time::timeout(duration, self.acquire()).await?
167 12 : }
168 :
169 : /// Try to acquire a concurrency [Token].
170 12 : async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
171 12 : if self.config.initial_limit == 0 {
172 : // If the rate limiter is disabled, we can always acquire a token.
173 0 : Ok(Token::disabled())
174 : } else {
175 12 : let mut notified = pin!(self.ready.notified());
176 12 : let mut ready = notified.as_mut().enable();
177 : loop {
178 12 : if ready {
179 11 : let mut inner = self.inner.lock();
180 11 : if inner.take(&self.ready).is_some() {
181 11 : break Ok(Token::new(self.clone()));
182 0 : }
183 0 : notified.set(self.ready.notified());
184 1 : }
185 1 : notified.as_mut().await;
186 0 : ready = true;
187 : }
188 : }
189 11 : }
190 :
191 : /// Return the concurrency [Token], along with the outcome of the job.
192 : ///
193 : /// The [Outcome] of the job, and the time taken to perform it, may be used
194 : /// to update the concurrency limit.
195 : ///
196 : /// Set the outcome to `None` to ignore the job.
197 11 : fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
198 11 : tracing::info!("outcome is {:?}", outcome);
199 11 : if self.config.initial_limit == 0 {
200 0 : return;
201 11 : }
202 11 :
203 11 : let mut inner = self.inner.lock();
204 11 :
205 11 : inner.update_limit(start.elapsed(), outcome);
206 11 :
207 11 : inner.in_flight -= 1;
208 11 : if inner.in_flight < inner.limit {
209 11 : inner.available = inner.limit - inner.in_flight;
210 11 : // At least 1 permit is now available
211 11 : self.ready.notify_one();
212 11 : }
213 11 : }
214 :
215 : /// The current state of the limiter.
216 : #[cfg(test)]
217 9 : fn state(&self) -> LimiterState {
218 9 : let inner = self.inner.lock();
219 9 : LimiterState { limit: inner.limit }
220 9 : }
221 : }
222 :
223 : impl Token {
224 11 : fn new(limiter: Arc<DynamicLimiter>) -> Self {
225 11 : Self {
226 11 : start: Instant::now(),
227 11 : limiter: Some(limiter),
228 11 : }
229 11 : }
230 0 : pub(crate) fn disabled() -> Self {
231 0 : Self {
232 0 : start: Instant::now(),
233 0 : limiter: None,
234 0 : }
235 0 : }
236 :
237 0 : pub(crate) fn is_disabled(&self) -> bool {
238 0 : self.limiter.is_none()
239 0 : }
240 :
241 8 : pub(crate) fn release(mut self, outcome: Outcome) {
242 8 : self.release_mut(Some(outcome));
243 8 : }
244 :
245 19 : pub(crate) fn release_mut(&mut self, outcome: Option<Outcome>) {
246 19 : if let Some(limiter) = self.limiter.take() {
247 11 : limiter.release_inner(self.start, outcome);
248 11 : }
249 19 : }
250 : }
251 :
252 : impl Drop for Token {
253 11 : fn drop(&mut self) {
254 11 : self.release_mut(None);
255 11 : }
256 : }
257 :
258 : #[cfg(test)]
259 : impl LimiterState {
260 : /// The current concurrency limit.
261 9 : fn limit(self) -> usize {
262 9 : self.limit
263 9 : }
264 : }
|