LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: d5cbf29d97fdd23f6bda3d96cc57e517a06c5084.info Lines: 7.2 % 363 26
Test Date: 2025-03-18 14:05:41 Functions: 10.7 % 28 3

            Line data    Source code
       1              : //! This module contains per-tenant background processes, e.g. compaction and GC.
       2              : 
       3              : use std::cmp::max;
       4              : use std::future::Future;
       5              : use std::ops::{ControlFlow, RangeInclusive};
       6              : use std::pin::pin;
       7              : use std::sync::Arc;
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use once_cell::sync::Lazy;
      11              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
      12              : use rand::Rng;
      13              : use scopeguard::defer;
      14              : use tokio::sync::{Semaphore, SemaphorePermit};
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::*;
      17              : use utils::backoff::exponential_backoff_duration;
      18              : use utils::completion::Barrier;
      19              : use utils::pausable_failpoint;
      20              : 
      21              : use crate::context::{DownloadBehavior, RequestContext};
      22              : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
      23              : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
      24              : use crate::tenant::throttle::Stats;
      25              : use crate::tenant::timeline::CompactionError;
      26              : use crate::tenant::timeline::compaction::CompactionOutcome;
      27              : use crate::tenant::{Tenant, TenantState};
      28              : 
      29              : /// Semaphore limiting concurrent background tasks (across all tenants).
      30              : ///
      31              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
      32           40 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      33           40 :     let total_threads = TOKIO_WORKER_THREADS.get();
      34           40 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      35           40 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      36           40 :     assert!(permits < total_threads, "need threads for other work");
      37           40 :     Semaphore::new(permits)
      38           40 : });
      39              : 
      40              : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
      41              : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
      42              : ///
      43              : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
      44              : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
      45              : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
      46              : ///
      47              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
      48              : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
      49              : /// thread pool.
      50            0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      51            0 :     let total_threads = TOKIO_WORKER_THREADS.get();
      52            0 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      53            0 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      54            0 :     assert!(permits < total_threads, "need threads for other work");
      55            0 :     Semaphore::new(permits)
      56            0 : });
      57              : 
      58              : /// Background jobs.
      59              : ///
      60              : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
      61              : /// do any significant IO or CPU work.
      62              : #[derive(
      63              :     Debug,
      64              :     PartialEq,
      65              :     Eq,
      66              :     Clone,
      67              :     Copy,
      68              :     strum_macros::IntoStaticStr,
      69              :     strum_macros::Display,
      70              :     enum_map::Enum,
      71              : )]
      72              : #[strum(serialize_all = "snake_case")]
      73              : pub(crate) enum BackgroundLoopKind {
      74              :     /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
      75              :     /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
      76              :     L0Compaction,
      77              :     Compaction,
      78              :     Gc,
      79              :     Eviction,
      80              :     TenantHouseKeeping,
      81              :     ConsumptionMetricsCollectMetrics,
      82              :     ConsumptionMetricsSyntheticSizeWorker,
      83              :     InitialLogicalSizeCalculation,
      84              :     HeatmapUpload,
      85              :     SecondaryDownload,
      86              : }
      87              : 
      88              : pub struct BackgroundLoopSemaphorePermit<'a> {
      89              :     _permit: SemaphorePermit<'static>,
      90              :     _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
      91              : }
      92              : 
      93              : /// Acquires a semaphore permit, to limit concurrent background jobs.
      94          725 : pub(crate) async fn acquire_concurrency_permit(
      95          725 :     loop_kind: BackgroundLoopKind,
      96          725 :     _ctx: &RequestContext,
      97          725 : ) -> BackgroundLoopSemaphorePermit<'static> {
      98          725 :     let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
      99          725 : 
     100          725 :     if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
     101            0 :         pausable_failpoint!("initial-size-calculation-permit-pause");
     102          725 :     }
     103              : 
     104              :     // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
     105          725 :     let semaphore = match loop_kind {
     106            0 :         BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
     107          725 :         _ => &CONCURRENT_BACKGROUND_TASKS,
     108              :     };
     109          725 :     let permit = semaphore.acquire().await.expect("should never close");
     110          725 : 
     111          725 :     recorder.acquired();
     112          725 : 
     113          725 :     BackgroundLoopSemaphorePermit {
     114          725 :         _permit: permit,
     115          725 :         _recorder: recorder,
     116          725 :     }
     117          725 : }
     118              : 
     119              : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
     120            0 : pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
     121            0 :     let tenant_shard_id = tenant.tenant_shard_id;
     122            0 : 
     123            0 :     task_mgr::spawn(
     124            0 :         BACKGROUND_RUNTIME.handle(),
     125            0 :         TaskKind::Compaction,
     126            0 :         tenant_shard_id,
     127            0 :         None,
     128            0 :         &format!("compactor for tenant {tenant_shard_id}"),
     129            0 :         {
     130            0 :             let tenant = Arc::clone(tenant);
     131            0 :             let can_start = can_start.cloned();
     132            0 :             async move {
     133            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     134            0 :                 tokio::select! {
     135            0 :                     _ = cancel.cancelled() => return Ok(()),
     136            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     137            0 :                 };
     138            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     139            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     140            0 :                 compaction_loop(tenant, cancel)
     141            0 :                     // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
     142            0 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     143            0 :                     .await;
     144            0 :                 Ok(())
     145            0 :             }
     146            0 :         },
     147            0 :     );
     148            0 : 
     149            0 :     task_mgr::spawn(
     150            0 :         BACKGROUND_RUNTIME.handle(),
     151            0 :         TaskKind::GarbageCollector,
     152            0 :         tenant_shard_id,
     153            0 :         None,
     154            0 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     155            0 :         {
     156            0 :             let tenant = Arc::clone(tenant);
     157            0 :             let can_start = can_start.cloned();
     158            0 :             async move {
     159            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     160            0 :                 tokio::select! {
     161            0 :                     _ = cancel.cancelled() => return Ok(()),
     162            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     163            0 :                 };
     164            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     165            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     166            0 :                 gc_loop(tenant, cancel)
     167            0 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     168            0 :                     .await;
     169            0 :                 Ok(())
     170            0 :             }
     171            0 :         },
     172            0 :     );
     173            0 : 
     174            0 :     task_mgr::spawn(
     175            0 :         BACKGROUND_RUNTIME.handle(),
     176            0 :         TaskKind::TenantHousekeeping,
     177            0 :         tenant_shard_id,
     178            0 :         None,
     179            0 :         &format!("housekeeping for tenant {tenant_shard_id}"),
     180            0 :         {
     181            0 :             let tenant = Arc::clone(tenant);
     182            0 :             let can_start = can_start.cloned();
     183            0 :             async move {
     184            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     185            0 :                 tokio::select! {
     186            0 :                     _ = cancel.cancelled() => return Ok(()),
     187            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     188            0 :                 };
     189            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     190            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     191            0 :                 tenant_housekeeping_loop(tenant, cancel)
     192            0 :                     .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     193            0 :                     .await;
     194            0 :                 Ok(())
     195            0 :             }
     196            0 :         },
     197            0 :     );
     198            0 : }
     199              : 
     200              : /// Compaction task's main loop.
     201            0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     202              :     const BASE_BACKOFF_SECS: f64 = 1.0;
     203              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     204              :     const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
     205              : 
     206            0 :     let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     207            0 :     let mut period = tenant.get_compaction_period();
     208            0 :     let mut error_run = 0; // consecutive errors
     209            0 : 
     210            0 :     // Stagger the compaction loop across tenants.
     211            0 :     if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     212            0 :         return;
     213            0 :     }
     214            0 :     if sleep_random(period, &cancel).await.is_err() {
     215            0 :         return;
     216            0 :     }
     217              : 
     218              :     loop {
     219              :         // Recheck that we're still active.
     220            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     221            0 :             return;
     222            0 :         }
     223            0 : 
     224            0 :         // Refresh the period. If compaction is disabled, check again in a bit.
     225            0 :         period = tenant.get_compaction_period();
     226            0 :         if period == Duration::ZERO {
     227              :             #[cfg(not(feature = "testing"))]
     228              :             info!("automatic compaction is disabled");
     229            0 :             tokio::select! {
     230            0 :                 _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
     231            0 :                 _ = cancel.cancelled() => return,
     232              :             }
     233            0 :             continue;
     234            0 :         }
     235            0 : 
     236            0 :         // Wait for the next compaction run.
     237            0 :         let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     238            0 :         tokio::select! {
     239            0 :             _ = tokio::time::sleep(backoff), if error_run > 0 => {},
     240            0 :             _ = tokio::time::sleep(period), if error_run == 0 => {},
     241            0 :             _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
     242            0 :             _ = cancel.cancelled() => return,
     243              :         }
     244              : 
     245              :         // Run compaction.
     246            0 :         let iteration = Iteration {
     247            0 :             started_at: Instant::now(),
     248            0 :             period,
     249            0 :             kind: BackgroundLoopKind::Compaction,
     250            0 :         };
     251            0 :         let IterationResult { output, elapsed } = iteration
     252            0 :             .run(tenant.compaction_iteration(&cancel, &ctx))
     253            0 :             .await;
     254              : 
     255            0 :         match output {
     256            0 :             Ok(outcome) => {
     257            0 :                 error_run = 0;
     258            0 :                 // If there's more compaction work, L0 or not, schedule an immediate run.
     259            0 :                 match outcome {
     260            0 :                     CompactionOutcome::Done => {}
     261            0 :                     CompactionOutcome::Skipped => {}
     262            0 :                     CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
     263            0 :                     CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
     264              :                 }
     265              :             }
     266              : 
     267            0 :             Err(err) => {
     268            0 :                 error_run += 1;
     269            0 :                 let backoff =
     270            0 :                     exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     271            0 :                 log_compaction_error(&err, Some((error_run, backoff)), cancel.is_cancelled());
     272            0 :                 continue;
     273              :             }
     274              :         }
     275              : 
     276              :         // NB: this log entry is recorded by performance tests.
     277            0 :         debug!(
     278            0 :             elapsed_ms = elapsed.as_millis(),
     279            0 :             "compaction iteration complete"
     280              :         );
     281              :     }
     282            0 : }
     283              : 
     284            0 : pub(crate) fn log_compaction_error(
     285            0 :     err: &CompactionError,
     286            0 :     retry_info: Option<(u32, Duration)>,
     287            0 :     task_cancelled: bool,
     288            0 : ) {
     289              :     use CompactionError::*;
     290              : 
     291              :     use crate::tenant::PageReconstructError;
     292              :     use crate::tenant::upload_queue::NotInitialized;
     293              : 
     294            0 :     let level = match err {
     295            0 :         e if e.is_cancel() => return,
     296            0 :         ShuttingDown => return,
     297            0 :         Offload(_) => Level::ERROR,
     298            0 :         AlreadyRunning(_) => Level::ERROR,
     299            0 :         CollectKeySpaceError(_) => Level::ERROR,
     300            0 :         _ if task_cancelled => Level::INFO,
     301            0 :         Other(err) => {
     302            0 :             let root_cause = err.root_cause();
     303            0 : 
     304            0 :             let upload_queue = root_cause
     305            0 :                 .downcast_ref::<NotInitialized>()
     306            0 :                 .is_some_and(|e| e.is_stopping());
     307            0 :             let timeline = root_cause
     308            0 :                 .downcast_ref::<PageReconstructError>()
     309            0 :                 .is_some_and(|e| e.is_stopping());
     310            0 :             let is_stopping = upload_queue || timeline;
     311              : 
     312            0 :             if is_stopping {
     313            0 :                 Level::INFO
     314              :             } else {
     315            0 :                 Level::ERROR
     316              :             }
     317              :         }
     318              :     };
     319              : 
     320            0 :     if let Some((error_count, sleep_duration)) = retry_info {
     321            0 :         match level {
     322              :             Level::ERROR => {
     323            0 :                 error!(
     324            0 :                     "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
     325              :                 )
     326              :             }
     327              :             Level::INFO => {
     328            0 :                 info!(
     329            0 :                     "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
     330              :                 )
     331              :             }
     332            0 :             level => unimplemented!("unexpected level {level:?}"),
     333              :         }
     334              :     } else {
     335            0 :         match level {
     336            0 :             Level::ERROR => error!("Compaction failed: {err:#}"),
     337            0 :             Level::INFO => info!("Compaction failed: {err:#}"),
     338            0 :             level => unimplemented!("unexpected level {level:?}"),
     339              :         }
     340              :     }
     341            0 : }
     342              : 
     343              : /// GC task's main loop.
     344            0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     345              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     346            0 :     let mut error_run = 0; // consecutive errors
     347            0 : 
     348            0 :     // GC might require downloading, to find the cutoff LSN that corresponds to the
     349            0 :     // cutoff specified as time.
     350            0 :     let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     351            0 :     let mut first = true;
     352              : 
     353              :     loop {
     354            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     355            0 :             return;
     356            0 :         }
     357            0 : 
     358            0 :         let period = tenant.get_gc_period();
     359            0 : 
     360            0 :         if first {
     361            0 :             first = false;
     362            0 :             if sleep_random(period, &cancel).await.is_err() {
     363            0 :                 break;
     364            0 :             }
     365            0 :         }
     366              : 
     367            0 :         let gc_horizon = tenant.get_gc_horizon();
     368            0 :         let sleep_duration;
     369            0 :         if period == Duration::ZERO || gc_horizon == 0 {
     370            0 :             #[cfg(not(feature = "testing"))]
     371            0 :             info!("automatic GC is disabled");
     372            0 :             // check again in 10 seconds, in case it's been enabled again.
     373            0 :             sleep_duration = Duration::from_secs(10);
     374            0 :         } else {
     375            0 :             let iteration = Iteration {
     376            0 :                 started_at: Instant::now(),
     377            0 :                 period,
     378            0 :                 kind: BackgroundLoopKind::Gc,
     379            0 :             };
     380              :             // Run gc
     381            0 :             let IterationResult { output, elapsed: _ } = iteration
     382            0 :                 .run(tenant.gc_iteration(
     383            0 :                     None,
     384            0 :                     gc_horizon,
     385            0 :                     tenant.get_pitr_interval(),
     386            0 :                     &cancel,
     387            0 :                     &ctx,
     388            0 :                 ))
     389            0 :                 .await;
     390            0 :             match output {
     391            0 :                 Ok(_) => {
     392            0 :                     error_run = 0;
     393            0 :                     sleep_duration = period;
     394            0 :                 }
     395              :                 Err(crate::tenant::GcError::TenantCancelled) => {
     396            0 :                     return;
     397              :                 }
     398            0 :                 Err(e) => {
     399            0 :                     error_run += 1;
     400            0 :                     let wait_duration =
     401            0 :                         exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
     402              : 
     403            0 :                     if matches!(e, crate::tenant::GcError::TimelineCancelled) {
     404              :                         // Timeline was cancelled during gc. We might either be in an event
     405              :                         // that affects the entire tenant (tenant deletion, pageserver shutdown),
     406              :                         // or in one that affects the timeline only (timeline deletion).
     407              :                         // Therefore, don't exit the loop.
     408            0 :                         info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     409              :                     } else {
     410            0 :                         error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     411              :                     }
     412              : 
     413            0 :                     sleep_duration = wait_duration;
     414              :                 }
     415              :             }
     416              :         };
     417              : 
     418            0 :         if tokio::time::timeout(sleep_duration, cancel.cancelled())
     419            0 :             .await
     420            0 :             .is_ok()
     421              :         {
     422            0 :             break;
     423            0 :         }
     424              :     }
     425            0 : }
     426              : 
     427              : /// Tenant housekeeping's main loop.
     428            0 : async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     429            0 :     let mut last_throttle_flag_reset_at = Instant::now();
     430              :     loop {
     431            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     432            0 :             return;
     433            0 :         }
     434              : 
     435              :         // Use the same period as compaction; it's not worth a separate setting. But if it's set to
     436              :         // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
     437            0 :         let period = match tenant.get_compaction_period() {
     438            0 :             Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
     439            0 :             period => period,
     440              :         };
     441              : 
     442            0 :         let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
     443            0 :             break;
     444              :         };
     445              : 
     446              :         // Do tenant housekeeping.
     447            0 :         let iteration = Iteration {
     448            0 :             started_at: Instant::now(),
     449            0 :             period,
     450            0 :             kind: BackgroundLoopKind::TenantHouseKeeping,
     451            0 :         };
     452            0 :         iteration.run(tenant.housekeeping()).await;
     453              : 
     454              :         // Log any getpage throttling.
     455            0 :         info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
     456            0 :             let now = Instant::now();
     457            0 :             let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
     458            0 :             let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
     459            0 :             if count_throttled == 0 {
     460            0 :                 return;
     461            0 :             }
     462            0 :             let allowed_rps = tenant.pagestream_throttle.steady_rps();
     463            0 :             let delta = now - prev;
     464            0 :             info!(
     465            0 :                 n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
     466              :                 count_accounted = count_accounted_finish,  // don't break existing log scraping
     467              :                 count_throttled,
     468              :                 sum_throttled_usecs,
     469              :                 count_accounted_start, // log after pre-existing fields to not break existing log scraping
     470            0 :                 allowed_rps=%format_args!("{allowed_rps:.0}"),
     471            0 :                 "shard was throttled in the last n_seconds"
     472              :             );
     473            0 :         });
     474              :     }
     475            0 : }
     476              : 
     477              : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
     478            0 : async fn wait_for_active_tenant(
     479            0 :     tenant: &Arc<Tenant>,
     480            0 :     cancel: &CancellationToken,
     481            0 : ) -> ControlFlow<()> {
     482            0 :     if tenant.current_state() == TenantState::Active {
     483            0 :         return ControlFlow::Continue(());
     484            0 :     }
     485            0 : 
     486            0 :     let mut update_rx = tenant.subscribe_for_state_updates();
     487            0 :     tokio::select! {
     488            0 :         result = update_rx.wait_for(|s| s == &TenantState::Active) => {
     489            0 :             if result.is_err() {
     490            0 :                 return ControlFlow::Break(());
     491            0 :             }
     492            0 :             debug!("Tenant state changed to active, continuing the task loop");
     493            0 :             ControlFlow::Continue(())
     494              :         },
     495            0 :         _ = cancel.cancelled() => ControlFlow::Break(()),
     496              :     }
     497            0 : }
     498              : 
     499              : #[derive(thiserror::Error, Debug)]
     500              : #[error("cancelled")]
     501              : pub(crate) struct Cancelled;
     502              : 
     503              : /// Sleeps for a random interval up to the given max value.
     504              : ///
     505              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     506              : /// different periods for more stable load.
     507            0 : pub(crate) async fn sleep_random(
     508            0 :     max: Duration,
     509            0 :     cancel: &CancellationToken,
     510            0 : ) -> Result<Duration, Cancelled> {
     511            0 :     sleep_random_range(Duration::ZERO..=max, cancel).await
     512            0 : }
     513              : 
     514              : /// Sleeps for a random interval in the given range. Returns the duration.
     515            0 : pub(crate) async fn sleep_random_range(
     516            0 :     interval: RangeInclusive<Duration>,
     517            0 :     cancel: &CancellationToken,
     518            0 : ) -> Result<Duration, Cancelled> {
     519            0 :     let delay = rand::thread_rng().gen_range(interval);
     520            0 :     if delay == Duration::ZERO {
     521            0 :         return Ok(delay);
     522            0 :     }
     523            0 :     tokio::select! {
     524            0 :         _ = cancel.cancelled() => Err(Cancelled),
     525            0 :         _ = tokio::time::sleep(delay) => Ok(delay),
     526              :     }
     527            0 : }
     528              : 
     529              : /// Sleeps for an interval with a random jitter.
     530            0 : pub(crate) async fn sleep_jitter(
     531            0 :     duration: Duration,
     532            0 :     jitter: Duration,
     533            0 :     cancel: &CancellationToken,
     534            0 : ) -> Result<Duration, Cancelled> {
     535            0 :     let from = duration.saturating_sub(jitter);
     536            0 :     let to = duration.saturating_add(jitter);
     537            0 :     sleep_random_range(from..=to, cancel).await
     538            0 : }
     539              : 
     540              : struct Iteration {
     541              :     started_at: Instant,
     542              :     period: Duration,
     543              :     kind: BackgroundLoopKind,
     544              : }
     545              : 
     546              : struct IterationResult<O> {
     547              :     output: O,
     548              :     elapsed: Duration,
     549              : }
     550              : 
     551              : impl Iteration {
     552              :     #[instrument(skip_all)]
     553              :     pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
     554              :         let mut fut = pin!(fut);
     555              : 
     556              :         // Wrap `fut` into a future that logs a message every `period` so that we get a
     557              :         // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
     558              :         let output = loop {
     559              :             match tokio::time::timeout(self.period, &mut fut).await {
     560              :                 Ok(r) => break r,
     561              :                 Err(_) => info!("still running"),
     562              :             }
     563              :         };
     564              :         let elapsed = self.started_at.elapsed();
     565              :         warn_when_period_overrun(elapsed, self.period, self.kind);
     566              : 
     567              :         IterationResult { output, elapsed }
     568              :     }
     569              : }
     570              : 
     571              : // NB: the `task` and `period` are used for metrics labels.
     572            0 : pub(crate) fn warn_when_period_overrun(
     573            0 :     elapsed: Duration,
     574            0 :     period: Duration,
     575            0 :     task: BackgroundLoopKind,
     576            0 : ) {
     577            0 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     578            0 :     if elapsed >= period && period != Duration::ZERO {
     579              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     580              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     581              :         // though there's no way to output the actual config value.
     582            0 :         info!(
     583              :             ?elapsed,
     584            0 :             period = %humantime::format_duration(period),
     585            0 :             ?task,
     586            0 :             "task iteration took longer than the configured period"
     587              :         );
     588            0 :         metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     589            0 :             .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
     590            0 :             .inc();
     591            0 :     }
     592            0 : }
        

Generated by: LCOV version 2.1-beta