LCOV - code coverage report
Current view: top level - pageserver/src/tenant - tasks.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 7.4 % 406 30
Test Date: 2024-09-24 13:57:57 Functions: 11.1 % 36 4

            Line data    Source code
       1              : //! This module contains functions to serve per-tenant background processes,
       2              : //! such as compaction and GC
       3              : 
       4              : use std::ops::ControlFlow;
       5              : use std::str::FromStr;
       6              : use std::sync::Arc;
       7              : use std::time::{Duration, Instant};
       8              : 
       9              : use crate::context::{DownloadBehavior, RequestContext};
      10              : use crate::metrics::TENANT_TASK_EVENTS;
      11              : use crate::task_mgr;
      12              : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
      13              : use crate::tenant::throttle::Stats;
      14              : use crate::tenant::timeline::CompactionError;
      15              : use crate::tenant::{Tenant, TenantState};
      16              : use rand::Rng;
      17              : use tokio_util::sync::CancellationToken;
      18              : use tracing::*;
      19              : use utils::{backoff, completion, pausable_failpoint};
      20              : 
      21              : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
      22           60 :     once_cell::sync::Lazy::new(|| {
      23           60 :         let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
      24           60 :         let permits = usize::max(
      25           60 :             1,
      26           60 :             // while a lot of the work is done on spawn_blocking, we still do
      27           60 :             // repartitioning in the async context. this should give leave us some workers
      28           60 :             // unblocked to be blocked on other work, hopefully easing any outside visible
      29           60 :             // effects of restarts.
      30           60 :             //
      31           60 :             // 6/8 is a guess; previously we ran with unlimited 8 and more from
      32           60 :             // spawn_blocking.
      33           60 :             (total_threads * 3).checked_div(4).unwrap_or(0),
      34           60 :         );
      35           60 :         assert_ne!(permits, 0, "we will not be adding in permits later");
      36           60 :         assert!(
      37           60 :             permits < total_threads,
      38            0 :             "need threads avail for shorter work"
      39              :         );
      40           60 :         tokio::sync::Semaphore::new(permits)
      41           60 :     });
      42              : 
      43         1080 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr, enum_map::Enum)]
      44              : #[strum(serialize_all = "snake_case")]
      45              : pub(crate) enum BackgroundLoopKind {
      46              :     Compaction,
      47              :     Gc,
      48              :     Eviction,
      49              :     IngestHouseKeeping,
      50              :     ConsumptionMetricsCollectMetrics,
      51              :     ConsumptionMetricsSyntheticSizeWorker,
      52              :     InitialLogicalSizeCalculation,
      53              :     HeatmapUpload,
      54              :     SecondaryDownload,
      55              : }
      56              : 
      57              : impl BackgroundLoopKind {
      58            0 :     fn as_static_str(&self) -> &'static str {
      59            0 :         self.into()
      60            0 :     }
      61              : }
      62              : 
      63              : /// Cancellation safe.
      64         1092 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
      65         1092 :     loop_kind: BackgroundLoopKind,
      66         1092 :     _ctx: &RequestContext,
      67         1092 : ) -> tokio::sync::SemaphorePermit<'static> {
      68         1092 :     let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.measure_acquisition(loop_kind);
      69         1092 : 
      70         1092 :     pausable_failpoint!(
      71              :         "initial-size-calculation-permit-pause",
      72         1092 :         loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
      73              :     );
      74              : 
      75              :     // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
      76         1092 :     match CONCURRENT_BACKGROUND_TASKS.acquire().await {
      77         1092 :         Ok(permit) => permit,
      78            0 :         Err(_closed) => unreachable!("we never close the semaphore"),
      79              :     }
      80         1092 : }
      81              : 
      82              : /// Start per tenant background loops: compaction and gc.
      83            0 : pub fn start_background_loops(
      84            0 :     tenant: &Arc<Tenant>,
      85            0 :     background_jobs_can_start: Option<&completion::Barrier>,
      86            0 : ) {
      87            0 :     let tenant_shard_id = tenant.tenant_shard_id;
      88            0 :     task_mgr::spawn(
      89            0 :         BACKGROUND_RUNTIME.handle(),
      90            0 :         TaskKind::Compaction,
      91            0 :         tenant_shard_id,
      92            0 :         None,
      93            0 :         &format!("compactor for tenant {tenant_shard_id}"),
      94            0 :         {
      95            0 :             let tenant = Arc::clone(tenant);
      96            0 :             let background_jobs_can_start = background_jobs_can_start.cloned();
      97            0 :             async move {
      98            0 :                 let cancel = task_mgr::shutdown_token();
      99            0 :                 tokio::select! {
     100            0 :                     _ = cancel.cancelled() => { return Ok(()) },
     101            0 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     102            0 :                 };
     103            0 :                 compaction_loop(tenant, cancel)
     104            0 :                     // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
     105            0 :                     .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     106            0 :                     .await;
     107            0 :                 Ok(())
     108            0 :             }
     109            0 :         },
     110            0 :     );
     111            0 :     task_mgr::spawn(
     112            0 :         BACKGROUND_RUNTIME.handle(),
     113            0 :         TaskKind::GarbageCollector,
     114            0 :         tenant_shard_id,
     115            0 :         None,
     116            0 :         &format!("garbage collector for tenant {tenant_shard_id}"),
     117            0 :         {
     118            0 :             let tenant = Arc::clone(tenant);
     119            0 :             let background_jobs_can_start = background_jobs_can_start.cloned();
     120            0 :             async move {
     121            0 :                 let cancel = task_mgr::shutdown_token();
     122            0 :                 tokio::select! {
     123            0 :                     _ = cancel.cancelled() => { return Ok(()) },
     124            0 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     125            0 :                 };
     126            0 :                 gc_loop(tenant, cancel)
     127            0 :                     .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     128            0 :                     .await;
     129            0 :                 Ok(())
     130            0 :             }
     131            0 :         },
     132            0 :     );
     133            0 : 
     134            0 :     task_mgr::spawn(
     135            0 :         BACKGROUND_RUNTIME.handle(),
     136            0 :         TaskKind::IngestHousekeeping,
     137            0 :         tenant_shard_id,
     138            0 :         None,
     139            0 :         &format!("ingest housekeeping for tenant {tenant_shard_id}"),
     140            0 :         {
     141            0 :             let tenant = Arc::clone(tenant);
     142            0 :             let background_jobs_can_start = background_jobs_can_start.cloned();
     143            0 :             async move {
     144            0 :                 let cancel = task_mgr::shutdown_token();
     145            0 :                 tokio::select! {
     146            0 :                     _ = cancel.cancelled() => { return Ok(()) },
     147            0 :                     _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
     148            0 :                 };
     149            0 :                 ingest_housekeeping_loop(tenant, cancel)
     150            0 :                     .instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
     151            0 :                     .await;
     152            0 :                 Ok(())
     153            0 :             }
     154            0 :         },
     155            0 :     );
     156            0 : }
     157              : 
     158              : ///
     159              : /// Compaction task's main loop
     160              : ///
     161            0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     162              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     163              :     // How many errors we have seen consequtively
     164            0 :     let mut error_run_count = 0;
     165            0 : 
     166            0 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     167            0 :     async {
     168            0 :         let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
     169            0 :         let mut first = true;
     170              :         loop {
     171            0 :             tokio::select! {
     172            0 :                 _ = cancel.cancelled() => {
     173            0 :                     return;
     174              :                 },
     175            0 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     176            0 :                     ControlFlow::Break(()) => return,
     177            0 :                     ControlFlow::Continue(()) => (),
     178            0 :                 },
     179            0 :             }
     180            0 : 
     181            0 :             let period = tenant.get_compaction_period();
     182            0 : 
     183            0 :             // TODO: we shouldn't need to await to find tenant and this could be moved outside of
     184            0 :             // loop, #3501. There are also additional "allowed_errors" in tests.
     185            0 :             if first {
     186            0 :                 first = false;
     187            0 :                 if random_init_delay(period, &cancel).await.is_err() {
     188            0 :                     break;
     189            0 :                 }
     190            0 :             }
     191              : 
     192              :             let sleep_duration;
     193            0 :             if period == Duration::ZERO {
     194              :                 #[cfg(not(feature = "testing"))]
     195              :                 info!("automatic compaction is disabled");
     196              :                 // check again in 10 seconds, in case it's been enabled again.
     197            0 :                 sleep_duration = Duration::from_secs(10)
     198              :             } else {
     199            0 :                 let iteration = Iteration {
     200            0 :                     started_at: Instant::now(),
     201            0 :                     period,
     202            0 :                     kind: BackgroundLoopKind::Compaction,
     203            0 :                 };
     204              : 
     205              :                 // Run compaction
     206            0 :                 let IterationResult { output, elapsed } = iteration
     207            0 :                     .run(tenant.compaction_iteration(&cancel, &ctx))
     208            0 :                     .await;
     209            0 :                 match output {
     210            0 :                     Ok(has_pending_task) => {
     211            0 :                         error_run_count = 0;
     212            0 :                         // schedule the next compaction immediately in case there is a pending compaction task
     213            0 :                         sleep_duration = if has_pending_task {
     214            0 :                             Duration::ZERO
     215              :                         } else {
     216            0 :                             period
     217              :                         };
     218              :                     }
     219            0 :                     Err(e) => {
     220            0 :                         let wait_duration = backoff::exponential_backoff_duration_seconds(
     221            0 :                             error_run_count + 1,
     222            0 :                             1.0,
     223            0 :                             MAX_BACKOFF_SECS,
     224            0 :                         );
     225            0 :                         error_run_count += 1;
     226            0 :                         let wait_duration = Duration::from_secs_f64(wait_duration);
     227            0 :                         log_compaction_error(
     228            0 :                             &e,
     229            0 :                             error_run_count,
     230            0 :                             &wait_duration,
     231            0 :                             cancel.is_cancelled(),
     232            0 :                         );
     233            0 :                         sleep_duration = wait_duration;
     234            0 :                     }
     235              :                 }
     236              : 
     237              :                 // the duration is recorded by performance tests by enabling debug in this function
     238            0 :                 tracing::debug!(
     239            0 :                     elapsed_ms = elapsed.as_millis(),
     240            0 :                     "compaction iteration complete"
     241              :                 );
     242              :             };
     243              : 
     244              :             // Perhaps we did no work and the walredo process has been idle for some time:
     245              :             // give it a chance to shut down to avoid leaving walredo process running indefinitely.
     246              :             // TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
     247              :             // so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
     248            0 :             if let Some(walredo_mgr) = &tenant.walredo_mgr {
     249            0 :                 walredo_mgr.maybe_quiesce(period * 10);
     250            0 :             }
     251              : 
     252              :             // Sleep
     253            0 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     254            0 :                 .await
     255            0 :                 .is_ok()
     256              :             {
     257            0 :                 break;
     258            0 :             }
     259              :         }
     260            0 :     }
     261            0 :     .await;
     262            0 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     263            0 : }
     264              : 
     265            0 : fn log_compaction_error(
     266            0 :     e: &CompactionError,
     267            0 :     error_run_count: u32,
     268            0 :     sleep_duration: &std::time::Duration,
     269            0 :     task_cancelled: bool,
     270            0 : ) {
     271              :     use crate::tenant::upload_queue::NotInitialized;
     272              :     use crate::tenant::PageReconstructError;
     273              :     use CompactionError::*;
     274              : 
     275              :     enum LooksLike {
     276              :         Info,
     277              :         Error,
     278              :     }
     279              : 
     280            0 :     let decision = match e {
     281            0 :         ShuttingDown => None,
     282            0 :         _ if task_cancelled => Some(LooksLike::Info),
     283            0 :         Other(e) => {
     284            0 :             let root_cause = e.root_cause();
     285              : 
     286            0 :             let is_stopping = {
     287            0 :                 let upload_queue = root_cause
     288            0 :                     .downcast_ref::<NotInitialized>()
     289            0 :                     .is_some_and(|e| e.is_stopping());
     290            0 : 
     291            0 :                 let timeline = root_cause
     292            0 :                     .downcast_ref::<PageReconstructError>()
     293            0 :                     .is_some_and(|e| e.is_stopping());
     294            0 : 
     295            0 :                 upload_queue || timeline
     296              :             };
     297              : 
     298            0 :             if is_stopping {
     299            0 :                 Some(LooksLike::Info)
     300              :             } else {
     301            0 :                 Some(LooksLike::Error)
     302              :             }
     303              :         }
     304              :     };
     305              : 
     306            0 :     match decision {
     307            0 :         Some(LooksLike::Info) => info!(
     308            0 :             "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
     309              :         ),
     310            0 :         Some(LooksLike::Error) => error!(
     311            0 :             "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
     312              :         ),
     313            0 :         None => {}
     314              :     }
     315            0 : }
     316              : 
     317              : ///
     318              : /// GC task's main loop
     319              : ///
     320            0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     321              :     const MAX_BACKOFF_SECS: f64 = 300.0;
     322              :     // How many errors we have seen consequtively
     323            0 :     let mut error_run_count = 0;
     324            0 : 
     325            0 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     326            0 :     async {
     327            0 :         // GC might require downloading, to find the cutoff LSN that corresponds to the
     328            0 :         // cutoff specified as time.
     329            0 :         let ctx =
     330            0 :             RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     331            0 : 
     332            0 :         let mut first = true;
     333            0 :         tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
     334              :         loop {
     335            0 :             tokio::select! {
     336            0 :                 _ = cancel.cancelled() => {
     337            0 :                     return;
     338              :                 },
     339            0 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     340            0 :                     ControlFlow::Break(()) => return,
     341            0 :                     ControlFlow::Continue(()) => (),
     342            0 :                 },
     343            0 :             }
     344            0 : 
     345            0 :             let period = tenant.get_gc_period();
     346            0 : 
     347            0 :             if first {
     348            0 :                 first = false;
     349            0 : 
     350            0 :                 let delays = async {
     351            0 :                     random_init_delay(period, &cancel).await?;
     352            0 :                     Ok::<_, Cancelled>(())
     353            0 :                 };
     354              : 
     355            0 :                 if delays.await.is_err() {
     356            0 :                     break;
     357            0 :                 }
     358            0 :             }
     359              : 
     360            0 :             let gc_horizon = tenant.get_gc_horizon();
     361            0 :             let sleep_duration;
     362            0 :             if period == Duration::ZERO || gc_horizon == 0 {
     363            0 :                 #[cfg(not(feature = "testing"))]
     364            0 :                 info!("automatic GC is disabled");
     365            0 :                 // check again in 10 seconds, in case it's been enabled again.
     366            0 :                 sleep_duration = Duration::from_secs(10);
     367            0 :             } else {
     368            0 :                 let iteration = Iteration {
     369            0 :                     started_at: Instant::now(),
     370            0 :                     period,
     371            0 :                     kind: BackgroundLoopKind::Gc,
     372            0 :                 };
     373              :                 // Run gc
     374            0 :                 let IterationResult { output, elapsed: _ } =
     375            0 :                     iteration.run(tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx))
     376            0 :                     .await;
     377            0 :                 match output {
     378            0 :                     Ok(_) => {
     379            0 :                         error_run_count = 0;
     380            0 :                         sleep_duration = period;
     381            0 :                     }
     382              :                     Err(crate::tenant::GcError::TenantCancelled) => {
     383            0 :                         return;
     384              :                     }
     385            0 :                     Err(e) => {
     386            0 :                         let wait_duration = backoff::exponential_backoff_duration_seconds(
     387            0 :                             error_run_count + 1,
     388            0 :                             1.0,
     389            0 :                             MAX_BACKOFF_SECS,
     390            0 :                         );
     391            0 :                         error_run_count += 1;
     392            0 :                         let wait_duration = Duration::from_secs_f64(wait_duration);
     393              : 
     394            0 :                         if matches!(e, crate::tenant::GcError::TimelineCancelled) {
     395              :                             // Timeline was cancelled during gc. We might either be in an event
     396              :                             // that affects the entire tenant (tenant deletion, pageserver shutdown),
     397              :                             // or in one that affects the timeline only (timeline deletion).
     398              :                             // Therefore, don't exit the loop.
     399            0 :                             info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
     400              :                         } else {
     401            0 :                             error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
     402              :                         }
     403              : 
     404            0 :                         sleep_duration = wait_duration;
     405              :                     }
     406              :                 }
     407              :             };
     408              : 
     409            0 :             if tokio::time::timeout(sleep_duration, cancel.cancelled())
     410            0 :                 .await
     411            0 :                 .is_ok()
     412              :             {
     413            0 :                 break;
     414            0 :             }
     415              :         }
     416            0 :     }
     417            0 :     .await;
     418            0 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     419            0 : }
     420              : 
     421            0 : async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
     422            0 :     TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
     423            0 :     async {
     424            0 :     let mut last_throttle_flag_reset_at = Instant::now();
     425              :         loop {
     426            0 :             tokio::select! {
     427            0 :                 _ = cancel.cancelled() => {
     428            0 :                     return;
     429              :                 },
     430            0 :                 tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
     431            0 :                     ControlFlow::Break(()) => return,
     432            0 :                     ControlFlow::Continue(()) => (),
     433            0 :                 },
     434            0 :             }
     435            0 : 
     436            0 :             // We run ingest housekeeping with the same frequency as compaction: it is not worth
     437            0 :             // having a distinct setting.  But we don't run it in the same task, because compaction
     438            0 :             // blocks on acquiring the background job semaphore.
     439            0 :             let period = tenant.get_compaction_period();
     440              : 
     441              :             // If compaction period is set to zero (to disable it), then we will use a reasonable default
     442            0 :             let period = if period == Duration::ZERO {
     443            0 :                 humantime::Duration::from_str(
     444            0 :                     pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
     445            0 :                 )
     446            0 :                 .unwrap()
     447            0 :                 .into()
     448              :             } else {
     449            0 :                 period
     450              :             };
     451              : 
     452              :             // Jitter the period by +/- 5%
     453            0 :             let period =
     454            0 :                 rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100);
     455            0 : 
     456            0 :             // Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
     457            0 :             // a tenant, since it won't have started writing any ephemeral files yet.
     458            0 :             if tokio::time::timeout(period, cancel.cancelled())
     459            0 :                 .await
     460            0 :                 .is_ok()
     461              :             {
     462            0 :                 break;
     463            0 :             }
     464            0 : 
     465            0 :             let iteration = Iteration {
     466            0 :                 started_at: Instant::now(),
     467            0 :                 period,
     468            0 :                 kind: BackgroundLoopKind::IngestHouseKeeping,
     469            0 :             };
     470            0 :             iteration.run(tenant.ingest_housekeeping()).await;
     471              : 
     472              :             // TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
     473              :             // Or just spawn another background loop for this throttle, it's not like it's super costly.
     474            0 :             info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
     475            0 :                 let now = Instant::now();
     476            0 :                 let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
     477            0 :                 let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.timeline_get_throttle.reset_stats();
     478            0 :                 if count_throttled == 0 {
     479            0 :                     return;
     480            0 :                 }
     481            0 :                 let allowed_rps = tenant.timeline_get_throttle.steady_rps();
     482            0 :                 let delta = now - prev;
     483            0 :                 info!(
     484            0 :                     n_seconds=%format_args!("{:.3}",
     485            0 :                     delta.as_secs_f64()),
     486              :                     count_accounted = count_accounted_finish,  // don't break existing log scraping
     487              :                     count_throttled,
     488              :                     sum_throttled_usecs,
     489              :                     count_accounted_start, // log after pre-existing fields to not break existing log scraping
     490            0 :                     allowed_rps=%format_args!("{allowed_rps:.0}"),
     491            0 :                     "shard was throttled in the last n_seconds"
     492              :                 );
     493            0 :             });
     494              :         }
     495            0 :     }
     496            0 :     .await;
     497            0 :     TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
     498            0 : }
     499              : 
     500            0 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
     501            0 :     // if the tenant has a proper status already, no need to wait for anything
     502            0 :     if tenant.current_state() == TenantState::Active {
     503            0 :         ControlFlow::Continue(())
     504              :     } else {
     505            0 :         let mut tenant_state_updates = tenant.subscribe_for_state_updates();
     506              :         loop {
     507            0 :             match tenant_state_updates.changed().await {
     508              :                 Ok(()) => {
     509            0 :                     let new_state = &*tenant_state_updates.borrow();
     510            0 :                     match new_state {
     511              :                         TenantState::Active => {
     512            0 :                             debug!("Tenant state changed to active, continuing the task loop");
     513            0 :                             return ControlFlow::Continue(());
     514              :                         }
     515            0 :                         state => {
     516            0 :                             debug!("Not running the task loop, tenant is not active: {state:?}");
     517            0 :                             continue;
     518              :                         }
     519              :                     }
     520              :                 }
     521            0 :                 Err(_sender_dropped_error) => {
     522            0 :                     return ControlFlow::Break(());
     523              :                 }
     524              :             }
     525              :         }
     526              :     }
     527            0 : }
     528              : 
     529            0 : #[derive(thiserror::Error, Debug)]
     530              : #[error("cancelled")]
     531              : pub(crate) struct Cancelled;
     532              : 
     533              : /// Provide a random delay for background task initialization.
     534              : ///
     535              : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
     536              : /// different periods for more stable load.
     537            0 : pub(crate) async fn random_init_delay(
     538            0 :     period: Duration,
     539            0 :     cancel: &CancellationToken,
     540            0 : ) -> Result<(), Cancelled> {
     541            0 :     if period == Duration::ZERO {
     542            0 :         return Ok(());
     543            0 :     }
     544            0 : 
     545            0 :     let d = {
     546            0 :         let mut rng = rand::thread_rng();
     547            0 :         rng.gen_range(Duration::ZERO..=period)
     548            0 :     };
     549            0 :     match tokio::time::timeout(d, cancel.cancelled()).await {
     550            0 :         Ok(_) => Err(Cancelled),
     551            0 :         Err(_) => Ok(()),
     552              :     }
     553            0 : }
     554              : 
     555              : struct Iteration {
     556              :     started_at: Instant,
     557              :     period: Duration,
     558              :     kind: BackgroundLoopKind,
     559              : }
     560              : 
     561              : struct IterationResult<O> {
     562              :     output: O,
     563              :     elapsed: Duration,
     564              : }
     565              : 
     566              : impl Iteration {
     567            0 :     #[instrument(skip_all)]
     568              :     pub(crate) async fn run<Fut, O>(self, fut: Fut) -> IterationResult<O>
     569              :     where
     570              :         Fut: std::future::Future<Output = O>,
     571              :     {
     572              :         let Self {
     573              :             started_at,
     574              :             period,
     575              :             kind,
     576              :         } = self;
     577              : 
     578              :         let mut fut = std::pin::pin!(fut);
     579              : 
     580              :         // Wrap `fut` into a future that logs a message every `period` so that we get a
     581              :         // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
     582            0 :         let liveness_logger = async move {
     583              :             loop {
     584            0 :                 match tokio::time::timeout(period, &mut fut).await {
     585            0 :                     Ok(x) => return x,
     586              :                     Err(_) => {
     587              :                         // info level as per the same rationale why warn_when_period_overrun is info
     588              :                         // =>  https://github.com/neondatabase/neon/pull/5724
     589            0 :                         info!("still running");
     590              :                     }
     591              :                 }
     592              :             }
     593            0 :         };
     594              : 
     595              :         let output = liveness_logger.await;
     596              : 
     597              :         let elapsed = started_at.elapsed();
     598              :         warn_when_period_overrun(elapsed, period, kind);
     599              : 
     600              :         IterationResult { output, elapsed }
     601              :     }
     602              : }
     603              : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
     604            0 : pub(crate) fn warn_when_period_overrun(
     605            0 :     elapsed: Duration,
     606            0 :     period: Duration,
     607            0 :     task: BackgroundLoopKind,
     608            0 : ) {
     609            0 :     // Duration::ZERO will happen because it's the "disable [bgtask]" value.
     610            0 :     if elapsed >= period && period != Duration::ZERO {
     611              :         // humantime does no significant digits clamping whereas Duration's debug is a bit more
     612              :         // intelligent. however it makes sense to keep the "configuration format" for period, even
     613              :         // though there's no way to output the actual config value.
     614            0 :         info!(
     615              :             ?elapsed,
     616            0 :             period = %humantime::format_duration(period),
     617            0 :             ?task,
     618            0 :             "task iteration took longer than the configured period"
     619              :         );
     620            0 :         crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
     621            0 :             .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
     622            0 :             .inc();
     623            0 :     }
     624            0 : }
        

Generated by: LCOV version 2.1-beta