LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 77.8 % 293 228
Test Date: 2024-02-07 07:37:29 Functions: 68.3 % 41 28

            Line data    Source code
       1              : //! This module contains functions to serve per-tenant background processes,
       2              : //! such as compaction and GC
       3              : 
       4              : use std::ops::ControlFlow;
       5              : use std::sync::Arc;
       6              : use std::time::{Duration, Instant};
       7              : 
       8              : use crate::context::{DownloadBehavior, RequestContext};
       9              : use crate::metrics::TENANT_TASK_EVENTS;
      10              : use crate::task_mgr;
      11              : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
      12              : use crate::tenant::timeline::CompactionError;
      13              : use crate::tenant::{Tenant, TenantState};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::*;
      16              : use utils::{backoff, completion};
      17              : 
      18              : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
      19          374 :     once_cell::sync::Lazy::new(|| {
      20          374 :         let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
      21          374 :         let permits = usize::max(
      22          374 :             1,
      23          374 :             // while a lot of the work is done on spawn_blocking, we still do
      24          374 :             // repartitioning in the async context. this should give leave us some workers
      25          374 :             // unblocked to be blocked on other work, hopefully easing any outside visible
      26          374 :             // effects of restarts.
      27          374 :             //
      28          374 :             // 6/8 is a guess; previously we ran with unlimited 8 and more from
      29          374 :             // spawn_blocking.
      30          374 :             (total_threads * 3).checked_div(4).unwrap_or(0),
      31          374 :         );
      32          374 :         assert_ne!(permits, 0, "we will not be adding in permits later");
      33          374 :         assert!(
      34          374 :             permits < total_threads,
      35            0 :             "need threads avail for shorter work"
      36              :         );
      37          374 :         tokio::sync::Semaphore::new(permits)
      38          374 :     });
      39              : 
      40         2144 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
      41              : #[strum(serialize_all = "snake_case")]
      42              : pub(crate) enum BackgroundLoopKind {
      43              :     Compaction,
      44              :     Gc,
      45              :     Eviction,
      46              :     ConsumptionMetricsCollectMetrics,
      47              :     ConsumptionMetricsSyntheticSizeWorker,
      48              :     InitialLogicalSizeCalculation,
      49              :     HeatmapUpload,
      50              :     SecondaryDownload,
      51              : }
      52              : 
      53              : impl BackgroundLoopKind {
      54         2144 :     fn as_static_str(&self) -> &'static str {
      55         2144 :         let s: &'static str = self.into();
      56         2144 :         s
      57         2144 :     }
      58              : }
      59              : 
      60              : /// Cancellation safe.
      61         2187 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
      62         2187 :     loop_kind: BackgroundLoopKind,
      63         2187 :     _ctx: &RequestContext,
      64         2187 : ) -> impl Drop {
      65         2129 :     let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
      66         2129 :         .with_label_values(&[loop_kind.as_static_str()])
      67         2129 :         .guard();
      68         2129 : 
      69         2129 :     pausable_failpoint!(
      70          577 :         "initial-size-calculation-permit-pause",
      71          577 :         loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
      72          577 :     );
      73              : 
      74         1767 :     match CONCURRENT_BACKGROUND_TASKS.acquire().await {
      75         1767 :         Ok(permit) => permit,
      76            0 :         Err(_closed) => unreachable!("we never close the semaphore"),
      77              :     }
      78         1767 : }
      79              : 
      80              : /// Start per tenant background loops: compaction and gc.
      81          830 : pub fn start_background_loops(
      82          830 :     tenant: &Arc<Tenant>,
      83          830 :     background_jobs_can_start: Option<&completion::Barrier>,
      84          830 : ) {
      85          830 :     let tenant_shard_id = tenant.tenant_shard_id;
      86          830 :     task_mgr::spawn(
      87          830 :         BACKGROUND_RUNTIME.handle(),
      88          830 :         TaskKind::Compaction,
      89          830 :         Some(tenant_shard_id),
      90          830 :         None,
      91          830 :         &format!("compactor for tenant {tenant_shard_id}"),
      92          830 :         false,
      93          830 :         {
      94          830 :             let tenant = Arc::clone(tenant);
      95          830 :             let background_jobs_can_start = background_jobs_can_start.cloned();
      96          830 :             async move {
      97          830 :                 let cancel = task_mgr::shutdown_token();
      98          830 :                 tokio::select! {
      99          830 :                     _ = cancel.cancelled() => { return Ok(()) },
     100          830 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     101          830 :                 };
     102          830 :                 compaction_loop(tenant, cancel)
     103          830 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     104       129483 :                     .await;
     105          385 :                 Ok(())
     106          830 :             }
     107          830 :         },
     108          830 :     );
     109          830 :     task_mgr::spawn(
     110          830 :         BACKGROUND_RUNTIME.handle(),
     111          830 :         TaskKind::GarbageCollector,
     112          830 :         Some(tenant_shard_id),
     113          830 :         None,
     114          830 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     115          830 :         false,
     116          830 :         {
     117          830 :             let tenant = Arc::clone(tenant);
     118          830 :             let background_jobs_can_start = background_jobs_can_start.cloned();
     119          830 :             async move {
     120          830 :                 let cancel = task_mgr::shutdown_token();
     121          830 :                 tokio::select! {
     122          830 :                     _ = cancel.cancelled() => { return Ok(()) },
     123          830 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     124          830 :                 };
     125          830 :                 gc_loop(tenant, cancel)
     126          830 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     127       265112 :                     .await;
     128          385 :                 Ok(())
     129          830 :             }
     130          830 :         },
     131          830 :     );
     132          830 : }
     133              : 
     134              : ///
     135              : /// Compaction task's main loop
     136              : ///
     137          830 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     138          830 :     const MAX_BACKOFF_SECS: f64 = 300.0;
     139          830 :     // How many errors we have seen consequtively
     140          830 :     let mut error_run_count = 0;
     141          830 : 
     142          830 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     143          830 :     async {
     144          830 :         let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     145          830 :         let mut first = true;
     146         1212 :         loop {
     147         1424 :             tokio::select! {
     148              :                 _ = cancel.cancelled() => {
     149              :                     return;
     150              :                 },
     151         1205 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     152              :                     ControlFlow::Break(()) => return,
     153              :                     ControlFlow::Continue(()) => (),
     154              :                 },
     155         1212 :             }
     156              : 
     157         1205 :             let period = tenant.get_compaction_period();
     158         1205 : 
     159         1205 :             // TODO: we shouldn't need to await to find tenant and this could be moved outside of
     160         1205 :             // loop, #3501. There are also additional "allowed_errors" in tests.
     161         1205 :             if first {
     162          823 :                 first = false;
     163          823 :                 if random_init_delay(period, &cancel).await.is_err() {
     164          234 :                     break;
     165          395 :                 }
     166          382 :             }
     167              : 
     168          777 :             let started_at = Instant::now();
     169              : 
     170          777 :             let sleep_duration = if period == Duration::ZERO {
     171              :                 #[cfg(not(feature = "testing"))]
     172              :                 info!("automatic compaction is disabled");
     173              :                 // check again in 10 seconds, in case it's been enabled again.
     174          387 :                 Duration::from_secs(10)
     175              :             } else {
     176              :                 // Run compaction
     177       128298 :                 if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
     178            0 :                     let wait_duration = backoff::exponential_backoff_duration_seconds(
     179            0 :                         error_run_count + 1,
     180            0 :                         1.0,
     181            0 :                         MAX_BACKOFF_SECS,
     182            0 :                     );
     183            0 :                     error_run_count += 1;
     184            0 :                     let wait_duration = Duration::from_secs_f64(wait_duration);
     185            0 :                     log_compaction_error(
     186            0 :                         &e,
     187            0 :                         error_run_count,
     188            0 :                         &wait_duration,
     189            0 :                         cancel.is_cancelled(),
     190            0 :                     );
     191            0 :                     wait_duration
     192              :                 } else {
     193          388 :                     error_run_count = 0;
     194          388 :                     period
     195              :                 }
     196              :             };
     197              : 
     198          775 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
     199              : 
     200              :             // Perhaps we did no work and the walredo process has been idle for some time:
     201              :             // give it a chance to shut down to avoid leaving walredo process running indefinitely.
     202          775 :             if let Some(walredo_mgr) = &tenant.walredo_mgr {
     203          775 :                 walredo_mgr.maybe_quiesce(period * 10);
     204          775 :             }
     205              : 
     206              :             // Sleep
     207          775 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     208          526 :                 .await
     209          526 :                 .is_ok()
     210              :             {
     211          144 :                 break;
     212          382 :             }
     213              :         }
     214          385 :     }
     215       129483 :     .await;
     216          385 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     217          385 : }
     218              : 
     219            0 : fn log_compaction_error(
     220            0 :     e: &CompactionError,
     221            0 :     error_run_count: u32,
     222            0 :     sleep_duration: &std::time::Duration,
     223            0 :     task_cancelled: bool,
     224            0 : ) {
     225              :     use crate::tenant::upload_queue::NotInitialized;
     226              :     use crate::tenant::PageReconstructError;
     227              :     use CompactionError::*;
     228              : 
     229              :     enum LooksLike {
     230              :         Info,
     231              :         Error,
     232              :     }
     233              : 
     234            0 :     let decision = match e {
     235            0 :         ShuttingDown => None,
     236            0 :         _ if task_cancelled => Some(LooksLike::Info),
     237            0 :         Other(e) => {
     238            0 :             let root_cause = e.root_cause();
     239              : 
     240            0 :             let is_stopping = {
     241            0 :                 let upload_queue = root_cause
     242            0 :                     .downcast_ref::<NotInitialized>()
     243            0 :                     .is_some_and(|e| e.is_stopping());
     244            0 : 
     245            0 :                 let timeline = root_cause
     246            0 :                     .downcast_ref::<PageReconstructError>()
     247            0 :                     .is_some_and(|e| e.is_stopping());
     248            0 : 
     249            0 :                 upload_queue || timeline
     250              :             };
     251              : 
     252            0 :             if is_stopping {
     253            0 :                 Some(LooksLike::Info)
     254              :             } else {
     255            0 :                 Some(LooksLike::Error)
     256              :             }
     257              :         }
     258              :     };
     259              : 
     260            0 :     match decision {
     261            0 :         Some(LooksLike::Info) => info!(
     262            0 :             "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
     263            0 :         ),
     264            0 :         Some(LooksLike::Error) => error!(
     265            0 :             "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
     266            0 :         ),
     267            0 :         None => {}
     268              :     }
     269            0 : }
     270              : 
     271              : ///
     272              : /// GC task's main loop
     273              : ///
     274          830 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     275          830 :     const MAX_BACKOFF_SECS: f64 = 300.0;
     276          830 :     // How many errors we have seen consequtively
     277          830 :     let mut error_run_count = 0;
     278          830 : 
     279          830 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     280          830 :     async {
     281          830 :         // GC might require downloading, to find the cutoff LSN that corresponds to the
     282          830 :         // cutoff specified as time.
     283          830 :         let ctx =
     284          830 :             RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     285          830 :         let mut first = true;
     286         1074 :         loop {
     287         1259 :             tokio::select! {
     288              :                 _ = cancel.cancelled() => {
     289              :                     return;
     290              :                 },
     291         1071 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     292              :                     ControlFlow::Break(()) => return,
     293              :                     ControlFlow::Continue(()) => (),
     294              :                 },
     295         1074 :             }
     296              : 
     297         1071 :             let period = tenant.get_gc_period();
     298         1071 : 
     299         1071 :             if first {
     300          827 :                 first = false;
     301          827 :                 if random_init_delay(period, &cancel).await.is_err() {
     302          285 :                     break;
     303          190 :                 }
     304          244 :             }
     305              : 
     306          434 :             let started_at = Instant::now();
     307          434 : 
     308          434 :             let gc_horizon = tenant.get_gc_horizon();
     309          434 :             let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
     310              :                 #[cfg(not(feature = "testing"))]
     311              :                 info!("automatic GC is disabled");
     312              :                 // check again in 10 seconds, in case it's been enabled again.
     313          388 :                 Duration::from_secs(10)
     314              :             } else {
     315              :                 // Run gc
     316           46 :                 let res = tenant
     317           46 :                     .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
     318       264292 :                     .await;
     319           45 :                 if let Err(e) = res {
     320            0 :                     let wait_duration = backoff::exponential_backoff_duration_seconds(
     321            0 :                         error_run_count + 1,
     322            0 :                         1.0,
     323            0 :                         MAX_BACKOFF_SECS,
     324            0 :                     );
     325            0 :                     error_run_count += 1;
     326            0 :                     let wait_duration = Duration::from_secs_f64(wait_duration);
     327            0 :                     error!(
     328            0 :                         "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
     329            0 :                     );
     330            0 :                     wait_duration
     331              :                 } else {
     332           45 :                     error_run_count = 0;
     333           45 :                     period
     334              :                 }
     335              :             };
     336              : 
     337          433 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
     338          433 : 
     339          433 :             // Sleep
     340          433 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     341          341 :                 .await
     342          341 :                 .is_ok()
     343              :             {
     344           97 :                 break;
     345          244 :             }
     346              :         }
     347          385 :     }
     348       265112 :     .await;
     349          385 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     350          385 : }
     351              : 
     352         2286 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
     353         2286 :     // if the tenant has a proper status already, no need to wait for anything
     354         2286 :     if tenant.current_state() == TenantState::Active {
     355         1899 :         ControlFlow::Continue(())
     356              :     } else {
     357          387 :         let mut tenant_state_updates = tenant.subscribe_for_state_updates();
     358              :         loop {
     359          398 :             match tenant_state_updates.changed().await {
     360              :                 Ok(()) => {
     361          388 :                     let new_state = &*tenant_state_updates.borrow();
     362          388 :                     match new_state {
     363              :                         TenantState::Active => {
     364            0 :                             debug!("Tenant state changed to active, continuing the task loop");
     365          377 :                             return ControlFlow::Continue(());
     366              :                         }
     367           11 :                         state => {
     368            0 :                             debug!("Not running the task loop, tenant is not active: {state:?}");
     369           11 :                             continue;
     370              :                         }
     371              :                     }
     372              :                 }
     373            0 :                 Err(_sender_dropped_error) => {
     374            0 :                     return ControlFlow::Break(());
     375              :                 }
     376              :             }
     377              :         }
     378              :     }
     379         2276 : }
     380              : 
     381            0 : #[derive(thiserror::Error, Debug)]
     382              : #[error("cancelled")]
     383              : pub(crate) struct Cancelled;
     384              : 
     385              : /// Provide a random delay for background task initialization.
     386              : ///
     387              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     388              : /// different periods for more stable load.
     389         2889 : pub(crate) async fn random_init_delay(
     390         2889 :     period: Duration,
     391         2889 :     cancel: &CancellationToken,
     392         2889 : ) -> Result<(), Cancelled> {
     393         2889 :     use rand::Rng;
     394         2889 : 
     395         2889 :     if period == Duration::ZERO {
     396          361 :         return Ok(());
     397         2528 :     }
     398         2528 : 
     399         2528 :     let d = {
     400         2528 :         let mut rng = rand::thread_rng();
     401         2528 :         rng.gen_range(Duration::ZERO..=period)
     402         2528 :     };
     403         2528 : 
     404         2528 :     match tokio::time::timeout(d, cancel.cancelled()).await {
     405          854 :         Ok(_) => Err(Cancelled),
     406          806 :         Err(_) => Ok(()),
     407              :     }
     408         2021 : }
     409              : 
     410              : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
     411         1287 : pub(crate) fn warn_when_period_overrun(
     412         1287 :     elapsed: Duration,
     413         1287 :     period: Duration,
     414         1287 :     task: BackgroundLoopKind,
     415         1287 : ) {
     416         1287 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     417         1287 :     if elapsed >= period && period != Duration::ZERO {
     418              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     419              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     420              :         // though there's no way to output the actual config value.
     421           15 :         info!(
     422           15 :             ?elapsed,
     423           15 :             period = %humantime::format_duration(period),
     424           15 :             ?task,
     425           15 :             "task iteration took longer than the configured period"
     426           15 :         );
     427           15 :         crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     428           15 :             .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
     429           15 :             .inc();
     430         1272 :     }
     431         1287 : }
        

Generated by: LCOV version 2.1-beta