LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 7.2 % 359 26
Test Date: 2025-02-20 13:11:02 Functions: 11.1 % 27 3

            Line data    Source code
       1              : //! This module contains per-tenant background processes, e.g. compaction and GC.
       2              : 
       3              : use std::cmp::max;
       4              : use std::future::Future;
       5              : use std::ops::{ControlFlow, RangeInclusive};
       6              : use std::pin::pin;
       7              : use std::sync::Arc;
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use once_cell::sync::Lazy;
      11              : use rand::Rng;
      12              : use scopeguard::defer;
      13              : use tokio::sync::{Semaphore, SemaphorePermit};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::*;
      16              : 
      17              : use crate::context::{DownloadBehavior, RequestContext};
      18              : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
      19              : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS};
      20              : use crate::tenant::throttle::Stats;
      21              : use crate::tenant::timeline::compaction::CompactionOutcome;
      22              : use crate::tenant::timeline::CompactionError;
      23              : use crate::tenant::{Tenant, TenantState};
      24              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
      25              : use utils::backoff::exponential_backoff_duration;
      26              : use utils::completion::Barrier;
      27              : use utils::pausable_failpoint;
      28              : 
      29              : /// Semaphore limiting concurrent background tasks (across all tenants).
      30              : ///
      31              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
      32           40 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      33           40 :     let total_threads = TOKIO_WORKER_THREADS.get();
      34           40 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      35           40 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      36           40 :     assert!(permits < total_threads, "need threads for other work");
      37           40 :     Semaphore::new(permits)
      38           40 : });
      39              : 
      40              : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
      41              : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
      42              : ///
      43              : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
      44              : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
      45              : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
      46              : ///
      47              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
      48              : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
      49              : /// thread pool.
      50            0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      51            0 :     let total_threads = TOKIO_WORKER_THREADS.get();
      52            0 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      53            0 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      54            0 :     assert!(permits < total_threads, "need threads for other work");
      55            0 :     Semaphore::new(permits)
      56            0 : });
      57              : 
      58              : /// Background jobs.
      59              : ///
      60              : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
      61              : /// do any significant IO or CPU work.
      62              : #[derive(
      63              :     Debug,
      64              :     PartialEq,
      65              :     Eq,
      66              :     Clone,
      67              :     Copy,
      68              :     strum_macros::IntoStaticStr,
      69              :     strum_macros::Display,
      70              :     enum_map::Enum,
      71              : )]
      72              : #[strum(serialize_all = "snake_case")]
      73              : pub(crate) enum BackgroundLoopKind {
      74              :     /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
      75              :     /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
      76              :     L0Compaction,
      77              :     Compaction,
      78              :     Gc,
      79              :     Eviction,
      80              :     TenantHouseKeeping,
      81              :     ConsumptionMetricsCollectMetrics,
      82              :     ConsumptionMetricsSyntheticSizeWorker,
      83              :     InitialLogicalSizeCalculation,
      84              :     HeatmapUpload,
      85              :     SecondaryDownload,
      86              : }
      87              : 
      88              : pub struct BackgroundLoopSemaphorePermit<'a> {
      89              :     _permit: SemaphorePermit<'static>,
      90              :     _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
      91              : }
      92              : 
      93              : /// Acquires a semaphore permit, to limit concurrent background jobs.
      94          728 : pub(crate) async fn acquire_concurrency_permit(
      95          728 :     loop_kind: BackgroundLoopKind,
      96          728 :     _ctx: &RequestContext,
      97          728 : ) -> BackgroundLoopSemaphorePermit<'static> {
      98          728 :     let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
      99          728 : 
     100          728 :     if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
     101            0 :         pausable_failpoint!("initial-size-calculation-permit-pause");
     102          728 :     }
     103              : 
     104              :     // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
     105          728 :     let semaphore = match loop_kind {
     106            0 :         BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
     107          728 :         _ => &CONCURRENT_BACKGROUND_TASKS,
     108              :     };
     109          728 :     let permit = semaphore.acquire().await.expect("should never close");
     110          728 : 
     111          728 :     recorder.acquired();
     112          728 : 
     113          728 :     BackgroundLoopSemaphorePermit {
     114          728 :         _permit: permit,
     115          728 :         _recorder: recorder,
     116          728 :     }
     117          728 : }
     118              : 
     119              : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
     120            0 : pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
     121            0 :     let tenant_shard_id = tenant.tenant_shard_id;
     122            0 : 
     123            0 :     task_mgr::spawn(
     124            0 :         BACKGROUND_RUNTIME.handle(),
     125            0 :         TaskKind::Compaction,
     126            0 :         tenant_shard_id,
     127            0 :         None,
     128            0 :         &format!("compactor for tenant {tenant_shard_id}"),
     129            0 :         {
     130            0 :             let tenant = Arc::clone(tenant);
     131            0 :             let can_start = can_start.cloned();
     132            0 :             async move {
     133            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     134            0 :                 tokio::select! {
     135            0 :                     _ = cancel.cancelled() => return Ok(()),
     136            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     137            0 :                 };
     138            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     139            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     140            0 :                 compaction_loop(tenant, cancel)
     141            0 :                     // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
     142            0 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     143            0 :                     .await;
     144            0 :                 Ok(())
     145            0 :             }
     146            0 :         },
     147            0 :     );
     148            0 : 
     149            0 :     task_mgr::spawn(
     150            0 :         BACKGROUND_RUNTIME.handle(),
     151            0 :         TaskKind::GarbageCollector,
     152            0 :         tenant_shard_id,
     153            0 :         None,
     154            0 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     155            0 :         {
     156            0 :             let tenant = Arc::clone(tenant);
     157            0 :             let can_start = can_start.cloned();
     158            0 :             async move {
     159            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     160            0 :                 tokio::select! {
     161            0 :                     _ = cancel.cancelled() => return Ok(()),
     162            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     163            0 :                 };
     164            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     165            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     166            0 :                 gc_loop(tenant, cancel)
     167            0 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     168            0 :                     .await;
     169            0 :                 Ok(())
     170            0 :             }
     171            0 :         },
     172            0 :     );
     173            0 : 
     174            0 :     task_mgr::spawn(
     175            0 :         BACKGROUND_RUNTIME.handle(),
     176            0 :         TaskKind::TenantHousekeeping,
     177            0 :         tenant_shard_id,
     178            0 :         None,
     179            0 :         &format!("housekeeping for tenant {tenant_shard_id}"),
     180            0 :         {
     181            0 :             let tenant = Arc::clone(tenant);
     182            0 :             let can_start = can_start.cloned();
     183            0 :             async move {
     184            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     185            0 :                 tokio::select! {
     186            0 :                     _ = cancel.cancelled() => return Ok(()),
     187            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     188            0 :                 };
     189            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     190            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     191            0 :                 tenant_housekeeping_loop(tenant, cancel)
     192            0 :                     .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     193            0 :                     .await;
     194            0 :                 Ok(())
     195            0 :             }
     196            0 :         },
     197            0 :     );
     198            0 : }
     199              : 
     200              : /// Compaction task's main loop.
     201            0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     202              :     const BASE_BACKOFF_SECS: f64 = 1.0;
     203              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     204              :     const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
     205              : 
     206            0 :     let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     207            0 :     let mut period = tenant.get_compaction_period();
     208            0 :     let mut error_run = 0; // consecutive errors
     209            0 : 
     210            0 :     // Stagger the compaction loop across tenants.
     211            0 :     if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     212            0 :         return;
     213            0 :     }
     214            0 :     if sleep_random(period, &cancel).await.is_err() {
     215            0 :         return;
     216            0 :     }
     217              : 
     218              :     loop {
     219              :         // Recheck that we're still active.
     220            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     221            0 :             return;
     222            0 :         }
     223            0 : 
     224            0 :         // Refresh the period. If compaction is disabled, check again in a bit.
     225            0 :         period = tenant.get_compaction_period();
     226            0 :         if period == Duration::ZERO {
     227              :             #[cfg(not(feature = "testing"))]
     228              :             info!("automatic compaction is disabled");
     229            0 :             tokio::select! {
     230            0 :                 _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
     231            0 :                 _ = cancel.cancelled() => return,
     232              :             }
     233            0 :             continue;
     234            0 :         }
     235            0 : 
     236            0 :         // Wait for the next compaction run.
     237            0 :         let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     238            0 :         tokio::select! {
     239            0 :             _ = tokio::time::sleep(backoff), if error_run > 0 => {},
     240            0 :             _ = tokio::time::sleep(period), if error_run == 0 => {},
     241            0 :             _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
     242            0 :             _ = cancel.cancelled() => return,
     243              :         }
     244              : 
     245              :         // Run compaction.
     246            0 :         let iteration = Iteration {
     247            0 :             started_at: Instant::now(),
     248            0 :             period,
     249            0 :             kind: BackgroundLoopKind::Compaction,
     250            0 :         };
     251            0 :         let IterationResult { output, elapsed } = iteration
     252            0 :             .run(tenant.compaction_iteration(&cancel, &ctx))
     253            0 :             .await;
     254              : 
     255            0 :         match output {
     256            0 :             Ok(outcome) => {
     257            0 :                 error_run = 0;
     258            0 :                 // If there's more compaction work, L0 or not, schedule an immediate run.
     259            0 :                 match outcome {
     260            0 :                     CompactionOutcome::Done => {}
     261            0 :                     CompactionOutcome::Skipped => {}
     262            0 :                     CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
     263            0 :                     CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
     264              :                 }
     265              :             }
     266              : 
     267            0 :             Err(err) => {
     268            0 :                 error_run += 1;
     269            0 :                 let backoff =
     270            0 :                     exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     271            0 :                 log_compaction_error(&err, error_run, backoff, cancel.is_cancelled());
     272            0 :                 continue;
     273              :             }
     274              :         }
     275              : 
     276              :         // NB: this log entry is recorded by performance tests.
     277            0 :         debug!(
     278            0 :             elapsed_ms = elapsed.as_millis(),
     279            0 :             "compaction iteration complete"
     280              :         );
     281              :     }
     282            0 : }
     283              : 
     284            0 : fn log_compaction_error(
     285            0 :     err: &CompactionError,
     286            0 :     error_count: u32,
     287            0 :     sleep_duration: Duration,
     288            0 :     task_cancelled: bool,
     289            0 : ) {
     290              :     use crate::pgdatadir_mapping::CollectKeySpaceError;
     291              :     use crate::tenant::upload_queue::NotInitialized;
     292              :     use crate::tenant::PageReconstructError;
     293              :     use CompactionError::*;
     294              : 
     295            0 :     let level = match err {
     296            0 :         ShuttingDown => return,
     297            0 :         Offload(_) => Level::ERROR,
     298            0 :         CollectKeySpaceError(CollectKeySpaceError::Cancelled) => Level::INFO,
     299            0 :         CollectKeySpaceError(_) => Level::ERROR,
     300            0 :         _ if task_cancelled => Level::INFO,
     301            0 :         Other(err) => {
     302            0 :             let root_cause = err.root_cause();
     303            0 : 
     304            0 :             let upload_queue = root_cause
     305            0 :                 .downcast_ref::<NotInitialized>()
     306            0 :                 .is_some_and(|e| e.is_stopping());
     307            0 :             let timeline = root_cause
     308            0 :                 .downcast_ref::<PageReconstructError>()
     309            0 :                 .is_some_and(|e| e.is_stopping());
     310            0 :             let is_stopping = upload_queue || timeline;
     311              : 
     312            0 :             if is_stopping {
     313            0 :                 Level::INFO
     314              :             } else {
     315            0 :                 Level::ERROR
     316              :             }
     317              :         }
     318              :     };
     319              : 
     320            0 :     match level {
     321              :         Level::ERROR => {
     322            0 :             error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
     323              :         }
     324              :         Level::INFO => {
     325            0 :             info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
     326              :         }
     327            0 :         level => unimplemented!("unexpected level {level:?}"),
     328              :     }
     329            0 : }
     330              : 
     331              : /// GC task's main loop.
     332            0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     333              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     334            0 :     let mut error_run = 0; // consecutive errors
     335            0 : 
     336            0 :     // GC might require downloading, to find the cutoff LSN that corresponds to the
     337            0 :     // cutoff specified as time.
     338            0 :     let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     339            0 :     let mut first = true;
     340              : 
     341              :     loop {
     342            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     343            0 :             return;
     344            0 :         }
     345            0 : 
     346            0 :         let period = tenant.get_gc_period();
     347            0 : 
     348            0 :         if first {
     349            0 :             first = false;
     350            0 :             if sleep_random(period, &cancel).await.is_err() {
     351            0 :                 break;
     352            0 :             }
     353            0 :         }
     354              : 
     355            0 :         let gc_horizon = tenant.get_gc_horizon();
     356            0 :         let sleep_duration;
     357            0 :         if period == Duration::ZERO || gc_horizon == 0 {
     358            0 :             #[cfg(not(feature = "testing"))]
     359            0 :             info!("automatic GC is disabled");
     360            0 :             // check again in 10 seconds, in case it's been enabled again.
     361            0 :             sleep_duration = Duration::from_secs(10);
     362            0 :         } else {
     363            0 :             let iteration = Iteration {
     364            0 :                 started_at: Instant::now(),
     365            0 :                 period,
     366            0 :                 kind: BackgroundLoopKind::Gc,
     367            0 :             };
     368              :             // Run gc
     369            0 :             let IterationResult { output, elapsed: _ } = iteration
     370            0 :                 .run(tenant.gc_iteration(
     371            0 :                     None,
     372            0 :                     gc_horizon,
     373            0 :                     tenant.get_pitr_interval(),
     374            0 :                     &cancel,
     375            0 :                     &ctx,
     376            0 :                 ))
     377            0 :                 .await;
     378            0 :             match output {
     379            0 :                 Ok(_) => {
     380            0 :                     error_run = 0;
     381            0 :                     sleep_duration = period;
     382            0 :                 }
     383              :                 Err(crate::tenant::GcError::TenantCancelled) => {
     384            0 :                     return;
     385              :                 }
     386            0 :                 Err(e) => {
     387            0 :                     error_run += 1;
     388            0 :                     let wait_duration =
     389            0 :                         exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
     390              : 
     391            0 :                     if matches!(e, crate::tenant::GcError::TimelineCancelled) {
     392              :                         // Timeline was cancelled during gc. We might either be in an event
     393              :                         // that affects the entire tenant (tenant deletion, pageserver shutdown),
     394              :                         // or in one that affects the timeline only (timeline deletion).
     395              :                         // Therefore, don't exit the loop.
     396            0 :                         info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     397              :                     } else {
     398            0 :                         error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     399              :                     }
     400              : 
     401            0 :                     sleep_duration = wait_duration;
     402              :                 }
     403              :             }
     404              :         };
     405              : 
     406            0 :         if tokio::time::timeout(sleep_duration, cancel.cancelled())
     407            0 :             .await
     408            0 :             .is_ok()
     409              :         {
     410            0 :             break;
     411            0 :         }
     412              :     }
     413            0 : }
     414              : 
     415              : /// Tenant housekeeping's main loop.
     416            0 : async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     417            0 :     let mut last_throttle_flag_reset_at = Instant::now();
     418              :     loop {
     419            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     420            0 :             return;
     421            0 :         }
     422              : 
     423              :         // Use the same period as compaction; it's not worth a separate setting. But if it's set to
     424              :         // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
     425            0 :         let period = match tenant.get_compaction_period() {
     426            0 :             Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
     427            0 :             period => period,
     428              :         };
     429              : 
     430            0 :         let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
     431            0 :             break;
     432              :         };
     433              : 
     434              :         // Do tenant housekeeping.
     435            0 :         let iteration = Iteration {
     436            0 :             started_at: Instant::now(),
     437            0 :             period,
     438            0 :             kind: BackgroundLoopKind::TenantHouseKeeping,
     439            0 :         };
     440            0 :         iteration.run(tenant.housekeeping()).await;
     441              : 
     442              :         // Log any getpage throttling.
     443            0 :         info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
     444            0 :             let now = Instant::now();
     445            0 :             let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
     446            0 :             let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
     447            0 :             if count_throttled == 0 {
     448            0 :                 return;
     449            0 :             }
     450            0 :             let allowed_rps = tenant.pagestream_throttle.steady_rps();
     451            0 :             let delta = now - prev;
     452            0 :             info!(
     453            0 :                 n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
     454              :                 count_accounted = count_accounted_finish,  // don't break existing log scraping
     455              :                 count_throttled,
     456              :                 sum_throttled_usecs,
     457              :                 count_accounted_start, // log after pre-existing fields to not break existing log scraping
     458            0 :                 allowed_rps=%format_args!("{allowed_rps:.0}"),
     459            0 :                 "shard was throttled in the last n_seconds"
     460              :             );
     461            0 :         });
     462              :     }
     463            0 : }
     464              : 
     465              : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
     466            0 : async fn wait_for_active_tenant(
     467            0 :     tenant: &Arc<Tenant>,
     468            0 :     cancel: &CancellationToken,
     469            0 : ) -> ControlFlow<()> {
     470            0 :     if tenant.current_state() == TenantState::Active {
     471            0 :         return ControlFlow::Continue(());
     472            0 :     }
     473            0 : 
     474            0 :     let mut update_rx = tenant.subscribe_for_state_updates();
     475              :     loop {
     476            0 :         tokio::select! {
     477            0 :             _ = cancel.cancelled() => return ControlFlow::Break(()),
     478            0 :             result = update_rx.changed() => if result.is_err() {
     479            0 :                 return ControlFlow::Break(());
     480            0 :             }
     481            0 :         }
     482            0 : 
     483            0 :         match &*update_rx.borrow() {
     484              :             TenantState::Active => {
     485            0 :                 debug!("Tenant state changed to active, continuing the task loop");
     486            0 :                 return ControlFlow::Continue(());
     487              :             }
     488            0 :             state => debug!("Not running the task loop, tenant is not active: {state:?}"),
     489              :         }
     490              :     }
     491            0 : }
     492              : 
     493              : #[derive(thiserror::Error, Debug)]
     494              : #[error("cancelled")]
     495              : pub(crate) struct Cancelled;
     496              : 
     497              : /// Sleeps for a random interval up to the given max value.
     498              : ///
     499              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     500              : /// different periods for more stable load.
     501            0 : pub(crate) async fn sleep_random(
     502            0 :     max: Duration,
     503            0 :     cancel: &CancellationToken,
     504            0 : ) -> Result<Duration, Cancelled> {
     505            0 :     sleep_random_range(Duration::ZERO..=max, cancel).await
     506            0 : }
     507              : 
     508              : /// Sleeps for a random interval in the given range. Returns the duration.
     509            0 : pub(crate) async fn sleep_random_range(
     510            0 :     interval: RangeInclusive<Duration>,
     511            0 :     cancel: &CancellationToken,
     512            0 : ) -> Result<Duration, Cancelled> {
     513            0 :     let delay = rand::thread_rng().gen_range(interval);
     514            0 :     if delay == Duration::ZERO {
     515            0 :         return Ok(delay);
     516            0 :     }
     517            0 :     tokio::select! {
     518            0 :         _ = cancel.cancelled() => Err(Cancelled),
     519            0 :         _ = tokio::time::sleep(delay) => Ok(delay),
     520              :     }
     521            0 : }
     522              : 
     523              : /// Sleeps for an interval with a random jitter.
     524            0 : pub(crate) async fn sleep_jitter(
     525            0 :     duration: Duration,
     526            0 :     jitter: Duration,
     527            0 :     cancel: &CancellationToken,
     528            0 : ) -> Result<Duration, Cancelled> {
     529            0 :     let from = duration.saturating_sub(jitter);
     530            0 :     let to = duration.saturating_add(jitter);
     531            0 :     sleep_random_range(from..=to, cancel).await
     532            0 : }
     533              : 
     534              : struct Iteration {
     535              :     started_at: Instant,
     536              :     period: Duration,
     537              :     kind: BackgroundLoopKind,
     538              : }
     539              : 
     540              : struct IterationResult<O> {
     541              :     output: O,
     542              :     elapsed: Duration,
     543              : }
     544              : 
     545              : impl Iteration {
     546              :     #[instrument(skip_all)]
     547              :     pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
     548              :         let mut fut = pin!(fut);
     549              : 
     550              :         // Wrap `fut` into a future that logs a message every `period` so that we get a
     551              :         // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
     552              :         let output = loop {
     553              :             match tokio::time::timeout(self.period, &mut fut).await {
     554              :                 Ok(r) => break r,
     555              :                 Err(_) => info!("still running"),
     556              :             }
     557              :         };
     558              :         let elapsed = self.started_at.elapsed();
     559              :         warn_when_period_overrun(elapsed, self.period, self.kind);
     560              : 
     561              :         IterationResult { output, elapsed }
     562              :     }
     563              : }
     564              : 
     565              : // NB: the `task` and `period` are used for metrics labels.
     566            0 : pub(crate) fn warn_when_period_overrun(
     567            0 :     elapsed: Duration,
     568            0 :     period: Duration,
     569            0 :     task: BackgroundLoopKind,
     570            0 : ) {
     571            0 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     572            0 :     if elapsed >= period && period != Duration::ZERO {
     573              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     574              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     575              :         // though there's no way to output the actual config value.
     576            0 :         info!(
     577              :             ?elapsed,
     578            0 :             period = %humantime::format_duration(period),
     579            0 :             ?task,
     580            0 :             "task iteration took longer than the configured period"
     581              :         );
     582            0 :         metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     583            0 :             .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
     584            0 :             .inc();
     585            0 :     }
     586            0 : }
        

Generated by: LCOV version 2.1-beta