Line data Source code
1 : //! Algorithms for controlling concurrency limits.
2 : use parking_lot::Mutex;
3 : use std::{pin::pin, sync::Arc, time::Duration};
4 : use tokio::{
5 : sync::Notify,
6 : time::{error::Elapsed, Instant},
7 : };
8 :
9 : use self::aimd::Aimd;
10 :
11 : pub(crate) mod aimd;
12 :
13 : /// Whether a job succeeded or failed as a result of congestion/overload.
14 : ///
15 : /// Errors not considered to be caused by overload should be ignored.
16 : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
17 : pub(crate) enum Outcome {
18 : /// The job succeeded, or failed in a way unrelated to overload.
19 : Success,
20 : /// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
21 : /// was observed.
22 : Overload,
23 : }
24 :
25 : /// An algorithm for controlling a concurrency limit.
26 : pub(crate) trait LimitAlgorithm: Send + Sync + 'static {
27 : /// Update the concurrency limit in response to a new job completion.
28 : fn update(&self, old_limit: usize, sample: Sample) -> usize;
29 : }
30 :
31 : /// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
32 : #[derive(Debug, Clone, PartialEq, Eq, Copy)]
33 : pub(crate) struct Sample {
34 : pub(crate) latency: Duration,
35 : /// Jobs in flight when the sample was taken.
36 : pub(crate) in_flight: usize,
37 : pub(crate) outcome: Outcome,
38 : }
39 :
40 18 : #[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
41 : #[serde(rename_all = "snake_case")]
42 : pub(crate) enum RateLimitAlgorithm {
43 : #[default]
44 : Fixed,
45 : Aimd {
46 : #[serde(flatten)]
47 : conf: Aimd,
48 : },
49 : }
50 :
51 : pub(crate) struct Fixed;
52 :
53 : impl LimitAlgorithm for Fixed {
54 0 : fn update(&self, old_limit: usize, _sample: Sample) -> usize {
55 0 : old_limit
56 0 : }
57 : }
58 :
59 18 : #[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
60 : pub struct RateLimiterConfig {
61 : #[serde(flatten)]
62 : pub(crate) algorithm: RateLimitAlgorithm,
63 : pub(crate) initial_limit: usize,
64 : }
65 :
66 : impl RateLimiterConfig {
67 36 : pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
68 36 : match self.algorithm {
69 0 : RateLimitAlgorithm::Fixed => Box::new(Fixed),
70 36 : RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
71 : }
72 36 : }
73 : }
74 :
75 : pub(crate) struct LimiterInner {
76 : alg: Box<dyn LimitAlgorithm>,
77 : available: usize,
78 : limit: usize,
79 : in_flight: usize,
80 : }
81 :
82 : impl LimiterInner {
83 66 : fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
84 66 : if let Some(outcome) = outcome {
85 48 : let sample = Sample {
86 48 : latency,
87 48 : in_flight: self.in_flight,
88 48 : outcome,
89 48 : };
90 48 : self.limit = self.alg.update(self.limit, sample);
91 48 : }
92 66 : }
93 :
94 66 : fn take(&mut self, ready: &Notify) -> Option<()> {
95 66 : if self.available >= 1 {
96 66 : self.available -= 1;
97 66 : self.in_flight += 1;
98 66 :
99 66 : // tell the next in the queue that there is a permit ready
100 66 : if self.available >= 1 {
101 48 : ready.notify_one();
102 48 : }
103 66 : Some(())
104 : } else {
105 0 : None
106 : }
107 66 : }
108 : }
109 :
110 : /// Limits the number of concurrent jobs.
111 : ///
112 : /// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
113 : /// token once the job is finished.
114 : ///
115 : /// The limit will be automatically adjusted based on observed latency (delay) and/or failures
116 : /// caused by overload (loss).
117 : pub(crate) struct DynamicLimiter {
118 : config: RateLimiterConfig,
119 : inner: Mutex<LimiterInner>,
120 : // to notify when a token is available
121 : ready: Notify,
122 : }
123 :
124 : /// A concurrency token, required to run a job.
125 : ///
126 : /// Release the token back to the [`DynamicLimiter`] after the job is complete.
127 : pub(crate) struct Token {
128 : start: Instant,
129 : limiter: Option<Arc<DynamicLimiter>>,
130 : }
131 :
132 : /// A snapshot of the state of the [`DynamicLimiter`].
133 : ///
134 : /// Not guaranteed to be consistent under high concurrency.
135 : #[derive(Debug, Clone, Copy)]
136 : #[cfg(test)]
137 : struct LimiterState {
138 : limit: usize,
139 : }
140 :
141 : impl DynamicLimiter {
142 : /// Create a limiter with a given limit control algorithm.
143 36 : pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
144 36 : let ready = Notify::new();
145 36 : ready.notify_one();
146 36 :
147 36 : Arc::new(Self {
148 36 : inner: Mutex::new(LimiterInner {
149 36 : alg: config.create_rate_limit_algorithm(),
150 36 : available: config.initial_limit,
151 36 : limit: config.initial_limit,
152 36 : in_flight: 0,
153 36 : }),
154 36 : ready,
155 36 : config,
156 36 : })
157 36 : }
158 :
159 : /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
160 72 : pub(crate) async fn acquire_timeout(
161 72 : self: &Arc<Self>,
162 72 : duration: Duration,
163 72 : ) -> Result<Token, Elapsed> {
164 72 : tokio::time::timeout(duration, self.acquire()).await?
165 72 : }
166 :
167 : /// Try to acquire a concurrency [Token].
168 72 : async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
169 72 : if self.config.initial_limit == 0 {
170 : // If the rate limiter is disabled, we can always acquire a token.
171 0 : Ok(Token::disabled())
172 : } else {
173 72 : let mut notified = pin!(self.ready.notified());
174 72 : let mut ready = notified.as_mut().enable();
175 : loop {
176 72 : if ready {
177 66 : let mut inner = self.inner.lock();
178 66 : if inner.take(&self.ready).is_some() {
179 66 : break Ok(Token::new(self.clone()));
180 0 : }
181 0 : notified.set(self.ready.notified());
182 6 : }
183 6 : notified.as_mut().await;
184 0 : ready = true;
185 : }
186 : }
187 66 : }
188 :
189 : /// Return the concurrency [Token], along with the outcome of the job.
190 : ///
191 : /// The [Outcome] of the job, and the time taken to perform it, may be used
192 : /// to update the concurrency limit.
193 : ///
194 : /// Set the outcome to `None` to ignore the job.
195 66 : fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
196 66 : tracing::info!("outcome is {:?}", outcome);
197 66 : if self.config.initial_limit == 0 {
198 0 : return;
199 66 : }
200 66 :
201 66 : let mut inner = self.inner.lock();
202 66 :
203 66 : inner.update_limit(start.elapsed(), outcome);
204 66 :
205 66 : inner.in_flight -= 1;
206 66 : if inner.in_flight < inner.limit {
207 66 : inner.available = inner.limit - inner.in_flight;
208 66 : // At least 1 permit is now available
209 66 : self.ready.notify_one();
210 66 : }
211 66 : }
212 :
213 : /// The current state of the limiter.
214 : #[cfg(test)]
215 54 : fn state(&self) -> LimiterState {
216 54 : let inner = self.inner.lock();
217 54 : LimiterState { limit: inner.limit }
218 54 : }
219 : }
220 :
221 : impl Token {
222 66 : fn new(limiter: Arc<DynamicLimiter>) -> Self {
223 66 : Self {
224 66 : start: Instant::now(),
225 66 : limiter: Some(limiter),
226 66 : }
227 66 : }
228 0 : pub(crate) fn disabled() -> Self {
229 0 : Self {
230 0 : start: Instant::now(),
231 0 : limiter: None,
232 0 : }
233 0 : }
234 :
235 0 : pub(crate) fn is_disabled(&self) -> bool {
236 0 : self.limiter.is_none()
237 0 : }
238 :
239 48 : pub(crate) fn release(mut self, outcome: Outcome) {
240 48 : self.release_mut(Some(outcome));
241 48 : }
242 :
243 114 : pub(crate) fn release_mut(&mut self, outcome: Option<Outcome>) {
244 114 : if let Some(limiter) = self.limiter.take() {
245 66 : limiter.release_inner(self.start, outcome);
246 66 : }
247 114 : }
248 : }
249 :
250 : impl Drop for Token {
251 66 : fn drop(&mut self) {
252 66 : self.release_mut(None);
253 66 : }
254 : }
255 :
256 : #[cfg(test)]
257 : impl LimiterState {
258 : /// The current concurrency limit.
259 54 : fn limit(self) -> usize {
260 54 : self.limit
261 54 : }
262 : }
|