Line data Source code
1 : use std::{
2 : str::FromStr,
3 : sync::{
4 : atomic::{AtomicU64, Ordering},
5 : Arc,
6 : },
7 : time::{Duration, Instant},
8 : };
9 :
10 : use arc_swap::ArcSwap;
11 : use enumset::EnumSet;
12 : use tracing::error;
13 :
14 : use crate::{context::RequestContext, task_mgr::TaskKind};
15 :
16 : /// Throttle for `async` functions.
17 : ///
18 : /// Runtime reconfigurable.
19 : ///
20 : /// To share a throttle among multiple entities, wrap it in an [`Arc`].
21 : ///
22 : /// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
23 : pub struct Throttle<M: Metric> {
24 : inner: ArcSwap<Inner>,
25 : metric: M,
26 : /// will be turned into [`Stats::count_accounted`]
27 : count_accounted: AtomicU64,
28 : /// will be turned into [`Stats::count_throttled`]
29 : count_throttled: AtomicU64,
30 : /// will be turned into [`Stats::sum_throttled_usecs`]
31 : sum_throttled_usecs: AtomicU64,
32 : }
33 :
34 : pub struct Inner {
35 : task_kinds: EnumSet<TaskKind>,
36 : rate_limiter: Arc<leaky_bucket::RateLimiter>,
37 : config: Config,
38 : }
39 :
40 : pub type Config = pageserver_api::models::ThrottleConfig;
41 :
42 : pub struct Observation {
43 : pub wait_time: Duration,
44 : }
45 : pub trait Metric {
46 : fn observe_throttling(&self, observation: &Observation);
47 : }
48 :
49 : /// See [`Throttle::reset_stats`].
50 : pub struct Stats {
51 : // Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`].
52 : pub count_accounted: u64,
53 : // Subset of the `accounted` requests that were actually throttled.
54 : // Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
55 : pub count_throttled: u64,
56 : // Sum of microseconds that throttled requests spent waiting for throttling.
57 : pub sum_throttled_usecs: u64,
58 : }
59 :
60 : impl<M> Throttle<M>
61 : where
62 : M: Metric,
63 : {
64 88 : pub fn new(config: Config, metric: M) -> Self {
65 88 : Self {
66 88 : inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
67 88 : metric,
68 88 : count_accounted: AtomicU64::new(0),
69 88 : count_throttled: AtomicU64::new(0),
70 88 : sum_throttled_usecs: AtomicU64::new(0),
71 88 : }
72 88 : }
73 88 : fn new_inner(config: Config) -> Inner {
74 88 : let Config {
75 88 : task_kinds,
76 88 : initial,
77 88 : refill_interval,
78 88 : refill_amount,
79 88 : max,
80 88 : fair,
81 88 : } = &config;
82 88 : let task_kinds: EnumSet<TaskKind> = task_kinds
83 88 : .iter()
84 88 : .filter_map(|s| match TaskKind::from_str(s) {
85 0 : Ok(v) => Some(v),
86 0 : Err(e) => {
87 0 : // TODO: avoid this failure mode
88 0 : error!(
89 0 : "cannot parse task kind, ignoring for rate limiting {}",
90 0 : utils::error::report_compact_sources(&e)
91 0 : );
92 0 : None
93 : }
94 88 : })
95 88 : .collect();
96 88 : Inner {
97 88 : task_kinds,
98 88 : rate_limiter: Arc::new(
99 88 : leaky_bucket::RateLimiter::builder()
100 88 : .initial(*initial)
101 88 : .interval(*refill_interval)
102 88 : .refill(refill_amount.get())
103 88 : .max(*max)
104 88 : .fair(*fair)
105 88 : .build(),
106 88 : ),
107 88 : config,
108 88 : }
109 88 : }
110 0 : pub fn reconfigure(&self, config: Config) {
111 0 : self.inner.store(Arc::new(Self::new_inner(config)));
112 0 : }
113 :
114 : /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
115 : /// This method allows retrieving & resetting that flag.
116 : /// Useful for periodic reporting.
117 0 : pub fn reset_stats(&self) -> Stats {
118 0 : let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed);
119 0 : let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
120 0 : let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
121 0 : Stats {
122 0 : count_accounted,
123 0 : count_throttled,
124 0 : sum_throttled_usecs,
125 0 : }
126 0 : }
127 :
128 : /// See [`Config::steady_rps`].
129 0 : pub fn steady_rps(&self) -> f64 {
130 0 : self.inner.load().config.steady_rps()
131 0 : }
132 :
133 502968 : pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) {
134 502968 : let inner = self.inner.load_full(); // clones the `Inner` Arc
135 502968 : if !inner.task_kinds.contains(ctx.task_kind()) {
136 502968 : return;
137 0 : };
138 0 : let start = std::time::Instant::now();
139 0 : let mut did_throttle = false;
140 0 : let acquire = inner.rate_limiter.acquire(key_count);
141 0 : // turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate
142 0 : let acquire = tokio::task::unconstrained(acquire);
143 0 : let mut acquire = std::pin::pin!(acquire);
144 0 : std::future::poll_fn(|cx| {
145 0 : use std::future::Future;
146 0 : let poll = acquire.as_mut().poll(cx);
147 0 : did_throttle = did_throttle || poll.is_pending();
148 0 : poll
149 0 : })
150 0 : .await;
151 0 : self.count_accounted.fetch_add(1, Ordering::Relaxed);
152 0 : if did_throttle {
153 0 : self.count_throttled.fetch_add(1, Ordering::Relaxed);
154 0 : let now = Instant::now();
155 0 : let wait_time = now - start;
156 0 : self.sum_throttled_usecs
157 0 : .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
158 0 : let observation = Observation { wait_time };
159 0 : self.metric.observe_throttling(&observation);
160 0 : }
161 502968 : }
162 : }
|