LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 11.1 % 315 35
Test Date: 2024-02-29 11:57:12 Functions: 14.0 % 43 6

            Line data    Source code
       1              : //! This module contains functions to serve per-tenant background processes,
       2              : //! such as compaction and GC
       3              : 
       4              : use std::ops::ControlFlow;
       5              : use std::sync::Arc;
       6              : use std::time::{Duration, Instant};
       7              : 
       8              : use crate::context::{DownloadBehavior, RequestContext};
       9              : use crate::metrics::TENANT_TASK_EVENTS;
      10              : use crate::task_mgr;
      11              : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
      12              : use crate::tenant::throttle::Stats;
      13              : use crate::tenant::timeline::CompactionError;
      14              : use crate::tenant::{Tenant, TenantState};
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::*;
      17              : use utils::{backoff, completion};
      18              : 
      19              : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
      20           10 :     once_cell::sync::Lazy::new(|| {
      21           10 :         let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
      22           10 :         let permits = usize::max(
      23           10 :             1,
      24           10 :             // while a lot of the work is done on spawn_blocking, we still do
      25           10 :             // repartitioning in the async context. this should give leave us some workers
      26           10 :             // unblocked to be blocked on other work, hopefully easing any outside visible
      27           10 :             // effects of restarts.
      28           10 :             //
      29           10 :             // 6/8 is a guess; previously we ran with unlimited 8 and more from
      30           10 :             // spawn_blocking.
      31           10 :             (total_threads * 3).checked_div(4).unwrap_or(0),
      32           10 :         );
      33           10 :         assert_ne!(permits, 0, "we will not be adding in permits later");
      34           10 :         assert!(
      35           10 :             permits < total_threads,
      36            0 :             "need threads avail for shorter work"
      37              :         );
      38           10 :         tokio::sync::Semaphore::new(permits)
      39           10 :     });
      40              : 
      41          408 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
      42              : #[strum(serialize_all = "snake_case")]
      43              : pub(crate) enum BackgroundLoopKind {
      44              :     Compaction,
      45              :     Gc,
      46              :     Eviction,
      47              :     ConsumptionMetricsCollectMetrics,
      48              :     ConsumptionMetricsSyntheticSizeWorker,
      49              :     InitialLogicalSizeCalculation,
      50              :     HeatmapUpload,
      51              :     SecondaryDownload,
      52              : }
      53              : 
      54              : impl BackgroundLoopKind {
      55          408 :     fn as_static_str(&self) -> &'static str {
      56          408 :         let s: &'static str = self.into();
      57          408 :         s
      58          408 :     }
      59              : }
      60              : 
      61              : /// Cancellation safe.
      62          408 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
      63          408 :     loop_kind: BackgroundLoopKind,
      64          408 :     _ctx: &RequestContext,
      65          408 : ) -> impl Drop {
      66          408 :     let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
      67          408 :         .with_label_values(&[loop_kind.as_static_str()])
      68          408 :         .guard();
      69          408 : 
      70          408 :     pausable_failpoint!(
      71            0 :         "initial-size-calculation-permit-pause",
      72            0 :         loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
      73            0 :     );
      74              : 
      75          408 :     match CONCURRENT_BACKGROUND_TASKS.acquire().await {
      76          408 :         Ok(permit) => permit,
      77            0 :         Err(_closed) => unreachable!("we never close the semaphore"),
      78              :     }
      79          408 : }
      80              : 
      81              : /// Start per tenant background loops: compaction and gc.
      82            0 : pub fn start_background_loops(
      83            0 :     tenant: &Arc<Tenant>,
      84            0 :     background_jobs_can_start: Option<&completion::Barrier>,
      85            0 : ) {
      86            0 :     let tenant_shard_id = tenant.tenant_shard_id;
      87            0 :     task_mgr::spawn(
      88            0 :         BACKGROUND_RUNTIME.handle(),
      89            0 :         TaskKind::Compaction,
      90            0 :         Some(tenant_shard_id),
      91            0 :         None,
      92            0 :         &format!("compactor for tenant {tenant_shard_id}"),
      93            0 :         false,
      94            0 :         {
      95            0 :             let tenant = Arc::clone(tenant);
      96            0 :             let background_jobs_can_start = background_jobs_can_start.cloned();
      97            0 :             async move {
      98            0 :                 let cancel = task_mgr::shutdown_token();
      99            0 :                 tokio::select! {
     100            0 :                     _ = cancel.cancelled() => { return Ok(()) },
     101            0 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     102            0 :                 };
     103            0 :                 compaction_loop(tenant, cancel)
     104            0 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     105            0 :                     .await;
     106            0 :                 Ok(())
     107            0 :             }
     108            0 :         },
     109            0 :     );
     110            0 :     task_mgr::spawn(
     111            0 :         BACKGROUND_RUNTIME.handle(),
     112            0 :         TaskKind::GarbageCollector,
     113            0 :         Some(tenant_shard_id),
     114            0 :         None,
     115            0 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     116            0 :         false,
     117            0 :         {
     118            0 :             let tenant = Arc::clone(tenant);
     119            0 :             let background_jobs_can_start = background_jobs_can_start.cloned();
     120            0 :             async move {
     121            0 :                 let cancel = task_mgr::shutdown_token();
     122            0 :                 tokio::select! {
     123            0 :                     _ = cancel.cancelled() => { return Ok(()) },
     124            0 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     125            0 :                 };
     126            0 :                 gc_loop(tenant, cancel)
     127            0 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     128            0 :                     .await;
     129            0 :                 Ok(())
     130            0 :             }
     131            0 :         },
     132            0 :     );
     133            0 : }
     134              : 
     135              : ///
     136              : /// Compaction task's main loop
     137              : ///
     138            0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     139            0 :     const MAX_BACKOFF_SECS: f64 = 300.0;
     140            0 :     // How many errors we have seen consequtively
     141            0 :     let mut error_run_count = 0;
     142            0 : 
     143            0 :     let mut last_throttle_flag_reset_at = Instant::now();
     144            0 : 
     145            0 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     146            0 :     async {
     147            0 :         let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     148            0 :         let mut first = true;
     149            0 :         loop {
     150            0 :             tokio::select! {
     151              :                 _ = cancel.cancelled() => {
     152              :                     return;
     153              :                 },
     154            0 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     155              :                     ControlFlow::Break(()) => return,
     156              :                     ControlFlow::Continue(()) => (),
     157              :                 },
     158            0 :             }
     159              : 
     160            0 :             let period = tenant.get_compaction_period();
     161            0 : 
     162            0 :             // TODO: we shouldn't need to await to find tenant and this could be moved outside of
     163            0 :             // loop, #3501. There are also additional "allowed_errors" in tests.
     164            0 :             if first {
     165            0 :                 first = false;
     166            0 :                 if random_init_delay(period, &cancel).await.is_err() {
     167            0 :                     break;
     168            0 :                 }
     169            0 :             }
     170              : 
     171            0 :             let started_at = Instant::now();
     172              : 
     173            0 :             let sleep_duration = if period == Duration::ZERO {
     174              :                 #[cfg(not(feature = "testing"))]
     175              :                 info!("automatic compaction is disabled");
     176              :                 // check again in 10 seconds, in case it's been enabled again.
     177            0 :                 Duration::from_secs(10)
     178              :             } else {
     179              :                 // Run compaction
     180            0 :                 if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
     181            0 :                     let wait_duration = backoff::exponential_backoff_duration_seconds(
     182            0 :                         error_run_count + 1,
     183            0 :                         1.0,
     184            0 :                         MAX_BACKOFF_SECS,
     185            0 :                     );
     186            0 :                     error_run_count += 1;
     187            0 :                     let wait_duration = Duration::from_secs_f64(wait_duration);
     188            0 :                     log_compaction_error(
     189            0 :                         &e,
     190            0 :                         error_run_count,
     191            0 :                         &wait_duration,
     192            0 :                         cancel.is_cancelled(),
     193            0 :                     );
     194            0 :                     wait_duration
     195              :                 } else {
     196            0 :                     error_run_count = 0;
     197            0 :                     period
     198              :                 }
     199              :             };
     200              : 
     201            0 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
     202              : 
     203              :             // Perhaps we did no work and the walredo process has been idle for some time:
     204              :             // give it a chance to shut down to avoid leaving walredo process running indefinitely.
     205            0 :             if let Some(walredo_mgr) = &tenant.walredo_mgr {
     206            0 :                 walredo_mgr.maybe_quiesce(period * 10);
     207            0 :             }
     208              : 
     209              :             // TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
     210              :             // so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
     211            0 :             info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
     212            0 :                 let now = Instant::now();
     213            0 :                 let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
     214            0 :                 let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats();
     215            0 :                 if count_throttled == 0 {
     216            0 :                     return;
     217            0 :                 }
     218            0 :                 let allowed_rps = tenant.timeline_get_throttle.steady_rps();
     219            0 :                 let delta = now - prev;
     220            0 :                 warn!(
     221            0 :                     n_seconds=%format_args!("{:.3}",
     222            0 :                     delta.as_secs_f64()),
     223            0 :                     count_accounted,
     224            0 :                     count_throttled,
     225            0 :                     sum_throttled_usecs,
     226            0 :                     allowed_rps=%format_args!("{allowed_rps:.0}"),
     227            0 :                     "shard was throttled in the last n_seconds")
     228            0 :             });
     229            0 : 
     230            0 :             // Sleep
     231            0 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     232            0 :                 .await
     233            0 :                 .is_ok()
     234              :             {
     235            0 :                 break;
     236            0 :             }
     237              :         }
     238            0 :     }
     239            0 :     .await;
     240            0 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     241            0 : }
     242              : 
     243            0 : fn log_compaction_error(
     244            0 :     e: &CompactionError,
     245            0 :     error_run_count: u32,
     246            0 :     sleep_duration: &std::time::Duration,
     247            0 :     task_cancelled: bool,
     248            0 : ) {
     249              :     use crate::tenant::upload_queue::NotInitialized;
     250              :     use crate::tenant::PageReconstructError;
     251              :     use CompactionError::*;
     252              : 
     253              :     enum LooksLike {
     254              :         Info,
     255              :         Error,
     256              :     }
     257              : 
     258            0 :     let decision = match e {
     259            0 :         ShuttingDown => None,
     260            0 :         _ if task_cancelled => Some(LooksLike::Info),
     261            0 :         Other(e) => {
     262            0 :             let root_cause = e.root_cause();
     263              : 
     264            0 :             let is_stopping = {
     265            0 :                 let upload_queue = root_cause
     266            0 :                     .downcast_ref::<NotInitialized>()
     267            0 :                     .is_some_and(|e| e.is_stopping());
     268            0 : 
     269            0 :                 let timeline = root_cause
     270            0 :                     .downcast_ref::<PageReconstructError>()
     271            0 :                     .is_some_and(|e| e.is_stopping());
     272            0 : 
     273            0 :                 upload_queue || timeline
     274              :             };
     275              : 
     276            0 :             if is_stopping {
     277            0 :                 Some(LooksLike::Info)
     278              :             } else {
     279            0 :                 Some(LooksLike::Error)
     280              :             }
     281              :         }
     282              :     };
     283              : 
     284            0 :     match decision {
     285            0 :         Some(LooksLike::Info) => info!(
     286            0 :             "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
     287            0 :         ),
     288            0 :         Some(LooksLike::Error) => error!(
     289            0 :             "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
     290            0 :         ),
     291            0 :         None => {}
     292              :     }
     293            0 : }
     294              : 
     295              : ///
     296              : /// GC task's main loop
     297              : ///
     298            0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     299            0 :     const MAX_BACKOFF_SECS: f64 = 300.0;
     300            0 :     // How many errors we have seen consequtively
     301            0 :     let mut error_run_count = 0;
     302            0 : 
     303            0 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     304            0 :     async {
     305            0 :         // GC might require downloading, to find the cutoff LSN that corresponds to the
     306            0 :         // cutoff specified as time.
     307            0 :         let ctx =
     308            0 :             RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     309            0 :         let mut first = true;
     310            0 :         loop {
     311            0 :             tokio::select! {
     312              :                 _ = cancel.cancelled() => {
     313              :                     return;
     314              :                 },
     315            0 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     316              :                     ControlFlow::Break(()) => return,
     317              :                     ControlFlow::Continue(()) => (),
     318              :                 },
     319            0 :             }
     320              : 
     321            0 :             let period = tenant.get_gc_period();
     322            0 : 
     323            0 :             if first {
     324            0 :                 first = false;
     325            0 :                 if random_init_delay(period, &cancel).await.is_err() {
     326            0 :                     break;
     327            0 :                 }
     328            0 :             }
     329              : 
     330            0 :             let started_at = Instant::now();
     331            0 : 
     332            0 :             let gc_horizon = tenant.get_gc_horizon();
     333            0 :             let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
     334              :                 #[cfg(not(feature = "testing"))]
     335              :                 info!("automatic GC is disabled");
     336              :                 // check again in 10 seconds, in case it's been enabled again.
     337            0 :                 Duration::from_secs(10)
     338              :             } else {
     339              :                 // Run gc
     340            0 :                 let res = tenant
     341            0 :                     .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
     342            0 :                     .await;
     343            0 :                 if let Err(e) = res {
     344            0 :                     let wait_duration = backoff::exponential_backoff_duration_seconds(
     345            0 :                         error_run_count + 1,
     346            0 :                         1.0,
     347            0 :                         MAX_BACKOFF_SECS,
     348            0 :                     );
     349            0 :                     error_run_count += 1;
     350            0 :                     let wait_duration = Duration::from_secs_f64(wait_duration);
     351            0 :                     error!(
     352            0 :                         "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
     353            0 :                     );
     354            0 :                     wait_duration
     355              :                 } else {
     356            0 :                     error_run_count = 0;
     357            0 :                     period
     358              :                 }
     359              :             };
     360              : 
     361            0 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
     362            0 : 
     363            0 :             // Sleep
     364            0 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     365            0 :                 .await
     366            0 :                 .is_ok()
     367              :             {
     368            0 :                 break;
     369            0 :             }
     370              :         }
     371            0 :     }
     372            0 :     .await;
     373            0 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     374            0 : }
     375              : 
     376            0 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
     377            0 :     // if the tenant has a proper status already, no need to wait for anything
     378            0 :     if tenant.current_state() == TenantState::Active {
     379            0 :         ControlFlow::Continue(())
     380              :     } else {
     381            0 :         let mut tenant_state_updates = tenant.subscribe_for_state_updates();
     382              :         loop {
     383            0 :             match tenant_state_updates.changed().await {
     384              :                 Ok(()) => {
     385            0 :                     let new_state = &*tenant_state_updates.borrow();
     386            0 :                     match new_state {
     387              :                         TenantState::Active => {
     388            0 :                             debug!("Tenant state changed to active, continuing the task loop");
     389            0 :                             return ControlFlow::Continue(());
     390              :                         }
     391            0 :                         state => {
     392            0 :                             debug!("Not running the task loop, tenant is not active: {state:?}");
     393            0 :                             continue;
     394              :                         }
     395              :                     }
     396              :                 }
     397            0 :                 Err(_sender_dropped_error) => {
     398            0 :                     return ControlFlow::Break(());
     399              :                 }
     400              :             }
     401              :         }
     402              :     }
     403            0 : }
     404              : 
     405            0 : #[derive(thiserror::Error, Debug)]
     406              : #[error("cancelled")]
     407              : pub(crate) struct Cancelled;
     408              : 
     409              : /// Provide a random delay for background task initialization.
     410              : ///
     411              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     412              : /// different periods for more stable load.
     413            0 : pub(crate) async fn random_init_delay(
     414            0 :     period: Duration,
     415            0 :     cancel: &CancellationToken,
     416            0 : ) -> Result<(), Cancelled> {
     417            0 :     use rand::Rng;
     418            0 : 
     419            0 :     if period == Duration::ZERO {
     420            0 :         return Ok(());
     421            0 :     }
     422            0 : 
     423            0 :     let d = {
     424            0 :         let mut rng = rand::thread_rng();
     425            0 :         rng.gen_range(Duration::ZERO..=period)
     426            0 :     };
     427            0 : 
     428            0 :     match tokio::time::timeout(d, cancel.cancelled()).await {
     429            0 :         Ok(_) => Err(Cancelled),
     430            0 :         Err(_) => Ok(()),
     431              :     }
     432            0 : }
     433              : 
     434              : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
     435            0 : pub(crate) fn warn_when_period_overrun(
     436            0 :     elapsed: Duration,
     437            0 :     period: Duration,
     438            0 :     task: BackgroundLoopKind,
     439            0 : ) {
     440            0 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     441            0 :     if elapsed >= period && period != Duration::ZERO {
     442              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     443              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     444              :         // though there's no way to output the actual config value.
     445            0 :         info!(
     446            0 :             ?elapsed,
     447            0 :             period = %humantime::format_duration(period),
     448            0 :             ?task,
     449            0 :             "task iteration took longer than the configured period"
     450            0 :         );
     451            0 :         crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     452            0 :             .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
     453            0 :             .inc();
     454            0 :     }
     455            0 : }
        

Generated by: LCOV version 2.1-beta