Line data Source code
1 : //! This module contains per-tenant background processes, e.g. compaction and GC.
2 :
3 : use std::cmp::max;
4 : use std::future::Future;
5 : use std::ops::{ControlFlow, RangeInclusive};
6 : use std::pin::pin;
7 : use std::sync::Arc;
8 : use std::time::{Duration, Instant};
9 :
10 : use once_cell::sync::Lazy;
11 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
12 : use rand::Rng;
13 : use scopeguard::defer;
14 : use tokio::sync::{Semaphore, SemaphorePermit};
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::*;
17 : use utils::backoff::exponential_backoff_duration;
18 : use utils::completion::Barrier;
19 : use utils::pausable_failpoint;
20 :
21 : use crate::context::{DownloadBehavior, RequestContext};
22 : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
23 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
24 : use crate::tenant::throttle::Stats;
25 : use crate::tenant::timeline::CompactionError;
26 : use crate::tenant::timeline::compaction::CompactionOutcome;
27 : use crate::tenant::{Tenant, TenantState};
28 :
29 : /// Semaphore limiting concurrent background tasks (across all tenants).
30 : ///
31 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
32 40 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
33 40 : let total_threads = TOKIO_WORKER_THREADS.get();
34 40 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
35 40 : assert_ne!(permits, 0, "we will not be adding in permits later");
36 40 : assert!(permits < total_threads, "need threads for other work");
37 40 : Semaphore::new(permits)
38 40 : });
39 :
40 : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
41 : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
42 : ///
43 : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
44 : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
45 : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
46 : ///
47 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
48 : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
49 : /// thread pool.
50 0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
51 0 : let total_threads = TOKIO_WORKER_THREADS.get();
52 0 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
53 0 : assert_ne!(permits, 0, "we will not be adding in permits later");
54 0 : assert!(permits < total_threads, "need threads for other work");
55 0 : Semaphore::new(permits)
56 0 : });
57 :
58 : /// Background jobs.
59 : ///
60 : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
61 : /// do any significant IO or CPU work.
62 : #[derive(
63 : Debug,
64 : PartialEq,
65 : Eq,
66 : Clone,
67 : Copy,
68 : strum_macros::IntoStaticStr,
69 : strum_macros::Display,
70 : enum_map::Enum,
71 : )]
72 : #[strum(serialize_all = "snake_case")]
73 : pub(crate) enum BackgroundLoopKind {
74 : /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
75 : /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
76 : L0Compaction,
77 : Compaction,
78 : Gc,
79 : Eviction,
80 : TenantHouseKeeping,
81 : ConsumptionMetricsCollectMetrics,
82 : ConsumptionMetricsSyntheticSizeWorker,
83 : InitialLogicalSizeCalculation,
84 : HeatmapUpload,
85 : SecondaryDownload,
86 : }
87 :
88 : pub struct BackgroundLoopSemaphorePermit<'a> {
89 : _permit: SemaphorePermit<'static>,
90 : _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
91 : }
92 :
93 : /// Acquires a semaphore permit, to limit concurrent background jobs.
94 728 : pub(crate) async fn acquire_concurrency_permit(
95 728 : loop_kind: BackgroundLoopKind,
96 728 : _ctx: &RequestContext,
97 728 : ) -> BackgroundLoopSemaphorePermit<'static> {
98 728 : let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
99 728 :
100 728 : if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
101 0 : pausable_failpoint!("initial-size-calculation-permit-pause");
102 728 : }
103 :
104 : // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
105 728 : let semaphore = match loop_kind {
106 0 : BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
107 728 : _ => &CONCURRENT_BACKGROUND_TASKS,
108 : };
109 728 : let permit = semaphore.acquire().await.expect("should never close");
110 728 :
111 728 : recorder.acquired();
112 728 :
113 728 : BackgroundLoopSemaphorePermit {
114 728 : _permit: permit,
115 728 : _recorder: recorder,
116 728 : }
117 728 : }
118 :
119 : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
120 0 : pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
121 0 : let tenant_shard_id = tenant.tenant_shard_id;
122 0 :
123 0 : task_mgr::spawn(
124 0 : BACKGROUND_RUNTIME.handle(),
125 0 : TaskKind::Compaction,
126 0 : tenant_shard_id,
127 0 : None,
128 0 : &format!("compactor for tenant {tenant_shard_id}"),
129 0 : {
130 0 : let tenant = Arc::clone(tenant);
131 0 : let can_start = can_start.cloned();
132 0 : async move {
133 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
134 0 : tokio::select! {
135 0 : _ = cancel.cancelled() => return Ok(()),
136 0 : _ = Barrier::maybe_wait(can_start) => {}
137 0 : };
138 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
139 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
140 0 : compaction_loop(tenant, cancel)
141 0 : // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
142 0 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
143 0 : .await;
144 0 : Ok(())
145 0 : }
146 0 : },
147 0 : );
148 0 :
149 0 : task_mgr::spawn(
150 0 : BACKGROUND_RUNTIME.handle(),
151 0 : TaskKind::GarbageCollector,
152 0 : tenant_shard_id,
153 0 : None,
154 0 : &format!("garbage collector for tenant {tenant_shard_id}"),
155 0 : {
156 0 : let tenant = Arc::clone(tenant);
157 0 : let can_start = can_start.cloned();
158 0 : async move {
159 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
160 0 : tokio::select! {
161 0 : _ = cancel.cancelled() => return Ok(()),
162 0 : _ = Barrier::maybe_wait(can_start) => {}
163 0 : };
164 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
165 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
166 0 : gc_loop(tenant, cancel)
167 0 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
168 0 : .await;
169 0 : Ok(())
170 0 : }
171 0 : },
172 0 : );
173 0 :
174 0 : task_mgr::spawn(
175 0 : BACKGROUND_RUNTIME.handle(),
176 0 : TaskKind::TenantHousekeeping,
177 0 : tenant_shard_id,
178 0 : None,
179 0 : &format!("housekeeping for tenant {tenant_shard_id}"),
180 0 : {
181 0 : let tenant = Arc::clone(tenant);
182 0 : let can_start = can_start.cloned();
183 0 : async move {
184 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
185 0 : tokio::select! {
186 0 : _ = cancel.cancelled() => return Ok(()),
187 0 : _ = Barrier::maybe_wait(can_start) => {}
188 0 : };
189 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
190 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
191 0 : tenant_housekeeping_loop(tenant, cancel)
192 0 : .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
193 0 : .await;
194 0 : Ok(())
195 0 : }
196 0 : },
197 0 : );
198 0 : }
199 :
200 : /// Compaction task's main loop.
201 0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
202 : const BASE_BACKOFF_SECS: f64 = 1.0;
203 : const MAX_BACKOFF_SECS: f64 = 300.0;
204 : const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
205 :
206 0 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
207 0 : let mut period = tenant.get_compaction_period();
208 0 : let mut error_run = 0; // consecutive errors
209 0 :
210 0 : // Stagger the compaction loop across tenants.
211 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
212 0 : return;
213 0 : }
214 0 : if sleep_random(period, &cancel).await.is_err() {
215 0 : return;
216 0 : }
217 :
218 : loop {
219 : // Recheck that we're still active.
220 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
221 0 : return;
222 0 : }
223 0 :
224 0 : // Refresh the period. If compaction is disabled, check again in a bit.
225 0 : period = tenant.get_compaction_period();
226 0 : if period == Duration::ZERO {
227 : #[cfg(not(feature = "testing"))]
228 : info!("automatic compaction is disabled");
229 0 : tokio::select! {
230 0 : _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
231 0 : _ = cancel.cancelled() => return,
232 : }
233 0 : continue;
234 0 : }
235 0 :
236 0 : // Wait for the next compaction run.
237 0 : let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
238 0 : tokio::select! {
239 0 : _ = tokio::time::sleep(backoff), if error_run > 0 => {},
240 0 : _ = tokio::time::sleep(period), if error_run == 0 => {},
241 0 : _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
242 0 : _ = cancel.cancelled() => return,
243 : }
244 :
245 : // Run compaction.
246 0 : let iteration = Iteration {
247 0 : started_at: Instant::now(),
248 0 : period,
249 0 : kind: BackgroundLoopKind::Compaction,
250 0 : };
251 0 : let IterationResult { output, elapsed } = iteration
252 0 : .run(tenant.compaction_iteration(&cancel, &ctx))
253 0 : .await;
254 :
255 0 : match output {
256 0 : Ok(outcome) => {
257 0 : error_run = 0;
258 0 : // If there's more compaction work, L0 or not, schedule an immediate run.
259 0 : match outcome {
260 0 : CompactionOutcome::Done => {}
261 0 : CompactionOutcome::Skipped => {}
262 0 : CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
263 0 : CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
264 : }
265 : }
266 :
267 0 : Err(err) => {
268 0 : error_run += 1;
269 0 : let backoff =
270 0 : exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
271 0 : log_compaction_error(&err, error_run, backoff, cancel.is_cancelled());
272 0 : continue;
273 : }
274 : }
275 :
276 : // NB: this log entry is recorded by performance tests.
277 0 : debug!(
278 0 : elapsed_ms = elapsed.as_millis(),
279 0 : "compaction iteration complete"
280 : );
281 : }
282 0 : }
283 :
284 0 : fn log_compaction_error(
285 0 : err: &CompactionError,
286 0 : error_count: u32,
287 0 : sleep_duration: Duration,
288 0 : task_cancelled: bool,
289 0 : ) {
290 : use CompactionError::*;
291 :
292 : use crate::tenant::PageReconstructError;
293 : use crate::tenant::upload_queue::NotInitialized;
294 :
295 0 : let level = match err {
296 0 : e if e.is_cancel() => return,
297 0 : ShuttingDown => return,
298 0 : Offload(_) => Level::ERROR,
299 0 : AlreadyRunning(_) => Level::ERROR,
300 0 : CollectKeySpaceError(_) => Level::ERROR,
301 0 : _ if task_cancelled => Level::INFO,
302 0 : Other(err) => {
303 0 : let root_cause = err.root_cause();
304 0 :
305 0 : let upload_queue = root_cause
306 0 : .downcast_ref::<NotInitialized>()
307 0 : .is_some_and(|e| e.is_stopping());
308 0 : let timeline = root_cause
309 0 : .downcast_ref::<PageReconstructError>()
310 0 : .is_some_and(|e| e.is_stopping());
311 0 : let is_stopping = upload_queue || timeline;
312 :
313 0 : if is_stopping {
314 0 : Level::INFO
315 : } else {
316 0 : Level::ERROR
317 : }
318 : }
319 : };
320 :
321 0 : match level {
322 : Level::ERROR => {
323 0 : error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
324 : }
325 : Level::INFO => {
326 0 : info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
327 : }
328 0 : level => unimplemented!("unexpected level {level:?}"),
329 : }
330 0 : }
331 :
332 : /// GC task's main loop.
333 0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
334 : const MAX_BACKOFF_SECS: f64 = 300.0;
335 0 : let mut error_run = 0; // consecutive errors
336 0 :
337 0 : // GC might require downloading, to find the cutoff LSN that corresponds to the
338 0 : // cutoff specified as time.
339 0 : let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
340 0 : let mut first = true;
341 :
342 : loop {
343 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
344 0 : return;
345 0 : }
346 0 :
347 0 : let period = tenant.get_gc_period();
348 0 :
349 0 : if first {
350 0 : first = false;
351 0 : if sleep_random(period, &cancel).await.is_err() {
352 0 : break;
353 0 : }
354 0 : }
355 :
356 0 : let gc_horizon = tenant.get_gc_horizon();
357 0 : let sleep_duration;
358 0 : if period == Duration::ZERO || gc_horizon == 0 {
359 0 : #[cfg(not(feature = "testing"))]
360 0 : info!("automatic GC is disabled");
361 0 : // check again in 10 seconds, in case it's been enabled again.
362 0 : sleep_duration = Duration::from_secs(10);
363 0 : } else {
364 0 : let iteration = Iteration {
365 0 : started_at: Instant::now(),
366 0 : period,
367 0 : kind: BackgroundLoopKind::Gc,
368 0 : };
369 : // Run gc
370 0 : let IterationResult { output, elapsed: _ } = iteration
371 0 : .run(tenant.gc_iteration(
372 0 : None,
373 0 : gc_horizon,
374 0 : tenant.get_pitr_interval(),
375 0 : &cancel,
376 0 : &ctx,
377 0 : ))
378 0 : .await;
379 0 : match output {
380 0 : Ok(_) => {
381 0 : error_run = 0;
382 0 : sleep_duration = period;
383 0 : }
384 : Err(crate::tenant::GcError::TenantCancelled) => {
385 0 : return;
386 : }
387 0 : Err(e) => {
388 0 : error_run += 1;
389 0 : let wait_duration =
390 0 : exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
391 :
392 0 : if matches!(e, crate::tenant::GcError::TimelineCancelled) {
393 : // Timeline was cancelled during gc. We might either be in an event
394 : // that affects the entire tenant (tenant deletion, pageserver shutdown),
395 : // or in one that affects the timeline only (timeline deletion).
396 : // Therefore, don't exit the loop.
397 0 : info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
398 : } else {
399 0 : error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
400 : }
401 :
402 0 : sleep_duration = wait_duration;
403 : }
404 : }
405 : };
406 :
407 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
408 0 : .await
409 0 : .is_ok()
410 : {
411 0 : break;
412 0 : }
413 : }
414 0 : }
415 :
416 : /// Tenant housekeeping's main loop.
417 0 : async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
418 0 : let mut last_throttle_flag_reset_at = Instant::now();
419 : loop {
420 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
421 0 : return;
422 0 : }
423 :
424 : // Use the same period as compaction; it's not worth a separate setting. But if it's set to
425 : // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
426 0 : let period = match tenant.get_compaction_period() {
427 0 : Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
428 0 : period => period,
429 : };
430 :
431 0 : let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
432 0 : break;
433 : };
434 :
435 : // Do tenant housekeeping.
436 0 : let iteration = Iteration {
437 0 : started_at: Instant::now(),
438 0 : period,
439 0 : kind: BackgroundLoopKind::TenantHouseKeeping,
440 0 : };
441 0 : iteration.run(tenant.housekeeping()).await;
442 :
443 : // Log any getpage throttling.
444 0 : info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
445 0 : let now = Instant::now();
446 0 : let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
447 0 : let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
448 0 : if count_throttled == 0 {
449 0 : return;
450 0 : }
451 0 : let allowed_rps = tenant.pagestream_throttle.steady_rps();
452 0 : let delta = now - prev;
453 0 : info!(
454 0 : n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
455 : count_accounted = count_accounted_finish, // don't break existing log scraping
456 : count_throttled,
457 : sum_throttled_usecs,
458 : count_accounted_start, // log after pre-existing fields to not break existing log scraping
459 0 : allowed_rps=%format_args!("{allowed_rps:.0}"),
460 0 : "shard was throttled in the last n_seconds"
461 : );
462 0 : });
463 : }
464 0 : }
465 :
466 : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
467 0 : async fn wait_for_active_tenant(
468 0 : tenant: &Arc<Tenant>,
469 0 : cancel: &CancellationToken,
470 0 : ) -> ControlFlow<()> {
471 0 : if tenant.current_state() == TenantState::Active {
472 0 : return ControlFlow::Continue(());
473 0 : }
474 0 :
475 0 : let mut update_rx = tenant.subscribe_for_state_updates();
476 0 : tokio::select! {
477 0 : result = update_rx.wait_for(|s| s == &TenantState::Active) => {
478 0 : if result.is_err() {
479 0 : return ControlFlow::Break(());
480 0 : }
481 0 : debug!("Tenant state changed to active, continuing the task loop");
482 0 : ControlFlow::Continue(())
483 : },
484 0 : _ = cancel.cancelled() => ControlFlow::Break(()),
485 : }
486 0 : }
487 :
488 : #[derive(thiserror::Error, Debug)]
489 : #[error("cancelled")]
490 : pub(crate) struct Cancelled;
491 :
492 : /// Sleeps for a random interval up to the given max value.
493 : ///
494 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
495 : /// different periods for more stable load.
496 0 : pub(crate) async fn sleep_random(
497 0 : max: Duration,
498 0 : cancel: &CancellationToken,
499 0 : ) -> Result<Duration, Cancelled> {
500 0 : sleep_random_range(Duration::ZERO..=max, cancel).await
501 0 : }
502 :
503 : /// Sleeps for a random interval in the given range. Returns the duration.
504 0 : pub(crate) async fn sleep_random_range(
505 0 : interval: RangeInclusive<Duration>,
506 0 : cancel: &CancellationToken,
507 0 : ) -> Result<Duration, Cancelled> {
508 0 : let delay = rand::thread_rng().gen_range(interval);
509 0 : if delay == Duration::ZERO {
510 0 : return Ok(delay);
511 0 : }
512 0 : tokio::select! {
513 0 : _ = cancel.cancelled() => Err(Cancelled),
514 0 : _ = tokio::time::sleep(delay) => Ok(delay),
515 : }
516 0 : }
517 :
518 : /// Sleeps for an interval with a random jitter.
519 0 : pub(crate) async fn sleep_jitter(
520 0 : duration: Duration,
521 0 : jitter: Duration,
522 0 : cancel: &CancellationToken,
523 0 : ) -> Result<Duration, Cancelled> {
524 0 : let from = duration.saturating_sub(jitter);
525 0 : let to = duration.saturating_add(jitter);
526 0 : sleep_random_range(from..=to, cancel).await
527 0 : }
528 :
529 : struct Iteration {
530 : started_at: Instant,
531 : period: Duration,
532 : kind: BackgroundLoopKind,
533 : }
534 :
535 : struct IterationResult<O> {
536 : output: O,
537 : elapsed: Duration,
538 : }
539 :
540 : impl Iteration {
541 : #[instrument(skip_all)]
542 : pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
543 : let mut fut = pin!(fut);
544 :
545 : // Wrap `fut` into a future that logs a message every `period` so that we get a
546 : // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
547 : let output = loop {
548 : match tokio::time::timeout(self.period, &mut fut).await {
549 : Ok(r) => break r,
550 : Err(_) => info!("still running"),
551 : }
552 : };
553 : let elapsed = self.started_at.elapsed();
554 : warn_when_period_overrun(elapsed, self.period, self.kind);
555 :
556 : IterationResult { output, elapsed }
557 : }
558 : }
559 :
560 : // NB: the `task` and `period` are used for metrics labels.
561 0 : pub(crate) fn warn_when_period_overrun(
562 0 : elapsed: Duration,
563 0 : period: Duration,
564 0 : task: BackgroundLoopKind,
565 0 : ) {
566 0 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
567 0 : if elapsed >= period && period != Duration::ZERO {
568 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
569 : // intelligent. however it makes sense to keep the "configuration format" for period, even
570 : // though there's no way to output the actual config value.
571 0 : info!(
572 : ?elapsed,
573 0 : period = %humantime::format_duration(period),
574 0 : ?task,
575 0 : "task iteration took longer than the configured period"
576 : );
577 0 : metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
578 0 : .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
579 0 : .inc();
580 0 : }
581 0 : }
|