Line data Source code
1 : use std::sync::Arc;
2 : use std::sync::atomic::{AtomicU64, Ordering};
3 : use std::time::Instant;
4 :
5 : use arc_swap::ArcSwap;
6 : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
7 :
8 : /// Throttle for `async` functions.
9 : ///
10 : /// Runtime reconfigurable.
11 : ///
12 : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
13 : ///
14 : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
15 : pub struct Throttle {
16 : inner: ArcSwap<Inner>,
17 : /// will be turned into [`Stats::count_accounted_start`]
18 : count_accounted_start: AtomicU64,
19 : /// will be turned into [`Stats::count_accounted_finish`]
20 : count_accounted_finish: AtomicU64,
21 : /// will be turned into [`Stats::count_throttled`]
22 : count_throttled: AtomicU64,
23 : /// will be turned into [`Stats::sum_throttled_usecs`]
24 : sum_throttled_usecs: AtomicU64,
25 : }
26 :
27 : pub struct Inner {
28 : enabled: bool,
29 : rate_limiter: Arc<RateLimiter>,
30 : }
31 :
32 : pub type Config = pageserver_api::models::ThrottleConfig;
33 :
34 : /// See [`Throttle::reset_stats`].
35 : pub struct Stats {
36 : /// Number of requests that started [`Throttle::throttle`] calls.
37 : pub count_accounted_start: u64,
38 : /// Number of requests that finished [`Throttle::throttle`] calls.
39 : pub count_accounted_finish: u64,
40 : /// Subset of the `accounted` requests that were actually throttled.
41 : /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
42 : pub count_throttled: u64,
43 : /// Sum of microseconds that throttled requests spent waiting for throttling.
44 : pub sum_throttled_usecs: u64,
45 : }
46 :
47 : pub enum ThrottleResult {
48 : NotThrottled { end: Instant },
49 : Throttled { end: Instant },
50 : }
51 :
52 : impl Throttle {
53 452 : pub fn new(config: Config) -> Self {
54 452 : Self {
55 452 : inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
56 452 : count_accounted_start: AtomicU64::new(0),
57 452 : count_accounted_finish: AtomicU64::new(0),
58 452 : count_throttled: AtomicU64::new(0),
59 452 : sum_throttled_usecs: AtomicU64::new(0),
60 452 : }
61 452 : }
62 452 : fn new_inner(config: Config) -> Inner {
63 452 : let Config {
64 452 : enabled,
65 452 : initial,
66 452 : refill_interval,
67 452 : refill_amount,
68 452 : max,
69 452 : } = config;
70 452 :
71 452 : // steady rate, we expect `refill_amount` requests per `refill_interval`.
72 452 : // dividing gives us the rps.
73 452 : let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
74 452 : let config = LeakyBucketConfig::new(rps, f64::from(max));
75 452 :
76 452 : // initial tracks how many tokens are available to put in the bucket
77 452 : // we want how many tokens are currently in the bucket
78 452 : let initial_tokens = max - initial;
79 452 :
80 452 : let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
81 452 :
82 452 : Inner {
83 452 : enabled: enabled.is_enabled(),
84 452 : rate_limiter: Arc::new(rate_limiter),
85 452 : }
86 452 : }
87 0 : pub fn reconfigure(&self, config: Config) {
88 0 : self.inner.store(Arc::new(Self::new_inner(config)));
89 0 : }
90 :
91 : /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
92 : /// This method allows retrieving & resetting that flag.
93 : /// Useful for periodic reporting.
94 0 : pub fn reset_stats(&self) -> Stats {
95 0 : let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
96 0 : let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
97 0 : let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
98 0 : let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
99 0 : Stats {
100 0 : count_accounted_start,
101 0 : count_accounted_finish,
102 0 : count_throttled,
103 0 : sum_throttled_usecs,
104 0 : }
105 0 : }
106 :
107 : /// See [`Config::steady_rps`].
108 0 : pub fn steady_rps(&self) -> f64 {
109 0 : self.inner.load().rate_limiter.steady_rps()
110 0 : }
111 :
112 : /// `start` must be [`Instant::now`] or earlier.
113 0 : pub async fn throttle(&self, key_count: usize, start: Instant) -> ThrottleResult {
114 0 : let inner = self.inner.load_full(); // clones the `Inner` Arc
115 0 :
116 0 : if !inner.enabled {
117 0 : return ThrottleResult::NotThrottled { end: start };
118 0 : }
119 0 :
120 0 : self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
121 0 : let did_throttle = inner.rate_limiter.acquire(key_count).await;
122 0 : self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
123 0 :
124 0 : if did_throttle {
125 0 : self.count_throttled.fetch_add(1, Ordering::Relaxed);
126 0 : let end = Instant::now();
127 0 : let wait_time = end - start;
128 0 : self.sum_throttled_usecs
129 0 : .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
130 0 : ThrottleResult::Throttled { end }
131 : } else {
132 0 : ThrottleResult::NotThrottled { end: start }
133 : }
134 0 : }
135 : }
|