Line data Source code
1 : //! This module contains per-tenant background processes, e.g. compaction and GC.
2 :
3 : use std::cmp::max;
4 : use std::future::Future;
5 : use std::ops::{ControlFlow, RangeInclusive};
6 : use std::pin::pin;
7 : use std::sync::Arc;
8 : use std::time::{Duration, Instant};
9 :
10 : use once_cell::sync::Lazy;
11 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
12 : use rand::Rng;
13 : use scopeguard::defer;
14 : use tokio::sync::{Semaphore, SemaphorePermit};
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::*;
17 : use utils::backoff::exponential_backoff_duration;
18 : use utils::completion::Barrier;
19 : use utils::pausable_failpoint;
20 : use utils::sync::gate::GateError;
21 :
22 : use crate::context::{DownloadBehavior, RequestContext};
23 : use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
24 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
25 : use crate::tenant::blob_io::WriteBlobError;
26 : use crate::tenant::throttle::Stats;
27 : use crate::tenant::timeline::CompactionError;
28 : use crate::tenant::timeline::compaction::CompactionOutcome;
29 : use crate::tenant::{TenantShard, TenantState};
30 : use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
31 :
32 : /// Semaphore limiting concurrent background tasks (across all tenants).
33 : ///
34 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
35 10 : static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
36 10 : let total_threads = TOKIO_WORKER_THREADS.get();
37 10 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
38 10 : assert_ne!(permits, 0, "we will not be adding in permits later");
39 10 : assert!(permits < total_threads, "need threads for other work");
40 10 : Semaphore::new(permits)
41 10 : });
42 :
43 : /// Semaphore limiting concurrent L0 compaction tasks (across all tenants). This is only used if
44 : /// both `compaction_l0_semaphore` and `compaction_l0_first` are enabled.
45 : ///
46 : /// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
47 : /// to avoid high read amp during heavy write workloads. Regular image/GC compaction is less
48 : /// important (e.g. due to page images in delta layers) and can wait for other background tasks.
49 : ///
50 : /// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work. Note
51 : /// that this runs on the same Tokio runtime as `CONCURRENT_BACKGROUND_TASKS`, and shares the same
52 : /// thread pool.
53 0 : static CONCURRENT_L0_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
54 0 : let total_threads = TOKIO_WORKER_THREADS.get();
55 0 : let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
56 0 : assert_ne!(permits, 0, "we will not be adding in permits later");
57 0 : assert!(permits < total_threads, "need threads for other work");
58 0 : Semaphore::new(permits)
59 0 : });
60 :
61 : /// Background jobs.
62 : ///
63 : /// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
64 : /// do any significant IO or CPU work.
65 : #[derive(
66 : Debug,
67 : PartialEq,
68 : Eq,
69 : Clone,
70 : Copy,
71 : strum_macros::IntoStaticStr,
72 : strum_macros::Display,
73 : enum_map::Enum,
74 : )]
75 : #[strum(serialize_all = "snake_case")]
76 : pub(crate) enum BackgroundLoopKind {
77 : /// L0Compaction runs as a separate pass within the Compaction loop, not a separate loop. It is
78 : /// used to request the `CONCURRENT_L0_COMPACTION_TASKS` semaphore and associated metrics.
79 : L0Compaction,
80 : Compaction,
81 : Gc,
82 : Eviction,
83 : TenantHouseKeeping,
84 : ConsumptionMetricsCollectMetrics,
85 : ConsumptionMetricsSyntheticSizeWorker,
86 : InitialLogicalSizeCalculation,
87 : HeatmapUpload,
88 : SecondaryDownload,
89 : }
90 :
91 : pub struct BackgroundLoopSemaphorePermit<'a> {
92 : _permit: SemaphorePermit<'static>,
93 : _recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
94 : }
95 :
96 : /// Acquires a semaphore permit, to limit concurrent background jobs.
97 192 : pub(crate) async fn acquire_concurrency_permit(
98 192 : loop_kind: BackgroundLoopKind,
99 192 : _ctx: &RequestContext,
100 192 : ) -> BackgroundLoopSemaphorePermit<'static> {
101 192 : let mut recorder = metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
102 :
103 192 : if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
104 0 : pausable_failpoint!("initial-size-calculation-permit-pause");
105 192 : }
106 :
107 : // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
108 192 : let semaphore = match loop_kind {
109 0 : BackgroundLoopKind::L0Compaction => &CONCURRENT_L0_COMPACTION_TASKS,
110 192 : _ => &CONCURRENT_BACKGROUND_TASKS,
111 : };
112 192 : let permit = semaphore.acquire().await.expect("should never close");
113 :
114 192 : recorder.acquired();
115 :
116 192 : BackgroundLoopSemaphorePermit {
117 192 : _permit: permit,
118 192 : _recorder: recorder,
119 192 : }
120 192 : }
121 :
122 : /// Start per tenant background loops: compaction, GC, and ingest housekeeping.
123 0 : pub fn start_background_loops(tenant: &Arc<TenantShard>, can_start: Option<&Barrier>) {
124 0 : let tenant_shard_id = tenant.tenant_shard_id;
125 :
126 0 : task_mgr::spawn(
127 0 : BACKGROUND_RUNTIME.handle(),
128 0 : TaskKind::Compaction,
129 0 : tenant_shard_id,
130 0 : None,
131 0 : &format!("compactor for tenant {tenant_shard_id}"),
132 : {
133 0 : let tenant = Arc::clone(tenant);
134 0 : let can_start = can_start.cloned();
135 0 : async move {
136 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
137 0 : tokio::select! {
138 0 : _ = cancel.cancelled() => return Ok(()),
139 0 : _ = Barrier::maybe_wait(can_start) => {}
140 : };
141 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
142 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
143 0 : compaction_loop(tenant, cancel)
144 : // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
145 0 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
146 0 : .await;
147 0 : Ok(())
148 0 : }
149 : },
150 : );
151 :
152 0 : task_mgr::spawn(
153 0 : BACKGROUND_RUNTIME.handle(),
154 0 : TaskKind::GarbageCollector,
155 0 : tenant_shard_id,
156 0 : None,
157 0 : &format!("garbage collector for tenant {tenant_shard_id}"),
158 : {
159 0 : let tenant = Arc::clone(tenant);
160 0 : let can_start = can_start.cloned();
161 0 : async move {
162 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
163 0 : tokio::select! {
164 0 : _ = cancel.cancelled() => return Ok(()),
165 0 : _ = Barrier::maybe_wait(can_start) => {}
166 : };
167 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
168 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
169 0 : gc_loop(tenant, cancel)
170 0 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
171 0 : .await;
172 0 : Ok(())
173 0 : }
174 : },
175 : );
176 :
177 0 : task_mgr::spawn(
178 0 : BACKGROUND_RUNTIME.handle(),
179 0 : TaskKind::TenantHousekeeping,
180 0 : tenant_shard_id,
181 0 : None,
182 0 : &format!("housekeeping for tenant {tenant_shard_id}"),
183 : {
184 0 : let tenant = Arc::clone(tenant);
185 0 : let can_start = can_start.cloned();
186 0 : async move {
187 0 : let cancel = task_mgr::shutdown_token(); // NB: must be in async context
188 0 : tokio::select! {
189 0 : _ = cancel.cancelled() => return Ok(()),
190 0 : _ = Barrier::maybe_wait(can_start) => {}
191 : };
192 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
193 0 : defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
194 0 : tenant_housekeeping_loop(tenant, cancel)
195 0 : .instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
196 0 : .await;
197 0 : Ok(())
198 0 : }
199 : },
200 : );
201 0 : }
202 :
203 : /// Compaction task's main loop.
204 0 : async fn compaction_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
205 : const BASE_BACKOFF_SECS: f64 = 1.0;
206 : const MAX_BACKOFF_SECS: f64 = 300.0;
207 : const RECHECK_CONFIG_INTERVAL: Duration = Duration::from_secs(10);
208 :
209 0 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
210 0 : let mut period = tenant.get_compaction_period();
211 0 : let mut error_run = 0; // consecutive errors
212 :
213 : // Stagger the compaction loop across tenants.
214 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
215 0 : return;
216 0 : }
217 0 : if sleep_random(period, &cancel).await.is_err() {
218 0 : return;
219 0 : }
220 :
221 : loop {
222 : // Recheck that we're still active.
223 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
224 0 : return;
225 0 : }
226 :
227 : // Refresh the period. If compaction is disabled, check again in a bit.
228 0 : period = tenant.get_compaction_period();
229 0 : if period == Duration::ZERO {
230 : #[cfg(not(feature = "testing"))]
231 : info!("automatic compaction is disabled");
232 0 : tokio::select! {
233 0 : _ = tokio::time::sleep(RECHECK_CONFIG_INTERVAL) => {},
234 0 : _ = cancel.cancelled() => return,
235 : }
236 0 : continue;
237 0 : }
238 :
239 : // Wait for the next compaction run.
240 0 : let backoff = exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
241 0 : tokio::select! {
242 0 : _ = tokio::time::sleep(backoff), if error_run > 0 => {},
243 0 : _ = tokio::time::sleep(period), if error_run == 0 => {},
244 0 : _ = tenant.l0_compaction_trigger.notified(), if error_run == 0 => {},
245 0 : _ = cancel.cancelled() => return,
246 : }
247 :
248 : // Run compaction.
249 0 : let iteration = Iteration {
250 0 : started_at: Instant::now(),
251 0 : period,
252 0 : kind: BackgroundLoopKind::Compaction,
253 0 : };
254 0 : let IterationResult { output, elapsed } = iteration
255 0 : .run(tenant.compaction_iteration(&cancel, &ctx))
256 0 : .await;
257 :
258 0 : match output {
259 0 : Ok(outcome) => {
260 0 : error_run = 0;
261 : // If there's more compaction work, L0 or not, schedule an immediate run.
262 0 : match outcome {
263 0 : CompactionOutcome::Done => {}
264 0 : CompactionOutcome::Skipped => {}
265 0 : CompactionOutcome::YieldForL0 => tenant.l0_compaction_trigger.notify_one(),
266 0 : CompactionOutcome::Pending => tenant.l0_compaction_trigger.notify_one(),
267 : }
268 : }
269 :
270 0 : Err(err) => {
271 0 : error_run += 1;
272 0 : let backoff =
273 0 : exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
274 0 : log_compaction_error(
275 0 : &err,
276 0 : Some((error_run, backoff)),
277 0 : cancel.is_cancelled(),
278 : false,
279 : );
280 0 : continue;
281 : }
282 : }
283 :
284 : // NB: this log entry is recorded by performance tests.
285 0 : debug!(
286 0 : elapsed_ms = elapsed.as_millis(),
287 0 : "compaction iteration complete"
288 : );
289 : }
290 0 : }
291 :
292 0 : pub(crate) fn log_compaction_error(
293 0 : err: &CompactionError,
294 0 : retry_info: Option<(u32, Duration)>,
295 0 : task_cancelled: bool,
296 0 : degrade_to_warning: bool,
297 0 : ) {
298 : use CompactionError::*;
299 :
300 : use crate::tenant::PageReconstructError;
301 : use crate::tenant::upload_queue::NotInitialized;
302 :
303 0 : let level = match err {
304 0 : e if e.is_cancel() => return,
305 0 : ShuttingDown => return,
306 0 : Offload(_) => Level::ERROR,
307 0 : AlreadyRunning(_) => Level::ERROR,
308 0 : CollectKeySpaceError(_) => Level::ERROR,
309 0 : _ if task_cancelled => Level::INFO,
310 0 : Other(err) => {
311 0 : let root_cause = err.root_cause();
312 :
313 0 : let upload_queue = root_cause
314 0 : .downcast_ref::<NotInitialized>()
315 0 : .is_some_and(|e| e.is_stopping());
316 0 : let timeline = root_cause
317 0 : .downcast_ref::<PageReconstructError>()
318 0 : .is_some_and(|e| e.is_stopping());
319 0 : let buffered_writer_flush_task_canelled = root_cause
320 0 : .downcast_ref::<FlushTaskError>()
321 0 : .is_some_and(|e| e.is_cancel());
322 0 : let write_blob_cancelled = root_cause
323 0 : .downcast_ref::<WriteBlobError>()
324 0 : .is_some_and(|e| e.is_cancel());
325 0 : let gate_closed = root_cause
326 0 : .downcast_ref::<GateError>()
327 0 : .is_some_and(|e| e.is_cancel());
328 0 : let is_stopping = upload_queue
329 0 : || timeline
330 0 : || buffered_writer_flush_task_canelled
331 0 : || write_blob_cancelled
332 0 : || gate_closed;
333 :
334 0 : if is_stopping {
335 0 : Level::INFO
336 : } else {
337 0 : Level::ERROR
338 : }
339 : }
340 : };
341 :
342 0 : if let Some((error_count, sleep_duration)) = retry_info {
343 0 : match level {
344 : Level::ERROR => {
345 0 : error!(
346 0 : "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
347 : )
348 : }
349 : Level::INFO => {
350 0 : info!(
351 0 : "Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}"
352 : )
353 : }
354 0 : level => unimplemented!("unexpected level {level:?}"),
355 : }
356 : } else {
357 0 : match level {
358 0 : Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
359 0 : Level::ERROR => error!("Compaction failed: {err:?}"),
360 0 : Level::INFO => info!("Compaction failed: {err:#}"),
361 0 : level => unimplemented!("unexpected level {level:?}"),
362 : }
363 : }
364 0 : }
365 :
366 : /// GC task's main loop.
367 0 : async fn gc_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
368 : const MAX_BACKOFF_SECS: f64 = 300.0;
369 0 : let mut error_run = 0; // consecutive errors
370 :
371 : // GC might require downloading, to find the cutoff LSN that corresponds to the
372 : // cutoff specified as time.
373 0 : let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
374 0 : let mut first = true;
375 :
376 : loop {
377 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
378 0 : return;
379 0 : }
380 :
381 0 : let period = tenant.get_gc_period();
382 :
383 0 : if first {
384 0 : first = false;
385 0 : if sleep_random(period, &cancel).await.is_err() {
386 0 : break;
387 0 : }
388 0 : }
389 :
390 0 : let gc_horizon = tenant.get_gc_horizon();
391 : let sleep_duration;
392 0 : if period == Duration::ZERO || gc_horizon == 0 {
393 0 : #[cfg(not(feature = "testing"))]
394 0 : info!("automatic GC is disabled");
395 0 : // check again in 10 seconds, in case it's been enabled again.
396 0 : sleep_duration = Duration::from_secs(10);
397 0 : } else {
398 0 : let iteration = Iteration {
399 0 : started_at: Instant::now(),
400 0 : period,
401 0 : kind: BackgroundLoopKind::Gc,
402 0 : };
403 : // Run gc
404 0 : let IterationResult { output, elapsed: _ } = iteration
405 0 : .run(tenant.gc_iteration(
406 0 : None,
407 0 : gc_horizon,
408 0 : tenant.get_pitr_interval(),
409 0 : &cancel,
410 0 : &ctx,
411 0 : ))
412 0 : .await;
413 0 : match output {
414 0 : Ok(_) => {
415 0 : error_run = 0;
416 0 : sleep_duration = period;
417 0 : }
418 : Err(crate::tenant::GcError::TenantCancelled) => {
419 0 : return;
420 : }
421 0 : Err(e) => {
422 0 : error_run += 1;
423 0 : let wait_duration =
424 0 : exponential_backoff_duration(error_run, 1.0, MAX_BACKOFF_SECS);
425 :
426 0 : if matches!(e, crate::tenant::GcError::TimelineCancelled) {
427 : // Timeline was cancelled during gc. We might either be in an event
428 : // that affects the entire tenant (tenant deletion, pageserver shutdown),
429 : // or in one that affects the timeline only (timeline deletion).
430 : // Therefore, don't exit the loop.
431 0 : info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
432 : } else {
433 0 : error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
434 : }
435 :
436 0 : sleep_duration = wait_duration;
437 : }
438 : }
439 : };
440 :
441 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
442 0 : .await
443 0 : .is_ok()
444 : {
445 0 : break;
446 0 : }
447 : }
448 0 : }
449 :
450 : /// Tenant housekeeping's main loop.
451 0 : async fn tenant_housekeeping_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
452 0 : let mut last_throttle_flag_reset_at = Instant::now();
453 : loop {
454 0 : if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
455 0 : return;
456 0 : }
457 :
458 : // Use the same period as compaction; it's not worth a separate setting. But if it's set to
459 : // zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
460 0 : let period = match tenant.get_compaction_period() {
461 0 : Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
462 0 : period => period,
463 : };
464 :
465 0 : let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
466 0 : break;
467 : };
468 :
469 : // Do tenant housekeeping.
470 0 : let iteration = Iteration {
471 0 : started_at: Instant::now(),
472 0 : period,
473 0 : kind: BackgroundLoopKind::TenantHouseKeeping,
474 0 : };
475 0 : iteration.run(tenant.housekeeping()).await;
476 :
477 : // Log any getpage throttling.
478 0 : info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
479 0 : let now = Instant::now();
480 0 : let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
481 0 : let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
482 0 : if count_throttled == 0 {
483 0 : return;
484 0 : }
485 0 : let allowed_rps = tenant.pagestream_throttle.steady_rps();
486 0 : let delta = now - prev;
487 0 : info!(
488 0 : n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
489 : count_accounted = count_accounted_finish, // don't break existing log scraping
490 : count_throttled,
491 : sum_throttled_usecs,
492 : count_accounted_start, // log after pre-existing fields to not break existing log scraping
493 0 : allowed_rps=%format_args!("{allowed_rps:.0}"),
494 0 : "shard was throttled in the last n_seconds"
495 : );
496 0 : });
497 : }
498 0 : }
499 :
500 : /// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
501 0 : async fn wait_for_active_tenant(
502 0 : tenant: &Arc<TenantShard>,
503 0 : cancel: &CancellationToken,
504 0 : ) -> ControlFlow<()> {
505 0 : if tenant.current_state() == TenantState::Active {
506 0 : return ControlFlow::Continue(());
507 0 : }
508 :
509 0 : let mut update_rx = tenant.subscribe_for_state_updates();
510 0 : tokio::select! {
511 0 : result = update_rx.wait_for(|s| s == &TenantState::Active) => {
512 0 : if result.is_err() {
513 0 : return ControlFlow::Break(());
514 0 : }
515 0 : debug!("Tenant state changed to active, continuing the task loop");
516 0 : ControlFlow::Continue(())
517 : },
518 0 : _ = cancel.cancelled() => ControlFlow::Break(()),
519 : }
520 0 : }
521 :
522 : #[derive(thiserror::Error, Debug)]
523 : #[error("cancelled")]
524 : pub(crate) struct Cancelled;
525 :
526 : /// Sleeps for a random interval up to the given max value.
527 : ///
528 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
529 : /// different periods for more stable load.
530 0 : pub(crate) async fn sleep_random(
531 0 : max: Duration,
532 0 : cancel: &CancellationToken,
533 0 : ) -> Result<Duration, Cancelled> {
534 0 : sleep_random_range(Duration::ZERO..=max, cancel).await
535 0 : }
536 :
537 : /// Sleeps for a random interval in the given range. Returns the duration.
538 0 : pub(crate) async fn sleep_random_range(
539 0 : interval: RangeInclusive<Duration>,
540 0 : cancel: &CancellationToken,
541 0 : ) -> Result<Duration, Cancelled> {
542 0 : let delay = rand::thread_rng().gen_range(interval);
543 0 : if delay == Duration::ZERO {
544 0 : return Ok(delay);
545 0 : }
546 0 : tokio::select! {
547 0 : _ = cancel.cancelled() => Err(Cancelled),
548 0 : _ = tokio::time::sleep(delay) => Ok(delay),
549 : }
550 0 : }
551 :
552 : /// Sleeps for an interval with a random jitter.
553 0 : pub(crate) async fn sleep_jitter(
554 0 : duration: Duration,
555 0 : jitter: Duration,
556 0 : cancel: &CancellationToken,
557 0 : ) -> Result<Duration, Cancelled> {
558 0 : let from = duration.saturating_sub(jitter);
559 0 : let to = duration.saturating_add(jitter);
560 0 : sleep_random_range(from..=to, cancel).await
561 0 : }
562 :
563 : struct Iteration {
564 : started_at: Instant,
565 : period: Duration,
566 : kind: BackgroundLoopKind,
567 : }
568 :
569 : struct IterationResult<O> {
570 : output: O,
571 : elapsed: Duration,
572 : }
573 :
574 : impl Iteration {
575 : #[instrument(skip_all)]
576 : pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
577 : let mut fut = pin!(fut);
578 :
579 : // Wrap `fut` into a future that logs a message every `period` so that we get a
580 : // very obvious breadcrumb in the logs _while_ a slow iteration is happening.
581 : let output = loop {
582 : match tokio::time::timeout(self.period, &mut fut).await {
583 : Ok(r) => break r,
584 : Err(_) => info!("still running"),
585 : }
586 : };
587 : let elapsed = self.started_at.elapsed();
588 : warn_when_period_overrun(elapsed, self.period, self.kind);
589 :
590 : IterationResult { output, elapsed }
591 : }
592 : }
593 :
594 : // NB: the `task` and `period` are used for metrics labels.
595 0 : pub(crate) fn warn_when_period_overrun(
596 0 : elapsed: Duration,
597 0 : period: Duration,
598 0 : task: BackgroundLoopKind,
599 0 : ) {
600 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
601 0 : if elapsed >= period && period != Duration::ZERO {
602 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
603 : // intelligent. however it makes sense to keep the "configuration format" for period, even
604 : // though there's no way to output the actual config value.
605 0 : info!(
606 : ?elapsed,
607 0 : period = %humantime::format_duration(period),
608 : ?task,
609 0 : "task iteration took longer than the configured period"
610 : );
611 0 : metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
612 0 : .with_label_values(&[task.into(), &format!("{}", period.as_secs())])
613 0 : .inc();
614 0 : }
615 0 : }
|