Line data Source code
1 : //! This module contains functions to serve per-tenant background processes,
2 : //! such as compaction and GC
3 :
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::{Duration, Instant};
7 :
8 : use crate::context::{DownloadBehavior, RequestContext};
9 : use crate::metrics::TENANT_TASK_EVENTS;
10 : use crate::task_mgr;
11 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
12 : use crate::tenant::timeline::CompactionError;
13 : use crate::tenant::{Tenant, TenantState};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::*;
16 : use utils::{backoff, completion};
17 :
18 : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
19 379 : once_cell::sync::Lazy::new(|| {
20 379 : let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
21 379 : let permits = usize::max(
22 379 : 1,
23 379 : // while a lot of the work is done on spawn_blocking, we still do
24 379 : // repartitioning in the async context. this should give leave us some workers
25 379 : // unblocked to be blocked on other work, hopefully easing any outside visible
26 379 : // effects of restarts.
27 379 : //
28 379 : // 6/8 is a guess; previously we ran with unlimited 8 and more from
29 379 : // spawn_blocking.
30 379 : (total_threads * 3).checked_div(4).unwrap_or(0),
31 379 : );
32 379 : assert_ne!(permits, 0, "we will not be adding in permits later");
33 379 : assert!(
34 379 : permits < total_threads,
35 0 : "need threads avail for shorter work"
36 : );
37 379 : tokio::sync::Semaphore::new(permits)
38 379 : });
39 :
40 2236 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
41 : #[strum(serialize_all = "snake_case")]
42 : pub(crate) enum BackgroundLoopKind {
43 : Compaction,
44 : Gc,
45 : Eviction,
46 : ConsumptionMetricsCollectMetrics,
47 : ConsumptionMetricsSyntheticSizeWorker,
48 : InitialLogicalSizeCalculation,
49 : HeatmapUpload,
50 : SecondaryDownload,
51 : }
52 :
53 : impl BackgroundLoopKind {
54 2236 : fn as_static_str(&self) -> &'static str {
55 2236 : let s: &'static str = self.into();
56 2236 : s
57 2236 : }
58 : }
59 :
60 : /// Cancellation safe.
61 2289 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
62 2289 : loop_kind: BackgroundLoopKind,
63 2289 : _ctx: &RequestContext,
64 2289 : ) -> impl Drop {
65 2220 : let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
66 2220 : .with_label_values(&[loop_kind.as_static_str()])
67 2220 : .guard();
68 2220 :
69 2220 : pausable_failpoint!(
70 565 : "initial-size-calculation-permit-pause",
71 565 : loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
72 565 : );
73 :
74 1869 : match CONCURRENT_BACKGROUND_TASKS.acquire().await {
75 1869 : Ok(permit) => permit,
76 0 : Err(_closed) => unreachable!("we never close the semaphore"),
77 : }
78 1869 : }
79 :
80 : /// Start per tenant background loops: compaction and gc.
81 856 : pub fn start_background_loops(
82 856 : tenant: &Arc<Tenant>,
83 856 : background_jobs_can_start: Option<&completion::Barrier>,
84 856 : ) {
85 856 : let tenant_shard_id = tenant.tenant_shard_id;
86 856 : task_mgr::spawn(
87 856 : BACKGROUND_RUNTIME.handle(),
88 856 : TaskKind::Compaction,
89 856 : Some(tenant_shard_id),
90 856 : None,
91 856 : &format!("compactor for tenant {tenant_shard_id}"),
92 856 : false,
93 856 : {
94 856 : let tenant = Arc::clone(tenant);
95 856 : let background_jobs_can_start = background_jobs_can_start.cloned();
96 856 : async move {
97 856 : let cancel = task_mgr::shutdown_token();
98 856 : tokio::select! {
99 856 : _ = cancel.cancelled() => { return Ok(()) },
100 856 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
101 856 : };
102 856 : compaction_loop(tenant, cancel)
103 856 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
104 95172 : .await;
105 400 : Ok(())
106 856 : }
107 856 : },
108 856 : );
109 856 : task_mgr::spawn(
110 856 : BACKGROUND_RUNTIME.handle(),
111 856 : TaskKind::GarbageCollector,
112 856 : Some(tenant_shard_id),
113 856 : None,
114 856 : &format!("garbage collector for tenant {tenant_shard_id}"),
115 856 : false,
116 856 : {
117 856 : let tenant = Arc::clone(tenant);
118 856 : let background_jobs_can_start = background_jobs_can_start.cloned();
119 856 : async move {
120 856 : let cancel = task_mgr::shutdown_token();
121 856 : tokio::select! {
122 856 : _ = cancel.cancelled() => { return Ok(()) },
123 856 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
124 856 : };
125 856 : gc_loop(tenant, cancel)
126 856 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
127 276338 : .await;
128 400 : Ok(())
129 856 : }
130 856 : },
131 856 : );
132 856 : }
133 :
134 : ///
135 : /// Compaction task's main loop
136 : ///
137 856 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
138 856 : const MAX_BACKOFF_SECS: f64 = 300.0;
139 856 : // How many errors we have seen consequtively
140 856 : let mut error_run_count = 0;
141 856 :
142 856 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
143 856 : async {
144 856 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
145 856 : let mut first = true;
146 1241 : loop {
147 1464 : tokio::select! {
148 : _ = cancel.cancelled() => {
149 : return;
150 : },
151 1236 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
152 : ControlFlow::Break(()) => return,
153 : ControlFlow::Continue(()) => (),
154 : },
155 1241 : }
156 :
157 1236 : let period = tenant.get_compaction_period();
158 1236 :
159 1236 : // TODO: we shouldn't need to await to find tenant and this could be moved outside of
160 1236 : // loop, #3501. There are also additional "allowed_errors" in tests.
161 1236 : if first {
162 851 : first = false;
163 851 : if random_init_delay(period, &cancel).await.is_err() {
164 230 : break;
165 423 : }
166 385 : }
167 :
168 808 : let started_at = Instant::now();
169 :
170 808 : let sleep_duration = if period == Duration::ZERO {
171 : #[cfg(not(feature = "testing"))]
172 : info!("automatic compaction is disabled");
173 : // check again in 10 seconds, in case it's been enabled again.
174 400 : Duration::from_secs(10)
175 : } else {
176 : // Run compaction
177 93928 : if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
178 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
179 0 : error_run_count + 1,
180 0 : 1.0,
181 0 : MAX_BACKOFF_SECS,
182 0 : );
183 0 : error_run_count += 1;
184 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
185 0 : log_compaction_error(
186 0 : &e,
187 0 : error_run_count,
188 0 : &wait_duration,
189 0 : cancel.is_cancelled(),
190 0 : );
191 0 : wait_duration
192 : } else {
193 406 : error_run_count = 0;
194 406 : period
195 : }
196 : };
197 :
198 806 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
199 :
200 : // Perhaps we did no work and the walredo process has been idle for some time:
201 : // give it a chance to shut down to avoid leaving walredo process running indefinitely.
202 806 : if let Some(walredo_mgr) = &tenant.walredo_mgr {
203 806 : walredo_mgr.maybe_quiesce(period * 10);
204 806 : }
205 :
206 : // Sleep
207 806 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
208 551 : .await
209 550 : .is_ok()
210 : {
211 165 : break;
212 385 : }
213 : }
214 400 : }
215 95172 : .await;
216 400 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
217 400 : }
218 :
219 0 : fn log_compaction_error(
220 0 : e: &CompactionError,
221 0 : error_run_count: u32,
222 0 : sleep_duration: &std::time::Duration,
223 0 : task_cancelled: bool,
224 0 : ) {
225 : use crate::tenant::upload_queue::NotInitialized;
226 : use crate::tenant::PageReconstructError;
227 : use CompactionError::*;
228 :
229 : enum LooksLike {
230 : Info,
231 : Error,
232 : }
233 :
234 0 : let decision = match e {
235 0 : ShuttingDown => None,
236 0 : _ if task_cancelled => Some(LooksLike::Info),
237 0 : Other(e) => {
238 0 : let root_cause = e.root_cause();
239 :
240 0 : let is_stopping = {
241 0 : let upload_queue = root_cause
242 0 : .downcast_ref::<NotInitialized>()
243 0 : .is_some_and(|e| e.is_stopping());
244 0 :
245 0 : let timeline = root_cause
246 0 : .downcast_ref::<PageReconstructError>()
247 0 : .is_some_and(|e| e.is_stopping());
248 0 :
249 0 : upload_queue || timeline
250 : };
251 :
252 0 : if is_stopping {
253 0 : Some(LooksLike::Info)
254 : } else {
255 0 : Some(LooksLike::Error)
256 : }
257 : }
258 : };
259 :
260 0 : match decision {
261 0 : Some(LooksLike::Info) => info!(
262 0 : "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
263 0 : ),
264 0 : Some(LooksLike::Error) => error!(
265 0 : "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
266 0 : ),
267 0 : None => {}
268 : }
269 0 : }
270 :
271 : ///
272 : /// GC task's main loop
273 : ///
274 856 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
275 856 : const MAX_BACKOFF_SECS: f64 = 300.0;
276 856 : // How many errors we have seen consequtively
277 856 : let mut error_run_count = 0;
278 856 :
279 856 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
280 856 : async {
281 856 : // GC might require downloading, to find the cutoff LSN that corresponds to the
282 856 : // cutoff specified as time.
283 856 : let ctx =
284 856 : RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
285 856 : let mut first = true;
286 1098 : loop {
287 1295 : tokio::select! {
288 : _ = cancel.cancelled() => {
289 : return;
290 : },
291 1092 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
292 : ControlFlow::Break(()) => return,
293 : ControlFlow::Continue(()) => (),
294 : },
295 1098 : }
296 :
297 1092 : let period = tenant.get_gc_period();
298 1092 :
299 1092 : if first {
300 850 : first = false;
301 850 : if random_init_delay(period, &cancel).await.is_err() {
302 296 : break;
303 186 : }
304 242 : }
305 :
306 428 : let started_at = Instant::now();
307 428 :
308 428 : let gc_horizon = tenant.get_gc_horizon();
309 428 : let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
310 : #[cfg(not(feature = "testing"))]
311 : info!("automatic GC is disabled");
312 : // check again in 10 seconds, in case it's been enabled again.
313 401 : Duration::from_secs(10)
314 : } else {
315 : // Run gc
316 27 : let res = tenant
317 27 : .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
318 275501 : .await;
319 26 : if let Err(e) = res {
320 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
321 0 : error_run_count + 1,
322 0 : 1.0,
323 0 : MAX_BACKOFF_SECS,
324 0 : );
325 0 : error_run_count += 1;
326 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
327 0 : error!(
328 0 : "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
329 0 : );
330 0 : wait_duration
331 : } else {
332 26 : error_run_count = 0;
333 26 : period
334 : }
335 : };
336 :
337 427 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
338 427 :
339 427 : // Sleep
340 427 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
341 340 : .await
342 340 : .is_ok()
343 : {
344 98 : break;
345 242 : }
346 : }
347 400 : }
348 276338 : .await;
349 400 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
350 400 : }
351 :
352 2339 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
353 2339 : // if the tenant has a proper status already, no need to wait for anything
354 2339 : if tenant.current_state() == TenantState::Active {
355 1926 : ControlFlow::Continue(())
356 : } else {
357 413 : let mut tenant_state_updates = tenant.subscribe_for_state_updates();
358 : loop {
359 420 : match tenant_state_updates.changed().await {
360 : Ok(()) => {
361 409 : let new_state = &*tenant_state_updates.borrow();
362 409 : match new_state {
363 : TenantState::Active => {
364 0 : debug!("Tenant state changed to active, continuing the task loop");
365 402 : return ControlFlow::Continue(());
366 : }
367 7 : state => {
368 0 : debug!("Not running the task loop, tenant is not active: {state:?}");
369 7 : continue;
370 : }
371 : }
372 : }
373 0 : Err(_sender_dropped_error) => {
374 0 : return ControlFlow::Break(());
375 : }
376 : }
377 : }
378 : }
379 2328 : }
380 :
381 0 : #[derive(thiserror::Error, Debug)]
382 : #[error("cancelled")]
383 : pub(crate) struct Cancelled;
384 :
385 : /// Provide a random delay for background task initialization.
386 : ///
387 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
388 : /// different periods for more stable load.
389 2960 : pub(crate) async fn random_init_delay(
390 2960 : period: Duration,
391 2960 : cancel: &CancellationToken,
392 2960 : ) -> Result<(), Cancelled> {
393 2960 : use rand::Rng;
394 2960 :
395 2960 : if period == Duration::ZERO {
396 365 : return Ok(());
397 2595 : }
398 2595 :
399 2595 : let d = {
400 2595 : let mut rng = rand::thread_rng();
401 2595 : rng.gen_range(Duration::ZERO..=period)
402 2595 : };
403 2595 :
404 2595 : match tokio::time::timeout(d, cancel.cancelled()).await {
405 832 : Ok(_) => Err(Cancelled),
406 871 : Err(_) => Ok(()),
407 : }
408 2068 : }
409 :
410 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
411 1318 : pub(crate) fn warn_when_period_overrun(
412 1318 : elapsed: Duration,
413 1318 : period: Duration,
414 1318 : task: BackgroundLoopKind,
415 1318 : ) {
416 1318 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
417 1318 : if elapsed >= period && period != Duration::ZERO {
418 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
419 : // intelligent. however it makes sense to keep the "configuration format" for period, even
420 : // though there's no way to output the actual config value.
421 16 : info!(
422 16 : ?elapsed,
423 16 : period = %humantime::format_duration(period),
424 16 : ?task,
425 16 : "task iteration took longer than the configured period"
426 16 : );
427 16 : crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
428 16 : .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
429 16 : .inc();
430 1302 : }
431 1318 : }
|