LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 7.3 % 357 26
Test Date: 2025-03-12 00:01:28 Functions: 10.7 % 28 3

            Line data    Source code
       1              : //! This module contains per-tenant background processes, e.g. compaction and GC.
       2              : 
       3              : use std::cmp::max;
       4              : use std::future::Future;
       5              : use std::ops::{ControlFlow, RangeInclusive};
       6              : use std::pin::pin;
       7              : use std::sync::Arc;
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use once_cell::sync::Lazy;
      11              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
      12              : use rand::Rng;
      13              : use scopeguard::defer;
      14              : use tokio::sync::{Semaphore, SemaphorePermit};
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::*;
      17              : use utils::backoff::exponential_backoff_duration;
      18              : use utils::completion::Barrier;
      19              : use utils::pausable_failpoint;
      20              : 
      21              : use crate::context::{DownloadBehavior, RequestContext};
      22              : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
      23              : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
      24              : use crate::tenant::throttle::Stats;
      25              : use crate::tenant::timeline::CompactionError;
      26              : use crate::tenant::timeline::compaction::CompactionOutcome;
      27              : use crate::tenant::{Tenant, TenantState};
      28              : 
      29              : /// Semaphore limiting concurrent background tasks (across all tenants).
      30              : ///
      31              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
      32           40 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      33           40 :     let total_threads = TOKIO_WORKER_THREADS.get();
      34           40 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      35           40 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      36           40 :     assert!(permits < total_threads, "need threads for other work");
      37           40 :     Semaphore::new(permits)
      38           40 : });
      39              : 
      40              : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
      41              : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
      42              : ///
      43              : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
      44              : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
      45              : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
      46              : ///
      47              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
      48              : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
      49              : /// thread pool.
      50            0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      51            0 :     let total_threads = TOKIO_WORKER_THREADS.get();
      52            0 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      53            0 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      54            0 :     assert!(permits < total_threads, "need threads for other work");
      55            0 :     Semaphore::new(permits)
      56            0 : });
      57              : 
      58              : /// Background jobs.
      59              : ///
      60              : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
      61              : /// do any significant IO or CPU work.
      62              : #[derive(
      63              :     Debug,
      64              :     PartialEq,
      65              :     Eq,
      66              :     Clone,
      67              :     Copy,
      68              :     strum_macros::IntoStaticStr,
      69              :     strum_macros::Display,
      70              :     enum_map::Enum,
      71              : )]
      72              : #[strum(serialize_all = "snake_case")]
      73              : pub(crate) enum BackgroundLoopKind {
      74              :     /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
      75              :     /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
      76              :     L0Compaction,
      77              :     Compaction,
      78              :     Gc,
      79              :     Eviction,
      80              :     TenantHouseKeeping,
      81              :     ConsumptionMetricsCollectMetrics,
      82              :     ConsumptionMetricsSyntheticSizeWorker,
      83              :     InitialLogicalSizeCalculation,
      84              :     HeatmapUpload,
      85              :     SecondaryDownload,
      86              : }
      87              : 
      88              : pub struct BackgroundLoopSemaphorePermit<'a> {
      89              :     _permit: SemaphorePermit<'static>,
      90              :     _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
      91              : }
      92              : 
      93              : /// Acquires a semaphore permit, to limit concurrent background jobs.
      94          728 : pub(crate) async fn acquire_concurrency_permit(
      95          728 :     loop_kind: BackgroundLoopKind,
      96          728 :     _ctx: &RequestContext,
      97          728 : ) -> BackgroundLoopSemaphorePermit<'static> {
      98          728 :     let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
      99          728 : 
     100          728 :     if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
     101            0 :         pausable_failpoint!("initial-size-calculation-permit-pause");
     102          728 :     }
     103              : 
     104              :     // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
     105          728 :     let semaphore = match loop_kind {
     106            0 :         BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
     107          728 :         _ => &CONCURRENT_BACKGROUND_TASKS,
     108              :     };
     109          728 :     let permit = semaphore.acquire().await.expect("should never close");
     110          728 : 
     111          728 :     recorder.acquired();
     112          728 : 
     113          728 :     BackgroundLoopSemaphorePermit {
     114          728 :         _permit: permit,
     115          728 :         _recorder: recorder,
     116          728 :     }
     117          728 : }
     118              : 
     119              : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
     120            0 : pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
     121            0 :     let tenant_shard_id = tenant.tenant_shard_id;
     122            0 : 
     123            0 :     task_mgr::spawn(
     124            0 :         BACKGROUND_RUNTIME.handle(),
     125            0 :         TaskKind::Compaction,
     126            0 :         tenant_shard_id,
     127            0 :         None,
     128            0 :         &format!("compactor for tenant {tenant_shard_id}"),
     129            0 :         {
     130            0 :             let tenant = Arc::clone(tenant);
     131            0 :             let can_start = can_start.cloned();
     132            0 :             async move {
     133            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     134            0 :                 tokio::select! {
     135            0 :                     _ = cancel.cancelled() => return Ok(()),
     136            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     137            0 :                 };
     138            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     139            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     140            0 :                 compaction_loop(tenant, cancel)
     141            0 :                     // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
     142            0 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     143            0 :                     .await;
     144            0 :                 Ok(())
     145            0 :             }
     146            0 :         },
     147            0 :     );
     148            0 : 
     149            0 :     task_mgr::spawn(
     150            0 :         BACKGROUND_RUNTIME.handle(),
     151            0 :         TaskKind::GarbageCollector,
     152            0 :         tenant_shard_id,
     153            0 :         None,
     154            0 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     155            0 :         {
     156            0 :             let tenant = Arc::clone(tenant);
     157            0 :             let can_start = can_start.cloned();
     158            0 :             async move {
     159            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     160            0 :                 tokio::select! {
     161            0 :                     _ = cancel.cancelled() => return Ok(()),
     162            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     163            0 :                 };
     164            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     165            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     166            0 :                 gc_loop(tenant, cancel)
     167            0 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     168            0 :                     .await;
     169            0 :                 Ok(())
     170            0 :             }
     171            0 :         },
     172            0 :     );
     173            0 : 
     174            0 :     task_mgr::spawn(
     175            0 :         BACKGROUND_RUNTIME.handle(),
     176            0 :         TaskKind::TenantHousekeeping,
     177            0 :         tenant_shard_id,
     178            0 :         None,
     179            0 :         &format!("housekeeping for tenant {tenant_shard_id}"),
     180            0 :         {
     181            0 :             let tenant = Arc::clone(tenant);
     182            0 :             let can_start = can_start.cloned();
     183            0 :             async move {
     184            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     185            0 :                 tokio::select! {
     186            0 :                     _ = cancel.cancelled() => return Ok(()),
     187            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     188            0 :                 };
     189            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     190            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     191            0 :                 tenant_housekeeping_loop(tenant, cancel)
     192            0 :                     .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     193            0 :                     .await;
     194            0 :                 Ok(())
     195            0 :             }
     196            0 :         },
     197            0 :     );
     198            0 : }
     199              : 
     200              : /// Compaction task's main loop.
     201            0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     202              :     const BASE_BACKOFF_SECS: f64 = 1.0;
     203              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     204              :     const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
     205              : 
     206            0 :     let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     207            0 :     let mut period = tenant.get_compaction_period();
     208            0 :     let mut error_run = 0; // consecutive errors
     209            0 : 
     210            0 :     // Stagger the compaction loop across tenants.
     211            0 :     if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     212            0 :         return;
     213            0 :     }
     214            0 :     if sleep_random(period, &cancel).await.is_err() {
     215            0 :         return;
     216            0 :     }
     217              : 
     218              :     loop {
     219              :         // Recheck that we're still active.
     220            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     221            0 :             return;
     222            0 :         }
     223            0 : 
     224            0 :         // Refresh the period. If compaction is disabled, check again in a bit.
     225            0 :         period = tenant.get_compaction_period();
     226            0 :         if period == Duration::ZERO {
     227              :             #[cfg(not(feature = "testing"))]
     228              :             info!("automatic compaction is disabled");
     229            0 :             tokio::select! {
     230            0 :                 _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
     231            0 :                 _ = cancel.cancelled() => return,
     232              :             }
     233            0 :             continue;
     234            0 :         }
     235            0 : 
     236            0 :         // Wait for the next compaction run.
     237            0 :         let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     238            0 :         tokio::select! {
     239            0 :             _ = tokio::time::sleep(backoff), if error_run > 0 => {},
     240            0 :             _ = tokio::time::sleep(period), if error_run == 0 => {},
     241            0 :             _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
     242            0 :             _ = cancel.cancelled() => return,
     243              :         }
     244              : 
     245              :         // Run compaction.
     246            0 :         let iteration = Iteration {
     247            0 :             started_at: Instant::now(),
     248            0 :             period,
     249            0 :             kind: BackgroundLoopKind::Compaction,
     250            0 :         };
     251            0 :         let IterationResult { output, elapsed } = iteration
     252            0 :             .run(tenant.compaction_iteration(&cancel, &ctx))
     253            0 :             .await;
     254              : 
     255            0 :         match output {
     256            0 :             Ok(outcome) => {
     257            0 :                 error_run = 0;
     258            0 :                 // If there's more compaction work, L0 or not, schedule an immediate run.
     259            0 :                 match outcome {
     260            0 :                     CompactionOutcome::Done => {}
     261            0 :                     CompactionOutcome::Skipped => {}
     262            0 :                     CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
     263            0 :                     CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
     264              :                 }
     265              :             }
     266              : 
     267            0 :             Err(err) => {
     268            0 :                 error_run += 1;
     269            0 :                 let backoff =
     270            0 :                     exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     271            0 :                 log_compaction_error(&err, error_run, backoff, cancel.is_cancelled());
     272            0 :                 continue;
     273              :             }
     274              :         }
     275              : 
     276              :         // NB: this log entry is recorded by performance tests.
     277            0 :         debug!(
     278            0 :             elapsed_ms = elapsed.as_millis(),
     279            0 :             "compaction iteration complete"
     280              :         );
     281              :     }
     282            0 : }
     283              : 
     284            0 : fn log_compaction_error(
     285            0 :     err: &CompactionError,
     286            0 :     error_count: u32,
     287            0 :     sleep_duration: Duration,
     288            0 :     task_cancelled: bool,
     289            0 : ) {
     290              :     use CompactionError::*;
     291              : 
     292              :     use crate::tenant::PageReconstructError;
     293              :     use crate::tenant::upload_queue::NotInitialized;
     294              : 
     295            0 :     let level = match err {
     296            0 :         e if e.is_cancel() => return,
     297            0 :         ShuttingDown => return,
     298            0 :         Offload(_) => Level::ERROR,
     299            0 :         AlreadyRunning(_) => Level::ERROR,
     300            0 :         CollectKeySpaceError(_) => Level::ERROR,
     301            0 :         _ if task_cancelled => Level::INFO,
     302            0 :         Other(err) => {
     303            0 :             let root_cause = err.root_cause();
     304            0 : 
     305            0 :             let upload_queue = root_cause
     306            0 :                 .downcast_ref::<NotInitialized>()
     307            0 :                 .is_some_and(|e| e.is_stopping());
     308            0 :             let timeline = root_cause
     309            0 :                 .downcast_ref::<PageReconstructError>()
     310            0 :                 .is_some_and(|e| e.is_stopping());
     311            0 :             let is_stopping = upload_queue || timeline;
     312              : 
     313            0 :             if is_stopping {
     314            0 :                 Level::INFO
     315              :             } else {
     316            0 :                 Level::ERROR
     317              :             }
     318              :         }
     319              :     };
     320              : 
     321            0 :     match level {
     322              :         Level::ERROR => {
     323            0 :             error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
     324              :         }
     325              :         Level::INFO => {
     326            0 :             info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
     327              :         }
     328            0 :         level => unimplemented!("unexpected level {level:?}"),
     329              :     }
     330            0 : }
     331              : 
     332              : /// GC task's main loop.
     333            0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     334              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     335            0 :     let mut error_run = 0; // consecutive errors
     336            0 : 
     337            0 :     // GC might require downloading, to find the cutoff LSN that corresponds to the
     338            0 :     // cutoff specified as time.
     339            0 :     let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     340            0 :     let mut first = true;
     341              : 
     342              :     loop {
     343            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     344            0 :             return;
     345            0 :         }
     346            0 : 
     347            0 :         let period = tenant.get_gc_period();
     348            0 : 
     349            0 :         if first {
     350            0 :             first = false;
     351            0 :             if sleep_random(period, &cancel).await.is_err() {
     352            0 :                 break;
     353            0 :             }
     354            0 :         }
     355              : 
     356            0 :         let gc_horizon = tenant.get_gc_horizon();
     357            0 :         let sleep_duration;
     358            0 :         if period == Duration::ZERO || gc_horizon == 0 {
     359            0 :             #[cfg(not(feature = "testing"))]
     360            0 :             info!("automatic GC is disabled");
     361            0 :             // check again in 10 seconds, in case it's been enabled again.
     362            0 :             sleep_duration = Duration::from_secs(10);
     363            0 :         } else {
     364            0 :             let iteration = Iteration {
     365            0 :                 started_at: Instant::now(),
     366            0 :                 period,
     367            0 :                 kind: BackgroundLoopKind::Gc,
     368            0 :             };
     369              :             // Run gc
     370            0 :             let IterationResult { output, elapsed: _ } = iteration
     371            0 :                 .run(tenant.gc_iteration(
     372            0 :                     None,
     373            0 :                     gc_horizon,
     374            0 :                     tenant.get_pitr_interval(),
     375            0 :                     &cancel,
     376            0 :                     &ctx,
     377            0 :                 ))
     378            0 :                 .await;
     379            0 :             match output {
     380            0 :                 Ok(_) => {
     381            0 :                     error_run = 0;
     382            0 :                     sleep_duration = period;
     383            0 :                 }
     384              :                 Err(crate::tenant::GcError::TenantCancelled) => {
     385            0 :                     return;
     386              :                 }
     387            0 :                 Err(e) => {
     388            0 :                     error_run += 1;
     389            0 :                     let wait_duration =
     390            0 :                         exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
     391              : 
     392            0 :                     if matches!(e, crate::tenant::GcError::TimelineCancelled) {
     393              :                         // Timeline was cancelled during gc. We might either be in an event
     394              :                         // that affects the entire tenant (tenant deletion, pageserver shutdown),
     395              :                         // or in one that affects the timeline only (timeline deletion).
     396              :                         // Therefore, don't exit the loop.
     397            0 :                         info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     398              :                     } else {
     399            0 :                         error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     400              :                     }
     401              : 
     402            0 :                     sleep_duration = wait_duration;
     403              :                 }
     404              :             }
     405              :         };
     406              : 
     407            0 :         if tokio::time::timeout(sleep_duration, cancel.cancelled())
     408            0 :             .await
     409            0 :             .is_ok()
     410              :         {
     411            0 :             break;
     412            0 :         }
     413              :     }
     414            0 : }
     415              : 
     416              : /// Tenant housekeeping's main loop.
     417            0 : async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     418            0 :     let mut last_throttle_flag_reset_at = Instant::now();
     419              :     loop {
     420            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     421            0 :             return;
     422            0 :         }
     423              : 
     424              :         // Use the same period as compaction; it's not worth a separate setting. But if it's set to
     425              :         // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
     426            0 :         let period = match tenant.get_compaction_period() {
     427            0 :             Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
     428            0 :             period => period,
     429              :         };
     430              : 
     431            0 :         let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
     432            0 :             break;
     433              :         };
     434              : 
     435              :         // Do tenant housekeeping.
     436            0 :         let iteration = Iteration {
     437            0 :             started_at: Instant::now(),
     438            0 :             period,
     439            0 :             kind: BackgroundLoopKind::TenantHouseKeeping,
     440            0 :         };
     441            0 :         iteration.run(tenant.housekeeping()).await;
     442              : 
     443              :         // Log any getpage throttling.
     444            0 :         info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
     445            0 :             let now = Instant::now();
     446            0 :             let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
     447            0 :             let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
     448            0 :             if count_throttled == 0 {
     449            0 :                 return;
     450            0 :             }
     451            0 :             let allowed_rps = tenant.pagestream_throttle.steady_rps();
     452            0 :             let delta = now - prev;
     453            0 :             info!(
     454            0 :                 n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
     455              :                 count_accounted = count_accounted_finish,  // don't break existing log scraping
     456              :                 count_throttled,
     457              :                 sum_throttled_usecs,
     458              :                 count_accounted_start, // log after pre-existing fields to not break existing log scraping
     459            0 :                 allowed_rps=%format_args!("{allowed_rps:.0}"),
     460            0 :                 "shard was throttled in the last n_seconds"
     461              :             );
     462            0 :         });
     463              :     }
     464            0 : }
     465              : 
     466              : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
     467            0 : async fn wait_for_active_tenant(
     468            0 :     tenant: &Arc<Tenant>,
     469            0 :     cancel: &CancellationToken,
     470            0 : ) -> ControlFlow<()> {
     471            0 :     if tenant.current_state() == TenantState::Active {
     472            0 :         return ControlFlow::Continue(());
     473            0 :     }
     474            0 : 
     475            0 :     let mut update_rx = tenant.subscribe_for_state_updates();
     476            0 :     tokio::select! {
     477            0 :         result = update_rx.wait_for(|s| s == &TenantState::Active) => {
     478            0 :             if result.is_err() {
     479            0 :                 return ControlFlow::Break(());
     480            0 :             }
     481            0 :             debug!("Tenant state changed to active, continuing the task loop");
     482            0 :             ControlFlow::Continue(())
     483              :         },
     484            0 :         _ = cancel.cancelled() => ControlFlow::Break(()),
     485              :     }
     486            0 : }
     487              : 
     488              : #[derive(thiserror::Error, Debug)]
     489              : #[error("cancelled")]
     490              : pub(crate) struct Cancelled;
     491              : 
     492              : /// Sleeps for a random interval up to the given max value.
     493              : ///
     494              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     495              : /// different periods for more stable load.
     496            0 : pub(crate) async fn sleep_random(
     497            0 :     max: Duration,
     498            0 :     cancel: &CancellationToken,
     499            0 : ) -> Result<Duration, Cancelled> {
     500            0 :     sleep_random_range(Duration::ZERO..=max, cancel).await
     501            0 : }
     502              : 
     503              : /// Sleeps for a random interval in the given range. Returns the duration.
     504            0 : pub(crate) async fn sleep_random_range(
     505            0 :     interval: RangeInclusive<Duration>,
     506            0 :     cancel: &CancellationToken,
     507            0 : ) -> Result<Duration, Cancelled> {
     508            0 :     let delay = rand::thread_rng().gen_range(interval);
     509            0 :     if delay == Duration::ZERO {
     510            0 :         return Ok(delay);
     511            0 :     }
     512            0 :     tokio::select! {
     513            0 :         _ = cancel.cancelled() => Err(Cancelled),
     514            0 :         _ = tokio::time::sleep(delay) => Ok(delay),
     515              :     }
     516            0 : }
     517              : 
     518              : /// Sleeps for an interval with a random jitter.
     519            0 : pub(crate) async fn sleep_jitter(
     520            0 :     duration: Duration,
     521            0 :     jitter: Duration,
     522            0 :     cancel: &CancellationToken,
     523            0 : ) -> Result<Duration, Cancelled> {
     524            0 :     let from = duration.saturating_sub(jitter);
     525            0 :     let to = duration.saturating_add(jitter);
     526            0 :     sleep_random_range(from..=to, cancel).await
     527            0 : }
     528              : 
     529              : struct Iteration {
     530              :     started_at: Instant,
     531              :     period: Duration,
     532              :     kind: BackgroundLoopKind,
     533              : }
     534              : 
     535              : struct IterationResult<O> {
     536              :     output: O,
     537              :     elapsed: Duration,
     538              : }
     539              : 
     540              : impl Iteration {
     541              :     #[instrument(skip_all)]
     542              :     pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
     543              :         let mut fut = pin!(fut);
     544              : 
     545              :         // Wrap `fut` into a future that logs a message every `period` so that we get a
     546              :         // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
     547              :         let output = loop {
     548              :             match tokio::time::timeout(self.period, &mut fut).await {
     549              :                 Ok(r) => break r,
     550              :                 Err(_) => info!("still running"),
     551              :             }
     552              :         };
     553              :         let elapsed = self.started_at.elapsed();
     554              :         warn_when_period_overrun(elapsed, self.period, self.kind);
     555              : 
     556              :         IterationResult { output, elapsed }
     557              :     }
     558              : }
     559              : 
     560              : // NB: the `task` and `period` are used for metrics labels.
     561            0 : pub(crate) fn warn_when_period_overrun(
     562            0 :     elapsed: Duration,
     563            0 :     period: Duration,
     564            0 :     task: BackgroundLoopKind,
     565            0 : ) {
     566            0 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     567            0 :     if elapsed >= period && period != Duration::ZERO {
     568              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     569              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     570              :         // though there's no way to output the actual config value.
     571            0 :         info!(
     572              :             ?elapsed,
     573            0 :             period = %humantime::format_duration(period),
     574            0 :             ?task,
     575            0 :             "task iteration took longer than the configured period"
     576              :         );
     577            0 :         metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     578            0 :             .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
     579            0 :             .inc();
     580            0 :     }
     581            0 : }
        

Generated by: LCOV version 2.1-beta