Line data Source code
1 : //! This module contains per-tenant background processes, e.g. compaction and GC.
2 :
3 : use std::cmp::max;
4 : use std::future::Future;
5 : use std::ops::{ControlFlow, RangeInclusive};
6 : use std::pin::pin;
7 : use std::sync::Arc;
8 : use std::time::{Duration, Instant};
9 :
10 : use once_cell::sync::Lazy;
11 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
12 : use rand::Rng;
13 : use scopeguard::defer;
14 : use tokio::sync::{Semaphore, SemaphorePermit};
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::*;
17 : use utils::backoff::exponential_backoff_duration;
18 : use utils::completion::Barrier;
19 : use utils::pausable_failpoint;
20 :
21 : use crate::context::{DownloadBehavior, RequestContext};
22 : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
23 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
24 : use crate::tenant::throttle::Stats;
25 : use crate::tenant::timeline::CompactionError;
26 : use crate::tenant::timeline::compaction::CompactionOutcome;
27 : use crate::tenant::{TenantShard, TenantState};
28 :
29 : /// Semaphore limiting concurrent background tasks (across all tenants).
30 : ///
31 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
32 10 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
33 10 : let total_threads = TOKIO_WORKER_THREADS.get();
34 :
35 : /*BEGIN_HADRON*/
36 : // ideally we should run at least one compaction task per tenant in order to (1) maximize
37 : // compaction throughput (2) avoid head-of-line blocking of large compactions. However doing
38 : // that may create too many compaction tasks with lots of memory overheads. So we limit the
39 : // number of compaction tasks based on the available CPU core count.
40 : // Need to revisit.
41 : // let tasks_per_thread = std::env::var("BG_TASKS_PER_THREAD")
42 : // .ok()
43 : // .and_then(|s| s.parse().ok())
44 : // .unwrap_or(4);
45 : // let permits = usize::max(1, total_threads * tasks_per_thread);
46 : // // assert!(permits < total_threads, "need threads for other work");
47 : /*END_HADRON*/
48 :
49 10 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
50 10 : assert_ne!(permits, 0, "we will not be adding in permits later");
51 10 : assert!(permits < total_threads, "need threads for other work");
52 10 : Semaphore::new(permits)
53 10 : });
54 :
55 : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
56 : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
57 : ///
58 : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
59 : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
60 : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
61 : ///
62 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
63 : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
64 : /// thread pool.
65 0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
66 0 : let total_threads = TOKIO_WORKER_THREADS.get();
67 0 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
68 0 : assert_ne!(permits, 0, "we will not be adding in permits later");
69 0 : assert!(permits < total_threads, "need threads for other work");
70 0 : Semaphore::new(permits)
71 0 : });
72 :
73 : /// Background jobs.
74 : ///
75 : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
76 : /// do any significant IO or CPU work.
77 : #[derive(
78 : Debug,
79 : PartialEq,
80 : Eq,
81 : Clone,
82 : Copy,
83 : strum_macros::IntoStaticStr,
84 : strum_macros::Display,
85 : enum_map::Enum,
86 : )]
87 : #[strum(serialize_all = "snake_case")]
88 : pub(crate) enum BackgroundLoopKind {
89 : /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
90 : /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
91 : L0Compaction,
92 : Compaction,
93 : Gc,
94 : Eviction,
95 : TenantHouseKeeping,
96 : ConsumptionMetricsCollectMetrics,
97 : ConsumptionMetricsSyntheticSizeWorker,
98 : InitialLogicalSizeCalculation,
99 : HeatmapUpload,
100 : SecondaryDownload,
101 : }
102 :
103 : pub struct BackgroundLoopSemaphorePermit<'a> {
104 : _permit: SemaphorePermit<'static>,
105 : _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
106 : }
107 :
108 : /// Acquires a semaphore permit, to limit concurrent background jobs.
109 192 : pub(crate) async fn acquire_concurrency_permit(
110 192 : loop_kind: BackgroundLoopKind,
111 192 : _ctx: &RequestContext,
112 192 : ) -> BackgroundLoopSemaphorePermit<'static> {
113 192 : let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
114 :
115 192 : if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
116 0 : pausable_failpoint!("initial-size-calculation-permit-pause");
117 192 : }
118 :
119 : // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
120 192 : let semaphore = match loop_kind {
121 0 : BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
122 192 : _ => &CONCURRENT_BACKGROUND_TASKS,
123 : };
124 192 : let permit = semaphore.acquire().await.expect("should never close");
125 :
126 192 : recorder.acquired();
127 :
128 192 : BackgroundLoopSemaphorePermit {
129 192 : _permit: permit,
130 192 : _recorder: recorder,
131 192 : }
132 192 : }
133 :
134 : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
135 0 : pub fn start_background_loops(tenant: &Arc<TenantShard>, can_start: Option<&Barrier>) {
136 0 : let tenant_shard_id = tenant.tenant_shard_id;
137 :
138 0 : task_mgr::spawn(
139 0 : BACKGROUND_RUNTIME.handle(),
140 0 : TaskKind::Compaction,
141 0 : tenant_shard_id,
142 0 : None,
143 0 : &format!("compactor for tenant {tenant_shard_id}"),
144 : {
145 0 : let tenant = Arc::clone(tenant);
146 0 : let can_start = can_start.cloned();
147 0 : async move {
148 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
149 0 : tokio::select! {
150 0 : _ = cancel.cancelled() => return Ok(()),
151 0 : _ = Barrier::maybe_wait(can_start) => {}
152 : };
153 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
154 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
155 0 : compaction_loop(tenant, cancel)
156 : // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
157 0 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
158 0 : .await;
159 0 : Ok(())
160 0 : }
161 : },
162 : );
163 :
164 0 : task_mgr::spawn(
165 0 : BACKGROUND_RUNTIME.handle(),
166 0 : TaskKind::GarbageCollector,
167 0 : tenant_shard_id,
168 0 : None,
169 0 : &format!("garbage collector for tenant {tenant_shard_id}"),
170 : {
171 0 : let tenant = Arc::clone(tenant);
172 0 : let can_start = can_start.cloned();
173 0 : async move {
174 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
175 0 : tokio::select! {
176 0 : _ = cancel.cancelled() => return Ok(()),
177 0 : _ = Barrier::maybe_wait(can_start) => {}
178 : };
179 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
180 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
181 0 : gc_loop(tenant, cancel)
182 0 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
183 0 : .await;
184 0 : Ok(())
185 0 : }
186 : },
187 : );
188 :
189 0 : task_mgr::spawn(
190 0 : BACKGROUND_RUNTIME.handle(),
191 0 : TaskKind::TenantHousekeeping,
192 0 : tenant_shard_id,
193 0 : None,
194 0 : &format!("housekeeping for tenant {tenant_shard_id}"),
195 : {
196 0 : let tenant = Arc::clone(tenant);
197 0 : let can_start = can_start.cloned();
198 0 : async move {
199 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
200 0 : tokio::select! {
201 0 : _ = cancel.cancelled() => return Ok(()),
202 0 : _ = Barrier::maybe_wait(can_start) => {}
203 : };
204 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
205 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
206 0 : tenant_housekeeping_loop(tenant, cancel)
207 0 : .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
208 0 : .await;
209 0 : Ok(())
210 0 : }
211 : },
212 : );
213 0 : }
214 :
215 : /// Compaction task's main loop.
216 0 : async fn compaction_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
217 : const BASE_BACKOFF_SECS: f64 = 1.0;
218 : const MAX_BACKOFF_SECS: f64 = 300.0;
219 : const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
220 :
221 0 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
222 0 : let mut period = tenant.get_compaction_period();
223 0 : let mut error_run = 0; // consecutive errors
224 :
225 : // Stagger the compaction loop across tenants.
226 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
227 0 : return;
228 0 : }
229 0 : if sleep_random(period, &cancel).await.is_err() {
230 0 : return;
231 0 : }
232 :
233 : loop {
234 : // Recheck that we're still active.
235 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
236 0 : return;
237 0 : }
238 :
239 : // Refresh the period. If compaction is disabled, check again in a bit.
240 0 : period = tenant.get_compaction_period();
241 0 : if period == Duration::ZERO {
242 : #[cfg(not(feature = "testing"))]
243 : info!("automatic compaction is disabled");
244 0 : tokio::select! {
245 0 : _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
246 0 : _ = cancel.cancelled() => return,
247 : }
248 0 : continue;
249 0 : }
250 :
251 : // Wait for the next compaction run.
252 0 : let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
253 0 : tokio::select! {
254 0 : _ = tokio::time::sleep(backoff), if error_run > 0 => {},
255 0 : _ = tokio::time::sleep(period), if error_run == 0 => {},
256 0 : _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
257 0 : _ = cancel.cancelled() => return,
258 : }
259 :
260 : // Run compaction.
261 0 : let iteration = Iteration {
262 0 : started_at: Instant::now(),
263 0 : period,
264 0 : kind: BackgroundLoopKind::Compaction,
265 0 : };
266 0 : let IterationResult { output, elapsed } = iteration
267 0 : .run(tenant.compaction_iteration(&cancel, &ctx))
268 0 : .await;
269 :
270 0 : match output {
271 0 : Ok(outcome) => {
272 0 : error_run = 0;
273 : // If there's more compaction work, L0 or not, schedule an immediate run.
274 0 : match outcome {
275 0 : CompactionOutcome::Done => {}
276 0 : CompactionOutcome::Skipped => {}
277 0 : CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
278 0 : CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
279 : }
280 : }
281 :
282 0 : Err(err) => {
283 0 : error_run += 1;
284 0 : let backoff =
285 0 : exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
286 0 : log_compaction_error(
287 0 : &err,
288 0 : Some((error_run, backoff)),
289 0 : cancel.is_cancelled(),
290 : false,
291 : );
292 0 : continue;
293 : }
294 : }
295 :
296 : // NB: this log entry is recorded by performance tests.
297 0 : debug!(
298 0 : elapsed_ms = elapsed.as_millis(),
299 0 : "compaction iteration complete"
300 : );
301 : }
302 0 : }
303 :
304 0 : pub(crate) fn log_compaction_error(
305 0 : err: &CompactionError,
306 0 : retry_info: Option<(u32, Duration)>,
307 0 : task_cancelled: bool,
308 0 : degrade_to_warning: bool,
309 0 : ) {
310 0 : let is_cancel = err.is_cancel();
311 :
312 0 : let level = if is_cancel || task_cancelled {
313 0 : Level::INFO
314 : } else {
315 0 : Level::ERROR
316 : };
317 :
318 0 : if let Some((error_count, sleep_duration)) = retry_info {
319 0 : match level {
320 : Level::ERROR => {
321 0 : error!(
322 0 : "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
323 : )
324 : }
325 : Level::INFO => {
326 0 : info!(
327 0 : "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
328 : )
329 : }
330 0 : level => unimplemented!("unexpected level {level:?}"),
331 : }
332 : } else {
333 0 : match level {
334 0 : Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
335 0 : Level::ERROR => error!("Compaction failed: {err:?}"),
336 0 : Level::INFO => info!("Compaction failed: {err:#}"),
337 0 : level => unimplemented!("unexpected level {level:?}"),
338 : }
339 : }
340 0 : }
341 :
342 : /// GC task's main loop.
343 0 : async fn gc_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
344 : const MAX_BACKOFF_SECS: f64 = 300.0;
345 0 : let mut error_run = 0; // consecutive errors
346 :
347 : // GC might require downloading, to find the cutoff LSN that corresponds to the
348 : // cutoff specified as time.
349 0 : let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
350 0 : let mut first = true;
351 :
352 : loop {
353 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
354 0 : return;
355 0 : }
356 :
357 0 : let period = tenant.get_gc_period();
358 :
359 0 : if first {
360 0 : first = false;
361 0 : if sleep_random(period, &cancel).await.is_err() {
362 0 : break;
363 0 : }
364 0 : }
365 :
366 0 : let gc_horizon = tenant.get_gc_horizon();
367 : let sleep_duration;
368 0 : if period == Duration::ZERO || gc_horizon == 0 {
369 0 : #[cfg(not(feature = "testing"))]
370 0 : info!("automatic GC is disabled");
371 0 : // check again in 10 seconds, in case it's been enabled again.
372 0 : sleep_duration = Duration::from_secs(10);
373 0 : } else {
374 0 : let iteration = Iteration {
375 0 : started_at: Instant::now(),
376 0 : period,
377 0 : kind: BackgroundLoopKind::Gc,
378 0 : };
379 : // Run gc
380 0 : let IterationResult { output, elapsed: _ } = iteration
381 0 : .run(tenant.gc_iteration(
382 0 : None,
383 0 : gc_horizon,
384 0 : tenant.get_pitr_interval(),
385 0 : &cancel,
386 0 : &ctx,
387 0 : ))
388 0 : .await;
389 0 : match output {
390 0 : Ok(_) => {
391 0 : error_run = 0;
392 0 : sleep_duration = period;
393 0 : }
394 : Err(crate::tenant::GcError::TenantCancelled) => {
395 0 : return;
396 : }
397 0 : Err(e) => {
398 0 : error_run += 1;
399 0 : let wait_duration =
400 0 : exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
401 :
402 0 : if matches!(e, crate::tenant::GcError::TimelineCancelled) {
403 : // Timeline was cancelled during gc. We might either be in an event
404 : // that affects the entire tenant (tenant deletion, pageserver shutdown),
405 : // or in one that affects the timeline only (timeline deletion).
406 : // Therefore, don't exit the loop.
407 0 : info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
408 : } else {
409 0 : error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
410 : }
411 :
412 0 : sleep_duration = wait_duration;
413 : }
414 : }
415 : };
416 :
417 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
418 0 : .await
419 0 : .is_ok()
420 : {
421 0 : break;
422 0 : }
423 : }
424 0 : }
425 :
426 : /// Tenant housekeeping's main loop.
427 0 : async fn tenant_housekeeping_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
428 0 : let mut last_throttle_flag_reset_at = Instant::now();
429 : loop {
430 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
431 0 : return;
432 0 : }
433 :
434 : // Use the same period as compaction; it's not worth a separate setting. But if it's set to
435 : // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
436 0 : let period = match tenant.get_compaction_period() {
437 0 : Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
438 0 : period => period,
439 : };
440 :
441 0 : let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
442 0 : break;
443 : };
444 :
445 : // Do tenant housekeeping.
446 0 : let iteration = Iteration {
447 0 : started_at: Instant::now(),
448 0 : period,
449 0 : kind: BackgroundLoopKind::TenantHouseKeeping,
450 0 : };
451 0 : iteration.run(tenant.housekeeping()).await;
452 :
453 : // Log any getpage throttling.
454 0 : info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
455 0 : let now = Instant::now();
456 0 : let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
457 0 : let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
458 0 : if count_throttled == 0 {
459 0 : return;
460 0 : }
461 0 : let allowed_rps = tenant.pagestream_throttle.steady_rps();
462 0 : let delta = now - prev;
463 0 : info!(
464 0 : n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
465 : count_accounted = count_accounted_finish, // don't break existing log scraping
466 : count_throttled,
467 : sum_throttled_usecs,
468 : count_accounted_start, // log after pre-existing fields to not break existing log scraping
469 0 : allowed_rps=%format_args!("{allowed_rps:.0}"),
470 0 : "shard was throttled in the last n_seconds"
471 : );
472 0 : });
473 : }
474 0 : }
475 :
476 : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
477 0 : async fn wait_for_active_tenant(
478 0 : tenant: &Arc<TenantShard>,
479 0 : cancel: &CancellationToken,
480 0 : ) -> ControlFlow<()> {
481 0 : if tenant.current_state() == TenantState::Active {
482 0 : return ControlFlow::Continue(());
483 0 : }
484 :
485 0 : let mut update_rx = tenant.subscribe_for_state_updates();
486 0 : tokio::select! {
487 0 : result = update_rx.wait_for(|s| s == &TenantState::Active) => {
488 0 : if result.is_err() {
489 0 : return ControlFlow::Break(());
490 0 : }
491 0 : debug!("Tenant state changed to active, continuing the task loop");
492 0 : ControlFlow::Continue(())
493 : },
494 0 : _ = cancel.cancelled() => ControlFlow::Break(()),
495 : }
496 0 : }
497 :
498 : #[derive(thiserror::Error, Debug)]
499 : #[error("cancelled")]
500 : pub(crate) struct Cancelled;
501 :
502 : /// Sleeps for a random interval up to the given max value.
503 : ///
504 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
505 : /// different periods for more stable load.
506 0 : pub(crate) async fn sleep_random(
507 0 : max: Duration,
508 0 : cancel: &CancellationToken,
509 0 : ) -> Result<Duration, Cancelled> {
510 0 : sleep_random_range(Duration::ZERO..=max, cancel).await
511 0 : }
512 :
513 : /// Sleeps for a random interval in the given range. Returns the duration.
514 0 : pub(crate) async fn sleep_random_range(
515 0 : interval: RangeInclusive<Duration>,
516 0 : cancel: &CancellationToken,
517 0 : ) -> Result<Duration, Cancelled> {
518 0 : let delay = rand::thread_rng().gen_range(interval);
519 0 : if delay == Duration::ZERO {
520 0 : return Ok(delay);
521 0 : }
522 0 : tokio::select! {
523 0 : _ = cancel.cancelled() => Err(Cancelled),
524 0 : _ = tokio::time::sleep(delay) => Ok(delay),
525 : }
526 0 : }
527 :
528 : /// Sleeps for an interval with a random jitter.
529 0 : pub(crate) async fn sleep_jitter(
530 0 : duration: Duration,
531 0 : jitter: Duration,
532 0 : cancel: &CancellationToken,
533 0 : ) -> Result<Duration, Cancelled> {
534 0 : let from = duration.saturating_sub(jitter);
535 0 : let to = duration.saturating_add(jitter);
536 0 : sleep_random_range(from..=to, cancel).await
537 0 : }
538 :
539 : struct Iteration {
540 : started_at: Instant,
541 : period: Duration,
542 : kind: BackgroundLoopKind,
543 : }
544 :
545 : struct IterationResult<O> {
546 : output: O,
547 : elapsed: Duration,
548 : }
549 :
550 : impl Iteration {
551 : #[instrument(skip_all)]
552 : pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
553 : let mut fut = pin!(fut);
554 :
555 : // Wrap `fut` into a future that logs a message every `period` so that we get a
556 : // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
557 : let output = loop {
558 : match tokio::time::timeout(self.period, &mut fut).await {
559 : Ok(r) => break r,
560 : Err(_) => info!("still running"),
561 : }
562 : };
563 : let elapsed = self.started_at.elapsed();
564 : warn_when_period_overrun(elapsed, self.period, self.kind);
565 :
566 : IterationResult { output, elapsed }
567 : }
568 : }
569 :
570 : // NB: the `task` and `period` are used for metrics labels.
571 0 : pub(crate) fn warn_when_period_overrun(
572 0 : elapsed: Duration,
573 0 : period: Duration,
574 0 : task: BackgroundLoopKind,
575 0 : ) {
576 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
577 0 : if elapsed >= period && period != Duration::ZERO {
578 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
579 : // intelligent. however it makes sense to keep the "configuration format" for period, even
580 : // though there's no way to output the actual config value.
581 0 : info!(
582 : ?elapsed,
583 0 : period = %humantime::format_duration(period),
584 : ?task,
585 0 : "task iteration took longer than the configured period"
586 : );
587 0 : metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
588 0 : .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
589 0 : .inc();
590 0 : }
591 0 : }
|