Line data Source code
1 : use std::{
2 : sync::{
3 : atomic::{AtomicU64, Ordering},
4 : Arc,
5 : },
6 : time::{Duration, Instant},
7 : };
8 :
9 : use arc_swap::ArcSwap;
10 : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
11 :
12 : /// Throttle for `async` functions.
13 : ///
14 : /// Runtime reconfigurable.
15 : ///
16 : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
17 : ///
18 : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
19 : pub struct Throttle<M: Metric> {
20 : inner: ArcSwap<Inner>,
21 : metric: M,
22 : /// will be turned into [`Stats::count_accounted_start`]
23 : count_accounted_start: AtomicU64,
24 : /// will be turned into [`Stats::count_accounted_finish`]
25 : count_accounted_finish: AtomicU64,
26 : /// will be turned into [`Stats::count_throttled`]
27 : count_throttled: AtomicU64,
28 : /// will be turned into [`Stats::sum_throttled_usecs`]
29 : sum_throttled_usecs: AtomicU64,
30 : }
31 :
32 : pub struct Inner {
33 : enabled: bool,
34 : rate_limiter: Arc<RateLimiter>,
35 : }
36 :
37 : pub type Config = pageserver_api::models::ThrottleConfig;
38 :
39 : pub struct Observation {
40 : pub wait_time: Duration,
41 : }
42 : pub trait Metric {
43 : fn accounting_start(&self);
44 : fn accounting_finish(&self);
45 : fn observe_throttling(&self, observation: &Observation);
46 : }
47 :
48 : /// See [`Throttle::reset_stats`].
49 : pub struct Stats {
50 : /// Number of requests that started [`Throttle::throttle`] calls.
51 : pub count_accounted_start: u64,
52 : /// Number of requests that finished [`Throttle::throttle`] calls.
53 : pub count_accounted_finish: u64,
54 : /// Subset of the `accounted` requests that were actually throttled.
55 : /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
56 : pub count_throttled: u64,
57 : /// Sum of microseconds that throttled requests spent waiting for throttling.
58 : pub sum_throttled_usecs: u64,
59 : }
60 :
61 : pub enum ThrottleResult {
62 : NotThrottled { start: Instant },
63 : Throttled { start: Instant, end: Instant },
64 : }
65 :
66 : impl<M> Throttle<M>
67 : where
68 : M: Metric,
69 : {
70 196 : pub fn new(config: Config, metric: M) -> Self {
71 196 : Self {
72 196 : inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
73 196 : metric,
74 196 : count_accounted_start: AtomicU64::new(0),
75 196 : count_accounted_finish: AtomicU64::new(0),
76 196 : count_throttled: AtomicU64::new(0),
77 196 : sum_throttled_usecs: AtomicU64::new(0),
78 196 : }
79 196 : }
80 196 : fn new_inner(config: Config) -> Inner {
81 196 : let Config {
82 196 : enabled,
83 196 : initial,
84 196 : refill_interval,
85 196 : refill_amount,
86 196 : max,
87 196 : } = config;
88 196 :
89 196 : // steady rate, we expect `refill_amount` requests per `refill_interval`.
90 196 : // dividing gives us the rps.
91 196 : let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
92 196 : let config = LeakyBucketConfig::new(rps, f64::from(max));
93 196 :
94 196 : // initial tracks how many tokens are available to put in the bucket
95 196 : // we want how many tokens are currently in the bucket
96 196 : let initial_tokens = max - initial;
97 196 :
98 196 : let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
99 196 :
100 196 : Inner {
101 196 : enabled: enabled.is_enabled(),
102 196 : rate_limiter: Arc::new(rate_limiter),
103 196 : }
104 196 : }
105 0 : pub fn reconfigure(&self, config: Config) {
106 0 : self.inner.store(Arc::new(Self::new_inner(config)));
107 0 : }
108 :
109 : /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
110 : /// This method allows retrieving & resetting that flag.
111 : /// Useful for periodic reporting.
112 0 : pub fn reset_stats(&self) -> Stats {
113 0 : let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
114 0 : let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
115 0 : let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
116 0 : let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
117 0 : Stats {
118 0 : count_accounted_start,
119 0 : count_accounted_finish,
120 0 : count_throttled,
121 0 : sum_throttled_usecs,
122 0 : }
123 0 : }
124 :
125 : /// See [`Config::steady_rps`].
126 0 : pub fn steady_rps(&self) -> f64 {
127 0 : self.inner.load().rate_limiter.steady_rps()
128 0 : }
129 :
130 0 : pub async fn throttle(&self, key_count: usize) -> ThrottleResult {
131 0 : let inner = self.inner.load_full(); // clones the `Inner` Arc
132 0 :
133 0 : let start = std::time::Instant::now();
134 0 :
135 0 : if !inner.enabled {
136 0 : return ThrottleResult::NotThrottled { start };
137 0 : }
138 0 :
139 0 : self.metric.accounting_start();
140 0 : self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
141 0 : let did_throttle = inner.rate_limiter.acquire(key_count).await;
142 0 : self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
143 0 : self.metric.accounting_finish();
144 0 :
145 0 : if did_throttle {
146 0 : self.count_throttled.fetch_add(1, Ordering::Relaxed);
147 0 : let now = Instant::now();
148 0 : let wait_time = now - start;
149 0 : self.sum_throttled_usecs
150 0 : .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
151 0 : let observation = Observation { wait_time };
152 0 : self.metric.observe_throttling(&observation);
153 0 : ThrottleResult::Throttled { start, end: now }
154 : } else {
155 0 : ThrottleResult::NotThrottled { start }
156 : }
157 0 : }
158 : }
|