TLA Line data Source code
1 : //! This module contains functions to serve per-tenant background processes,
2 : //! such as compaction and GC
3 :
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::{Duration, Instant};
7 :
8 : use crate::context::{DownloadBehavior, RequestContext};
9 : use crate::metrics::TENANT_TASK_EVENTS;
10 : use crate::task_mgr;
11 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
12 : use crate::tenant::{Tenant, TenantState};
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::*;
15 : use utils::completion;
16 :
17 : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
18 CBC 243 : once_cell::sync::Lazy::new(|| {
19 243 : let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
20 243 : let permits = usize::max(
21 243 : 1,
22 243 : // while a lot of the work is done on spawn_blocking, we still do
23 243 : // repartitioning in the async context. this should give leave us some workers
24 243 : // unblocked to be blocked on other work, hopefully easing any outside visible
25 243 : // effects of restarts.
26 243 : //
27 243 : // 6/8 is a guess; previously we ran with unlimited 8 and more from
28 243 : // spawn_blocking.
29 243 : (total_threads * 3).checked_div(4).unwrap_or(0),
30 243 : );
31 243 : assert_ne!(permits, 0, "we will not be adding in permits later");
32 243 : assert!(
33 243 : permits < total_threads,
34 UBC 0 : "need threads avail for shorter work"
35 : );
36 CBC 243 : tokio::sync::Semaphore::new(permits)
37 243 : });
38 :
39 2593 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
40 : #[strum(serialize_all = "snake_case")]
41 : pub(crate) enum BackgroundLoopKind {
42 : Compaction,
43 : Gc,
44 : Eviction,
45 : ConsumptionMetricsCollectMetrics,
46 : ConsumptionMetricsSyntheticSizeWorker,
47 : }
48 :
49 : impl BackgroundLoopKind {
50 2593 : fn as_static_str(&self) -> &'static str {
51 2593 : let s: &'static str = self.into();
52 2593 : s
53 2593 : }
54 : }
55 :
56 : pub(crate) enum RateLimitError {
57 : Cancelled,
58 : }
59 :
60 1288 : pub(crate) async fn concurrent_background_tasks_rate_limit(
61 1288 : loop_kind: BackgroundLoopKind,
62 1288 : _ctx: &RequestContext,
63 1288 : cancel: &CancellationToken,
64 1288 : ) -> Result<impl Drop, RateLimitError> {
65 1288 : crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT
66 1288 : .with_label_values(&[loop_kind.as_static_str()])
67 1288 : .inc();
68 1288 : scopeguard::defer!(
69 1288 : crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc();
70 1288 : );
71 1288 : tokio::select! {
72 1288 : permit = CONCURRENT_BACKGROUND_TASKS.acquire() => {
73 : match permit {
74 : Ok(permit) => Ok(permit),
75 : Err(_closed) => unreachable!("we never close the semaphore"),
76 : }
77 : },
78 : _ = cancel.cancelled() => {
79 : Err(RateLimitError::Cancelled)
80 : }
81 : }
82 1288 : }
83 :
84 : /// Start per tenant background loops: compaction and gc.
85 672 : pub fn start_background_loops(
86 672 : tenant: &Arc<Tenant>,
87 672 : background_jobs_can_start: Option<&completion::Barrier>,
88 672 : ) {
89 672 : let tenant_id = tenant.tenant_id;
90 672 : task_mgr::spawn(
91 672 : BACKGROUND_RUNTIME.handle(),
92 672 : TaskKind::Compaction,
93 672 : Some(tenant_id),
94 672 : None,
95 672 : &format!("compactor for tenant {tenant_id}"),
96 672 : false,
97 672 : {
98 672 : let tenant = Arc::clone(tenant);
99 672 : let background_jobs_can_start = background_jobs_can_start.cloned();
100 672 : async move {
101 672 : let cancel = task_mgr::shutdown_token();
102 797 : tokio::select! {
103 797 : _ = cancel.cancelled() => { return Ok(()) },
104 797 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
105 797 : };
106 616 : compaction_loop(tenant, cancel)
107 616 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
108 1040843 : .await;
109 222 : Ok(())
110 672 : }
111 672 : },
112 672 : );
113 672 : task_mgr::spawn(
114 672 : BACKGROUND_RUNTIME.handle(),
115 672 : TaskKind::GarbageCollector,
116 672 : Some(tenant_id),
117 672 : None,
118 672 : &format!("garbage collector for tenant {tenant_id}"),
119 672 : false,
120 672 : {
121 672 : let tenant = Arc::clone(tenant);
122 672 : let background_jobs_can_start = background_jobs_can_start.cloned();
123 672 : async move {
124 672 : let cancel = task_mgr::shutdown_token();
125 798 : tokio::select! {
126 798 : _ = cancel.cancelled() => { return Ok(()) },
127 798 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
128 798 : };
129 616 : gc_loop(tenant, cancel)
130 616 : .instrument(info_span!("gc_loop", tenant_id = %tenant_id))
131 337113 : .await;
132 222 : Ok(())
133 672 : }
134 672 : },
135 672 : );
136 672 : }
137 :
138 : ///
139 : /// Compaction task's main loop
140 : ///
141 616 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
142 616 : let wait_duration = Duration::from_secs(2);
143 616 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
144 616 : async {
145 616 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
146 616 : let mut first = true;
147 : loop {
148 801 : tokio::select! {
149 : _ = cancel.cancelled() => {
150 : return;
151 : },
152 797 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
153 : ControlFlow::Break(()) => return,
154 : ControlFlow::Continue(()) => (),
155 : },
156 : }
157 :
158 797 : let period = tenant.get_compaction_period();
159 797 :
160 797 : // TODO: we shouldn't need to await to find tenant and this could be moved outside of
161 797 : // loop, #3501. There are also additional "allowed_errors" in tests.
162 797 : if first {
163 612 : first = false;
164 612 : if random_init_delay(period, &cancel).await.is_err() {
165 93 : break;
166 304 : }
167 185 : }
168 :
169 489 : let started_at = Instant::now();
170 :
171 489 : let sleep_duration = if period == Duration::ZERO {
172 : #[cfg(not(feature = "testing"))]
173 : info!("automatic compaction is disabled");
174 : // check again in 10 seconds, in case it's been enabled again.
175 248 : Duration::from_secs(10)
176 : } else {
177 : // Run compaction
178 1040272 : if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
179 LBC (1) : error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
180 (1) : wait_duration
181 : } else {
182 CBC 233 : period
183 : }
184 : };
185 :
186 481 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
187 481 :
188 481 : // Sleep
189 481 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
190 310 : .await
191 310 : .is_ok()
192 : {
193 125 : break;
194 185 : }
195 : }
196 222 : }
197 1040843 : .await;
198 222 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
199 222 : }
200 :
201 : ///
202 : /// GC task's main loop
203 : ///
204 616 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
205 616 : let wait_duration = Duration::from_secs(2);
206 616 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
207 616 : async {
208 616 : // GC might require downloading, to find the cutoff LSN that corresponds to the
209 616 : // cutoff specified as time.
210 616 : let ctx =
211 616 : RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
212 616 : let mut first = true;
213 : loop {
214 737 : tokio::select! {
215 : _ = cancel.cancelled() => {
216 : return;
217 : },
218 733 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
219 : ControlFlow::Break(()) => return,
220 : ControlFlow::Continue(()) => (),
221 : },
222 : }
223 :
224 733 : let period = tenant.get_gc_period();
225 733 :
226 733 : if first {
227 612 : first = false;
228 612 : if random_init_delay(period, &cancel).await.is_err() {
229 119 : break;
230 167 : }
231 121 : }
232 :
233 288 : let started_at = Instant::now();
234 288 :
235 288 : let gc_horizon = tenant.get_gc_horizon();
236 288 : let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
237 : #[cfg(not(feature = "testing"))]
238 : info!("automatic GC is disabled");
239 : // check again in 10 seconds, in case it's been enabled again.
240 256 : Duration::from_secs(10)
241 : } else {
242 : // Run gc
243 32 : let res = tenant
244 32 : .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
245 336749 : .await;
246 27 : if let Err(e) = res {
247 UBC 0 : error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
248 0 : wait_duration
249 : } else {
250 CBC 27 : period
251 : }
252 : };
253 :
254 283 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
255 283 :
256 283 : // Sleep
257 283 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
258 220 : .await
259 220 : .is_ok()
260 : {
261 99 : break;
262 121 : }
263 : }
264 222 : }
265 337113 : .await;
266 222 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
267 222 : }
268 :
269 1538 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
270 1536 : // if the tenant has a proper status already, no need to wait for anything
271 1536 : if tenant.current_state() == TenantState::Active {
272 1495 : ControlFlow::Continue(())
273 : } else {
274 41 : let mut tenant_state_updates = tenant.subscribe_for_state_updates();
275 : loop {
276 41 : match tenant_state_updates.changed().await {
277 : Ok(()) => {
278 35 : let new_state = &*tenant_state_updates.borrow();
279 35 : match new_state {
280 : TenantState::Active => {
281 UBC 0 : debug!("Tenant state changed to active, continuing the task loop");
282 CBC 35 : return ControlFlow::Continue(());
283 : }
284 LBC (1) : state => {
285 UBC 0 : debug!("Not running the task loop, tenant is not active: {state:?}");
286 LBC (1) : continue;
287 : }
288 : }
289 : }
290 UBC 0 : Err(_sender_dropped_error) => {
291 0 : return ControlFlow::Break(());
292 : }
293 : }
294 : }
295 : }
296 CBC 1530 : }
297 :
298 UBC 0 : #[derive(thiserror::Error, Debug)]
299 : #[error("cancelled")]
300 : pub(crate) struct Cancelled;
301 :
302 : /// Provide a random delay for background task initialization.
303 : ///
304 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
305 : /// different periods for more stable load.
306 CBC 2254 : pub(crate) async fn random_init_delay(
307 2254 : period: Duration,
308 2254 : cancel: &CancellationToken,
309 2254 : ) -> Result<(), Cancelled> {
310 2254 : use rand::Rng;
311 2254 :
312 2254 : if period == Duration::ZERO {
313 318 : return Ok(());
314 1936 : }
315 1936 :
316 1936 : let d = {
317 1936 : let mut rng = rand::thread_rng();
318 1936 : rng.gen_range(Duration::ZERO..=period)
319 1936 : };
320 1936 :
321 1936 : match tokio::time::timeout(d, cancel.cancelled()).await {
322 466 : Ok(_) => Err(Cancelled),
323 647 : Err(_) => Ok(()),
324 : }
325 1431 : }
326 :
327 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
328 825 : pub(crate) fn warn_when_period_overrun(
329 825 : elapsed: Duration,
330 825 : period: Duration,
331 825 : task: BackgroundLoopKind,
332 825 : ) {
333 825 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
334 825 : if elapsed >= period && period != Duration::ZERO {
335 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
336 : // intelligent. however it makes sense to keep the "configuration format" for period, even
337 : // though there's no way to output the actual config value.
338 17 : warn!(
339 17 : ?elapsed,
340 17 : period = %humantime::format_duration(period),
341 17 : ?task,
342 17 : "task iteration took longer than the configured period"
343 17 : );
344 17 : crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
345 17 : .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
346 17 : .inc();
347 808 : }
348 825 : }
|