Line data Source code
1 : use std::{
2 : str::FromStr,
3 : sync::{
4 : atomic::{AtomicU64, Ordering},
5 : Arc, Mutex,
6 : },
7 : time::{Duration, Instant},
8 : };
9 :
10 : use arc_swap::ArcSwap;
11 : use enumset::EnumSet;
12 : use tracing::{error, warn};
13 : use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
14 :
15 : use crate::{context::RequestContext, task_mgr::TaskKind};
16 :
17 : /// Throttle for `async` functions.
18 : ///
19 : /// Runtime reconfigurable.
20 : ///
21 : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
22 : ///
23 : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
24 : pub struct Throttle<M: Metric> {
25 : inner: ArcSwap<Inner>,
26 : metric: M,
27 : /// will be turned into [`Stats::count_accounted`]
28 : count_accounted: AtomicU64,
29 : /// will be turned into [`Stats::count_throttled`]
30 : count_throttled: AtomicU64,
31 : /// will be turned into [`Stats::sum_throttled_usecs`]
32 : sum_throttled_usecs: AtomicU64,
33 : }
34 :
35 : pub struct Inner {
36 : task_kinds: EnumSet<TaskKind>,
37 : rate_limiter: Arc<RateLimiter>,
38 : }
39 :
40 : pub type Config = pageserver_api::models::ThrottleConfig;
41 :
42 : pub struct Observation {
43 : pub wait_time: Duration,
44 : }
45 : pub trait Metric {
46 : fn observe_throttling(&self, observation: &Observation);
47 : }
48 :
49 : /// See [`Throttle::reset_stats`].
50 : pub struct Stats {
51 : // Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`].
52 : pub count_accounted: u64,
53 : // Subset of the `accounted` requests that were actually throttled.
54 : // Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
55 : pub count_throttled: u64,
56 : // Sum of microseconds that throttled requests spent waiting for throttling.
57 : pub sum_throttled_usecs: u64,
58 : }
59 :
60 : impl<M> Throttle<M>
61 : where
62 : M: Metric,
63 : {
64 564 : pub fn new(config: Config, metric: M) -> Self {
65 564 : Self {
66 564 : inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
67 564 : metric,
68 564 : count_accounted: AtomicU64::new(0),
69 564 : count_throttled: AtomicU64::new(0),
70 564 : sum_throttled_usecs: AtomicU64::new(0),
71 564 : }
72 564 : }
73 588 : fn new_inner(config: Config) -> Inner {
74 588 : let Config {
75 588 : task_kinds,
76 588 : initial,
77 588 : refill_interval,
78 588 : refill_amount,
79 588 : max,
80 588 : } = config;
81 588 : let task_kinds: EnumSet<TaskKind> = task_kinds
82 588 : .iter()
83 588 : .filter_map(|s| match TaskKind::from_str(s) {
84 0 : Ok(v) => Some(v),
85 0 : Err(e) => {
86 0 : // TODO: avoid this failure mode
87 0 : error!(
88 0 : "cannot parse task kind, ignoring for rate limiting {}",
89 0 : utils::error::report_compact_sources(&e)
90 : );
91 0 : None
92 : }
93 588 : })
94 588 : .collect();
95 588 :
96 588 : // steady rate, we expect `refill_amount` requests per `refill_interval`.
97 588 : // dividing gives us the rps.
98 588 : let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
99 588 : let config = LeakyBucketConfig::new(rps, f64::from(max));
100 588 :
101 588 : // initial tracks how many tokens are available to put in the bucket
102 588 : // we want how many tokens are currently in the bucket
103 588 : let initial_tokens = max - initial;
104 588 :
105 588 : let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
106 588 :
107 588 : Inner {
108 588 : task_kinds,
109 588 : rate_limiter: Arc::new(rate_limiter),
110 588 : }
111 588 : }
112 24 : pub fn reconfigure(&self, config: Config) {
113 24 : self.inner.store(Arc::new(Self::new_inner(config)));
114 24 : }
115 :
116 : /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
117 : /// This method allows retrieving & resetting that flag.
118 : /// Useful for periodic reporting.
119 0 : pub fn reset_stats(&self) -> Stats {
120 0 : let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed);
121 0 : let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
122 0 : let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
123 0 : Stats {
124 0 : count_accounted,
125 0 : count_throttled,
126 0 : sum_throttled_usecs,
127 0 : }
128 0 : }
129 :
130 : /// See [`Config::steady_rps`].
131 0 : pub fn steady_rps(&self) -> f64 {
132 0 : self.inner.load().rate_limiter.steady_rps()
133 0 : }
134 :
135 1879494 : pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
136 1879494 : let inner = self.inner.load_full(); // clones the `Inner` Arc
137 1879494 : if !inner.task_kinds.contains(ctx.task_kind()) {
138 1879494 : return None;
139 0 : };
140 0 : let start = std::time::Instant::now();
141 :
142 0 : let did_throttle = inner.rate_limiter.acquire(key_count).await;
143 :
144 0 : self.count_accounted.fetch_add(1, Ordering::Relaxed);
145 0 : if did_throttle {
146 0 : self.count_throttled.fetch_add(1, Ordering::Relaxed);
147 0 : let now = Instant::now();
148 0 : let wait_time = now - start;
149 0 : self.sum_throttled_usecs
150 0 : .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
151 0 : let observation = Observation { wait_time };
152 0 : self.metric.observe_throttling(&observation);
153 0 : match ctx.micros_spent_throttled.add(wait_time) {
154 0 : Ok(res) => res,
155 0 : Err(error) => {
156 0 : use once_cell::sync::Lazy;
157 0 : use utils::rate_limit::RateLimit;
158 0 : static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
159 0 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
160 0 : let mut guard = WARN_RATE_LIMIT.lock().unwrap();
161 0 : guard.call(move || {
162 0 : warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
163 0 : });
164 0 : }
165 : }
166 0 : Some(wait_time)
167 : } else {
168 0 : None
169 : }
170 1879494 : }
171 : }
|