Line data Source code
1 : //! This module contains functions to serve per-tenant background processes,
2 : //! such as compaction and GC
3 :
4 : use std::ops::ControlFlow;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 : use std::time::{Duration, Instant};
8 :
9 : use crate::context::{DownloadBehavior, RequestContext};
10 : use crate::metrics::TENANT_TASK_EVENTS;
11 : use crate::task_mgr;
12 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
13 : use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD;
14 : use crate::tenant::throttle::Stats;
15 : use crate::tenant::timeline::CompactionError;
16 : use crate::tenant::{Tenant, TenantState};
17 : use rand::Rng;
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::*;
20 : use utils::{backoff, completion, pausable_failpoint};
21 :
22 : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
23 20 : once_cell::sync::Lazy::new(|| {
24 20 : let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
25 20 : let permits = usize::max(
26 20 : 1,
27 20 : // while a lot of the work is done on spawn_blocking, we still do
28 20 : // repartitioning in the async context. this should give leave us some workers
29 20 : // unblocked to be blocked on other work, hopefully easing any outside visible
30 20 : // effects of restarts.
31 20 : //
32 20 : // 6/8 is a guess; previously we ran with unlimited 8 and more from
33 20 : // spawn_blocking.
34 20 : (total_threads * 3).checked_div(4).unwrap_or(0),
35 20 : );
36 20 : assert_ne!(permits, 0, "we will not be adding in permits later");
37 20 : assert!(
38 20 : permits < total_threads,
39 0 : "need threads avail for shorter work"
40 : );
41 20 : tokio::sync::Semaphore::new(permits)
42 20 : });
43 :
44 180 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr, enum_map::Enum)]
45 : #[strum(serialize_all = "snake_case")]
46 : pub(crate) enum BackgroundLoopKind {
47 : Compaction,
48 : Gc,
49 : Eviction,
50 : IngestHouseKeeping,
51 : ConsumptionMetricsCollectMetrics,
52 : ConsumptionMetricsSyntheticSizeWorker,
53 : InitialLogicalSizeCalculation,
54 : HeatmapUpload,
55 : SecondaryDownload,
56 : }
57 :
58 : impl BackgroundLoopKind {
59 0 : fn as_static_str(&self) -> &'static str {
60 0 : self.into()
61 0 : }
62 : }
63 :
64 : static PERMIT_GAUGES: once_cell::sync::Lazy<
65 : enum_map::EnumMap<BackgroundLoopKind, metrics::IntCounterPair>,
66 20 : > = once_cell::sync::Lazy::new(|| {
67 180 : enum_map::EnumMap::from_array(std::array::from_fn(|i| {
68 180 : let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
69 180 : crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE.with_label_values(&[kind.into()])
70 180 : }))
71 20 : });
72 :
73 : /// Cancellation safe.
74 364 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
75 364 : loop_kind: BackgroundLoopKind,
76 364 : _ctx: &RequestContext,
77 364 : ) -> tokio::sync::SemaphorePermit<'static> {
78 364 : let _guard = PERMIT_GAUGES[loop_kind].guard();
79 :
80 : pausable_failpoint!(
81 : "initial-size-calculation-permit-pause",
82 : loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
83 : );
84 :
85 : // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
86 364 : match CONCURRENT_BACKGROUND_TASKS.acquire().await {
87 364 : Ok(permit) => permit,
88 0 : Err(_closed) => unreachable!("we never close the semaphore"),
89 : }
90 364 : }
91 :
92 : /// Start per tenant background loops: compaction and gc.
93 0 : pub fn start_background_loops(
94 0 : tenant: &Arc<Tenant>,
95 0 : background_jobs_can_start: Option<&completion::Barrier>,
96 0 : ) {
97 0 : let tenant_shard_id = tenant.tenant_shard_id;
98 0 : task_mgr::spawn(
99 0 : BACKGROUND_RUNTIME.handle(),
100 0 : TaskKind::Compaction,
101 0 : Some(tenant_shard_id),
102 0 : None,
103 0 : &format!("compactor for tenant {tenant_shard_id}"),
104 0 : {
105 0 : let tenant = Arc::clone(tenant);
106 0 : let background_jobs_can_start = background_jobs_can_start.cloned();
107 0 : async move {
108 0 : let cancel = task_mgr::shutdown_token();
109 : tokio::select! {
110 : _ = cancel.cancelled() => { return Ok(()) },
111 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
112 : };
113 0 : compaction_loop(tenant, cancel)
114 0 : // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
115 0 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
116 0 : .await;
117 0 : Ok(())
118 0 : }
119 0 : },
120 0 : );
121 0 : task_mgr::spawn(
122 0 : BACKGROUND_RUNTIME.handle(),
123 0 : TaskKind::GarbageCollector,
124 0 : Some(tenant_shard_id),
125 0 : None,
126 0 : &format!("garbage collector for tenant {tenant_shard_id}"),
127 0 : {
128 0 : let tenant = Arc::clone(tenant);
129 0 : let background_jobs_can_start = background_jobs_can_start.cloned();
130 0 : async move {
131 0 : let cancel = task_mgr::shutdown_token();
132 : tokio::select! {
133 : _ = cancel.cancelled() => { return Ok(()) },
134 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
135 : };
136 0 : gc_loop(tenant, cancel)
137 0 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
138 0 : .await;
139 0 : Ok(())
140 0 : }
141 0 : },
142 0 : );
143 0 :
144 0 : task_mgr::spawn(
145 0 : BACKGROUND_RUNTIME.handle(),
146 0 : TaskKind::IngestHousekeeping,
147 0 : Some(tenant_shard_id),
148 0 : None,
149 0 : &format!("ingest housekeeping for tenant {tenant_shard_id}"),
150 0 : {
151 0 : let tenant = Arc::clone(tenant);
152 0 : let background_jobs_can_start = background_jobs_can_start.cloned();
153 0 : async move {
154 0 : let cancel = task_mgr::shutdown_token();
155 : tokio::select! {
156 : _ = cancel.cancelled() => { return Ok(()) },
157 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
158 : };
159 0 : ingest_housekeeping_loop(tenant, cancel)
160 0 : .instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
161 0 : .await;
162 0 : Ok(())
163 0 : }
164 0 : },
165 0 : );
166 0 : }
167 :
168 : ///
169 : /// Compaction task's main loop
170 : ///
171 0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
172 0 : const MAX_BACKOFF_SECS: f64 = 300.0;
173 0 : // How many errors we have seen consequtively
174 0 : let mut error_run_count = 0;
175 0 :
176 0 : let mut last_throttle_flag_reset_at = Instant::now();
177 0 :
178 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
179 0 : async {
180 0 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
181 0 : let mut first = true;
182 0 : loop {
183 0 : tokio::select! {
184 : _ = cancel.cancelled() => {
185 : return;
186 : },
187 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
188 : ControlFlow::Break(()) => return,
189 : ControlFlow::Continue(()) => (),
190 : },
191 : }
192 :
193 0 : let period = tenant.get_compaction_period();
194 0 :
195 0 : // TODO: we shouldn't need to await to find tenant and this could be moved outside of
196 0 : // loop, #3501. There are also additional "allowed_errors" in tests.
197 0 : if first {
198 0 : first = false;
199 0 : if random_init_delay(period, &cancel).await.is_err() {
200 0 : break;
201 0 : }
202 0 : }
203 :
204 0 : let started_at = Instant::now();
205 :
206 0 : let sleep_duration = if period == Duration::ZERO {
207 : #[cfg(not(feature = "testing"))]
208 : info!("automatic compaction is disabled");
209 : // check again in 10 seconds, in case it's been enabled again.
210 0 : Duration::from_secs(10)
211 : } else {
212 : // Run compaction
213 0 : match tenant.compaction_iteration(&cancel, &ctx).await {
214 0 : Err(e) => {
215 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
216 0 : error_run_count + 1,
217 0 : 1.0,
218 0 : MAX_BACKOFF_SECS,
219 0 : );
220 0 : error_run_count += 1;
221 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
222 0 : log_compaction_error(
223 0 : &e,
224 0 : error_run_count,
225 0 : &wait_duration,
226 0 : cancel.is_cancelled(),
227 0 : );
228 0 : wait_duration
229 : }
230 0 : Ok(has_pending_task) => {
231 0 : error_run_count = 0;
232 0 : // schedule the next compaction immediately in case there is a pending compaction task
233 0 : if has_pending_task { Duration::from_secs(0) } else { period }
234 : }
235 : }
236 : };
237 :
238 0 : let elapsed = started_at.elapsed();
239 0 : warn_when_period_overrun(elapsed, period, BackgroundLoopKind::Compaction);
240 0 :
241 0 : // the duration is recorded by performance tests by enabling debug in this function
242 0 : tracing::debug!(elapsed_ms=elapsed.as_millis(), "compaction iteration complete");
243 :
244 : // Perhaps we did no work and the walredo process has been idle for some time:
245 : // give it a chance to shut down to avoid leaving walredo process running indefinitely.
246 0 : if let Some(walredo_mgr) = &tenant.walredo_mgr {
247 0 : walredo_mgr.maybe_quiesce(period * 10);
248 0 : }
249 :
250 : // TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
251 : // so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
252 0 : info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
253 0 : let now = Instant::now();
254 0 : let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
255 0 : let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats();
256 0 : if count_throttled == 0 {
257 0 : return;
258 0 : }
259 0 : let allowed_rps = tenant.timeline_get_throttle.steady_rps();
260 0 : let delta = now - prev;
261 0 : info!(
262 0 : n_seconds=%format_args!("{:.3}",
263 0 : delta.as_secs_f64()),
264 : count_accounted,
265 : count_throttled,
266 : sum_throttled_usecs,
267 0 : allowed_rps=%format_args!("{allowed_rps:.0}"),
268 0 : "shard was throttled in the last n_seconds")
269 0 : });
270 0 :
271 0 : // Sleep
272 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
273 0 : .await
274 0 : .is_ok()
275 : {
276 0 : break;
277 0 : }
278 : }
279 0 : }
280 0 : .await;
281 0 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
282 0 : }
283 :
284 0 : fn log_compaction_error(
285 0 : e: &CompactionError,
286 0 : error_run_count: u32,
287 0 : sleep_duration: &std::time::Duration,
288 0 : task_cancelled: bool,
289 0 : ) {
290 : use crate::tenant::upload_queue::NotInitialized;
291 : use crate::tenant::PageReconstructError;
292 : use CompactionError::*;
293 :
294 : enum LooksLike {
295 : Info,
296 : Error,
297 : }
298 :
299 0 : let decision = match e {
300 0 : ShuttingDown => None,
301 0 : _ if task_cancelled => Some(LooksLike::Info),
302 0 : Other(e) => {
303 0 : let root_cause = e.root_cause();
304 :
305 0 : let is_stopping = {
306 0 : let upload_queue = root_cause
307 0 : .downcast_ref::<NotInitialized>()
308 0 : .is_some_and(|e| e.is_stopping());
309 0 :
310 0 : let timeline = root_cause
311 0 : .downcast_ref::<PageReconstructError>()
312 0 : .is_some_and(|e| e.is_stopping());
313 0 :
314 0 : upload_queue || timeline
315 : };
316 :
317 0 : if is_stopping {
318 0 : Some(LooksLike::Info)
319 : } else {
320 0 : Some(LooksLike::Error)
321 : }
322 : }
323 : };
324 :
325 0 : match decision {
326 0 : Some(LooksLike::Info) => info!(
327 0 : "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
328 : ),
329 0 : Some(LooksLike::Error) => error!(
330 0 : "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
331 : ),
332 0 : None => {}
333 : }
334 0 : }
335 :
336 : ///
337 : /// GC task's main loop
338 : ///
339 0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
340 0 : const MAX_BACKOFF_SECS: f64 = 300.0;
341 0 : // How many errors we have seen consequtively
342 0 : let mut error_run_count = 0;
343 0 :
344 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
345 0 : async {
346 0 : // GC might require downloading, to find the cutoff LSN that corresponds to the
347 0 : // cutoff specified as time.
348 0 : let ctx =
349 0 : RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
350 0 :
351 0 : let mut first = true;
352 0 : loop {
353 0 : tokio::select! {
354 : _ = cancel.cancelled() => {
355 : return;
356 : },
357 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
358 : ControlFlow::Break(()) => return,
359 : ControlFlow::Continue(()) => (),
360 : },
361 : }
362 :
363 0 : let period = tenant.get_gc_period();
364 0 :
365 0 : if first {
366 0 : first = false;
367 0 :
368 0 : if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel)
369 0 : .await
370 0 : .is_err()
371 : {
372 0 : break;
373 0 : }
374 0 :
375 0 : if random_init_delay(period, &cancel).await.is_err() {
376 0 : break;
377 0 : }
378 0 : }
379 :
380 0 : let started_at = Instant::now();
381 0 :
382 0 : let gc_horizon = tenant.get_gc_horizon();
383 0 : let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
384 : #[cfg(not(feature = "testing"))]
385 : info!("automatic GC is disabled");
386 : // check again in 10 seconds, in case it's been enabled again.
387 0 : Duration::from_secs(10)
388 : } else {
389 : // Run gc
390 0 : let res = tenant
391 0 : .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
392 0 : .await;
393 0 : match res {
394 : Ok(_) => {
395 0 : error_run_count = 0;
396 0 : period
397 : }
398 : Err(crate::tenant::GcError::TenantCancelled) => {
399 0 : return;
400 : }
401 0 : Err(e) => {
402 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
403 0 : error_run_count + 1,
404 0 : 1.0,
405 0 : MAX_BACKOFF_SECS,
406 0 : );
407 0 : error_run_count += 1;
408 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
409 0 :
410 0 : error!(
411 0 : "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
412 : );
413 0 : wait_duration
414 : }
415 : }
416 : };
417 :
418 0 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
419 0 :
420 0 : // Sleep
421 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
422 0 : .await
423 0 : .is_ok()
424 : {
425 0 : break;
426 0 : }
427 : }
428 0 : }
429 0 : .await;
430 0 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
431 0 : }
432 :
433 0 : async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
434 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
435 0 : async {
436 0 : loop {
437 0 : tokio::select! {
438 : _ = cancel.cancelled() => {
439 : return;
440 : },
441 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
442 : ControlFlow::Break(()) => return,
443 : ControlFlow::Continue(()) => (),
444 : },
445 : }
446 :
447 : // We run ingest housekeeping with the same frequency as compaction: it is not worth
448 : // having a distinct setting. But we don't run it in the same task, because compaction
449 : // blocks on acquiring the background job semaphore.
450 0 : let period = tenant.get_compaction_period();
451 :
452 : // If compaction period is set to zero (to disable it), then we will use a reasonable default
453 0 : let period = if period == Duration::ZERO {
454 0 : humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD)
455 0 : .unwrap()
456 0 : .into()
457 : } else {
458 0 : period
459 : };
460 :
461 : // Jitter the period by +/- 5%
462 0 : let period =
463 0 : rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100);
464 0 :
465 0 : // Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
466 0 : // a tenant, since it won't have started writing any ephemeral files yet.
467 0 : if tokio::time::timeout(period, cancel.cancelled())
468 0 : .await
469 0 : .is_ok()
470 : {
471 0 : break;
472 0 : }
473 0 :
474 0 : let started_at = Instant::now();
475 0 : tenant.ingest_housekeeping().await;
476 :
477 0 : warn_when_period_overrun(
478 0 : started_at.elapsed(),
479 0 : period,
480 0 : BackgroundLoopKind::IngestHouseKeeping,
481 0 : );
482 : }
483 0 : }
484 0 : .await;
485 0 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
486 0 : }
487 :
488 0 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
489 0 : // if the tenant has a proper status already, no need to wait for anything
490 0 : if tenant.current_state() == TenantState::Active {
491 0 : ControlFlow::Continue(())
492 : } else {
493 0 : let mut tenant_state_updates = tenant.subscribe_for_state_updates();
494 : loop {
495 0 : match tenant_state_updates.changed().await {
496 : Ok(()) => {
497 0 : let new_state = &*tenant_state_updates.borrow();
498 0 : match new_state {
499 : TenantState::Active => {
500 0 : debug!("Tenant state changed to active, continuing the task loop");
501 0 : return ControlFlow::Continue(());
502 : }
503 0 : state => {
504 0 : debug!("Not running the task loop, tenant is not active: {state:?}");
505 0 : continue;
506 : }
507 : }
508 : }
509 0 : Err(_sender_dropped_error) => {
510 0 : return ControlFlow::Break(());
511 : }
512 : }
513 : }
514 : }
515 0 : }
516 :
517 0 : #[derive(thiserror::Error, Debug)]
518 : #[error("cancelled")]
519 : pub(crate) struct Cancelled;
520 :
521 : /// Provide a random delay for background task initialization.
522 : ///
523 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
524 : /// different periods for more stable load.
525 0 : pub(crate) async fn random_init_delay(
526 0 : period: Duration,
527 0 : cancel: &CancellationToken,
528 0 : ) -> Result<(), Cancelled> {
529 0 : if period == Duration::ZERO {
530 0 : return Ok(());
531 0 : }
532 0 :
533 0 : let d = {
534 0 : let mut rng = rand::thread_rng();
535 0 : rng.gen_range(Duration::ZERO..=period)
536 0 : };
537 0 :
538 0 : match tokio::time::timeout(d, cancel.cancelled()).await {
539 0 : Ok(_) => Err(Cancelled),
540 0 : Err(_) => Ok(()),
541 : }
542 0 : }
543 :
544 : /// Delays GC by defaul lease length at restart.
545 : ///
546 : /// We do this as the leases mapping are not persisted to disk. By delaying GC by default
547 : /// length, we gurantees that all the leases we granted before the restart will expire
548 : /// when we run GC for the first time after the restart.
549 0 : pub(crate) async fn delay_by_lease_length(
550 0 : length: Duration,
551 0 : cancel: &CancellationToken,
552 0 : ) -> Result<(), Cancelled> {
553 0 : match tokio::time::timeout(length, cancel.cancelled()).await {
554 0 : Ok(_) => Err(Cancelled),
555 0 : Err(_) => Ok(()),
556 : }
557 0 : }
558 :
559 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
560 0 : pub(crate) fn warn_when_period_overrun(
561 0 : elapsed: Duration,
562 0 : period: Duration,
563 0 : task: BackgroundLoopKind,
564 0 : ) {
565 0 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
566 0 : if elapsed >= period && period != Duration::ZERO {
567 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
568 : // intelligent. however it makes sense to keep the "configuration format" for period, even
569 : // though there's no way to output the actual config value.
570 0 : info!(
571 : ?elapsed,
572 0 : period = %humantime::format_duration(period),
573 0 : ?task,
574 0 : "task iteration took longer than the configured period"
575 : );
576 0 : crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
577 0 : .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
578 0 : .inc();
579 0 : }
580 0 : }
|