Line data Source code
1 : //! This module contains per-tenant background processes, e.g. compaction and GC.
2 :
3 : use std::cmp::max;
4 : use std::future::Future;
5 : use std::ops::{ControlFlow, RangeInclusive};
6 : use std::pin::pin;
7 : use std::sync::Arc;
8 : use std::time::{Duration, Instant};
9 :
10 : use once_cell::sync::Lazy;
11 : use rand::Rng;
12 : use scopeguard::defer;
13 : use tokio::sync::{Semaphore, SemaphorePermit};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::*;
16 :
17 : use crate::context::{DownloadBehavior, RequestContext};
18 : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
19 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS};
20 : use crate::tenant::throttle::Stats;
21 : use crate::tenant::timeline::compaction::CompactionOutcome;
22 : use crate::tenant::timeline::CompactionError;
23 : use crate::tenant::{Tenant, TenantState};
24 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
25 : use utils::backoff::exponential_backoff_duration;
26 : use utils::completion::Barrier;
27 : use utils::pausable_failpoint;
28 :
29 : /// Semaphore limiting concurrent background tasks (across all tenants).
30 : ///
31 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
32 40 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
33 40 : let total_threads = TOKIO_WORKER_THREADS.get();
34 40 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
35 40 : assert_ne!(permits, 0, "we will not be adding in permits later");
36 40 : assert!(permits < total_threads, "need threads for other work");
37 40 : Semaphore::new(permits)
38 40 : });
39 :
40 : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
41 : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
42 : ///
43 : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
44 : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
45 : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
46 : ///
47 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
48 : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
49 : /// thread pool.
50 0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
51 0 : let total_threads = TOKIO_WORKER_THREADS.get();
52 0 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
53 0 : assert_ne!(permits, 0, "we will not be adding in permits later");
54 0 : assert!(permits < total_threads, "need threads for other work");
55 0 : Semaphore::new(permits)
56 0 : });
57 :
58 : /// Background jobs.
59 : ///
60 : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
61 : /// do any significant IO or CPU work.
62 : #[derive(
63 : Debug,
64 : PartialEq,
65 : Eq,
66 : Clone,
67 : Copy,
68 : strum_macros::IntoStaticStr,
69 : strum_macros::Display,
70 : enum_map::Enum,
71 : )]
72 : #[strum(serialize_all = "snake_case")]
73 : pub(crate) enum BackgroundLoopKind {
74 : /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
75 : /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
76 : L0Compaction,
77 : Compaction,
78 : Gc,
79 : Eviction,
80 : TenantHouseKeeping,
81 : ConsumptionMetricsCollectMetrics,
82 : ConsumptionMetricsSyntheticSizeWorker,
83 : InitialLogicalSizeCalculation,
84 : HeatmapUpload,
85 : SecondaryDownload,
86 : }
87 :
88 : pub struct BackgroundLoopSemaphorePermit<'a> {
89 : _permit: SemaphorePermit<'static>,
90 : _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
91 : }
92 :
93 : /// Acquires a semaphore permit, to limit concurrent background jobs.
94 728 : pub(crate) async fn acquire_concurrency_permit(
95 728 : loop_kind: BackgroundLoopKind,
96 728 : _ctx: &RequestContext,
97 728 : ) -> BackgroundLoopSemaphorePermit<'static> {
98 728 : let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
99 728 :
100 728 : if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
101 0 : pausable_failpoint!("initial-size-calculation-permit-pause");
102 728 : }
103 :
104 : // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
105 728 : let semaphore = match loop_kind {
106 0 : BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
107 728 : _ => &CONCURRENT_BACKGROUND_TASKS,
108 : };
109 728 : let permit = semaphore.acquire().await.expect("should never close");
110 728 :
111 728 : recorder.acquired();
112 728 :
113 728 : BackgroundLoopSemaphorePermit {
114 728 : _permit: permit,
115 728 : _recorder: recorder,
116 728 : }
117 728 : }
118 :
119 : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
120 0 : pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
121 0 : let tenant_shard_id = tenant.tenant_shard_id;
122 0 :
123 0 : task_mgr::spawn(
124 0 : BACKGROUND_RUNTIME.handle(),
125 0 : TaskKind::Compaction,
126 0 : tenant_shard_id,
127 0 : None,
128 0 : &format!("compactor for tenant {tenant_shard_id}"),
129 0 : {
130 0 : let tenant = Arc::clone(tenant);
131 0 : let can_start = can_start.cloned();
132 0 : async move {
133 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
134 0 : tokio::select! {
135 0 : _ = cancel.cancelled() => return Ok(()),
136 0 : _ = Barrier::maybe_wait(can_start) => {}
137 0 : };
138 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
139 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
140 0 : compaction_loop(tenant, cancel)
141 0 : // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
142 0 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
143 0 : .await;
144 0 : Ok(())
145 0 : }
146 0 : },
147 0 : );
148 0 :
149 0 : task_mgr::spawn(
150 0 : BACKGROUND_RUNTIME.handle(),
151 0 : TaskKind::GarbageCollector,
152 0 : tenant_shard_id,
153 0 : None,
154 0 : &format!("garbage collector for tenant {tenant_shard_id}"),
155 0 : {
156 0 : let tenant = Arc::clone(tenant);
157 0 : let can_start = can_start.cloned();
158 0 : async move {
159 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
160 0 : tokio::select! {
161 0 : _ = cancel.cancelled() => return Ok(()),
162 0 : _ = Barrier::maybe_wait(can_start) => {}
163 0 : };
164 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
165 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
166 0 : gc_loop(tenant, cancel)
167 0 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
168 0 : .await;
169 0 : Ok(())
170 0 : }
171 0 : },
172 0 : );
173 0 :
174 0 : task_mgr::spawn(
175 0 : BACKGROUND_RUNTIME.handle(),
176 0 : TaskKind::TenantHousekeeping,
177 0 : tenant_shard_id,
178 0 : None,
179 0 : &format!("housekeeping for tenant {tenant_shard_id}"),
180 0 : {
181 0 : let tenant = Arc::clone(tenant);
182 0 : let can_start = can_start.cloned();
183 0 : async move {
184 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
185 0 : tokio::select! {
186 0 : _ = cancel.cancelled() => return Ok(()),
187 0 : _ = Barrier::maybe_wait(can_start) => {}
188 0 : };
189 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
190 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
191 0 : tenant_housekeeping_loop(tenant, cancel)
192 0 : .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
193 0 : .await;
194 0 : Ok(())
195 0 : }
196 0 : },
197 0 : );
198 0 : }
199 :
200 : /// Compaction task's main loop.
201 0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
202 : const BASE_BACKOFF_SECS: f64 = 1.0;
203 : const MAX_BACKOFF_SECS: f64 = 300.0;
204 : const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
205 :
206 0 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
207 0 : let mut period = tenant.get_compaction_period();
208 0 : let mut error_run = 0; // consecutive errors
209 0 :
210 0 : // Stagger the compaction loop across tenants.
211 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
212 0 : return;
213 0 : }
214 0 : if sleep_random(period, &cancel).await.is_err() {
215 0 : return;
216 0 : }
217 :
218 : loop {
219 : // Recheck that we're still active.
220 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
221 0 : return;
222 0 : }
223 0 :
224 0 : // Refresh the period. If compaction is disabled, check again in a bit.
225 0 : period = tenant.get_compaction_period();
226 0 : if period == Duration::ZERO {
227 : #[cfg(not(feature = "testing"))]
228 : info!("automatic compaction is disabled");
229 0 : tokio::select! {
230 0 : _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
231 0 : _ = cancel.cancelled() => return,
232 : }
233 0 : continue;
234 0 : }
235 0 :
236 0 : // Wait for the next compaction run.
237 0 : let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
238 0 : tokio::select! {
239 0 : _ = tokio::time::sleep(backoff), if error_run > 0 => {},
240 0 : _ = tokio::time::sleep(period), if error_run == 0 => {},
241 0 : _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
242 0 : _ = cancel.cancelled() => return,
243 : }
244 :
245 : // Run compaction.
246 0 : let iteration = Iteration {
247 0 : started_at: Instant::now(),
248 0 : period,
249 0 : kind: BackgroundLoopKind::Compaction,
250 0 : };
251 0 : let IterationResult { output, elapsed } = iteration
252 0 : .run(tenant.compaction_iteration(&cancel, &ctx))
253 0 : .await;
254 :
255 0 : match output {
256 0 : Ok(outcome) => {
257 0 : error_run = 0;
258 0 : // If there's more compaction work, L0 or not, schedule an immediate run.
259 0 : match outcome {
260 0 : CompactionOutcome::Done => {}
261 0 : CompactionOutcome::Skipped => {}
262 0 : CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
263 0 : CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
264 : }
265 : }
266 :
267 0 : Err(err) => {
268 0 : error_run += 1;
269 0 : let backoff =
270 0 : exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
271 0 : log_compaction_error(&err, error_run, backoff, cancel.is_cancelled());
272 0 : continue;
273 : }
274 : }
275 :
276 : // NB: this log entry is recorded by performance tests.
277 0 : debug!(
278 0 : elapsed_ms = elapsed.as_millis(),
279 0 : "compaction iteration complete"
280 : );
281 : }
282 0 : }
283 :
284 0 : fn log_compaction_error(
285 0 : err: &CompactionError,
286 0 : error_count: u32,
287 0 : sleep_duration: Duration,
288 0 : task_cancelled: bool,
289 0 : ) {
290 : use crate::pgdatadir_mapping::CollectKeySpaceError;
291 : use crate::tenant::upload_queue::NotInitialized;
292 : use crate::tenant::PageReconstructError;
293 : use CompactionError::*;
294 :
295 0 : let level = match err {
296 0 : ShuttingDown => return,
297 0 : Offload(_) => Level::ERROR,
298 0 : CollectKeySpaceError(CollectKeySpaceError::Cancelled) => Level::INFO,
299 0 : CollectKeySpaceError(_) => Level::ERROR,
300 0 : _ if task_cancelled => Level::INFO,
301 0 : Other(err) => {
302 0 : let root_cause = err.root_cause();
303 0 :
304 0 : let upload_queue = root_cause
305 0 : .downcast_ref::<NotInitialized>()
306 0 : .is_some_and(|e| e.is_stopping());
307 0 : let timeline = root_cause
308 0 : .downcast_ref::<PageReconstructError>()
309 0 : .is_some_and(|e| e.is_stopping());
310 0 : let is_stopping = upload_queue || timeline;
311 :
312 0 : if is_stopping {
313 0 : Level::INFO
314 : } else {
315 0 : Level::ERROR
316 : }
317 : }
318 : };
319 :
320 0 : match level {
321 : Level::ERROR => {
322 0 : error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
323 : }
324 : Level::INFO => {
325 0 : info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
326 : }
327 0 : level => unimplemented!("unexpected level {level:?}"),
328 : }
329 0 : }
330 :
331 : /// GC task's main loop.
332 0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
333 : const MAX_BACKOFF_SECS: f64 = 300.0;
334 0 : let mut error_run = 0; // consecutive errors
335 0 :
336 0 : // GC might require downloading, to find the cutoff LSN that corresponds to the
337 0 : // cutoff specified as time.
338 0 : let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
339 0 : let mut first = true;
340 :
341 : loop {
342 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
343 0 : return;
344 0 : }
345 0 :
346 0 : let period = tenant.get_gc_period();
347 0 :
348 0 : if first {
349 0 : first = false;
350 0 : if sleep_random(period, &cancel).await.is_err() {
351 0 : break;
352 0 : }
353 0 : }
354 :
355 0 : let gc_horizon = tenant.get_gc_horizon();
356 0 : let sleep_duration;
357 0 : if period == Duration::ZERO || gc_horizon == 0 {
358 0 : #[cfg(not(feature = "testing"))]
359 0 : info!("automatic GC is disabled");
360 0 : // check again in 10 seconds, in case it's been enabled again.
361 0 : sleep_duration = Duration::from_secs(10);
362 0 : } else {
363 0 : let iteration = Iteration {
364 0 : started_at: Instant::now(),
365 0 : period,
366 0 : kind: BackgroundLoopKind::Gc,
367 0 : };
368 : // Run gc
369 0 : let IterationResult { output, elapsed: _ } = iteration
370 0 : .run(tenant.gc_iteration(
371 0 : None,
372 0 : gc_horizon,
373 0 : tenant.get_pitr_interval(),
374 0 : &cancel,
375 0 : &ctx,
376 0 : ))
377 0 : .await;
378 0 : match output {
379 0 : Ok(_) => {
380 0 : error_run = 0;
381 0 : sleep_duration = period;
382 0 : }
383 : Err(crate::tenant::GcError::TenantCancelled) => {
384 0 : return;
385 : }
386 0 : Err(e) => {
387 0 : error_run += 1;
388 0 : let wait_duration =
389 0 : exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
390 :
391 0 : if matches!(e, crate::tenant::GcError::TimelineCancelled) {
392 : // Timeline was cancelled during gc. We might either be in an event
393 : // that affects the entire tenant (tenant deletion, pageserver shutdown),
394 : // or in one that affects the timeline only (timeline deletion).
395 : // Therefore, don't exit the loop.
396 0 : info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
397 : } else {
398 0 : error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
399 : }
400 :
401 0 : sleep_duration = wait_duration;
402 : }
403 : }
404 : };
405 :
406 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
407 0 : .await
408 0 : .is_ok()
409 : {
410 0 : break;
411 0 : }
412 : }
413 0 : }
414 :
415 : /// Tenant housekeeping's main loop.
416 0 : async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
417 0 : let mut last_throttle_flag_reset_at = Instant::now();
418 : loop {
419 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
420 0 : return;
421 0 : }
422 :
423 : // Use the same period as compaction; it's not worth a separate setting. But if it's set to
424 : // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
425 0 : let period = match tenant.get_compaction_period() {
426 0 : Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
427 0 : period => period,
428 : };
429 :
430 0 : let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
431 0 : break;
432 : };
433 :
434 : // Do tenant housekeeping.
435 0 : let iteration = Iteration {
436 0 : started_at: Instant::now(),
437 0 : period,
438 0 : kind: BackgroundLoopKind::TenantHouseKeeping,
439 0 : };
440 0 : iteration.run(tenant.housekeeping()).await;
441 :
442 : // Log any getpage throttling.
443 0 : info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
444 0 : let now = Instant::now();
445 0 : let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
446 0 : let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
447 0 : if count_throttled == 0 {
448 0 : return;
449 0 : }
450 0 : let allowed_rps = tenant.pagestream_throttle.steady_rps();
451 0 : let delta = now - prev;
452 0 : info!(
453 0 : n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
454 : count_accounted = count_accounted_finish, // don't break existing log scraping
455 : count_throttled,
456 : sum_throttled_usecs,
457 : count_accounted_start, // log after pre-existing fields to not break existing log scraping
458 0 : allowed_rps=%format_args!("{allowed_rps:.0}"),
459 0 : "shard was throttled in the last n_seconds"
460 : );
461 0 : });
462 : }
463 0 : }
464 :
465 : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
466 0 : async fn wait_for_active_tenant(
467 0 : tenant: &Arc<Tenant>,
468 0 : cancel: &CancellationToken,
469 0 : ) -> ControlFlow<()> {
470 0 : if tenant.current_state() == TenantState::Active {
471 0 : return ControlFlow::Continue(());
472 0 : }
473 0 :
474 0 : let mut update_rx = tenant.subscribe_for_state_updates();
475 : loop {
476 0 : tokio::select! {
477 0 : _ = cancel.cancelled() => return ControlFlow::Break(()),
478 0 : result = update_rx.changed() => if result.is_err() {
479 0 : return ControlFlow::Break(());
480 0 : }
481 0 : }
482 0 :
483 0 : match &*update_rx.borrow() {
484 : TenantState::Active => {
485 0 : debug!("Tenant state changed to active, continuing the task loop");
486 0 : return ControlFlow::Continue(());
487 : }
488 0 : state => debug!("Not running the task loop, tenant is not active: {state:?}"),
489 : }
490 : }
491 0 : }
492 :
493 : #[derive(thiserror::Error, Debug)]
494 : #[error("cancelled")]
495 : pub(crate) struct Cancelled;
496 :
497 : /// Sleeps for a random interval up to the given max value.
498 : ///
499 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
500 : /// different periods for more stable load.
501 0 : pub(crate) async fn sleep_random(
502 0 : max: Duration,
503 0 : cancel: &CancellationToken,
504 0 : ) -> Result<Duration, Cancelled> {
505 0 : sleep_random_range(Duration::ZERO..=max, cancel).await
506 0 : }
507 :
508 : /// Sleeps for a random interval in the given range. Returns the duration.
509 0 : pub(crate) async fn sleep_random_range(
510 0 : interval: RangeInclusive<Duration>,
511 0 : cancel: &CancellationToken,
512 0 : ) -> Result<Duration, Cancelled> {
513 0 : let delay = rand::thread_rng().gen_range(interval);
514 0 : if delay == Duration::ZERO {
515 0 : return Ok(delay);
516 0 : }
517 0 : tokio::select! {
518 0 : _ = cancel.cancelled() => Err(Cancelled),
519 0 : _ = tokio::time::sleep(delay) => Ok(delay),
520 : }
521 0 : }
522 :
523 : /// Sleeps for an interval with a random jitter.
524 0 : pub(crate) async fn sleep_jitter(
525 0 : duration: Duration,
526 0 : jitter: Duration,
527 0 : cancel: &CancellationToken,
528 0 : ) -> Result<Duration, Cancelled> {
529 0 : let from = duration.saturating_sub(jitter);
530 0 : let to = duration.saturating_add(jitter);
531 0 : sleep_random_range(from..=to, cancel).await
532 0 : }
533 :
534 : struct Iteration {
535 : started_at: Instant,
536 : period: Duration,
537 : kind: BackgroundLoopKind,
538 : }
539 :
540 : struct IterationResult<O> {
541 : output: O,
542 : elapsed: Duration,
543 : }
544 :
545 : impl Iteration {
546 : #[instrument(skip_all)]
547 : pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
548 : let mut fut = pin!(fut);
549 :
550 : // Wrap `fut` into a future that logs a message every `period` so that we get a
551 : // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
552 : let output = loop {
553 : match tokio::time::timeout(self.period, &mut fut).await {
554 : Ok(r) => break r,
555 : Err(_) => info!("still running"),
556 : }
557 : };
558 : let elapsed = self.started_at.elapsed();
559 : warn_when_period_overrun(elapsed, self.period, self.kind);
560 :
561 : IterationResult { output, elapsed }
562 : }
563 : }
564 :
565 : // NB: the `task` and `period` are used for metrics labels.
566 0 : pub(crate) fn warn_when_period_overrun(
567 0 : elapsed: Duration,
568 0 : period: Duration,
569 0 : task: BackgroundLoopKind,
570 0 : ) {
571 0 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
572 0 : if elapsed >= period && period != Duration::ZERO {
573 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
574 : // intelligent. however it makes sense to keep the "configuration format" for period, even
575 : // though there's no way to output the actual config value.
576 0 : info!(
577 : ?elapsed,
578 0 : period = %humantime::format_duration(period),
579 0 : ?task,
580 0 : "task iteration took longer than the configured period"
581 : );
582 0 : metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
583 0 : .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
584 0 : .inc();
585 0 : }
586 0 : }
|