TLA Line data Source code
1 : //! This module contains functions to serve per-tenant background processes,
2 : //! such as compaction and GC
3 :
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::{Duration, Instant};
7 :
8 : use crate::context::{DownloadBehavior, RequestContext};
9 : use crate::metrics::TENANT_TASK_EVENTS;
10 : use crate::task_mgr;
11 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
12 : use crate::tenant::{Tenant, TenantState};
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::*;
15 : use utils::{backoff, completion};
16 :
17 : static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
18 CBC 447 : once_cell::sync::Lazy::new(|| {
19 447 : let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
20 447 : let permits = usize::max(
21 447 : 1,
22 447 : // while a lot of the work is done on spawn_blocking, we still do
23 447 : // repartitioning in the async context. this should give leave us some workers
24 447 : // unblocked to be blocked on other work, hopefully easing any outside visible
25 447 : // effects of restarts.
26 447 : //
27 447 : // 6/8 is a guess; previously we ran with unlimited 8 and more from
28 447 : // spawn_blocking.
29 447 : (total_threads * 3).checked_div(4).unwrap_or(0),
30 447 : );
31 447 : assert_ne!(permits, 0, "we will not be adding in permits later");
32 447 : assert!(
33 447 : permits < total_threads,
34 UBC 0 : "need threads avail for shorter work"
35 : );
36 CBC 447 : tokio::sync::Semaphore::new(permits)
37 447 : });
38 :
39 1948 : #[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
40 : #[strum(serialize_all = "snake_case")]
41 : pub(crate) enum BackgroundLoopKind {
42 : Compaction,
43 : Gc,
44 : Eviction,
45 : ConsumptionMetricsCollectMetrics,
46 : ConsumptionMetricsSyntheticSizeWorker,
47 : InitialLogicalSizeCalculation,
48 : HeatmapUpload,
49 : SecondaryDownload,
50 : }
51 :
52 : impl BackgroundLoopKind {
53 1948 : fn as_static_str(&self) -> &'static str {
54 1948 : let s: &'static str = self.into();
55 1948 : s
56 1948 : }
57 : }
58 :
59 : /// Cancellation safe.
60 1987 : pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
61 1987 : loop_kind: BackgroundLoopKind,
62 1987 : _ctx: &RequestContext,
63 1987 : ) -> impl Drop {
64 1927 : let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
65 1927 : .with_label_values(&[loop_kind.as_static_str()])
66 1927 : .guard();
67 1927 :
68 1927 : match CONCURRENT_BACKGROUND_TASKS.acquire().await {
69 1927 : Ok(permit) => permit,
70 UBC 0 : Err(_closed) => unreachable!("we never close the semaphore"),
71 : }
72 CBC 1927 : }
73 :
74 : /// Start per tenant background loops: compaction and gc.
75 709 : pub fn start_background_loops(
76 709 : tenant: &Arc<Tenant>,
77 709 : background_jobs_can_start: Option<&completion::Barrier>,
78 709 : ) {
79 709 : let tenant_shard_id = tenant.tenant_shard_id;
80 709 : task_mgr::spawn(
81 709 : BACKGROUND_RUNTIME.handle(),
82 709 : TaskKind::Compaction,
83 709 : Some(tenant_shard_id),
84 709 : None,
85 709 : &format!("compactor for tenant {tenant_shard_id}"),
86 709 : false,
87 709 : {
88 709 : let tenant = Arc::clone(tenant);
89 709 : let background_jobs_can_start = background_jobs_can_start.cloned();
90 709 : async move {
91 709 : let cancel = task_mgr::shutdown_token();
92 709 : tokio::select! {
93 709 : _ = cancel.cancelled() => { return Ok(()) },
94 709 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
95 709 : };
96 709 : compaction_loop(tenant, cancel)
97 709 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
98 91329 : .await;
99 309 : Ok(())
100 709 : }
101 709 : },
102 709 : );
103 709 : task_mgr::spawn(
104 709 : BACKGROUND_RUNTIME.handle(),
105 709 : TaskKind::GarbageCollector,
106 709 : Some(tenant_shard_id),
107 709 : None,
108 709 : &format!("garbage collector for tenant {tenant_shard_id}"),
109 709 : false,
110 709 : {
111 709 : let tenant = Arc::clone(tenant);
112 709 : let background_jobs_can_start = background_jobs_can_start.cloned();
113 709 : async move {
114 709 : let cancel = task_mgr::shutdown_token();
115 709 : tokio::select! {
116 709 : _ = cancel.cancelled() => { return Ok(()) },
117 709 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
118 709 : };
119 709 : gc_loop(tenant, cancel)
120 709 : .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
121 236559 : .await;
122 309 : Ok(())
123 709 : }
124 709 : },
125 709 : );
126 709 : }
127 :
128 : ///
129 : /// Compaction task's main loop
130 : ///
131 709 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
132 709 : const MAX_BACKOFF_SECS: f64 = 300.0;
133 709 : // How many errors we have seen consequtively
134 709 : let mut error_run_count = 0;
135 709 :
136 709 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
137 709 : async {
138 709 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
139 709 : let mut first = true;
140 1079 : loop {
141 1258 : tokio::select! {
142 : _ = cancel.cancelled() => {
143 : return;
144 : },
145 1078 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
146 : ControlFlow::Break(()) => return,
147 : ControlFlow::Continue(()) => (),
148 : },
149 1079 : }
150 :
151 1078 : let period = tenant.get_compaction_period();
152 1078 :
153 1078 : // TODO: we shouldn't need to await to find tenant and this could be moved outside of
154 1078 : // loop, #3501. There are also additional "allowed_errors" in tests.
155 1078 : if first {
156 709 : first = false;
157 709 : if random_init_delay(period, &cancel).await.is_err() {
158 166 : break;
159 367 : }
160 369 : }
161 :
162 736 : let started_at = Instant::now();
163 :
164 736 : let sleep_duration = if period == Duration::ZERO {
165 : #[cfg(not(feature = "testing"))]
166 : info!("automatic compaction is disabled");
167 : // check again in 10 seconds, in case it's been enabled again.
168 402 : Duration::from_secs(10)
169 : } else {
170 : // Run compaction
171 90270 : if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
172 UBC 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
173 0 : error_run_count + 1,
174 0 : 1.0,
175 0 : MAX_BACKOFF_SECS,
176 0 : );
177 0 : error_run_count += 1;
178 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
179 0 : error!(
180 0 : "Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
181 0 : );
182 0 : wait_duration
183 : } else {
184 CBC 325 : error_run_count = 0;
185 325 : period
186 : }
187 : };
188 :
189 727 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
190 727 :
191 727 : // Perhaps we did no work and the walredo process has been idle for some time:
192 727 : // give it a chance to shut down to avoid leaving walredo process running indefinitely.
193 727 : tenant.walredo_mgr.maybe_quiesce(period * 10);
194 727 :
195 727 : // Sleep
196 727 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
197 512 : .await
198 512 : .is_ok()
199 : {
200 142 : break;
201 370 : }
202 : }
203 309 : }
204 91329 : .await;
205 309 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
206 309 : }
207 :
208 : ///
209 : /// GC task's main loop
210 : ///
211 709 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
212 709 : const MAX_BACKOFF_SECS: f64 = 300.0;
213 709 : // How many errors we have seen consequtively
214 709 : let mut error_run_count = 0;
215 709 :
216 709 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
217 709 : async {
218 709 : // GC might require downloading, to find the cutoff LSN that corresponds to the
219 709 : // cutoff specified as time.
220 709 : let ctx =
221 709 : RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
222 709 : let mut first = true;
223 981 : loop {
224 1155 : tokio::select! {
225 : _ = cancel.cancelled() => {
226 : return;
227 : },
228 979 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
229 : ControlFlow::Break(()) => return,
230 : ControlFlow::Continue(()) => (),
231 : },
232 981 : }
233 :
234 979 : let period = tenant.get_gc_period();
235 979 :
236 979 : if first {
237 708 : first = false;
238 708 : if random_init_delay(period, &cancel).await.is_err() {
239 216 : break;
240 175 : }
241 271 : }
242 :
243 446 : let started_at = Instant::now();
244 446 :
245 446 : let gc_horizon = tenant.get_gc_horizon();
246 446 : let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
247 : #[cfg(not(feature = "testing"))]
248 : info!("automatic GC is disabled");
249 : // check again in 10 seconds, in case it's been enabled again.
250 405 : Duration::from_secs(10)
251 : } else {
252 : // Run gc
253 41 : let res = tenant
254 41 : .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
255 235795 : .await;
256 35 : if let Err(e) = res {
257 UBC 0 : let wait_duration = backoff::exponential_backoff_duration_seconds(
258 0 : error_run_count + 1,
259 0 : 1.0,
260 0 : MAX_BACKOFF_SECS,
261 0 : );
262 0 : error_run_count += 1;
263 0 : let wait_duration = Duration::from_secs_f64(wait_duration);
264 0 : error!(
265 0 : "Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
266 0 : );
267 0 : wait_duration
268 : } else {
269 CBC 35 : error_run_count = 0;
270 35 : period
271 : }
272 : };
273 :
274 440 : warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
275 440 :
276 440 : // Sleep
277 440 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
278 363 : .await
279 363 : .is_ok()
280 : {
281 91 : break;
282 272 : }
283 : }
284 309 : }
285 236559 : .await;
286 309 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
287 309 : }
288 :
289 2060 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
290 2060 : // if the tenant has a proper status already, no need to wait for anything
291 2060 : if tenant.current_state() == TenantState::Active {
292 1708 : ControlFlow::Continue(())
293 : } else {
294 352 : let mut tenant_state_updates = tenant.subscribe_for_state_updates();
295 : loop {
296 353 : match tenant_state_updates.changed().await {
297 : Ok(()) => {
298 350 : let new_state = &*tenant_state_updates.borrow();
299 350 : match new_state {
300 : TenantState::Active => {
301 UBC 0 : debug!("Tenant state changed to active, continuing the task loop");
302 CBC 349 : return ControlFlow::Continue(());
303 : }
304 1 : state => {
305 UBC 0 : debug!("Not running the task loop, tenant is not active: {state:?}");
306 CBC 1 : continue;
307 : }
308 : }
309 : }
310 UBC 0 : Err(_sender_dropped_error) => {
311 0 : return ControlFlow::Break(());
312 : }
313 : }
314 : }
315 : }
316 CBC 2057 : }
317 :
318 UBC 0 : #[derive(thiserror::Error, Debug)]
319 : #[error("cancelled")]
320 : pub(crate) struct Cancelled;
321 :
322 : /// Provide a random delay for background task initialization.
323 : ///
324 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
325 : /// different periods for more stable load.
326 CBC 2522 : pub(crate) async fn random_init_delay(
327 2522 : period: Duration,
328 2522 : cancel: &CancellationToken,
329 2522 : ) -> Result<(), Cancelled> {
330 2522 : use rand::Rng;
331 2522 :
332 2522 : if period == Duration::ZERO {
333 329 : return Ok(());
334 2193 : }
335 2193 :
336 2193 : let d = {
337 2193 : let mut rng = rand::thread_rng();
338 2193 : rng.gen_range(Duration::ZERO..=period)
339 2193 : };
340 2193 :
341 2193 : match tokio::time::timeout(d, cancel.cancelled()).await {
342 613 : Ok(_) => Err(Cancelled),
343 787 : Err(_) => Ok(()),
344 : }
345 1729 : }
346 :
347 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
348 1263 : pub(crate) fn warn_when_period_overrun(
349 1263 : elapsed: Duration,
350 1263 : period: Duration,
351 1263 : task: BackgroundLoopKind,
352 1263 : ) {
353 1263 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
354 1263 : if elapsed >= period && period != Duration::ZERO {
355 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
356 : // intelligent. however it makes sense to keep the "configuration format" for period, even
357 : // though there's no way to output the actual config value.
358 21 : info!(
359 21 : ?elapsed,
360 21 : period = %humantime::format_duration(period),
361 21 : ?task,
362 21 : "task iteration took longer than the configured period"
363 21 : );
364 21 : crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
365 21 : .with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
366 21 : .inc();
367 1242 : }
368 1263 : }
|