LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.6 % 224 212 4 8 212
Current Date: 2023-10-19 02:04:12 Functions: 75.0 % 36 27 1 8 27
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! This module contains functions to serve per-tenant background processes,
       2                 : //! such as compaction and GC
       3                 : 
       4                 : use std::ops::ControlFlow;
       5                 : use std::sync::Arc;
       6                 : use std::time::{Duration, Instant};
       7                 : 
       8                 : use crate::context::{DownloadBehavior, RequestContext};
       9                 : use crate::metrics::TENANT_TASK_EVENTS;
      10                 : use crate::task_mgr;
      11                 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
      12                 : use crate::tenant::{Tenant, TenantState};
      13                 : use tokio_util::sync::CancellationToken;
      14                 : use tracing::*;
      15                 : use utils::completion;
      16                 : 
      17                 : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
      18 CBC         243 :     once_cell::sync::Lazy::new(|| {
      19             243 :         let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
      20             243 :         let permits = usize::max(
      21             243 :             1,
      22             243 :             // while a lot of the work is done on spawn_blocking, we still do
      23             243 :             // repartitioning in the async context. this should give leave us some workers
      24             243 :             // unblocked to be blocked on other work, hopefully easing any outside visible
      25             243 :             // effects of restarts.
      26             243 :             //
      27             243 :             // 6/8 is a guess; previously we ran with unlimited 8 and more from
      28             243 :             // spawn_blocking.
      29             243 :             (total_threads * 3).checked_div(4).unwrap_or(0),
      30             243 :         );
      31             243 :         assert_ne!(permits, 0, "we will not be adding in permits later");
      32             243 :         assert!(
      33             243 :             permits < total_threads,
      34 UBC           0 :             "need threads avail for shorter work"
      35                 :         );
      36 CBC         243 :         tokio::sync::Semaphore::new(permits)
      37             243 :     });
      38                 : 
      39            2593 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
      40                 : #[strum(serialize_all = "snake_case")]
      41                 : pub(crate) enum BackgroundLoopKind {
      42                 :     Compaction,
      43                 :     Gc,
      44                 :     Eviction,
      45                 :     ConsumptionMetricsCollectMetrics,
      46                 :     ConsumptionMetricsSyntheticSizeWorker,
      47                 : }
      48                 : 
      49                 : impl BackgroundLoopKind {
      50            2593 :     fn as_static_str(&self) -> &'static str {
      51            2593 :         let s: &'static str = self.into();
      52            2593 :         s
      53            2593 :     }
      54                 : }
      55                 : 
      56                 : pub(crate) enum RateLimitError {
      57                 :     Cancelled,
      58                 : }
      59                 : 
      60            1288 : pub(crate) async fn concurrent_background_tasks_rate_limit(
      61            1288 :     loop_kind: BackgroundLoopKind,
      62            1288 :     _ctx: &RequestContext,
      63            1288 :     cancel: &CancellationToken,
      64            1288 : ) -> Result<impl Drop, RateLimitError> {
      65            1288 :     crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT
      66            1288 :         .with_label_values(&[loop_kind.as_static_str()])
      67            1288 :         .inc();
      68            1288 :     scopeguard::defer!(
      69            1288 :         crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc();
      70            1288 :     );
      71            1288 :     tokio::select! {
      72            1288 :         permit = CONCURRENT_BACKGROUND_TASKS.acquire() => {
      73                 :             match permit {
      74                 :                 Ok(permit) => Ok(permit),
      75                 :                 Err(_closed) => unreachable!("we never close the semaphore"),
      76                 :             }
      77                 :         },
      78                 :         _ = cancel.cancelled() => {
      79                 :             Err(RateLimitError::Cancelled)
      80                 :         }
      81                 :     }
      82            1288 : }
      83                 : 
      84                 : /// Start per tenant background loops: compaction and gc.
      85             672 : pub fn start_background_loops(
      86             672 :     tenant: &Arc<Tenant>,
      87             672 :     background_jobs_can_start: Option<&completion::Barrier>,
      88             672 : ) {
      89             672 :     let tenant_id = tenant.tenant_id;
      90             672 :     task_mgr::spawn(
      91             672 :         BACKGROUND_RUNTIME.handle(),
      92             672 :         TaskKind::Compaction,
      93             672 :         Some(tenant_id),
      94             672 :         None,
      95             672 :         &format!("compactor for tenant {tenant_id}"),
      96             672 :         false,
      97             672 :         {
      98             672 :             let tenant = Arc::clone(tenant);
      99             672 :             let background_jobs_can_start = background_jobs_can_start.cloned();
     100             672 :             async move {
     101             672 :                 let cancel = task_mgr::shutdown_token();
     102             797 :                 tokio::select! {
     103             797 :                     _ = cancel.cancelled() => { return Ok(()) },
     104             797 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     105             797 :                 };
     106             616 :                 compaction_loop(tenant, cancel)
     107             616 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
     108         1040843 :                     .await;
     109             222 :                 Ok(())
     110             672 :             }
     111             672 :         },
     112             672 :     );
     113             672 :     task_mgr::spawn(
     114             672 :         BACKGROUND_RUNTIME.handle(),
     115             672 :         TaskKind::GarbageCollector,
     116             672 :         Some(tenant_id),
     117             672 :         None,
     118             672 :         &format!("garbage collector for tenant {tenant_id}"),
     119             672 :         false,
     120             672 :         {
     121             672 :             let tenant = Arc::clone(tenant);
     122             672 :             let background_jobs_can_start = background_jobs_can_start.cloned();
     123             672 :             async move {
     124             672 :                 let cancel = task_mgr::shutdown_token();
     125             798 :                 tokio::select! {
     126             798 :                     _ = cancel.cancelled() => { return Ok(()) },
     127             798 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     128             798 :                 };
     129             616 :                 gc_loop(tenant, cancel)
     130             616 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_id))
     131          337113 :                     .await;
     132             222 :                 Ok(())
     133             672 :             }
     134             672 :         },
     135             672 :     );
     136             672 : }
     137                 : 
     138                 : ///
     139                 : /// Compaction task's main loop
     140                 : ///
     141             616 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     142             616 :     let wait_duration = Duration::from_secs(2);
     143             616 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     144             616 :     async {
     145             616 :         let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     146             616 :         let mut first = true;
     147                 :         loop {
     148             801 :             tokio::select! {
     149                 :                 _ = cancel.cancelled() => {
     150                 :                     return;
     151                 :                 },
     152             797 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     153                 :                     ControlFlow::Break(()) => return,
     154                 :                     ControlFlow::Continue(()) => (),
     155                 :                 },
     156                 :             }
     157                 : 
     158             797 :             let period = tenant.get_compaction_period();
     159             797 : 
     160             797 :             // TODO: we shouldn't need to await to find tenant and this could be moved outside of
     161             797 :             // loop, #3501. There are also additional "allowed_errors" in tests.
     162             797 :             if first {
     163             612 :                 first = false;
     164             612 :                 if random_init_delay(period, &cancel).await.is_err() {
     165              93 :                     break;
     166             304 :                 }
     167             185 :             }
     168                 : 
     169             489 :             let started_at = Instant::now();
     170                 : 
     171             489 :             let sleep_duration = if period == Duration::ZERO {
     172                 :                 #[cfg(not(feature = "testing"))]
     173                 :                 info!("automatic compaction is disabled");
     174                 :                 // check again in 10 seconds, in case it's been enabled again.
     175             248 :                 Duration::from_secs(10)
     176                 :             } else {
     177                 :                 // Run compaction
     178         1040272 :                 if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
     179 LBC         (1) :                     error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
     180             (1) :                     wait_duration
     181                 :                 } else {
     182 CBC         233 :                     period
     183                 :                 }
     184                 :             };
     185                 : 
     186             481 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
     187             481 : 
     188             481 :             // Sleep
     189             481 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     190             310 :                 .await
     191             310 :                 .is_ok()
     192                 :             {
     193             125 :                 break;
     194             185 :             }
     195                 :         }
     196             222 :     }
     197         1040843 :     .await;
     198             222 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     199             222 : }
     200                 : 
     201                 : ///
     202                 : /// GC task's main loop
     203                 : ///
     204             616 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     205             616 :     let wait_duration = Duration::from_secs(2);
     206             616 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     207             616 :     async {
     208             616 :         // GC might require downloading, to find the cutoff LSN that corresponds to the
     209             616 :         // cutoff specified as time.
     210             616 :         let ctx =
     211             616 :             RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     212             616 :         let mut first = true;
     213                 :         loop {
     214             737 :             tokio::select! {
     215                 :                 _ = cancel.cancelled() => {
     216                 :                     return;
     217                 :                 },
     218             733 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     219                 :                     ControlFlow::Break(()) => return,
     220                 :                     ControlFlow::Continue(()) => (),
     221                 :                 },
     222                 :             }
     223                 : 
     224             733 :             let period = tenant.get_gc_period();
     225             733 : 
     226             733 :             if first {
     227             612 :                 first = false;
     228             612 :                 if random_init_delay(period, &cancel).await.is_err() {
     229             119 :                     break;
     230             167 :                 }
     231             121 :             }
     232                 : 
     233             288 :             let started_at = Instant::now();
     234             288 : 
     235             288 :             let gc_horizon = tenant.get_gc_horizon();
     236             288 :             let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
     237                 :                 #[cfg(not(feature = "testing"))]
     238                 :                 info!("automatic GC is disabled");
     239                 :                 // check again in 10 seconds, in case it's been enabled again.
     240             256 :                 Duration::from_secs(10)
     241                 :             } else {
     242                 :                 // Run gc
     243              32 :                 let res = tenant
     244              32 :                     .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
     245          336749 :                     .await;
     246              27 :                 if let Err(e) = res {
     247 UBC           0 :                     error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
     248               0 :                     wait_duration
     249                 :                 } else {
     250 CBC          27 :                     period
     251                 :                 }
     252                 :             };
     253                 : 
     254             283 :             warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
     255             283 : 
     256             283 :             // Sleep
     257             283 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     258             220 :                 .await
     259             220 :                 .is_ok()
     260                 :             {
     261              99 :                 break;
     262             121 :             }
     263                 :         }
     264             222 :     }
     265          337113 :     .await;
     266             222 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     267             222 : }
     268                 : 
     269            1538 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
     270            1536 :     // if the tenant has a proper status already, no need to wait for anything
     271            1536 :     if tenant.current_state() == TenantState::Active {
     272            1495 :         ControlFlow::Continue(())
     273                 :     } else {
     274              41 :         let mut tenant_state_updates = tenant.subscribe_for_state_updates();
     275                 :         loop {
     276              41 :             match tenant_state_updates.changed().await {
     277                 :                 Ok(()) => {
     278              35 :                     let new_state = &*tenant_state_updates.borrow();
     279              35 :                     match new_state {
     280                 :                         TenantState::Active => {
     281 UBC           0 :                             debug!("Tenant state changed to active, continuing the task loop");
     282 CBC          35 :                             return ControlFlow::Continue(());
     283                 :                         }
     284 LBC         (1) :                         state => {
     285 UBC           0 :                             debug!("Not running the task loop, tenant is not active: {state:?}");
     286 LBC         (1) :                             continue;
     287                 :                         }
     288                 :                     }
     289                 :                 }
     290 UBC           0 :                 Err(_sender_dropped_error) => {
     291               0 :                     return ControlFlow::Break(());
     292                 :                 }
     293                 :             }
     294                 :         }
     295                 :     }
     296 CBC        1530 : }
     297                 : 
     298 UBC           0 : #[derive(thiserror::Error, Debug)]
     299                 : #[error("cancelled")]
     300                 : pub(crate) struct Cancelled;
     301                 : 
     302                 : /// Provide a random delay for background task initialization.
     303                 : ///
     304                 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     305                 : /// different periods for more stable load.
     306 CBC        2254 : pub(crate) async fn random_init_delay(
     307            2254 :     period: Duration,
     308            2254 :     cancel: &CancellationToken,
     309            2254 : ) -> Result<(), Cancelled> {
     310            2254 :     use rand::Rng;
     311            2254 : 
     312            2254 :     if period == Duration::ZERO {
     313             318 :         return Ok(());
     314            1936 :     }
     315            1936 : 
     316            1936 :     let d = {
     317            1936 :         let mut rng = rand::thread_rng();
     318            1936 :         rng.gen_range(Duration::ZERO..=period)
     319            1936 :     };
     320            1936 : 
     321            1936 :     match tokio::time::timeout(d, cancel.cancelled()).await {
     322             466 :         Ok(_) => Err(Cancelled),
     323             647 :         Err(_) => Ok(()),
     324                 :     }
     325            1431 : }
     326                 : 
     327                 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
     328             825 : pub(crate) fn warn_when_period_overrun(
     329             825 :     elapsed: Duration,
     330             825 :     period: Duration,
     331             825 :     task: BackgroundLoopKind,
     332             825 : ) {
     333             825 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     334             825 :     if elapsed >= period && period != Duration::ZERO {
     335                 :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     336                 :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     337                 :         // though there's no way to output the actual config value.
     338              17 :         warn!(
     339              17 :             ?elapsed,
     340              17 :             period = %humantime::format_duration(period),
     341              17 :             ?task,
     342              17 :             "task iteration took longer than the configured period"
     343              17 :         );
     344              17 :         crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     345              17 :             .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
     346              17 :             .inc();
     347             808 :     }
     348             825 : }
        

Generated by: LCOV version 2.1-beta