Line data Source code
1 : //! This module implements the Generic Cell Rate Algorithm for a simplified
2 : //! version of the Leaky Bucket rate limiting system.
3 : //!
4 : //! # Leaky Bucket
5 : //!
6 : //! If the bucket is full, no new requests are allowed and are throttled/errored.
7 : //! If the bucket is partially full/empty, new requests are added to the bucket in
8 : //! terms of "tokens".
9 : //!
10 : //! Over time, tokens are removed from the bucket, naturally allowing new requests at a steady rate.
11 : //!
12 : //! The bucket size tunes the burst support. The drain rate tunes the steady-rate requests per second.
13 : //!
14 : //! # [GCRA](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm)
15 : //!
16 : //! GCRA is a continuous rate leaky-bucket impl that stores minimal state and requires
17 : //! no background jobs to drain tokens, as the design utilises timestamps to drain automatically over time.
18 : //!
19 : //! We store an "empty_at" timestamp as the only state. As time progresses, we will naturally approach
20 : //! the empty state. The full-bucket state is calculated from `empty_at - config.bucket_width`.
21 : //!
22 : //! Another explaination can be found here: <https://brandur.org/rate-limiting>
23 :
24 : use std::{sync::Mutex, time::Duration};
25 :
26 : use tokio::{sync::Notify, time::Instant};
27 :
28 : pub struct LeakyBucketConfig {
29 : /// This is the "time cost" of a single request unit.
30 : /// Should loosely represent how long it takes to handle a request unit in active resource time.
31 : /// Loosely speaking this is the inverse of the steady-rate requests-per-second
32 : pub cost: Duration,
33 :
34 : /// total size of the bucket
35 : pub bucket_width: Duration,
36 : }
37 :
38 : impl LeakyBucketConfig {
39 598 : pub fn new(rps: f64, bucket_size: f64) -> Self {
40 598 : let cost = Duration::from_secs_f64(rps.recip());
41 598 : let bucket_width = cost.mul_f64(bucket_size);
42 598 : Self { cost, bucket_width }
43 598 : }
44 : }
45 :
46 : pub struct LeakyBucketState {
47 : /// Bucket is represented by `allow_at..empty_at` where `allow_at = empty_at - config.bucket_width`.
48 : ///
49 : /// At any given time, `empty_at - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
50 : /// Adding `n` tokens to the bucket is done by moving `empty_at` forward by `n * config.time_cost`.
51 : /// If `now < allow_at`, the bucket is considered filled and cannot accept any more tokens.
52 : /// Draining the bucket will happen naturally as `now` moves forward.
53 : ///
54 : /// Let `n` be some "time cost" for the request,
55 : /// If now is after empty_at, the bucket is empty and the empty_at is reset to now,
56 : /// If now is within the `bucket window + n`, we are within time budget.
57 : /// If now is before the `bucket window + n`, we have run out of budget.
58 : ///
59 : /// This is inspired by the generic cell rate algorithm (GCRA) and works
60 : /// exactly the same as a leaky-bucket.
61 : pub empty_at: Instant,
62 : }
63 :
64 : impl LeakyBucketState {
65 594 : pub fn with_initial_tokens(config: &LeakyBucketConfig, initial_tokens: f64) -> Self {
66 594 : LeakyBucketState {
67 594 : empty_at: Instant::now() + config.cost.mul_f64(initial_tokens),
68 594 : }
69 594 : }
70 :
71 2 : pub fn bucket_is_empty(&self, now: Instant) -> bool {
72 2 : // if self.end is after now, the bucket is not empty
73 2 : self.empty_at <= now
74 2 : }
75 :
76 : /// Immediately adds tokens to the bucket, if there is space.
77 : ///
78 : /// In a scenario where you are waiting for available rate,
79 : /// rather than just erroring immediately, `started` corresponds to when this waiting started.
80 : ///
81 : /// `n` is the number of tokens that will be filled in the bucket.
82 : ///
83 : /// # Errors
84 : ///
85 : /// If there is not enough space, no tokens are added. Instead, an error is returned with the time when
86 : /// there will be space again.
87 16219 : pub fn add_tokens(
88 16219 : &mut self,
89 16219 : config: &LeakyBucketConfig,
90 16219 : started: Instant,
91 16219 : n: f64,
92 16219 : ) -> Result<(), Instant> {
93 16219 : let now = Instant::now();
94 16219 :
95 16219 : // invariant: started <= now
96 16219 : debug_assert!(started <= now);
97 :
98 : // If the bucket was empty when we started our search,
99 : // we should update the `empty_at` value accordingly.
100 : // this prevents us from having negative tokens in the bucket.
101 16219 : let mut empty_at = self.empty_at;
102 16219 : if empty_at < started {
103 6 : empty_at = started;
104 16213 : }
105 :
106 16219 : let n = config.cost.mul_f64(n);
107 16219 : let new_empty_at = empty_at + n;
108 16219 : let allow_at = new_empty_at.checked_sub(config.bucket_width);
109 :
110 : // empty_at
111 : // allow_at | new_empty_at
112 : // / | /
113 : // -------o-[---------o-|--]---------
114 : // now1 ^ now2 ^
115 : //
116 : // at now1, the bucket would be completely filled if we add n tokens.
117 : // at now2, the bucket would be partially filled if we add n tokens.
118 :
119 16219 : match allow_at {
120 16219 : Some(allow_at) if now < allow_at => Err(allow_at),
121 : _ => {
122 16210 : self.empty_at = new_empty_at;
123 16210 : Ok(())
124 : }
125 : }
126 16219 : }
127 : }
128 :
129 : pub struct RateLimiter {
130 : pub config: LeakyBucketConfig,
131 : pub state: Mutex<LeakyBucketState>,
132 : /// a queue to provide this fair ordering.
133 : pub queue: Notify,
134 : }
135 :
136 : struct Requeue<'a>(&'a Notify);
137 :
138 : impl Drop for Requeue<'_> {
139 0 : fn drop(&mut self) {
140 0 : self.0.notify_one();
141 0 : }
142 : }
143 :
144 : impl RateLimiter {
145 594 : pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self {
146 594 : RateLimiter {
147 594 : state: Mutex::new(LeakyBucketState::with_initial_tokens(
148 594 : &config,
149 594 : initial_tokens,
150 594 : )),
151 594 : config,
152 594 : queue: {
153 594 : let queue = Notify::new();
154 594 : queue.notify_one();
155 594 : queue
156 594 : },
157 594 : }
158 594 : }
159 :
160 0 : pub fn steady_rps(&self) -> f64 {
161 0 : self.config.cost.as_secs_f64().recip()
162 0 : }
163 :
164 : /// returns true if we did throttle
165 0 : pub async fn acquire(&self, count: usize) -> bool {
166 0 : let mut throttled = false;
167 0 :
168 0 : let start = tokio::time::Instant::now();
169 0 :
170 0 : // wait until we are the first in the queue
171 0 : let mut notified = std::pin::pin!(self.queue.notified());
172 0 : if !notified.as_mut().enable() {
173 0 : throttled = true;
174 0 : notified.await;
175 0 : }
176 :
177 : // notify the next waiter in the queue when we are done.
178 0 : let _guard = Requeue(&self.queue);
179 :
180 : loop {
181 0 : let res = self
182 0 : .state
183 0 : .lock()
184 0 : .unwrap()
185 0 : .add_tokens(&self.config, start, count as f64);
186 0 : match res {
187 0 : Ok(()) => return throttled,
188 0 : Err(ready_at) => {
189 0 : throttled = true;
190 0 : tokio::time::sleep_until(ready_at).await;
191 : }
192 : }
193 : }
194 0 : }
195 : }
196 :
197 : #[cfg(test)]
198 : mod tests {
199 : use std::time::Duration;
200 :
201 : use tokio::time::Instant;
202 :
203 : use super::{LeakyBucketConfig, LeakyBucketState};
204 :
205 : #[tokio::test(start_paused = true)]
206 1 : async fn check() {
207 1 : let config = LeakyBucketConfig {
208 1 : // average 100rps
209 1 : cost: Duration::from_millis(10),
210 1 : // burst up to 100 requests
211 1 : bucket_width: Duration::from_millis(1000),
212 1 : };
213 1 :
214 1 : let mut state = LeakyBucketState {
215 1 : empty_at: Instant::now(),
216 1 : };
217 1 :
218 1 : // supports burst
219 1 : {
220 1 : // should work for 100 requests this instant
221 101 : for _ in 0..100 {
222 100 : state.add_tokens(&config, Instant::now(), 1.0).unwrap();
223 100 : }
224 1 : let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
225 1 : assert_eq!(ready - Instant::now(), Duration::from_millis(10));
226 1 : }
227 1 :
228 1 : // doesn't overfill
229 1 : {
230 1 : // after 1s we should have an empty bucket again.
231 1 : tokio::time::advance(Duration::from_secs(1)).await;
232 1 : assert!(state.bucket_is_empty(Instant::now()));
233 1 :
234 1 : // after 1s more, we should not over count the tokens and allow more than 200 requests.
235 1 : tokio::time::advance(Duration::from_secs(1)).await;
236 101 : for _ in 0..100 {
237 100 : state.add_tokens(&config, Instant::now(), 1.0).unwrap();
238 100 : }
239 1 : let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
240 1 : assert_eq!(ready - Instant::now(), Duration::from_millis(10));
241 1 : }
242 1 :
243 1 : // supports sustained rate over a long period
244 1 : {
245 1 : tokio::time::advance(Duration::from_secs(1)).await;
246 1 :
247 1 : // should sustain 100rps
248 2001 : for _ in 0..2000 {
249 2000 : tokio::time::advance(Duration::from_millis(10)).await;
250 2000 : state.add_tokens(&config, Instant::now(), 1.0).unwrap();
251 1 : }
252 1 : }
253 1 :
254 1 : // supports requesting more tokens than can be stored in the bucket
255 1 : // we just wait a little bit longer upfront.
256 1 : {
257 1 : // start the bucket completely empty
258 1 : tokio::time::advance(Duration::from_secs(5)).await;
259 1 : assert!(state.bucket_is_empty(Instant::now()));
260 1 :
261 1 : // requesting 200 tokens of space should take 200*cost = 2s
262 1 : // but we already have 1s available, so we wait 1s from start.
263 1 : let start = Instant::now();
264 1 :
265 1 : let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
266 1 : assert_eq!(ready - Instant::now(), Duration::from_secs(1));
267 1 :
268 1 : tokio::time::advance(Duration::from_millis(500)).await;
269 1 : let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
270 1 : assert_eq!(ready - Instant::now(), Duration::from_millis(500));
271 1 :
272 1 : tokio::time::advance(Duration::from_millis(500)).await;
273 1 : state.add_tokens(&config, start, 200.0).unwrap();
274 1 :
275 1 : // bucket should be completely full now
276 1 : let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
277 1 : assert_eq!(ready - Instant::now(), Duration::from_millis(10));
278 1 : }
279 1 : }
280 : }
|