LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 88.7 % 256 227 29 227
Current Date: 2024-01-09 02:06:09 Functions: 73.5 % 34 25 9 25
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! This module contains functions to serve per-tenant background processes,
       2                 : //! such as compaction and GC
       3                 : 
       4                 : use std::ops::ControlFlow;
       5                 : use std::sync::Arc;
       6                 : use std::time::{Duration, Instant};
       7                 : 
       8                 : use crate::context::{DownloadBehavior, RequestContext};
       9                 : use crate::metrics::TENANT_TASK_EVENTS;
      10                 : use crate::task_mgr;
      11                 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
      12                 : use crate::tenant::{Tenant, TenantState};
      13                 : use tokio_util::sync::CancellationToken;
      14                 : use tracing::*;
      15                 : use utils::{backoff, completion};
      16                 : 
      17                 : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
      18 CBC         447 :     once_cell::sync::Lazy::new(|| {
      19             447 :         let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
      20             447 :         let permits = usize::max(
      21             447 :             1,
      22             447 :             // while a lot of the work is done on spawn_blocking, we still do
      23             447 :             // repartitioning in the async context. this should give leave us some workers
      24             447 :             // unblocked to be blocked on other work, hopefully easing any outside visible
      25             447 :             // effects of restarts.
      26             447 :             //
      27             447 :             // 6/8 is a guess; previously we ran with unlimited 8 and more from
      28             447 :             // spawn_blocking.
      29             447 :             (total_threads * 3).checked_div(4).unwrap_or(0),
      30             447 :         );
      31             447 :         assert_ne!(permits, 0, "we will not be adding in permits later");
      32             447 :         assert!(
      33             447 :             permits < total_threads,
      34 UBC           0 :             "need threads avail for shorter work"
      35                 :         );
      36 CBC         447 :         tokio::sync::Semaphore::new(permits)
      37             447 :     });
      38                 : 
      39            1948 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
      40                 : #[strum(serialize_all = "snake_case")]
      41                 : pub(crate) enum BackgroundLoopKind {
      42                 :     Compaction,
      43                 :     Gc,
      44                 :     Eviction,
      45                 :     ConsumptionMetricsCollectMetrics,
      46                 :     ConsumptionMetricsSyntheticSizeWorker,
      47                 :     InitialLogicalSizeCalculation,
      48                 :     HeatmapUpload,
      49                 :     SecondaryDownload,
      50                 : }
      51                 : 
      52                 : impl BackgroundLoopKind {
      53            1948 :     fn as_static_str(&self) -> &'static str {
      54            1948 :         let s: &'static str = self.into();
      55            1948 :         s
      56            1948 :     }
      57                 : }
      58                 : 
      59                 : /// Cancellation safe.
      60            1987 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
      61            1987 :     loop_kind: BackgroundLoopKind,
      62            1987 :     _ctx: &RequestContext,
      63            1987 : ) -> impl Drop {
      64            1927 :     let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
      65            1927 :         .with_label_values(&[loop_kind.as_static_str()])
      66            1927 :         .guard();
      67            1927 : 
      68            1927 :     match CONCURRENT_BACKGROUND_TASKS.acquire().await {
      69            1927 :         Ok(permit) => permit,
      70 UBC           0 :         Err(_closed) => unreachable!("we never close the semaphore"),
      71                 :     }
      72 CBC        1927 : }
      73                 : 
      74                 : /// Start per tenant background loops: compaction and gc.
      75             709 : pub fn start_background_loops(
      76             709 :     tenant: &Arc<Tenant>,
      77             709 :     background_jobs_can_start: Option<&completion::Barrier>,
      78             709 : ) {
      79             709 :     let tenant_shard_id = tenant.tenant_shard_id;
      80             709 :     task_mgr::spawn(
      81             709 :         BACKGROUND_RUNTIME.handle(),
      82             709 :         TaskKind::Compaction,
      83             709 :         Some(tenant_shard_id),
      84             709 :         None,
      85             709 :         &format!("compactor for tenant {tenant_shard_id}"),
      86             709 :         false,
      87             709 :         {
      88             709 :             let tenant = Arc::clone(tenant);
      89             709 :             let background_jobs_can_start = background_jobs_can_start.cloned();
      90             709 :             async move {
      91             709 :                 let cancel = task_mgr::shutdown_token();
      92             709 :                 tokio::select! {
      93             709 :                     _ = cancel.cancelled() => { return Ok(()) },
      94             709 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
      95             709 :                 };
      96             709 :                 compaction_loop(tenant, cancel)
      97             709 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
      98           91329 :                     .await;
      99             309 :                 Ok(())
     100             709 :             }
     101             709 :         },
     102             709 :     );
     103             709 :     task_mgr::spawn(
     104             709 :         BACKGROUND_RUNTIME.handle(),
     105             709 :         TaskKind::GarbageCollector,
     106             709 :         Some(tenant_shard_id),
     107             709 :         None,
     108             709 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     109             709 :         false,
     110             709 :         {
     111             709 :             let tenant = Arc::clone(tenant);
     112             709 :             let background_jobs_can_start = background_jobs_can_start.cloned();
     113             709 :             async move {
     114             709 :                 let cancel = task_mgr::shutdown_token();
     115             709 :                 tokio::select! {
     116             709 :                     _ = cancel.cancelled() => { return Ok(()) },
     117             709 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     118             709 :                 };
     119             709 :                 gc_loop(tenant, cancel)
     120             709 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     121          236559 :                     .await;
     122             309 :                 Ok(())
     123             709 :             }
     124             709 :         },
     125             709 :     );
     126             709 : }
     127                 : 
     128                 : ///
     129                 : /// Compaction task's main loop
     130                 : ///
     131             709 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     132             709 :     const MAX_BACKOFF_SECS: f64 = 300.0;
     133             709 :     // How many errors we have seen consequtively
     134             709 :     let mut error_run_count = 0;
     135             709 : 
     136             709 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     137             709 :     async {
     138             709 :         let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     139             709 :         let mut first = true;
     140            1079 :         loop {
     141            1258 :             tokio::select! {
     142                 :                 _ = cancel.cancelled() => {
     143                 :                     return;
     144                 :                 },
     145            1078 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     146                 :                     ControlFlow::Break(()) => return,
     147                 :                     ControlFlow::Continue(()) => (),
     148                 :                 },
     149            1079 :             }
     150                 : 
     151            1078 :             let period = tenant.get_compaction_period();
     152            1078 : 
     153            1078 :             // TODO: we shouldn't need to await to find tenant and this could be moved outside of
     154            1078 :             // loop, #3501. There are also additional "allowed_errors" in tests.
     155            1078 :             if first {
     156             709 :                 first = false;
     157             709 :                 if random_init_delay(period, &cancel).await.is_err() {
     158             166 :                     break;
     159             367 :                 }
     160             369 :             }
     161                 : 
     162             736 :             let started_at = Instant::now();
     163                 : 
     164             736 :             let sleep_duration = if period == Duration::ZERO {
     165                 :                 #[cfg(not(feature = "testing"))]
     166                 :                 info!("automatic compaction is disabled");
     167                 :                 // check again in 10 seconds, in case it's been enabled again.
     168             402 :                 Duration::from_secs(10)
     169                 :             } else {
     170                 :                 // Run compaction
     171           90270 :                 if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
     172 UBC           0 :                     let wait_duration = backoff::exponential_backoff_duration_seconds(
     173               0 :                         error_run_count + 1,
     174               0 :                         1.0,
     175               0 :                         MAX_BACKOFF_SECS,
     176               0 :                     );
     177               0 :                     error_run_count += 1;
     178               0 :                     let wait_duration = Duration::from_secs_f64(wait_duration);
     179               0 :                     error!(
     180               0 :                         "Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
     181               0 :                     );
     182               0 :                     wait_duration
     183                 :                 } else {
     184 CBC         325 :                     error_run_count = 0;
     185             325 :                     period
     186                 :                 }
     187                 :             };
     188                 : 
     189             727 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
     190             727 : 
     191             727 :             // Perhaps we did no work and the walredo process has been idle for some time:
     192             727 :             // give it a chance to shut down to avoid leaving walredo process running indefinitely.
     193             727 :             tenant.walredo_mgr.maybe_quiesce(period * 10);
     194             727 : 
     195             727 :             // Sleep
     196             727 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     197             512 :                 .await
     198             512 :                 .is_ok()
     199                 :             {
     200             142 :                 break;
     201             370 :             }
     202                 :         }
     203             309 :     }
     204           91329 :     .await;
     205             309 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     206             309 : }
     207                 : 
     208                 : ///
     209                 : /// GC task's main loop
     210                 : ///
     211             709 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     212             709 :     const MAX_BACKOFF_SECS: f64 = 300.0;
     213             709 :     // How many errors we have seen consequtively
     214             709 :     let mut error_run_count = 0;
     215             709 : 
     216             709 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     217             709 :     async {
     218             709 :         // GC might require downloading, to find the cutoff LSN that corresponds to the
     219             709 :         // cutoff specified as time.
     220             709 :         let ctx =
     221             709 :             RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     222             709 :         let mut first = true;
     223             981 :         loop {
     224            1155 :             tokio::select! {
     225                 :                 _ = cancel.cancelled() => {
     226                 :                     return;
     227                 :                 },
     228             979 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     229                 :                     ControlFlow::Break(()) => return,
     230                 :                     ControlFlow::Continue(()) => (),
     231                 :                 },
     232             981 :             }
     233                 : 
     234             979 :             let period = tenant.get_gc_period();
     235             979 : 
     236             979 :             if first {
     237             708 :                 first = false;
     238             708 :                 if random_init_delay(period, &cancel).await.is_err() {
     239             216 :                     break;
     240             175 :                 }
     241             271 :             }
     242                 : 
     243             446 :             let started_at = Instant::now();
     244             446 : 
     245             446 :             let gc_horizon = tenant.get_gc_horizon();
     246             446 :             let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
     247                 :                 #[cfg(not(feature = "testing"))]
     248                 :                 info!("automatic GC is disabled");
     249                 :                 // check again in 10 seconds, in case it's been enabled again.
     250             405 :                 Duration::from_secs(10)
     251                 :             } else {
     252                 :                 // Run gc
     253              41 :                 let res = tenant
     254              41 :                     .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
     255          235795 :                     .await;
     256              35 :                 if let Err(e) = res {
     257 UBC           0 :                     let wait_duration = backoff::exponential_backoff_duration_seconds(
     258               0 :                         error_run_count + 1,
     259               0 :                         1.0,
     260               0 :                         MAX_BACKOFF_SECS,
     261               0 :                     );
     262               0 :                     error_run_count += 1;
     263               0 :                     let wait_duration = Duration::from_secs_f64(wait_duration);
     264               0 :                     error!(
     265               0 :                         "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
     266               0 :                     );
     267               0 :                     wait_duration
     268                 :                 } else {
     269 CBC          35 :                     error_run_count = 0;
     270              35 :                     period
     271                 :                 }
     272                 :             };
     273                 : 
     274             440 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
     275             440 : 
     276             440 :             // Sleep
     277             440 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     278             363 :                 .await
     279             363 :                 .is_ok()
     280                 :             {
     281              91 :                 break;
     282             272 :             }
     283                 :         }
     284             309 :     }
     285          236559 :     .await;
     286             309 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     287             309 : }
     288                 : 
     289            2060 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
     290            2060 :     // if the tenant has a proper status already, no need to wait for anything
     291            2060 :     if tenant.current_state() == TenantState::Active {
     292            1708 :         ControlFlow::Continue(())
     293                 :     } else {
     294             352 :         let mut tenant_state_updates = tenant.subscribe_for_state_updates();
     295                 :         loop {
     296             353 :             match tenant_state_updates.changed().await {
     297                 :                 Ok(()) => {
     298             350 :                     let new_state = &*tenant_state_updates.borrow();
     299             350 :                     match new_state {
     300                 :                         TenantState::Active => {
     301 UBC           0 :                             debug!("Tenant state changed to active, continuing the task loop");
     302 CBC         349 :                             return ControlFlow::Continue(());
     303                 :                         }
     304               1 :                         state => {
     305 UBC           0 :                             debug!("Not running the task loop, tenant is not active: {state:?}");
     306 CBC           1 :                             continue;
     307                 :                         }
     308                 :                     }
     309                 :                 }
     310 UBC           0 :                 Err(_sender_dropped_error) => {
     311               0 :                     return ControlFlow::Break(());
     312                 :                 }
     313                 :             }
     314                 :         }
     315                 :     }
     316 CBC        2057 : }
     317                 : 
     318 UBC           0 : #[derive(thiserror::Error, Debug)]
     319                 : #[error("cancelled")]
     320                 : pub(crate) struct Cancelled;
     321                 : 
     322                 : /// Provide a random delay for background task initialization.
     323                 : ///
     324                 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     325                 : /// different periods for more stable load.
     326 CBC        2522 : pub(crate) async fn random_init_delay(
     327            2522 :     period: Duration,
     328            2522 :     cancel: &CancellationToken,
     329            2522 : ) -> Result<(), Cancelled> {
     330            2522 :     use rand::Rng;
     331            2522 : 
     332            2522 :     if period == Duration::ZERO {
     333             329 :         return Ok(());
     334            2193 :     }
     335            2193 : 
     336            2193 :     let d = {
     337            2193 :         let mut rng = rand::thread_rng();
     338            2193 :         rng.gen_range(Duration::ZERO..=period)
     339            2193 :     };
     340            2193 : 
     341            2193 :     match tokio::time::timeout(d, cancel.cancelled()).await {
     342             613 :         Ok(_) => Err(Cancelled),
     343             787 :         Err(_) => Ok(()),
     344                 :     }
     345            1729 : }
     346                 : 
     347                 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
     348            1263 : pub(crate) fn warn_when_period_overrun(
     349            1263 :     elapsed: Duration,
     350            1263 :     period: Duration,
     351            1263 :     task: BackgroundLoopKind,
     352            1263 : ) {
     353            1263 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     354            1263 :     if elapsed >= period && period != Duration::ZERO {
     355                 :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     356                 :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     357                 :         // though there's no way to output the actual config value.
     358              21 :         info!(
     359              21 :             ?elapsed,
     360              21 :             period = %humantime::format_duration(period),
     361              21 :             ?task,
     362              21 :             "task iteration took longer than the configured period"
     363              21 :         );
     364              21 :         crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     365              21 :             .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
     366              21 :             .inc();
     367            1242 :     }
     368            1263 : }
        

Generated by: LCOV version 2.1-beta