Line data Source code
1 : //! This module contains functions to serve per-tenant background processes,
2 : //! such as compaction and GC
3 :
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::{Duration, Instant};
7 :
8 : use crate::context::{DownloadBehavior, RequestContext};
9 : use crate::metrics::TENANT_TASK_EVENTS;
10 : use crate::task_mgr;
11 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
12 : use crate::tenant::throttle::Stats;
13 : use crate::tenant::timeline::CompactionError;
14 : use crate::tenant::{Tenant, TenantState};
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::*;
17 : use utils::{backoff, completion};
18 :
19 : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
20 14 : once_cell::sync::Lazy::new(|| {
21 14 : let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
22 14 : let permits = usize::max(
23 14 : 1,
24 14 : // while a lot of the work is done on spawn_blocking, we still do
25 14 : // repartitioning in the async context. this should give leave us some workers
26 14 : // unblocked to be blocked on other work, hopefully easing any outside visible
27 14 : // effects of restarts.
28 14 : //
29 14 : // 6/8 is a guess; previously we ran with unlimited 8 and more from
30 14 : // spawn_blocking.
31 14 : (total_threads * 3).checked_div(4).unwrap_or(0),
32 14 : );
33 14 : assert_ne!(permits, 0, "we will not be adding in permits later");
34 14 : assert!(
35 14 : permits < total_threads,
36 0 : "need threads avail for shorter work"
37 : );
38 14 : tokio::sync::Semaphore::new(permits)
39 14 : });
40 :
41 510 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
42 : #[strum(serialize_all = "snake_case")]
43 : pub(crate) enum BackgroundLoopKind {
44 : Compaction,
45 : Gc,
46 : Eviction,
47 : ConsumptionMetricsCollectMetrics,
48 : ConsumptionMetricsSyntheticSizeWorker,
49 : InitialLogicalSizeCalculation,
50 : HeatmapUpload,
51 : SecondaryDownload,
52 : }
53 :
54 : impl BackgroundLoopKind {
55 510 : fn as_static_str(&self) -> &'static str {
56 510 : let s: &'static str = self.into();
57 510 : s
58 510 : }
59 : }
60 :
61 : /// Cancellation safe.
62 510 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
63 510 : loop_kind: BackgroundLoopKind,
64 510 : _ctx: &RequestContext,
65 510 : ) -> impl Drop {
66 510 : let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
67 510 : .with_label_values(&[loop_kind.as_static_str()])
68 510 : .guard();
69 :
70 0 : pausable_failpoint!(
71 0 : "initial-size-calculation-permit-pause",
72 0 : loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
73 0 : );
74 :
75 : // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
76 510 : match CONCURRENT_BACKGROUND_TASKS.acquire().await {
77 510 : Ok(permit) => permit,
78 0 : Err(_closed) => unreachable!("we never close the semaphore"),
79 : }
80 510 : }
81 :
82 : /// Start per tenant background loops: compaction and gc.
83 0 : pub fn start_background_loops(
84 0 : tenant: &Arc<Tenant>,
85 0 : background_jobs_can_start: Option<&completion::Barrier>,
86 0 : ) {
87 0 : let tenant_shard_id = tenant.tenant_shard_id;
88 0 : task_mgr::spawn(
89 0 : BACKGROUND_RUNTIME.handle(),
90 0 : TaskKind::Compaction,
91 0 : Some(tenant_shard_id),
92 0 : None,
93 0 : &format!("compactor for tenant {tenant_shard_id}"),
94 0 : false,
95 0 : {
96 0 : let tenant = Arc::clone(tenant);
97 0 : let background_jobs_can_start = background_jobs_can_start.cloned();
98 0 : async move {
99 0 : let cancel = task_mgr::shutdown_token();
100 0 : tokio::select! {
101 0 : _ = cancel.cancelled() => { return Ok(()) },
102 0 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
103 0 : };
104 0 : compaction_loop(tenant, cancel)
105 0 : // If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
106 0 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
107 0 : .await;
108 0 : Ok(())
109 0 : }
110 0 : },
111 0 : );
112 0 : task_mgr::spawn(
113 0 : BACKGROUND_RUNTIME.handle(),
114 0 : TaskKind::GarbageCollector,
115 0 : Some(tenant_shard_id),
116 0 : None,
117 0 : &format!("garbage collector for tenant {tenant_shard_id}"),
118 0 : false,
119 0 : {
120 0 : let tenant = Arc::clone(tenant);
121 0 : let background_jobs_can_start = background_jobs_can_start.cloned();
122 0 : async move {
123 0 : let cancel = task_mgr::shutdown_token();
124 0 : tokio::select! {
125 0 : _ = cancel.cancelled() => { return Ok(()) },
126 0 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
127 0 : };
128 0 : gc_loop(tenant, cancel)
129 0 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
130 0 : .await;
131 0 : Ok(())
132 0 : }
133 0 : },
134 0 : );
135 0 : }
136 :
137 : ///
138 : /// Compaction task's main loop
139 : ///
140 0 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
141 0 : const MAX_BACKOFF_SECS: f64 = 300.0;
142 0 : // How many errors we have seen consequtively
143 0 : let mut error_run_count = 0;
144 0 :
145 0 : let mut last_throttle_flag_reset_at = Instant::now();
146 0 :
147 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
148 0 : async {
149 0 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
150 0 : let mut first = true;
151 0 : loop {
152 0 : tokio::select! {
153 : _ = cancel.cancelled() => {
154 : return;
155 : },
156 0 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
157 : ControlFlow::Break(()) => return,
158 : ControlFlow::Continue(()) => (),
159 : },
160 : }
161 :
162 0 : let period = tenant.get_compaction_period();
163 0 :
164 0 : // TODO: we shouldn't need to await to find tenant and this could be moved outside of
165 0 : // loop, #3501. There are also additional "allowed_errors" in tests.
166 0 : if first {
167 0 : first = false;
168 0 : if random_init_delay(period, &cancel).await.is_err() {
169 0 : break;
170 0 : }
171 0 : }
172 :
173 0 : let started_at = Instant::now();
174 :
175 0 : let sleep_duration = if period == Duration::ZERO {
176 : #[cfg(not(feature = "testing"))]
177 : info!("automatic compaction is disabled");
178 : // check again in 10 seconds, in case it's been enabled again.
179 0 : Duration::from_secs(10)
180 : } else {
181 : // Run compaction
182 0 : if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
183 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
184 0 : error_run_count + 1,
185 0 : 1.0,
186 0 : MAX_BACKOFF_SECS,
187 0 : );
188 0 : error_run_count += 1;
189 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
190 0 : log_compaction_error(
191 0 : &e,
192 0 : error_run_count,
193 0 : &wait_duration,
194 0 : cancel.is_cancelled(),
195 0 : );
196 0 : wait_duration
197 : } else {
198 0 : error_run_count = 0;
199 0 : period
200 : }
201 : };
202 :
203 0 : let elapsed = started_at.elapsed();
204 0 : warn_when_period_overrun(elapsed, period, BackgroundLoopKind::Compaction);
205 0 :
206 0 : // the duration is recorded by performance tests by enabling debug in this function
207 0 : tracing::debug!(elapsed_ms=elapsed.as_millis(), "compaction iteration complete");
208 :
209 : // Perhaps we did no work and the walredo process has been idle for some time:
210 : // give it a chance to shut down to avoid leaving walredo process running indefinitely.
211 0 : if let Some(walredo_mgr) = &tenant.walredo_mgr {
212 0 : walredo_mgr.maybe_quiesce(period * 10);
213 0 : }
214 :
215 : // TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
216 : // so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
217 0 : info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
218 0 : let now = Instant::now();
219 0 : let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
220 0 : let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats();
221 0 : if count_throttled == 0 {
222 0 : return;
223 0 : }
224 0 : let allowed_rps = tenant.timeline_get_throttle.steady_rps();
225 0 : let delta = now - prev;
226 0 : info!(
227 0 : n_seconds=%format_args!("{:.3}",
228 0 : delta.as_secs_f64()),
229 0 : count_accounted,
230 0 : count_throttled,
231 0 : sum_throttled_usecs,
232 0 : allowed_rps=%format_args!("{allowed_rps:.0}"),
233 0 : "shard was throttled in the last n_seconds")
234 0 : });
235 0 :
236 0 : // Sleep
237 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
238 0 : .await
239 0 : .is_ok()
240 : {
241 0 : break;
242 0 : }
243 : }
244 0 : }
245 0 : .await;
246 0 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
247 0 : }
248 :
249 0 : fn log_compaction_error(
250 0 : e: &CompactionError,
251 0 : error_run_count: u32,
252 0 : sleep_duration: &std::time::Duration,
253 0 : task_cancelled: bool,
254 0 : ) {
255 : use crate::tenant::upload_queue::NotInitialized;
256 : use crate::tenant::PageReconstructError;
257 : use CompactionError::*;
258 :
259 : enum LooksLike {
260 : Info,
261 : Error,
262 : }
263 :
264 0 : let decision = match e {
265 0 : ShuttingDown => None,
266 0 : _ if task_cancelled => Some(LooksLike::Info),
267 0 : Other(e) => {
268 0 : let root_cause = e.root_cause();
269 :
270 0 : let is_stopping = {
271 0 : let upload_queue = root_cause
272 0 : .downcast_ref::<NotInitialized>()
273 0 : .is_some_and(|e| e.is_stopping());
274 0 :
275 0 : let timeline = root_cause
276 0 : .downcast_ref::<PageReconstructError>()
277 0 : .is_some_and(|e| e.is_stopping());
278 0 :
279 0 : upload_queue || timeline
280 : };
281 :
282 0 : if is_stopping {
283 0 : Some(LooksLike::Info)
284 : } else {
285 0 : Some(LooksLike::Error)
286 : }
287 : }
288 : };
289 :
290 0 : match decision {
291 0 : Some(LooksLike::Info) => info!(
292 0 : "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
293 0 : ),
294 0 : Some(LooksLike::Error) => error!(
295 0 : "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
296 0 : ),
297 0 : None => {}
298 : }
299 0 : }
300 :
301 : ///
302 : /// GC task's main loop
303 : ///
304 0 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
305 0 : const MAX_BACKOFF_SECS: f64 = 300.0;
306 0 : // How many errors we have seen consequtively
307 0 : let mut error_run_count = 0;
308 0 :
309 0 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
310 0 : async {
311 0 : // GC might require downloading, to find the cutoff LSN that corresponds to the
312 0 : // cutoff specified as time.
313 0 : let ctx =
314 0 : RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
315 0 : let mut first = true;
316 0 : loop {
317 0 : tokio::select! {
318 : _ = cancel.cancelled() => {
319 : return;
320 : },
321 0 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
322 : ControlFlow::Break(()) => return,
323 : ControlFlow::Continue(()) => (),
324 : },
325 : }
326 :
327 0 : let period = tenant.get_gc_period();
328 0 :
329 0 : if first {
330 0 : first = false;
331 0 : if random_init_delay(period, &cancel).await.is_err() {
332 0 : break;
333 0 : }
334 0 : }
335 :
336 0 : let started_at = Instant::now();
337 0 :
338 0 : let gc_horizon = tenant.get_gc_horizon();
339 0 : let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
340 : #[cfg(not(feature = "testing"))]
341 : info!("automatic GC is disabled");
342 : // check again in 10 seconds, in case it's been enabled again.
343 0 : Duration::from_secs(10)
344 : } else {
345 : // Run gc
346 0 : let res = tenant
347 0 : .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
348 0 : .await;
349 0 : if let Err(e) = res {
350 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
351 0 : error_run_count + 1,
352 0 : 1.0,
353 0 : MAX_BACKOFF_SECS,
354 0 : );
355 0 : error_run_count += 1;
356 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
357 0 : error!(
358 0 : "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
359 0 : );
360 0 : wait_duration
361 : } else {
362 0 : error_run_count = 0;
363 0 : period
364 : }
365 : };
366 :
367 0 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
368 0 :
369 0 : // Sleep
370 0 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
371 0 : .await
372 0 : .is_ok()
373 : {
374 0 : break;
375 0 : }
376 : }
377 0 : }
378 0 : .await;
379 0 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
380 0 : }
381 :
382 0 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
383 0 : // if the tenant has a proper status already, no need to wait for anything
384 0 : if tenant.current_state() == TenantState::Active {
385 0 : ControlFlow::Continue(())
386 : } else {
387 0 : let mut tenant_state_updates = tenant.subscribe_for_state_updates();
388 : loop {
389 0 : match tenant_state_updates.changed().await {
390 : Ok(()) => {
391 0 : let new_state = &*tenant_state_updates.borrow();
392 0 : match new_state {
393 : TenantState::Active => {
394 0 : debug!("Tenant state changed to active, continuing the task loop");
395 0 : return ControlFlow::Continue(());
396 : }
397 0 : state => {
398 0 : debug!("Not running the task loop, tenant is not active: {state:?}");
399 0 : continue;
400 : }
401 : }
402 : }
403 0 : Err(_sender_dropped_error) => {
404 0 : return ControlFlow::Break(());
405 : }
406 : }
407 : }
408 : }
409 0 : }
410 :
411 0 : #[derive(thiserror::Error, Debug)]
412 : #[error("cancelled")]
413 : pub(crate) struct Cancelled;
414 :
415 : /// Provide a random delay for background task initialization.
416 : ///
417 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
418 : /// different periods for more stable load.
419 0 : pub(crate) async fn random_init_delay(
420 0 : period: Duration,
421 0 : cancel: &CancellationToken,
422 0 : ) -> Result<(), Cancelled> {
423 0 : use rand::Rng;
424 0 :
425 0 : if period == Duration::ZERO {
426 0 : return Ok(());
427 0 : }
428 0 :
429 0 : let d = {
430 0 : let mut rng = rand::thread_rng();
431 0 : rng.gen_range(Duration::ZERO..=period)
432 0 : };
433 0 :
434 0 : match tokio::time::timeout(d, cancel.cancelled()).await {
435 0 : Ok(_) => Err(Cancelled),
436 0 : Err(_) => Ok(()),
437 : }
438 0 : }
439 :
440 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
441 0 : pub(crate) fn warn_when_period_overrun(
442 0 : elapsed: Duration,
443 0 : period: Duration,
444 0 : task: BackgroundLoopKind,
445 0 : ) {
446 0 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
447 0 : if elapsed >= period && period != Duration::ZERO {
448 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
449 : // intelligent. however it makes sense to keep the "configuration format" for period, even
450 : // though there's no way to output the actual config value.
451 0 : info!(
452 0 : ?elapsed,
453 0 : period = %humantime::format_duration(period),
454 0 : ?task,
455 0 : "task iteration took longer than the configured period"
456 0 : );
457 0 : crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
458 0 : .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
459 0 : .inc();
460 0 : }
461 0 : }
|