LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: 17080b14f46954d6812ea0a7dad4b2247e0840a8.info Lines: 6.7 % 343 23
Test Date: 2025-07-08 18:30:10 Functions: 9.7 % 31 3

            Line data    Source code
       1              : //! This module contains per-tenant background processes, e.g. compaction and GC.
       2              : 
       3              : use std::cmp::max;
       4              : use std::future::Future;
       5              : use std::ops::{ControlFlow, RangeInclusive};
       6              : use std::pin::pin;
       7              : use std::sync::Arc;
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use once_cell::sync::Lazy;
      11              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
      12              : use rand::Rng;
      13              : use scopeguard::defer;
      14              : use tokio::sync::{Semaphore, SemaphorePermit};
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::*;
      17              : use utils::backoff::exponential_backoff_duration;
      18              : use utils::completion::Barrier;
      19              : use utils::pausable_failpoint;
      20              : use utils::sync::gate::GateError;
      21              : 
      22              : use crate::context::{DownloadBehavior, RequestContext};
      23              : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
      24              : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
      25              : use crate::tenant::blob_io::WriteBlobError;
      26              : use crate::tenant::throttle::Stats;
      27              : use crate::tenant::timeline::CompactionError;
      28              : use crate::tenant::timeline::compaction::CompactionOutcome;
      29              : use crate::tenant::{TenantShard, TenantState};
      30              : use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
      31              : 
      32              : /// Semaphore limiting concurrent background tasks (across all tenants).
      33              : ///
      34              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
      35           10 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      36           10 :     let total_threads = TOKIO_WORKER_THREADS.get();
      37           10 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      38           10 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      39           10 :     assert!(permits < total_threads, "need threads for other work");
      40           10 :     Semaphore::new(permits)
      41           10 : });
      42              : 
      43              : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
      44              : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
      45              : ///
      46              : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
      47              : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
      48              : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
      49              : ///
      50              : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
      51              : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
      52              : /// thread pool.
      53            0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
      54            0 :     let total_threads = TOKIO_WORKER_THREADS.get();
      55            0 :     let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
      56            0 :     assert_ne!(permits, 0, "we will not be adding in permits later");
      57            0 :     assert!(permits < total_threads, "need threads for other work");
      58            0 :     Semaphore::new(permits)
      59            0 : });
      60              : 
      61              : /// Background jobs.
      62              : ///
      63              : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
      64              : /// do any significant IO or CPU work.
      65              : #[derive(
      66              :     Debug,
      67              :     PartialEq,
      68              :     Eq,
      69              :     Clone,
      70              :     Copy,
      71              :     strum_macros::IntoStaticStr,
      72              :     strum_macros::Display,
      73              :     enum_map::Enum,
      74              : )]
      75              : #[strum(serialize_all = "snake_case")]
      76              : pub(crate) enum BackgroundLoopKind {
      77              :     /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
      78              :     /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
      79              :     L0Compaction,
      80              :     Compaction,
      81              :     Gc,
      82              :     Eviction,
      83              :     TenantHouseKeeping,
      84              :     ConsumptionMetricsCollectMetrics,
      85              :     ConsumptionMetricsSyntheticSizeWorker,
      86              :     InitialLogicalSizeCalculation,
      87              :     HeatmapUpload,
      88              :     SecondaryDownload,
      89              : }
      90              : 
      91              : pub struct BackgroundLoopSemaphorePermit<'a> {
      92              :     _permit: SemaphorePermit<'static>,
      93              :     _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
      94              : }
      95              : 
      96              : /// Acquires a semaphore permit, to limit concurrent background jobs.
      97          192 : pub(crate) async fn acquire_concurrency_permit(
      98          192 :     loop_kind: BackgroundLoopKind,
      99          192 :     _ctx: &RequestContext,
     100          192 : ) -> BackgroundLoopSemaphorePermit<'static> {
     101          192 :     let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
     102              : 
     103          192 :     if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
     104            0 :         pausable_failpoint!("initial-size-calculation-permit-pause");
     105          192 :     }
     106              : 
     107              :     // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
     108          192 :     let semaphore = match loop_kind {
     109            0 :         BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
     110          192 :         _ => &CONCURRENT_BACKGROUND_TASKS,
     111              :     };
     112          192 :     let permit = semaphore.acquire().await.expect("should never close");
     113              : 
     114          192 :     recorder.acquired();
     115              : 
     116          192 :     BackgroundLoopSemaphorePermit {
     117          192 :         _permit: permit,
     118          192 :         _recorder: recorder,
     119          192 :     }
     120          192 : }
     121              : 
     122              : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
     123            0 : pub fn start_background_loops(tenant: &Arc<TenantShard>, can_start: Option<&Barrier>) {
     124            0 :     let tenant_shard_id = tenant.tenant_shard_id;
     125              : 
     126            0 :     task_mgr::spawn(
     127            0 :         BACKGROUND_RUNTIME.handle(),
     128            0 :         TaskKind::Compaction,
     129            0 :         tenant_shard_id,
     130            0 :         None,
     131            0 :         &format!("compactor for tenant {tenant_shard_id}"),
     132              :         {
     133            0 :             let tenant = Arc::clone(tenant);
     134            0 :             let can_start = can_start.cloned();
     135            0 :             async move {
     136            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     137            0 :                 tokio::select! {
     138            0 :                     _ = cancel.cancelled() => return Ok(()),
     139            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     140              :                 };
     141            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     142            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     143            0 :                 compaction_loop(tenant, cancel)
     144              :                     // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
     145            0 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     146            0 :                     .await;
     147            0 :                 Ok(())
     148            0 :             }
     149              :         },
     150              :     );
     151              : 
     152            0 :     task_mgr::spawn(
     153            0 :         BACKGROUND_RUNTIME.handle(),
     154            0 :         TaskKind::GarbageCollector,
     155            0 :         tenant_shard_id,
     156            0 :         None,
     157            0 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     158              :         {
     159            0 :             let tenant = Arc::clone(tenant);
     160            0 :             let can_start = can_start.cloned();
     161            0 :             async move {
     162            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     163            0 :                 tokio::select! {
     164            0 :                     _ = cancel.cancelled() => return Ok(()),
     165            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     166              :                 };
     167            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     168            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     169            0 :                 gc_loop(tenant, cancel)
     170            0 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     171            0 :                     .await;
     172            0 :                 Ok(())
     173            0 :             }
     174              :         },
     175              :     );
     176              : 
     177            0 :     task_mgr::spawn(
     178            0 :         BACKGROUND_RUNTIME.handle(),
     179            0 :         TaskKind::TenantHousekeeping,
     180            0 :         tenant_shard_id,
     181            0 :         None,
     182            0 :         &format!("housekeeping for tenant {tenant_shard_id}"),
     183              :         {
     184            0 :             let tenant = Arc::clone(tenant);
     185            0 :             let can_start = can_start.cloned();
     186            0 :             async move {
     187            0 :                 let cancel = task_mgr::shutdown_token(); // NB: must be in async context
     188            0 :                 tokio::select! {
     189            0 :                     _ = cancel.cancelled() => return Ok(()),
     190            0 :                     _ = Barrier::maybe_wait(can_start) => {}
     191              :                 };
     192            0 :                 TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     193            0 :                 defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
     194            0 :                 tenant_housekeeping_loop(tenant, cancel)
     195            0 :                     .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     196            0 :                     .await;
     197            0 :                 Ok(())
     198            0 :             }
     199              :         },
     200              :     );
     201            0 : }
     202              : 
     203              : /// Compaction task's main loop.
     204            0 : async fn compaction_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
     205              :     const BASE_BACKOFF_SECS: f64 = 1.0;
     206              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     207              :     const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
     208              : 
     209            0 :     let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     210            0 :     let mut period = tenant.get_compaction_period();
     211            0 :     let mut error_run = 0; // consecutive errors
     212              : 
     213              :     // Stagger the compaction loop across tenants.
     214            0 :     if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     215            0 :         return;
     216            0 :     }
     217            0 :     if sleep_random(period, &cancel).await.is_err() {
     218            0 :         return;
     219            0 :     }
     220              : 
     221              :     loop {
     222              :         // Recheck that we're still active.
     223            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     224            0 :             return;
     225            0 :         }
     226              : 
     227              :         // Refresh the period. If compaction is disabled, check again in a bit.
     228            0 :         period = tenant.get_compaction_period();
     229            0 :         if period == Duration::ZERO {
     230              :             #[cfg(not(feature = "testing"))]
     231              :             info!("automatic compaction is disabled");
     232            0 :             tokio::select! {
     233            0 :                 _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
     234            0 :                 _ = cancel.cancelled() => return,
     235              :             }
     236            0 :             continue;
     237            0 :         }
     238              : 
     239              :         // Wait for the next compaction run.
     240            0 :         let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     241            0 :         tokio::select! {
     242            0 :             _ = tokio::time::sleep(backoff), if error_run > 0 => {},
     243            0 :             _ = tokio::time::sleep(period), if error_run == 0 => {},
     244            0 :             _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
     245            0 :             _ = cancel.cancelled() => return,
     246              :         }
     247              : 
     248              :         // Run compaction.
     249            0 :         let iteration = Iteration {
     250            0 :             started_at: Instant::now(),
     251            0 :             period,
     252            0 :             kind: BackgroundLoopKind::Compaction,
     253            0 :         };
     254            0 :         let IterationResult { output, elapsed } = iteration
     255            0 :             .run(tenant.compaction_iteration(&cancel, &ctx))
     256            0 :             .await;
     257              : 
     258            0 :         match output {
     259            0 :             Ok(outcome) => {
     260            0 :                 error_run = 0;
     261              :                 // If there's more compaction work, L0 or not, schedule an immediate run.
     262            0 :                 match outcome {
     263            0 :                     CompactionOutcome::Done => {}
     264            0 :                     CompactionOutcome::Skipped => {}
     265            0 :                     CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
     266            0 :                     CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
     267              :                 }
     268              :             }
     269              : 
     270            0 :             Err(err) => {
     271            0 :                 error_run += 1;
     272            0 :                 let backoff =
     273            0 :                     exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
     274            0 :                 log_compaction_error(
     275            0 :                     &err,
     276            0 :                     Some((error_run, backoff)),
     277            0 :                     cancel.is_cancelled(),
     278              :                     false,
     279              :                 );
     280            0 :                 continue;
     281              :             }
     282              :         }
     283              : 
     284              :         // NB: this log entry is recorded by performance tests.
     285            0 :         debug!(
     286            0 :             elapsed_ms = elapsed.as_millis(),
     287            0 :             "compaction iteration complete"
     288              :         );
     289              :     }
     290            0 : }
     291              : 
     292            0 : pub(crate) fn log_compaction_error(
     293            0 :     err: &CompactionError,
     294            0 :     retry_info: Option<(u32, Duration)>,
     295            0 :     task_cancelled: bool,
     296            0 :     degrade_to_warning: bool,
     297            0 : ) {
     298              :     use CompactionError::*;
     299              : 
     300              :     use crate::tenant::PageReconstructError;
     301              :     use crate::tenant::upload_queue::NotInitialized;
     302              : 
     303            0 :     let level = match err {
     304            0 :         e if e.is_cancel() => return,
     305            0 :         ShuttingDown => return,
     306            0 :         CollectKeySpaceError(_) => Level::ERROR,
     307            0 :         _ if task_cancelled => Level::INFO,
     308            0 :         Other(err) => {
     309            0 :             let root_cause = err.root_cause();
     310              : 
     311            0 :             let upload_queue = root_cause
     312            0 :                 .downcast_ref::<NotInitialized>()
     313            0 :                 .is_some_and(|e| e.is_stopping());
     314            0 :             let timeline = root_cause
     315            0 :                 .downcast_ref::<PageReconstructError>()
     316            0 :                 .is_some_and(|e| e.is_stopping());
     317            0 :             let buffered_writer_flush_task_canelled = root_cause
     318            0 :                 .downcast_ref::<FlushTaskError>()
     319            0 :                 .is_some_and(|e| e.is_cancel());
     320            0 :             let write_blob_cancelled = root_cause
     321            0 :                 .downcast_ref::<WriteBlobError>()
     322            0 :                 .is_some_and(|e| e.is_cancel());
     323            0 :             let gate_closed = root_cause
     324            0 :                 .downcast_ref::<GateError>()
     325            0 :                 .is_some_and(|e| e.is_cancel());
     326            0 :             let is_stopping = upload_queue
     327            0 :                 || timeline
     328            0 :                 || buffered_writer_flush_task_canelled
     329            0 :                 || write_blob_cancelled
     330            0 :                 || gate_closed;
     331              : 
     332            0 :             if is_stopping {
     333            0 :                 Level::INFO
     334              :             } else {
     335            0 :                 Level::ERROR
     336              :             }
     337              :         }
     338              :     };
     339              : 
     340            0 :     if let Some((error_count, sleep_duration)) = retry_info {
     341            0 :         match level {
     342              :             Level::ERROR => {
     343            0 :                 error!(
     344            0 :                     "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
     345              :                 )
     346              :             }
     347              :             Level::INFO => {
     348            0 :                 info!(
     349            0 :                     "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
     350              :                 )
     351              :             }
     352            0 :             level => unimplemented!("unexpected level {level:?}"),
     353              :         }
     354              :     } else {
     355            0 :         match level {
     356            0 :             Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
     357            0 :             Level::ERROR => error!("Compaction failed: {err:?}"),
     358            0 :             Level::INFO => info!("Compaction failed: {err:#}"),
     359            0 :             level => unimplemented!("unexpected level {level:?}"),
     360              :         }
     361              :     }
     362            0 : }
     363              : 
     364              : /// GC task's main loop.
     365            0 : async fn gc_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
     366              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     367            0 :     let mut error_run = 0; // consecutive errors
     368              : 
     369              :     // GC might require downloading, to find the cutoff LSN that corresponds to the
     370              :     // cutoff specified as time.
     371            0 :     let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     372            0 :     let mut first = true;
     373              : 
     374              :     loop {
     375            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     376            0 :             return;
     377            0 :         }
     378              : 
     379            0 :         let period = tenant.get_gc_period();
     380              : 
     381            0 :         if first {
     382            0 :             first = false;
     383            0 :             if sleep_random(period, &cancel).await.is_err() {
     384            0 :                 break;
     385            0 :             }
     386            0 :         }
     387              : 
     388            0 :         let gc_horizon = tenant.get_gc_horizon();
     389              :         let sleep_duration;
     390            0 :         if period == Duration::ZERO || gc_horizon == 0 {
     391            0 :             #[cfg(not(feature = "testing"))]
     392            0 :             info!("automatic GC is disabled");
     393            0 :             // check again in 10 seconds, in case it's been enabled again.
     394            0 :             sleep_duration = Duration::from_secs(10);
     395            0 :         } else {
     396            0 :             let iteration = Iteration {
     397            0 :                 started_at: Instant::now(),
     398            0 :                 period,
     399            0 :                 kind: BackgroundLoopKind::Gc,
     400            0 :             };
     401              :             // Run gc
     402            0 :             let IterationResult { output, elapsed: _ } = iteration
     403            0 :                 .run(tenant.gc_iteration(
     404            0 :                     None,
     405            0 :                     gc_horizon,
     406            0 :                     tenant.get_pitr_interval(),
     407            0 :                     &cancel,
     408            0 :                     &ctx,
     409            0 :                 ))
     410            0 :                 .await;
     411            0 :             match output {
     412            0 :                 Ok(_) => {
     413            0 :                     error_run = 0;
     414            0 :                     sleep_duration = period;
     415            0 :                 }
     416              :                 Err(crate::tenant::GcError::TenantCancelled) => {
     417            0 :                     return;
     418              :                 }
     419            0 :                 Err(e) => {
     420            0 :                     error_run += 1;
     421            0 :                     let wait_duration =
     422            0 :                         exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
     423              : 
     424            0 :                     if matches!(e, crate::tenant::GcError::TimelineCancelled) {
     425              :                         // Timeline was cancelled during gc. We might either be in an event
     426              :                         // that affects the entire tenant (tenant deletion, pageserver shutdown),
     427              :                         // or in one that affects the timeline only (timeline deletion).
     428              :                         // Therefore, don't exit the loop.
     429            0 :                         info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     430              :                     } else {
     431            0 :                         error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
     432              :                     }
     433              : 
     434            0 :                     sleep_duration = wait_duration;
     435              :                 }
     436              :             }
     437              :         };
     438              : 
     439            0 :         if tokio::time::timeout(sleep_duration, cancel.cancelled())
     440            0 :             .await
     441            0 :             .is_ok()
     442              :         {
     443            0 :             break;
     444            0 :         }
     445              :     }
     446            0 : }
     447              : 
     448              : /// Tenant housekeeping's main loop.
     449            0 : async fn tenant_housekeeping_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
     450            0 :     let mut last_throttle_flag_reset_at = Instant::now();
     451              :     loop {
     452            0 :         if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
     453            0 :             return;
     454            0 :         }
     455              : 
     456              :         // Use the same period as compaction; it's not worth a separate setting. But if it's set to
     457              :         // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
     458            0 :         let period = match tenant.get_compaction_period() {
     459            0 :             Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
     460            0 :             period => period,
     461              :         };
     462              : 
     463            0 :         let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
     464            0 :             break;
     465              :         };
     466              : 
     467              :         // Do tenant housekeeping.
     468            0 :         let iteration = Iteration {
     469            0 :             started_at: Instant::now(),
     470            0 :             period,
     471            0 :             kind: BackgroundLoopKind::TenantHouseKeeping,
     472            0 :         };
     473            0 :         iteration.run(tenant.housekeeping()).await;
     474              : 
     475              :         // Log any getpage throttling.
     476            0 :         info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
     477            0 :             let now = Instant::now();
     478            0 :             let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
     479            0 :             let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
     480            0 :             if count_throttled == 0 {
     481            0 :                 return;
     482            0 :             }
     483            0 :             let allowed_rps = tenant.pagestream_throttle.steady_rps();
     484            0 :             let delta = now - prev;
     485            0 :             info!(
     486            0 :                 n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
     487              :                 count_accounted = count_accounted_finish,  // don't break existing log scraping
     488              :                 count_throttled,
     489              :                 sum_throttled_usecs,
     490              :                 count_accounted_start, // log after pre-existing fields to not break existing log scraping
     491            0 :                 allowed_rps=%format_args!("{allowed_rps:.0}"),
     492            0 :                 "shard was throttled in the last n_seconds"
     493              :             );
     494            0 :         });
     495              :     }
     496            0 : }
     497              : 
     498              : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
     499            0 : async fn wait_for_active_tenant(
     500            0 :     tenant: &Arc<TenantShard>,
     501            0 :     cancel: &CancellationToken,
     502            0 : ) -> ControlFlow<()> {
     503            0 :     if tenant.current_state() == TenantState::Active {
     504            0 :         return ControlFlow::Continue(());
     505            0 :     }
     506              : 
     507            0 :     let mut update_rx = tenant.subscribe_for_state_updates();
     508            0 :     tokio::select! {
     509            0 :         result = update_rx.wait_for(|s| s == &TenantState::Active) => {
     510            0 :             if result.is_err() {
     511            0 :                 return ControlFlow::Break(());
     512            0 :             }
     513            0 :             debug!("Tenant state changed to active, continuing the task loop");
     514            0 :             ControlFlow::Continue(())
     515              :         },
     516            0 :         _ = cancel.cancelled() => ControlFlow::Break(()),
     517              :     }
     518            0 : }
     519              : 
     520              : #[derive(thiserror::Error, Debug)]
     521              : #[error("cancelled")]
     522              : pub(crate) struct Cancelled;
     523              : 
     524              : /// Sleeps for a random interval up to the given max value.
     525              : ///
     526              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     527              : /// different periods for more stable load.
     528            0 : pub(crate) async fn sleep_random(
     529            0 :     max: Duration,
     530            0 :     cancel: &CancellationToken,
     531            0 : ) -> Result<Duration, Cancelled> {
     532            0 :     sleep_random_range(Duration::ZERO..=max, cancel).await
     533            0 : }
     534              : 
     535              : /// Sleeps for a random interval in the given range. Returns the duration.
     536            0 : pub(crate) async fn sleep_random_range(
     537            0 :     interval: RangeInclusive<Duration>,
     538            0 :     cancel: &CancellationToken,
     539            0 : ) -> Result<Duration, Cancelled> {
     540            0 :     let delay = rand::thread_rng().gen_range(interval);
     541            0 :     if delay == Duration::ZERO {
     542            0 :         return Ok(delay);
     543            0 :     }
     544            0 :     tokio::select! {
     545            0 :         _ = cancel.cancelled() => Err(Cancelled),
     546            0 :         _ = tokio::time::sleep(delay) => Ok(delay),
     547              :     }
     548            0 : }
     549              : 
     550              : /// Sleeps for an interval with a random jitter.
     551            0 : pub(crate) async fn sleep_jitter(
     552            0 :     duration: Duration,
     553            0 :     jitter: Duration,
     554            0 :     cancel: &CancellationToken,
     555            0 : ) -> Result<Duration, Cancelled> {
     556            0 :     let from = duration.saturating_sub(jitter);
     557            0 :     let to = duration.saturating_add(jitter);
     558            0 :     sleep_random_range(from..=to, cancel).await
     559            0 : }
     560              : 
     561              : struct Iteration {
     562              :     started_at: Instant,
     563              :     period: Duration,
     564              :     kind: BackgroundLoopKind,
     565              : }
     566              : 
     567              : struct IterationResult<O> {
     568              :     output: O,
     569              :     elapsed: Duration,
     570              : }
     571              : 
     572              : impl Iteration {
     573              :     #[instrument(skip_all)]
     574              :     pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
     575              :         let mut fut = pin!(fut);
     576              : 
     577              :         // Wrap `fut` into a future that logs a message every `period` so that we get a
     578              :         // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
     579              :         let output = loop {
     580              :             match tokio::time::timeout(self.period, &mut fut).await {
     581              :                 Ok(r) => break r,
     582              :                 Err(_) => info!("still running"),
     583              :             }
     584              :         };
     585              :         let elapsed = self.started_at.elapsed();
     586              :         warn_when_period_overrun(elapsed, self.period, self.kind);
     587              : 
     588              :         IterationResult { output, elapsed }
     589              :     }
     590              : }
     591              : 
     592              : // NB: the `task` and `period` are used for metrics labels.
     593            0 : pub(crate) fn warn_when_period_overrun(
     594            0 :     elapsed: Duration,
     595            0 :     period: Duration,
     596            0 :     task: BackgroundLoopKind,
     597            0 : ) {
     598              :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     599            0 :     if elapsed >= period && period != Duration::ZERO {
     600              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     601              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     602              :         // though there's no way to output the actual config value.
     603            0 :         info!(
     604              :             ?elapsed,
     605            0 :             period = %humantime::format_duration(period),
     606              :             ?task,
     607            0 :             "task iteration took longer than the configured period"
     608              :         );
     609            0 :         metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     610            0 :             .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
     611            0 :             .inc();
     612            0 :     }
     613            0 : }
        

Generated by: LCOV version 2.1-beta