Line data Source code
1 : use std::{
2 : str::FromStr,
3 : sync::{
4 : atomic::{AtomicU64, Ordering},
5 : Arc, Mutex,
6 : },
7 : time::{Duration, Instant},
8 : };
9 :
10 : use arc_swap::ArcSwap;
11 : use enumset::EnumSet;
12 : use tracing::{error, warn};
13 : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
14 :
15 : use crate::{context::RequestContext, task_mgr::TaskKind};
16 :
17 : /// Throttle for `async` functions.
18 : ///
19 : /// Runtime reconfigurable.
20 : ///
21 : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
22 : ///
23 : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
24 : pub struct Throttle<M: Metric> {
25 : inner: ArcSwap<Inner>,
26 : metric: M,
27 : /// will be turned into [`Stats::count_accounted_start`]
28 : count_accounted_start: AtomicU64,
29 : /// will be turned into [`Stats::count_accounted_finish`]
30 : count_accounted_finish: AtomicU64,
31 : /// will be turned into [`Stats::count_throttled`]
32 : count_throttled: AtomicU64,
33 : /// will be turned into [`Stats::sum_throttled_usecs`]
34 : sum_throttled_usecs: AtomicU64,
35 : }
36 :
37 : pub struct Inner {
38 : task_kinds: EnumSet<TaskKind>,
39 : rate_limiter: Arc<RateLimiter>,
40 : }
41 :
42 : pub type Config = pageserver_api::models::ThrottleConfig;
43 :
44 : pub struct Observation {
45 : pub wait_time: Duration,
46 : }
47 : pub trait Metric {
48 : fn accounting_start(&self);
49 : fn accounting_finish(&self);
50 : fn observe_throttling(&self, observation: &Observation);
51 : }
52 :
53 : /// See [`Throttle::reset_stats`].
54 : pub struct Stats {
55 : /// Number of requests that started [`Throttle::throttle`] calls.
56 : pub count_accounted_start: u64,
57 : /// Number of requests that finished [`Throttle::throttle`] calls.
58 : pub count_accounted_finish: u64,
59 : /// Subset of the `accounted` requests that were actually throttled.
60 : /// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
61 : pub count_throttled: u64,
62 : /// Sum of microseconds that throttled requests spent waiting for throttling.
63 : pub sum_throttled_usecs: u64,
64 : }
65 :
66 : impl<M> Throttle<M>
67 : where
68 : M: Metric,
69 : {
70 192 : pub fn new(config: Config, metric: M) -> Self {
71 192 : Self {
72 192 : inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
73 192 : metric,
74 192 : count_accounted_start: AtomicU64::new(0),
75 192 : count_accounted_finish: AtomicU64::new(0),
76 192 : count_throttled: AtomicU64::new(0),
77 192 : sum_throttled_usecs: AtomicU64::new(0),
78 192 : }
79 192 : }
80 192 : fn new_inner(config: Config) -> Inner {
81 192 : let Config {
82 192 : task_kinds,
83 192 : initial,
84 192 : refill_interval,
85 192 : refill_amount,
86 192 : max,
87 192 : } = config;
88 192 : let task_kinds: EnumSet<TaskKind> = task_kinds
89 192 : .iter()
90 192 : .filter_map(|s| match TaskKind::from_str(s) {
91 0 : Ok(v) => Some(v),
92 0 : Err(e) => {
93 0 : // TODO: avoid this failure mode
94 0 : error!(
95 0 : "cannot parse task kind, ignoring for rate limiting {}",
96 0 : utils::error::report_compact_sources(&e)
97 : );
98 0 : None
99 : }
100 192 : })
101 192 : .collect();
102 192 :
103 192 : // steady rate, we expect `refill_amount` requests per `refill_interval`.
104 192 : // dividing gives us the rps.
105 192 : let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
106 192 : let config = LeakyBucketConfig::new(rps, f64::from(max));
107 192 :
108 192 : // initial tracks how many tokens are available to put in the bucket
109 192 : // we want how many tokens are currently in the bucket
110 192 : let initial_tokens = max - initial;
111 192 :
112 192 : let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
113 192 :
114 192 : Inner {
115 192 : task_kinds,
116 192 : rate_limiter: Arc::new(rate_limiter),
117 192 : }
118 192 : }
119 0 : pub fn reconfigure(&self, config: Config) {
120 0 : self.inner.store(Arc::new(Self::new_inner(config)));
121 0 : }
122 :
123 : /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
124 : /// This method allows retrieving & resetting that flag.
125 : /// Useful for periodic reporting.
126 0 : pub fn reset_stats(&self) -> Stats {
127 0 : let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
128 0 : let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
129 0 : let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
130 0 : let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
131 0 : Stats {
132 0 : count_accounted_start,
133 0 : count_accounted_finish,
134 0 : count_throttled,
135 0 : sum_throttled_usecs,
136 0 : }
137 0 : }
138 :
139 : /// See [`Config::steady_rps`].
140 0 : pub fn steady_rps(&self) -> f64 {
141 0 : self.inner.load().rate_limiter.steady_rps()
142 0 : }
143 :
144 626360 : pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
145 626360 : let inner = self.inner.load_full(); // clones the `Inner` Arc
146 626360 : if !inner.task_kinds.contains(ctx.task_kind()) {
147 626360 : return None;
148 0 : };
149 0 : let start = std::time::Instant::now();
150 0 :
151 0 : self.metric.accounting_start();
152 0 : self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
153 0 : let did_throttle = inner.rate_limiter.acquire(key_count).await;
154 0 : self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
155 0 : self.metric.accounting_finish();
156 0 :
157 0 : if did_throttle {
158 0 : self.count_throttled.fetch_add(1, Ordering::Relaxed);
159 0 : let now = Instant::now();
160 0 : let wait_time = now - start;
161 0 : self.sum_throttled_usecs
162 0 : .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
163 0 : let observation = Observation { wait_time };
164 0 : self.metric.observe_throttling(&observation);
165 0 : match ctx.micros_spent_throttled.add(wait_time) {
166 0 : Ok(res) => res,
167 0 : Err(error) => {
168 0 : use once_cell::sync::Lazy;
169 0 : use utils::rate_limit::RateLimit;
170 0 : static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
171 0 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
172 0 : let mut guard = WARN_RATE_LIMIT.lock().unwrap();
173 0 : guard.call(move || {
174 0 : warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
175 0 : });
176 0 : }
177 : }
178 0 : Some(wait_time)
179 : } else {
180 0 : None
181 : }
182 626360 : }
183 : }
|