LCOV - code coverage report
Current view: top level - libs/utils/src - leaky_bucket.rs (source / functions) Coverage Total Hit
Test: 53437f7e869ac68c86c7d3e4c20964c0156f158c.info Lines: 81.0 % 153 124
Test Date: 2024-09-20 16:14:12 Functions: 63.6 % 11 7

            Line data    Source code
       1              : //! This module implements the Generic Cell Rate Algorithm for a simplified
       2              : //! version of the Leaky Bucket rate limiting system.
       3              : //!
       4              : //! # Leaky Bucket
       5              : //!
       6              : //! If the bucket is full, no new requests are allowed and are throttled/errored.
       7              : //! If the bucket is partially full/empty, new requests are added to the bucket in
       8              : //! terms of "tokens".
       9              : //!
      10              : //! Over time, tokens are removed from the bucket, naturally allowing new requests at a steady rate.
      11              : //!
      12              : //! The bucket size tunes the burst support. The drain rate tunes the steady-rate requests per second.
      13              : //!
      14              : //! # [GCRA](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm)
      15              : //!
      16              : //! GCRA is a continuous rate leaky-bucket impl that stores minimal state and requires
      17              : //! no background jobs to drain tokens, as the design utilises timestamps to drain automatically over time.
      18              : //!
      19              : //! We store an "empty_at" timestamp as the only state. As time progresses, we will naturally approach
      20              : //! the empty state. The full-bucket state is calculated from `empty_at - config.bucket_width`.
      21              : //!
      22              : //! Another explaination can be found here: <https://brandur.org/rate-limiting>
      23              : 
      24              : use std::{sync::Mutex, time::Duration};
      25              : 
      26              : use tokio::{sync::Notify, time::Instant};
      27              : 
      28              : pub struct LeakyBucketConfig {
      29              :     /// This is the "time cost" of a single request unit.
      30              :     /// Should loosely represent how long it takes to handle a request unit in active resource time.
      31              :     /// Loosely speaking this is the inverse of the steady-rate requests-per-second
      32              :     pub cost: Duration,
      33              : 
      34              :     /// total size of the bucket
      35              :     pub bucket_width: Duration,
      36              : }
      37              : 
      38              : impl LeakyBucketConfig {
      39          598 :     pub fn new(rps: f64, bucket_size: f64) -> Self {
      40          598 :         let cost = Duration::from_secs_f64(rps.recip());
      41          598 :         let bucket_width = cost.mul_f64(bucket_size);
      42          598 :         Self { cost, bucket_width }
      43          598 :     }
      44              : }
      45              : 
      46              : pub struct LeakyBucketState {
      47              :     /// Bucket is represented by `allow_at..empty_at` where `allow_at = empty_at - config.bucket_width`.
      48              :     ///
      49              :     /// At any given time, `empty_at - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
      50              :     /// Adding `n` tokens to the bucket is done by moving `empty_at` forward by `n * config.time_cost`.
      51              :     /// If `now < allow_at`, the bucket is considered filled and cannot accept any more tokens.
      52              :     /// Draining the bucket will happen naturally as `now` moves forward.
      53              :     ///
      54              :     /// Let `n` be some "time cost" for the request,
      55              :     /// If now is after empty_at, the bucket is empty and the empty_at is reset to now,
      56              :     /// If now is within the `bucket window + n`, we are within time budget.
      57              :     /// If now is before the `bucket window + n`, we have run out of budget.
      58              :     ///
      59              :     /// This is inspired by the generic cell rate algorithm (GCRA) and works
      60              :     /// exactly the same as a leaky-bucket.
      61              :     pub empty_at: Instant,
      62              : }
      63              : 
      64              : impl LeakyBucketState {
      65          594 :     pub fn with_initial_tokens(config: &LeakyBucketConfig, initial_tokens: f64) -> Self {
      66          594 :         LeakyBucketState {
      67          594 :             empty_at: Instant::now() + config.cost.mul_f64(initial_tokens),
      68          594 :         }
      69          594 :     }
      70              : 
      71            2 :     pub fn bucket_is_empty(&self, now: Instant) -> bool {
      72            2 :         // if self.end is after now, the bucket is not empty
      73            2 :         self.empty_at <= now
      74            2 :     }
      75              : 
      76              :     /// Immediately adds tokens to the bucket, if there is space.
      77              :     ///
      78              :     /// In a scenario where you are waiting for available rate,
      79              :     /// rather than just erroring immediately, `started` corresponds to when this waiting started.
      80              :     ///
      81              :     /// `n` is the number of tokens that will be filled in the bucket.
      82              :     ///
      83              :     /// # Errors
      84              :     ///
      85              :     /// If there is not enough space, no tokens are added. Instead, an error is returned with the time when
      86              :     /// there will be space again.
      87        16219 :     pub fn add_tokens(
      88        16219 :         &mut self,
      89        16219 :         config: &LeakyBucketConfig,
      90        16219 :         started: Instant,
      91        16219 :         n: f64,
      92        16219 :     ) -> Result<(), Instant> {
      93        16219 :         let now = Instant::now();
      94        16219 : 
      95        16219 :         // invariant: started <= now
      96        16219 :         debug_assert!(started <= now);
      97              : 
      98              :         // If the bucket was empty when we started our search,
      99              :         // we should update the `empty_at` value accordingly.
     100              :         // this prevents us from having negative tokens in the bucket.
     101        16219 :         let mut empty_at = self.empty_at;
     102        16219 :         if empty_at < started {
     103            6 :             empty_at = started;
     104        16213 :         }
     105              : 
     106        16219 :         let n = config.cost.mul_f64(n);
     107        16219 :         let new_empty_at = empty_at + n;
     108        16219 :         let allow_at = new_empty_at.checked_sub(config.bucket_width);
     109              : 
     110              :         //                     empty_at
     111              :         //          allow_at    |   new_empty_at
     112              :         //           /          |   /
     113              :         // -------o-[---------o-|--]---------
     114              :         //   now1 ^      now2 ^
     115              :         //
     116              :         // at now1, the bucket would be completely filled if we add n tokens.
     117              :         // at now2, the bucket would be partially filled if we add n tokens.
     118              : 
     119        16219 :         match allow_at {
     120        16219 :             Some(allow_at) if now < allow_at => Err(allow_at),
     121              :             _ => {
     122        16210 :                 self.empty_at = new_empty_at;
     123        16210 :                 Ok(())
     124              :             }
     125              :         }
     126        16219 :     }
     127              : }
     128              : 
     129              : pub struct RateLimiter {
     130              :     pub config: LeakyBucketConfig,
     131              :     pub state: Mutex<LeakyBucketState>,
     132              :     /// a queue to provide this fair ordering.
     133              :     pub queue: Notify,
     134              : }
     135              : 
     136              : struct Requeue<'a>(&'a Notify);
     137              : 
     138              : impl Drop for Requeue<'_> {
     139            0 :     fn drop(&mut self) {
     140            0 :         self.0.notify_one();
     141            0 :     }
     142              : }
     143              : 
     144              : impl RateLimiter {
     145          594 :     pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self {
     146          594 :         RateLimiter {
     147          594 :             state: Mutex::new(LeakyBucketState::with_initial_tokens(
     148          594 :                 &config,
     149          594 :                 initial_tokens,
     150          594 :             )),
     151          594 :             config,
     152          594 :             queue: {
     153          594 :                 let queue = Notify::new();
     154          594 :                 queue.notify_one();
     155          594 :                 queue
     156          594 :             },
     157          594 :         }
     158          594 :     }
     159              : 
     160            0 :     pub fn steady_rps(&self) -> f64 {
     161            0 :         self.config.cost.as_secs_f64().recip()
     162            0 :     }
     163              : 
     164              :     /// returns true if we did throttle
     165            0 :     pub async fn acquire(&self, count: usize) -> bool {
     166            0 :         let mut throttled = false;
     167            0 : 
     168            0 :         let start = tokio::time::Instant::now();
     169            0 : 
     170            0 :         // wait until we are the first in the queue
     171            0 :         let mut notified = std::pin::pin!(self.queue.notified());
     172            0 :         if !notified.as_mut().enable() {
     173            0 :             throttled = true;
     174            0 :             notified.await;
     175            0 :         }
     176              : 
     177              :         // notify the next waiter in the queue when we are done.
     178            0 :         let _guard = Requeue(&self.queue);
     179              : 
     180              :         loop {
     181            0 :             let res = self
     182            0 :                 .state
     183            0 :                 .lock()
     184            0 :                 .unwrap()
     185            0 :                 .add_tokens(&self.config, start, count as f64);
     186            0 :             match res {
     187            0 :                 Ok(()) => return throttled,
     188            0 :                 Err(ready_at) => {
     189            0 :                     throttled = true;
     190            0 :                     tokio::time::sleep_until(ready_at).await;
     191              :                 }
     192              :             }
     193              :         }
     194            0 :     }
     195              : }
     196              : 
     197              : #[cfg(test)]
     198              : mod tests {
     199              :     use std::time::Duration;
     200              : 
     201              :     use tokio::time::Instant;
     202              : 
     203              :     use super::{LeakyBucketConfig, LeakyBucketState};
     204              : 
     205              :     #[tokio::test(start_paused = true)]
     206            1 :     async fn check() {
     207            1 :         let config = LeakyBucketConfig {
     208            1 :             // average 100rps
     209            1 :             cost: Duration::from_millis(10),
     210            1 :             // burst up to 100 requests
     211            1 :             bucket_width: Duration::from_millis(1000),
     212            1 :         };
     213            1 : 
     214            1 :         let mut state = LeakyBucketState {
     215            1 :             empty_at: Instant::now(),
     216            1 :         };
     217            1 : 
     218            1 :         // supports burst
     219            1 :         {
     220            1 :             // should work for 100 requests this instant
     221          101 :             for _ in 0..100 {
     222          100 :                 state.add_tokens(&config, Instant::now(), 1.0).unwrap();
     223          100 :             }
     224            1 :             let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
     225            1 :             assert_eq!(ready - Instant::now(), Duration::from_millis(10));
     226            1 :         }
     227            1 : 
     228            1 :         // doesn't overfill
     229            1 :         {
     230            1 :             // after 1s we should have an empty bucket again.
     231            1 :             tokio::time::advance(Duration::from_secs(1)).await;
     232            1 :             assert!(state.bucket_is_empty(Instant::now()));
     233            1 : 
     234            1 :             // after 1s more, we should not over count the tokens and allow more than 200 requests.
     235            1 :             tokio::time::advance(Duration::from_secs(1)).await;
     236          101 :             for _ in 0..100 {
     237          100 :                 state.add_tokens(&config, Instant::now(), 1.0).unwrap();
     238          100 :             }
     239            1 :             let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
     240            1 :             assert_eq!(ready - Instant::now(), Duration::from_millis(10));
     241            1 :         }
     242            1 : 
     243            1 :         // supports sustained rate over a long period
     244            1 :         {
     245            1 :             tokio::time::advance(Duration::from_secs(1)).await;
     246            1 : 
     247            1 :             // should sustain 100rps
     248         2001 :             for _ in 0..2000 {
     249         2000 :                 tokio::time::advance(Duration::from_millis(10)).await;
     250         2000 :                 state.add_tokens(&config, Instant::now(), 1.0).unwrap();
     251            1 :             }
     252            1 :         }
     253            1 : 
     254            1 :         // supports requesting more tokens than can be stored in the bucket
     255            1 :         // we just wait a little bit longer upfront.
     256            1 :         {
     257            1 :             // start the bucket completely empty
     258            1 :             tokio::time::advance(Duration::from_secs(5)).await;
     259            1 :             assert!(state.bucket_is_empty(Instant::now()));
     260            1 : 
     261            1 :             // requesting 200 tokens of space should take 200*cost = 2s
     262            1 :             // but we already have 1s available, so we wait 1s from start.
     263            1 :             let start = Instant::now();
     264            1 : 
     265            1 :             let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
     266            1 :             assert_eq!(ready - Instant::now(), Duration::from_secs(1));
     267            1 : 
     268            1 :             tokio::time::advance(Duration::from_millis(500)).await;
     269            1 :             let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
     270            1 :             assert_eq!(ready - Instant::now(), Duration::from_millis(500));
     271            1 : 
     272            1 :             tokio::time::advance(Duration::from_millis(500)).await;
     273            1 :             state.add_tokens(&config, start, 200.0).unwrap();
     274            1 : 
     275            1 :             // bucket should be completely full now
     276            1 :             let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
     277            1 :             assert_eq!(ready - Instant::now(), Duration::from_millis(10));
     278            1 :         }
     279            1 :     }
     280              : }
        

Generated by: LCOV version 2.1-beta