Line data Source code
1 : use std::{
2 : sync::{
3 : atomic::{AtomicU64, Ordering},
4 : Arc,
5 : },
6 : time::Instant,
7 : };
8 :
9 : use arc_swap::ArcSwap;
10 : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
11 :
12 : /// Throttle for `async` functions.
13 : ///
14 : /// Runtime reconfigurable.
15 : ///
16 : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
17 : ///
18 : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
19 : pub struct Throttle {
20 : inner: ArcSwap<Inner>,
21 : /// will be turned into [`Stats::count_accounted_start`]
22 : count_accounted_start: AtomicU64,
23 : /// will be turned into [`Stats::count_accounted_finish`]
24 : count_accounted_finish: AtomicU64,
25 : /// will be turned into [`Stats::count_throttled`]
26 : count_throttled: AtomicU64,
27 : /// will be turned into [`Stats::sum_throttled_usecs`]
28 : sum_throttled_usecs: AtomicU64,
29 : }
30 :
31 : pub struct Inner {
32 : enabled: bool,
33 : rate_limiter: Arc<RateLimiter>,
34 : }
35 :
36 : pub type Config = pageserver_api::models::ThrottleConfig;
37 :
38 : /// See [`Throttle::reset_stats`].
39 : pub struct Stats {
40 : /// Number of requests that started [`Throttle::throttle`] calls.
41 : pub count_accounted_start: u64,
42 : /// Number of requests that finished [`Throttle::throttle`] calls.
43 : pub count_accounted_finish: u64,
44 : /// Subset of the `accounted` requests that were actually throttled.
45 : /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
46 : pub count_throttled: u64,
47 : /// Sum of microseconds that throttled requests spent waiting for throttling.
48 : pub sum_throttled_usecs: u64,
49 : }
50 :
51 : pub enum ThrottleResult {
52 : NotThrottled { end: Instant },
53 : Throttled { end: Instant },
54 : }
55 :
56 : impl Throttle {
57 444 : pub fn new(config: Config) -> Self {
58 444 : Self {
59 444 : inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
60 444 : count_accounted_start: AtomicU64::new(0),
61 444 : count_accounted_finish: AtomicU64::new(0),
62 444 : count_throttled: AtomicU64::new(0),
63 444 : sum_throttled_usecs: AtomicU64::new(0),
64 444 : }
65 444 : }
66 444 : fn new_inner(config: Config) -> Inner {
67 444 : let Config {
68 444 : enabled,
69 444 : initial,
70 444 : refill_interval,
71 444 : refill_amount,
72 444 : max,
73 444 : } = config;
74 444 :
75 444 : // steady rate, we expect `refill_amount` requests per `refill_interval`.
76 444 : // dividing gives us the rps.
77 444 : let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
78 444 : let config = LeakyBucketConfig::new(rps, f64::from(max));
79 444 :
80 444 : // initial tracks how many tokens are available to put in the bucket
81 444 : // we want how many tokens are currently in the bucket
82 444 : let initial_tokens = max - initial;
83 444 :
84 444 : let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
85 444 :
86 444 : Inner {
87 444 : enabled: enabled.is_enabled(),
88 444 : rate_limiter: Arc::new(rate_limiter),
89 444 : }
90 444 : }
91 0 : pub fn reconfigure(&self, config: Config) {
92 0 : self.inner.store(Arc::new(Self::new_inner(config)));
93 0 : }
94 :
95 : /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
96 : /// This method allows retrieving & resetting that flag.
97 : /// Useful for periodic reporting.
98 0 : pub fn reset_stats(&self) -> Stats {
99 0 : let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
100 0 : let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
101 0 : let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
102 0 : let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
103 0 : Stats {
104 0 : count_accounted_start,
105 0 : count_accounted_finish,
106 0 : count_throttled,
107 0 : sum_throttled_usecs,
108 0 : }
109 0 : }
110 :
111 : /// See [`Config::steady_rps`].
112 0 : pub fn steady_rps(&self) -> f64 {
113 0 : self.inner.load().rate_limiter.steady_rps()
114 0 : }
115 :
116 : /// `start` must be [`Instant::now`] or earlier.
117 0 : pub async fn throttle(&self, key_count: usize, start: Instant) -> ThrottleResult {
118 0 : let inner = self.inner.load_full(); // clones the `Inner` Arc
119 0 :
120 0 : if !inner.enabled {
121 0 : return ThrottleResult::NotThrottled { end: start };
122 0 : }
123 0 :
124 0 : self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
125 0 : let did_throttle = inner.rate_limiter.acquire(key_count).await;
126 0 : self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
127 0 :
128 0 : if did_throttle {
129 0 : self.count_throttled.fetch_add(1, Ordering::Relaxed);
130 0 : let end = Instant::now();
131 0 : let wait_time = end - start;
132 0 : self.sum_throttled_usecs
133 0 : .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
134 0 : ThrottleResult::Throttled { end }
135 : } else {
136 0 : ThrottleResult::NotThrottled { end: start }
137 : }
138 0 : }
139 : }
|