Line data Source code
1 : //! This module contains functions to serve per-tenant background processes,
2 : //! such as compaction and GC
3 :
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::{Duration, Instant};
7 :
8 : use crate::context::{DownloadBehavior, RequestContext};
9 : use crate::metrics::TENANT_TASK_EVENTS;
10 : use crate::task_mgr;
11 : use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
12 : use crate::tenant::{Tenant, TenantState};
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::*;
15 : use utils::completion;
16 :
17 : /// Start per tenant background loops: compaction and gc.
18 698 : pub fn start_background_loops(
19 698 : tenant: &Arc<Tenant>,
20 698 : background_jobs_can_start: Option<&completion::Barrier>,
21 698 : ) {
22 698 : let tenant_id = tenant.tenant_id;
23 698 : task_mgr::spawn(
24 698 : BACKGROUND_RUNTIME.handle(),
25 698 : TaskKind::Compaction,
26 698 : Some(tenant_id),
27 698 : None,
28 698 : &format!("compactor for tenant {tenant_id}"),
29 698 : false,
30 698 : {
31 698 : let tenant = Arc::clone(tenant);
32 698 : let background_jobs_can_start = background_jobs_can_start.cloned();
33 698 : async move {
34 698 : let cancel = task_mgr::shutdown_token();
35 813 : tokio::select! {
36 813 : _ = cancel.cancelled() => { return Ok(()) },
37 813 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
38 813 : };
39 634 : compaction_loop(tenant, cancel)
40 634 : .instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
41 820792 : .await;
42 252 : Ok(())
43 698 : }
44 698 : },
45 698 : );
46 698 : task_mgr::spawn(
47 698 : BACKGROUND_RUNTIME.handle(),
48 698 : TaskKind::GarbageCollector,
49 698 : Some(tenant_id),
50 698 : None,
51 698 : &format!("garbage collector for tenant {tenant_id}"),
52 698 : false,
53 698 : {
54 698 : let tenant = Arc::clone(tenant);
55 698 : let background_jobs_can_start = background_jobs_can_start.cloned();
56 698 : async move {
57 698 : let cancel = task_mgr::shutdown_token();
58 814 : tokio::select! {
59 814 : _ = cancel.cancelled() => { return Ok(()) },
60 814 : _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
61 814 : };
62 633 : gc_loop(tenant, cancel)
63 633 : .instrument(info_span!("gc_loop", tenant_id = %tenant_id))
64 213342 : .await;
65 251 : Ok(())
66 698 : }
67 698 : },
68 698 : );
69 698 : }
70 :
71 : ///
72 : /// Compaction task's main loop
73 : ///
74 634 : async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
75 634 : let wait_duration = Duration::from_secs(2);
76 634 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
77 634 : async {
78 634 : let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
79 634 : let mut first = true;
80 : loop {
81 850 : tokio::select! {
82 : _ = cancel.cancelled() => {
83 : return;
84 : },
85 840 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
86 : ControlFlow::Break(()) => return,
87 : ControlFlow::Continue(()) => (),
88 : },
89 : }
90 :
91 840 : let period = tenant.get_compaction_period();
92 840 :
93 840 : // TODO: we shouldn't need to await to find tenant and this could be moved outside of
94 840 : // loop, #3501. There are also additional "allowed_errors" in tests.
95 840 : if first {
96 624 : first = false;
97 624 : if random_init_delay(period, &cancel).await.is_err() {
98 97 : break;
99 318 : }
100 216 : }
101 :
102 534 : let started_at = Instant::now();
103 :
104 534 : let sleep_duration = if period == Duration::ZERO {
105 271 : info!("automatic compaction is disabled");
106 : // check again in 10 seconds, in case it's been enabled again.
107 271 : Duration::from_secs(10)
108 : } else {
109 : // Run compaction
110 820178 : if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
111 0 : error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
112 0 : wait_duration
113 : } else {
114 253 : period
115 : }
116 : };
117 :
118 524 : warn_when_period_overrun(started_at.elapsed(), period, "compaction");
119 524 :
120 524 : // Sleep
121 524 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
122 361 : .await
123 361 : .is_ok()
124 : {
125 145 : break;
126 216 : }
127 : }
128 252 : }
129 820792 : .await;
130 252 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
131 252 : }
132 :
133 : ///
134 : /// GC task's main loop
135 : ///
136 633 : async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
137 633 : let wait_duration = Duration::from_secs(2);
138 633 : TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
139 633 : async {
140 633 : // GC might require downloading, to find the cutoff LSN that corresponds to the
141 633 : // cutoff specified as time.
142 633 : let ctx =
143 633 : RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
144 633 : let mut first = true;
145 : loop {
146 751 : tokio::select! {
147 : _ = cancel.cancelled() => {
148 : return;
149 : },
150 743 : tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
151 : ControlFlow::Break(()) => return,
152 : ControlFlow::Continue(()) => (),
153 : },
154 : }
155 :
156 743 : let period = tenant.get_gc_period();
157 743 :
158 743 : if first {
159 625 : first = false;
160 625 : if random_init_delay(period, &cancel).await.is_err() {
161 114 : break;
162 198 : }
163 118 : }
164 :
165 316 : let started_at = Instant::now();
166 316 :
167 316 : let gc_horizon = tenant.get_gc_horizon();
168 316 : let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
169 280 : info!("automatic GC is disabled");
170 : // check again in 10 seconds, in case it's been enabled again.
171 280 : Duration::from_secs(10)
172 : } else {
173 : // Run gc
174 36 : let res = tenant
175 36 : .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
176 212956 : .await;
177 31 : if let Err(e) = res {
178 0 : error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
179 0 : wait_duration
180 : } else {
181 31 : period
182 : }
183 : };
184 :
185 311 : warn_when_period_overrun(started_at.elapsed(), period, "gc");
186 311 :
187 311 : // Sleep
188 311 : if tokio::time::timeout(sleep_duration, cancel.cancelled())
189 247 : .await
190 247 : .is_ok()
191 : {
192 129 : break;
193 118 : }
194 : }
195 251 : }
196 213342 : .await;
197 251 : TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
198 251 : }
199 :
200 1601 : async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
201 1598 : // if the tenant has a proper status already, no need to wait for anything
202 1598 : if tenant.current_state() == TenantState::Active {
203 1559 : ControlFlow::Continue(())
204 : } else {
205 39 : let mut tenant_state_updates = tenant.subscribe_for_state_updates();
206 : loop {
207 40 : match tenant_state_updates.changed().await {
208 : Ok(()) => {
209 25 : let new_state = &*tenant_state_updates.borrow();
210 25 : match new_state {
211 : TenantState::Active => {
212 0 : debug!("Tenant state changed to active, continuing the task loop");
213 24 : return ControlFlow::Continue(());
214 : }
215 1 : state => {
216 0 : debug!("Not running the task loop, tenant is not active: {state:?}");
217 1 : continue;
218 : }
219 : }
220 : }
221 0 : Err(_sender_dropped_error) => {
222 0 : return ControlFlow::Break(());
223 : }
224 : }
225 : }
226 : }
227 1583 : }
228 :
229 0 : #[derive(thiserror::Error, Debug)]
230 : #[error("cancelled")]
231 : pub(crate) struct Cancelled;
232 :
233 : /// Provide a random delay for background task initialization.
234 : ///
235 : /// This delay prevents a thundering herd of background tasks and will likely keep them running on
236 : /// different periods for more stable load.
237 2349 : pub(crate) async fn random_init_delay(
238 2349 : period: Duration,
239 2349 : cancel: &CancellationToken,
240 2349 : ) -> Result<(), Cancelled> {
241 2349 : use rand::Rng;
242 2349 :
243 2349 : if period == Duration::ZERO {
244 372 : return Ok(());
245 1977 : }
246 1977 :
247 1977 : let d = {
248 1977 : let mut rng = rand::thread_rng();
249 1977 : rng.gen_range(Duration::ZERO..=period)
250 1977 : };
251 1977 :
252 1977 : match tokio::time::timeout(d, cancel.cancelled()).await {
253 474 : Ok(_) => Err(Cancelled),
254 693 : Err(_) => Ok(()),
255 : }
256 1539 : }
257 :
258 : /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
259 922 : pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
260 922 : // Duration::ZERO will happen because it's the "disable [bgtask]" value.
261 922 : if elapsed >= period && period != Duration::ZERO {
262 : // humantime does no significant digits clamping whereas Duration's debug is a bit more
263 : // intelligent. however it makes sense to keep the "configuration format" for period, even
264 : // though there's no way to output the actual config value.
265 9 : warn!(
266 9 : ?elapsed,
267 9 : period = %humantime::format_duration(period),
268 9 : task,
269 9 : "task iteration took longer than the configured period"
270 9 : );
271 9 : crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
272 9 : .with_label_values(&[task, &format!("{}", period.as_secs())])
273 9 : .inc();
274 913 : }
275 922 : }
|