LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 95.1 % 184 175
Test Date: 2023-09-06 10:18:01 Functions: 77.8 % 27 21

            Line data    Source code
       1              : //! This module contains functions to serve per-tenant background processes,
       2              : //! such as compaction and GC
       3              : 
       4              : use std::ops::ControlFlow;
       5              : use std::sync::Arc;
       6              : use std::time::{Duration, Instant};
       7              : 
       8              : use crate::context::{DownloadBehavior, RequestContext};
       9              : use crate::metrics::TENANT_TASK_EVENTS;
      10              : use crate::task_mgr;
      11              : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
      12              : use crate::tenant::{Tenant, TenantState};
      13              : use tokio_util::sync::CancellationToken;
      14              : use tracing::*;
      15              : use utils::completion;
      16              : 
      17              : /// Start per tenant background loops: compaction and gc.
      18          698 : pub fn start_background_loops(
      19          698 :     tenant: &Arc<Tenant>,
      20          698 :     background_jobs_can_start: Option<&completion::Barrier>,
      21          698 : ) {
      22          698 :     let tenant_id = tenant.tenant_id;
      23          698 :     task_mgr::spawn(
      24          698 :         BACKGROUND_RUNTIME.handle(),
      25          698 :         TaskKind::Compaction,
      26          698 :         Some(tenant_id),
      27          698 :         None,
      28          698 :         &format!("compactor for tenant {tenant_id}"),
      29          698 :         false,
      30          698 :         {
      31          698 :             let tenant = Arc::clone(tenant);
      32          698 :             let background_jobs_can_start = background_jobs_can_start.cloned();
      33          698 :             async move {
      34          698 :                 let cancel = task_mgr::shutdown_token();
      35          813 :                 tokio::select! {
      36          813 :                     _ = cancel.cancelled() => { return Ok(()) },
      37          813 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
      38          813 :                 };
      39          634 :                 compaction_loop(tenant, cancel)
      40          634 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
      41       820792 :                     .await;
      42          252 :                 Ok(())
      43          698 :             }
      44          698 :         },
      45          698 :     );
      46          698 :     task_mgr::spawn(
      47          698 :         BACKGROUND_RUNTIME.handle(),
      48          698 :         TaskKind::GarbageCollector,
      49          698 :         Some(tenant_id),
      50          698 :         None,
      51          698 :         &format!("garbage collector for tenant {tenant_id}"),
      52          698 :         false,
      53          698 :         {
      54          698 :             let tenant = Arc::clone(tenant);
      55          698 :             let background_jobs_can_start = background_jobs_can_start.cloned();
      56          698 :             async move {
      57          698 :                 let cancel = task_mgr::shutdown_token();
      58          814 :                 tokio::select! {
      59          814 :                     _ = cancel.cancelled() => { return Ok(()) },
      60          814 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
      61          814 :                 };
      62          633 :                 gc_loop(tenant, cancel)
      63          633 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_id))
      64       213342 :                     .await;
      65          251 :                 Ok(())
      66          698 :             }
      67          698 :         },
      68          698 :     );
      69          698 : }
      70              : 
      71              : ///
      72              : /// Compaction task's main loop
      73              : ///
      74          634 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
      75          634 :     let wait_duration = Duration::from_secs(2);
      76          634 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
      77          634 :     async {
      78          634 :         let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
      79          634 :         let mut first = true;
      80              :         loop {
      81          850 :             tokio::select! {
      82              :                 _ = cancel.cancelled() => {
      83              :                     return;
      84              :                 },
      85          840 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
      86              :                     ControlFlow::Break(()) => return,
      87              :                     ControlFlow::Continue(()) => (),
      88              :                 },
      89              :             }
      90              : 
      91          840 :             let period = tenant.get_compaction_period();
      92          840 : 
      93          840 :             // TODO: we shouldn't need to await to find tenant and this could be moved outside of
      94          840 :             // loop, #3501. There are also additional "allowed_errors" in tests.
      95          840 :             if first {
      96          624 :                 first = false;
      97          624 :                 if random_init_delay(period, &cancel).await.is_err() {
      98           97 :                     break;
      99          318 :                 }
     100          216 :             }
     101              : 
     102          534 :             let started_at = Instant::now();
     103              : 
     104          534 :             let sleep_duration = if period == Duration::ZERO {
     105          271 :                 info!("automatic compaction is disabled");
     106              :                 // check again in 10 seconds, in case it's been enabled again.
     107          271 :                 Duration::from_secs(10)
     108              :             } else {
     109              :                 // Run compaction
     110       820178 :                 if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
     111            0 :                     error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
     112            0 :                     wait_duration
     113              :                 } else {
     114          253 :                     period
     115              :                 }
     116              :             };
     117              : 
     118          524 :             warn_when_period_overrun(started_at.elapsed(), period, "compaction");
     119          524 : 
     120          524 :             // Sleep
     121          524 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     122          361 :                 .await
     123          361 :                 .is_ok()
     124              :             {
     125          145 :                 break;
     126          216 :             }
     127              :         }
     128          252 :     }
     129       820792 :     .await;
     130          252 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     131          252 : }
     132              : 
     133              : ///
     134              : /// GC task's main loop
     135              : ///
     136          633 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     137          633 :     let wait_duration = Duration::from_secs(2);
     138          633 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     139          633 :     async {
     140          633 :         // GC might require downloading, to find the cutoff LSN that corresponds to the
     141          633 :         // cutoff specified as time.
     142          633 :         let ctx =
     143          633 :             RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     144          633 :         let mut first = true;
     145              :         loop {
     146          751 :             tokio::select! {
     147              :                 _ = cancel.cancelled() => {
     148              :                     return;
     149              :                 },
     150          743 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     151              :                     ControlFlow::Break(()) => return,
     152              :                     ControlFlow::Continue(()) => (),
     153              :                 },
     154              :             }
     155              : 
     156          743 :             let period = tenant.get_gc_period();
     157          743 : 
     158          743 :             if first {
     159          625 :                 first = false;
     160          625 :                 if random_init_delay(period, &cancel).await.is_err() {
     161          114 :                     break;
     162          198 :                 }
     163          118 :             }
     164              : 
     165          316 :             let started_at = Instant::now();
     166          316 : 
     167          316 :             let gc_horizon = tenant.get_gc_horizon();
     168          316 :             let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
     169          280 :                 info!("automatic GC is disabled");
     170              :                 // check again in 10 seconds, in case it's been enabled again.
     171          280 :                 Duration::from_secs(10)
     172              :             } else {
     173              :                 // Run gc
     174           36 :                 let res = tenant
     175           36 :                     .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
     176       212956 :                     .await;
     177           31 :                 if let Err(e) = res {
     178            0 :                     error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
     179            0 :                     wait_duration
     180              :                 } else {
     181           31 :                     period
     182              :                 }
     183              :             };
     184              : 
     185          311 :             warn_when_period_overrun(started_at.elapsed(), period, "gc");
     186          311 : 
     187          311 :             // Sleep
     188          311 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     189          247 :                 .await
     190          247 :                 .is_ok()
     191              :             {
     192          129 :                 break;
     193          118 :             }
     194              :         }
     195          251 :     }
     196       213342 :     .await;
     197          251 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     198          251 : }
     199              : 
     200         1601 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
     201         1598 :     // if the tenant has a proper status already, no need to wait for anything
     202         1598 :     if tenant.current_state() == TenantState::Active {
     203         1559 :         ControlFlow::Continue(())
     204              :     } else {
     205           39 :         let mut tenant_state_updates = tenant.subscribe_for_state_updates();
     206              :         loop {
     207           40 :             match tenant_state_updates.changed().await {
     208              :                 Ok(()) => {
     209           25 :                     let new_state = &*tenant_state_updates.borrow();
     210           25 :                     match new_state {
     211              :                         TenantState::Active => {
     212            0 :                             debug!("Tenant state changed to active, continuing the task loop");
     213           24 :                             return ControlFlow::Continue(());
     214              :                         }
     215            1 :                         state => {
     216            0 :                             debug!("Not running the task loop, tenant is not active: {state:?}");
     217            1 :                             continue;
     218              :                         }
     219              :                     }
     220              :                 }
     221            0 :                 Err(_sender_dropped_error) => {
     222            0 :                     return ControlFlow::Break(());
     223              :                 }
     224              :             }
     225              :         }
     226              :     }
     227         1583 : }
     228              : 
     229            0 : #[derive(thiserror::Error, Debug)]
     230              : #[error("cancelled")]
     231              : pub(crate) struct Cancelled;
     232              : 
     233              : /// Provide a random delay for background task initialization.
     234              : ///
     235              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     236              : /// different periods for more stable load.
     237         2349 : pub(crate) async fn random_init_delay(
     238         2349 :     period: Duration,
     239         2349 :     cancel: &CancellationToken,
     240         2349 : ) -> Result<(), Cancelled> {
     241         2349 :     use rand::Rng;
     242         2349 : 
     243         2349 :     if period == Duration::ZERO {
     244          372 :         return Ok(());
     245         1977 :     }
     246         1977 : 
     247         1977 :     let d = {
     248         1977 :         let mut rng = rand::thread_rng();
     249         1977 :         rng.gen_range(Duration::ZERO..=period)
     250         1977 :     };
     251         1977 : 
     252         1977 :     match tokio::time::timeout(d, cancel.cancelled()).await {
     253          474 :         Ok(_) => Err(Cancelled),
     254          693 :         Err(_) => Ok(()),
     255              :     }
     256         1539 : }
     257              : 
     258              : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
     259          922 : pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
     260          922 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     261          922 :     if elapsed >= period && period != Duration::ZERO {
     262              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     263              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     264              :         // though there's no way to output the actual config value.
     265            9 :         warn!(
     266            9 :             ?elapsed,
     267            9 :             period = %humantime::format_duration(period),
     268            9 :             task,
     269            9 :             "task iteration took longer than the configured period"
     270            9 :         );
     271            9 :         crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     272            9 :             .with_label_values(&[task, &format!("{}", period.as_secs())])
     273            9 :             .inc();
     274          913 :     }
     275          922 : }
        

Generated by: LCOV version 2.1-beta