Line data Source code
1 : use std::{
2 : sync::{
3 : atomic::{AtomicU64, Ordering},
4 : Arc,
5 : },
6 : time::{Duration, Instant},
7 : };
8 :
9 : use arc_swap::ArcSwap;
10 : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
11 :
12 : /// Throttle for `async` functions.
13 : ///
14 : /// Runtime reconfigurable.
15 : ///
16 : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
17 : ///
18 : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
19 : pub struct Throttle<M: Metric> {
20 : inner: ArcSwap<Inner>,
21 : metric: M,
22 : /// will be turned into [`Stats::count_accounted_start`]
23 : count_accounted_start: AtomicU64,
24 : /// will be turned into [`Stats::count_accounted_finish`]
25 : count_accounted_finish: AtomicU64,
26 : /// will be turned into [`Stats::count_throttled`]
27 : count_throttled: AtomicU64,
28 : /// will be turned into [`Stats::sum_throttled_usecs`]
29 : sum_throttled_usecs: AtomicU64,
30 : }
31 :
32 : pub struct Inner {
33 : enabled: bool,
34 : rate_limiter: Arc<RateLimiter>,
35 : }
36 :
37 : pub type Config = pageserver_api::models::ThrottleConfig;
38 :
39 : pub struct Observation {
40 : pub wait_time: Duration,
41 : }
42 : pub trait Metric {
43 : fn accounting_start(&self);
44 : fn accounting_finish(&self);
45 : fn observe_throttling(&self, observation: &Observation);
46 : }
47 :
48 : /// See [`Throttle::reset_stats`].
49 : pub struct Stats {
50 : /// Number of requests that started [`Throttle::throttle`] calls.
51 : pub count_accounted_start: u64,
52 : /// Number of requests that finished [`Throttle::throttle`] calls.
53 : pub count_accounted_finish: u64,
54 : /// Subset of the `accounted` requests that were actually throttled.
55 : /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
56 : pub count_throttled: u64,
57 : /// Sum of microseconds that throttled requests spent waiting for throttling.
58 : pub sum_throttled_usecs: u64,
59 : }
60 :
61 : impl<M> Throttle<M>
62 : where
63 : M: Metric,
64 : {
65 192 : pub fn new(config: Config, metric: M) -> Self {
66 192 : Self {
67 192 : inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
68 192 : metric,
69 192 : count_accounted_start: AtomicU64::new(0),
70 192 : count_accounted_finish: AtomicU64::new(0),
71 192 : count_throttled: AtomicU64::new(0),
72 192 : sum_throttled_usecs: AtomicU64::new(0),
73 192 : }
74 192 : }
75 192 : fn new_inner(config: Config) -> Inner {
76 192 : let Config {
77 192 : enabled,
78 192 : initial,
79 192 : refill_interval,
80 192 : refill_amount,
81 192 : max,
82 192 : } = config;
83 192 :
84 192 : // steady rate, we expect `refill_amount` requests per `refill_interval`.
85 192 : // dividing gives us the rps.
86 192 : let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
87 192 : let config = LeakyBucketConfig::new(rps, f64::from(max));
88 192 :
89 192 : // initial tracks how many tokens are available to put in the bucket
90 192 : // we want how many tokens are currently in the bucket
91 192 : let initial_tokens = max - initial;
92 192 :
93 192 : let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
94 192 :
95 192 : Inner {
96 192 : enabled: enabled.is_enabled(),
97 192 : rate_limiter: Arc::new(rate_limiter),
98 192 : }
99 192 : }
100 0 : pub fn reconfigure(&self, config: Config) {
101 0 : self.inner.store(Arc::new(Self::new_inner(config)));
102 0 : }
103 :
104 : /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
105 : /// This method allows retrieving & resetting that flag.
106 : /// Useful for periodic reporting.
107 0 : pub fn reset_stats(&self) -> Stats {
108 0 : let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
109 0 : let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
110 0 : let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
111 0 : let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
112 0 : Stats {
113 0 : count_accounted_start,
114 0 : count_accounted_finish,
115 0 : count_throttled,
116 0 : sum_throttled_usecs,
117 0 : }
118 0 : }
119 :
120 : /// See [`Config::steady_rps`].
121 0 : pub fn steady_rps(&self) -> f64 {
122 0 : self.inner.load().rate_limiter.steady_rps()
123 0 : }
124 :
125 0 : pub async fn throttle(&self, key_count: usize) -> Option<Duration> {
126 0 : let inner = self.inner.load_full(); // clones the `Inner` Arc
127 0 :
128 0 : if !inner.enabled {
129 0 : return None;
130 0 : }
131 0 :
132 0 : let start = std::time::Instant::now();
133 0 :
134 0 : self.metric.accounting_start();
135 0 : self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
136 0 : let did_throttle = inner.rate_limiter.acquire(key_count).await;
137 0 : self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
138 0 : self.metric.accounting_finish();
139 0 :
140 0 : if did_throttle {
141 0 : self.count_throttled.fetch_add(1, Ordering::Relaxed);
142 0 : let now = Instant::now();
143 0 : let wait_time = now - start;
144 0 : self.sum_throttled_usecs
145 0 : .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
146 0 : let observation = Observation { wait_time };
147 0 : self.metric.observe_throttling(&observation);
148 0 : Some(wait_time)
149 : } else {
150 0 : None
151 : }
152 0 : }
153 : }
|