Line data Source code
1 : //! Algorithms for controlling concurrency limits.
2 : use parking_lot::Mutex;
3 : use std::{pin::pin, sync::Arc, time::Duration};
4 : use tokio::{
5 : sync::Notify,
6 : time::{error::Elapsed, Instant},
7 : };
8 :
9 : use self::aimd::Aimd;
10 :
11 : pub(crate) mod aimd;
12 :
13 : /// Whether a job succeeded or failed as a result of congestion/overload.
14 : ///
15 : /// Errors not considered to be caused by overload should be ignored.
16 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
17 : pub(crate) enum Outcome {
18 : /// The job succeeded, or failed in a way unrelated to overload.
19 : Success,
20 : /// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
21 : /// was observed.
22 : Overload,
23 : }
24 :
25 : /// An algorithm for controlling a concurrency limit.
26 : pub(crate) trait LimitAlgorithm: Send + Sync + 'static {
27 : /// Update the concurrency limit in response to a new job completion.
28 : fn update(&self, old_limit: usize, sample: Sample) -> usize;
29 : }
30 :
31 : /// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
32 : #[derive(Debug, Clone, PartialEq, Eq, Copy)]
33 : pub(crate) struct Sample {
34 : pub(crate) latency: Duration,
35 : /// Jobs in flight when the sample was taken.
36 : pub(crate) in_flight: usize,
37 : pub(crate) outcome: Outcome,
38 : }
39 :
40 3 : #[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
41 : #[serde(rename_all = "snake_case")]
42 : pub(crate) enum RateLimitAlgorithm {
43 : #[default]
44 : Fixed,
45 : Aimd {
46 : #[serde(flatten)]
47 : conf: Aimd,
48 : },
49 : }
50 :
51 : pub(crate) struct Fixed;
52 :
53 : impl LimitAlgorithm for Fixed {
54 0 : fn update(&self, old_limit: usize, _sample: Sample) -> usize {
55 0 : old_limit
56 0 : }
57 : }
58 :
59 3 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
60 : pub struct RateLimiterConfig {
61 : #[serde(flatten)]
62 : pub(crate) algorithm: RateLimitAlgorithm,
63 : pub(crate) initial_limit: usize,
64 : }
65 :
66 : impl RateLimiterConfig {
67 6 : pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
68 6 : match self.algorithm {
69 0 : RateLimitAlgorithm::Fixed => Box::new(Fixed),
70 6 : RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
71 : }
72 6 : }
73 : }
74 :
75 : pub(crate) struct LimiterInner {
76 : alg: Box<dyn LimitAlgorithm>,
77 : available: usize,
78 : limit: usize,
79 : in_flight: usize,
80 : }
81 :
82 : impl LimiterInner {
83 11 : fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
84 11 : if let Some(outcome) = outcome {
85 8 : let sample = Sample {
86 8 : latency,
87 8 : in_flight: self.in_flight,
88 8 : outcome,
89 8 : };
90 8 : self.limit = self.alg.update(self.limit, sample);
91 8 : }
92 11 : }
93 :
94 11 : fn take(&mut self, ready: &Notify) -> Option<()> {
95 11 : if self.available >= 1 {
96 11 : self.available -= 1;
97 11 : self.in_flight += 1;
98 11 :
99 11 : // tell the next in the queue that there is a permit ready
100 11 : if self.available >= 1 {
101 8 : ready.notify_one();
102 8 : }
103 11 : Some(())
104 : } else {
105 0 : None
106 : }
107 11 : }
108 : }
109 :
110 : /// Limits the number of concurrent jobs.
111 : ///
112 : /// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
113 : /// token once the job is finished.
114 : ///
115 : /// The limit will be automatically adjusted based on observed latency (delay) and/or failures
116 : /// caused by overload (loss).
117 : pub(crate) struct DynamicLimiter {
118 : config: RateLimiterConfig,
119 : inner: Mutex<LimiterInner>,
120 : // to notify when a token is available
121 : ready: Notify,
122 : }
123 :
124 : /// A concurrency token, required to run a job.
125 : ///
126 : /// Release the token back to the [`DynamicLimiter`] after the job is complete.
127 : pub(crate) struct Token {
128 : start: Instant,
129 : limiter: Option<Arc<DynamicLimiter>>,
130 : }
131 :
132 : /// A snapshot of the state of the [`DynamicLimiter`].
133 : ///
134 : /// Not guaranteed to be consistent under high concurrency.
135 : #[derive(Debug, Clone, Copy)]
136 : #[cfg(test)]
137 : struct LimiterState {
138 : limit: usize,
139 : }
140 :
141 : impl DynamicLimiter {
142 : /// Create a limiter with a given limit control algorithm.
143 6 : pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
144 6 : let ready = Notify::new();
145 6 : ready.notify_one();
146 6 :
147 6 : Arc::new(Self {
148 6 : inner: Mutex::new(LimiterInner {
149 6 : alg: config.create_rate_limit_algorithm(),
150 6 : available: config.initial_limit,
151 6 : limit: config.initial_limit,
152 6 : in_flight: 0,
153 6 : }),
154 6 : ready,
155 6 : config,
156 6 : })
157 6 : }
158 :
159 : /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
160 12 : pub(crate) async fn acquire_timeout(
161 12 : self: &Arc<Self>,
162 12 : duration: Duration,
163 12 : ) -> Result<Token, Elapsed> {
164 12 : tokio::time::timeout(duration, self.acquire()).await?
165 12 : }
166 :
167 : /// Try to acquire a concurrency [Token].
168 12 : async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
169 12 : if self.config.initial_limit == 0 {
170 : // If the rate limiter is disabled, we can always acquire a token.
171 0 : Ok(Token::disabled())
172 : } else {
173 12 : let mut notified = pin!(self.ready.notified());
174 12 : let mut ready = notified.as_mut().enable();
175 : loop {
176 12 : if ready {
177 11 : let mut inner = self.inner.lock();
178 11 : if inner.take(&self.ready).is_some() {
179 11 : break Ok(Token::new(self.clone()));
180 0 : }
181 0 : notified.set(self.ready.notified());
182 1 : }
183 1 : notified.as_mut().await;
184 0 : ready = true;
185 : }
186 : }
187 11 : }
188 :
189 : /// Return the concurrency [Token], along with the outcome of the job.
190 : ///
191 : /// The [Outcome] of the job, and the time taken to perform it, may be used
192 : /// to update the concurrency limit.
193 : ///
194 : /// Set the outcome to `None` to ignore the job.
195 11 : fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
196 11 : tracing::info!("outcome is {:?}", outcome);
197 11 : if self.config.initial_limit == 0 {
198 0 : return;
199 11 : }
200 11 :
201 11 : let mut inner = self.inner.lock();
202 11 :
203 11 : inner.update_limit(start.elapsed(), outcome);
204 11 :
205 11 : inner.in_flight -= 1;
206 11 : if inner.in_flight < inner.limit {
207 11 : inner.available = inner.limit - inner.in_flight;
208 11 : // At least 1 permit is now available
209 11 : self.ready.notify_one();
210 11 : }
211 11 : }
212 :
213 : /// The current state of the limiter.
214 : #[cfg(test)]
215 9 : fn state(&self) -> LimiterState {
216 9 : let inner = self.inner.lock();
217 9 : LimiterState { limit: inner.limit }
218 9 : }
219 : }
220 :
221 : impl Token {
222 11 : fn new(limiter: Arc<DynamicLimiter>) -> Self {
223 11 : Self {
224 11 : start: Instant::now(),
225 11 : limiter: Some(limiter),
226 11 : }
227 11 : }
228 0 : pub(crate) fn disabled() -> Self {
229 0 : Self {
230 0 : start: Instant::now(),
231 0 : limiter: None,
232 0 : }
233 0 : }
234 :
235 0 : pub(crate) fn is_disabled(&self) -> bool {
236 0 : self.limiter.is_none()
237 0 : }
238 :
239 8 : pub(crate) fn release(mut self, outcome: Outcome) {
240 8 : self.release_mut(Some(outcome));
241 8 : }
242 :
243 19 : pub(crate) fn release_mut(&mut self, outcome: Option<Outcome>) {
244 19 : if let Some(limiter) = self.limiter.take() {
245 11 : limiter.release_inner(self.start, outcome);
246 11 : }
247 19 : }
248 : }
249 :
250 : impl Drop for Token {
251 11 : fn drop(&mut self) {
252 11 : self.release_mut(None);
253 11 : }
254 : }
255 :
256 : #[cfg(test)]
257 : impl LimiterState {
258 : /// The current concurrency limit.
259 9 : fn limit(self) -> usize {
260 9 : self.limit
261 9 : }
262 : }
|