Line data Source code
1 : use futures::Future;
2 : use rand::Rng;
3 : use std::{
4 : collections::HashMap,
5 : marker::PhantomData,
6 : pin::Pin,
7 : time::{Duration, Instant},
8 : };
9 :
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio::task::JoinSet;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::{completion::Barrier, yielding_loop::yielding_loop};
14 :
15 : use super::{CommandRequest, CommandResponse};
16 :
17 : /// Scheduling interval is the time between calls to JobGenerator::schedule.
18 : /// When we schedule jobs, the job generator may provide a hint of its preferred
19 : /// interval, which we will respect within these intervals.
20 : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
21 : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
22 :
23 : /// Jitter a Duration by an integer percentage. Returned values are uniform
24 : /// in the range 100-pct..100+pct (i.e. a 5% jitter is 5% either way: a ~10% range)
25 0 : pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
26 0 : if d == Duration::ZERO {
27 0 : d
28 : } else {
29 0 : rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
30 : }
31 0 : }
32 :
33 : /// When a periodic task first starts, it should wait for some time in the range 0..period, so
34 : /// that starting many such tasks at the same time spreads them across the time range.
35 0 : pub(super) fn period_warmup(period: Duration) -> Duration {
36 0 : if period == Duration::ZERO {
37 0 : period
38 : } else {
39 0 : rand::thread_rng().gen_range(Duration::ZERO..period)
40 : }
41 0 : }
42 :
43 : /// Scheduling helper for background work across many tenants.
44 : ///
45 : /// Systems that need to run background work across many tenants may use this type
46 : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
47 : /// implementation to provide the work to execute. This is a simple scheduler that just
48 : /// polls the generator for outstanding work, replacing its queue of pending work with
49 : /// what the generator yields on each call: the job generator can change its mind about
50 : /// the order of jobs between calls. The job generator is notified when jobs complete,
51 : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
52 : /// admin APIs).
53 : ///
54 : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
55 : ///
56 : /// G: A JobGenerator that this scheduler will poll to find pending jobs
57 : /// PJ: 'Pending Job': type for job descriptors that are ready to run
58 : /// RJ: 'Running Job' type' for jobs that have been spawned
59 : /// C : 'Completion' type that spawned jobs will send when they finish
60 : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
61 : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
62 : where
63 : G: JobGenerator<PJ, RJ, C, CMD>,
64 : C: Completion,
65 : PJ: PendingJob,
66 : RJ: RunningJob,
67 : {
68 : generator: G,
69 :
70 : /// Ready to run. Will progress to `running` once concurrent limit is satisfied, or
71 : /// be removed on next scheduling pass.
72 : pending: std::collections::VecDeque<PJ>,
73 :
74 : /// Tasks currently running in Self::tasks for these tenants. Check this map
75 : /// before pushing more work into pending for the same tenant.
76 : running: HashMap<TenantShardId, RJ>,
77 :
78 : tasks: JoinSet<C>,
79 :
80 : concurrency: usize,
81 :
82 : /// How often we would like schedule_interval to be called.
83 : pub(super) scheduling_interval: Duration,
84 :
85 : _phantom: PhantomData<(PJ, RJ, C, CMD)>,
86 : }
87 :
88 : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
89 : where
90 : C: Completion,
91 : PJ: PendingJob,
92 : RJ: RunningJob,
93 : {
94 : /// Called at each scheduling interval. Return a list of jobs to run, most urgent first.
95 : ///
96 : /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
97 : /// Implementations should take care to yield the executor periodically if running
98 : /// very long loops.
99 : ///
100 : /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
101 : /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
102 : /// and re-generated.
103 : async fn schedule(&mut self) -> SchedulingResult<PJ>;
104 :
105 : /// Called when a pending job is ready to be run.
106 : ///
107 : /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
108 : fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
109 :
110 : /// Called when a job previously spawned with spawn() transmits its completion
111 : fn on_completion(&mut self, completion: C);
112 :
113 : /// Called when a command is received. A job will be spawned immediately if the return
114 : /// value is Some, ignoring concurrency limits and the pending queue.
115 : fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
116 : }
117 :
118 : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
119 : pub(super) struct SchedulingResult<PJ> {
120 : pub(super) jobs: Vec<PJ>,
121 : /// The job generator would like to be called again this soon
122 : pub(super) want_interval: Option<Duration>,
123 : }
124 :
125 : /// See [`TenantBackgroundJobs`].
126 : pub(super) trait PendingJob {
127 : fn get_tenant_shard_id(&self) -> &TenantShardId;
128 : }
129 :
130 : /// See [`TenantBackgroundJobs`].
131 : pub(super) trait Completion: Send + 'static {
132 : fn get_tenant_shard_id(&self) -> &TenantShardId;
133 : }
134 :
135 : /// See [`TenantBackgroundJobs`].
136 : pub(super) trait RunningJob {
137 : fn get_barrier(&self) -> Barrier;
138 : }
139 :
140 : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
141 : where
142 : C: Completion,
143 : PJ: PendingJob,
144 : RJ: RunningJob,
145 : G: JobGenerator<PJ, RJ, C, CMD>,
146 : {
147 0 : pub(super) fn new(generator: G, concurrency: usize) -> Self {
148 0 : Self {
149 0 : generator,
150 0 : pending: std::collections::VecDeque::new(),
151 0 : running: HashMap::new(),
152 0 : tasks: JoinSet::new(),
153 0 : concurrency,
154 0 : scheduling_interval: MAX_SCHEDULING_INTERVAL,
155 0 : _phantom: PhantomData,
156 0 : }
157 0 : }
158 :
159 0 : pub(super) async fn run(
160 0 : &mut self,
161 0 : mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
162 0 : background_jobs_can_start: Barrier,
163 0 : cancel: CancellationToken,
164 0 : ) {
165 0 : tracing::info!("Waiting for background_jobs_can start...");
166 0 : background_jobs_can_start.wait().await;
167 0 : tracing::info!("background_jobs_can is ready, proceeding.");
168 :
169 0 : while !cancel.is_cancelled() {
170 : // Look for new work: this is relatively expensive because we have to go acquire the lock on
171 : // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
172 : // require an upload.
173 0 : self.schedule_iteration(&cancel).await;
174 :
175 0 : if cancel.is_cancelled() {
176 0 : return;
177 0 : }
178 0 :
179 0 : // Schedule some work, if concurrency limit permits it
180 0 : self.spawn_pending();
181 0 :
182 0 : // This message is printed every scheduling iteration as proof of liveness when looking at logs
183 0 : tracing::info!(
184 0 : "Status: {} tasks running, {} pending",
185 0 : self.running.len(),
186 0 : self.pending.len()
187 : );
188 :
189 : // Between scheduling iterations, we will:
190 : // - Drain any complete tasks and spawn pending tasks
191 : // - Handle incoming administrative commands
192 : // - Check our cancellation token
193 0 : let next_scheduling_iteration = Instant::now()
194 0 : .checked_add(self.scheduling_interval)
195 0 : .unwrap_or_else(|| {
196 0 : tracing::warn!(
197 0 : "Scheduling interval invalid ({}s)",
198 0 : self.scheduling_interval.as_secs_f64()
199 : );
200 : // unwrap(): this constant is small, cannot fail to add to time unless
201 : // we are close to the end of the universe.
202 0 : Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
203 0 : });
204 : loop {
205 0 : tokio::select! {
206 0 : _ = cancel.cancelled() => {
207 0 : tracing::info!("joining tasks");
208 : // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
209 : // It is the callers responsibility to make sure that the tasks they scheduled
210 : // respect an appropriate cancellation token, to shut down promptly. It is only
211 : // safe to wait on joining these tasks because we can see the cancellation token
212 : // has been set.
213 0 : while let Some(_r) = self.tasks.join_next().await {}
214 0 : tracing::info!("terminating on cancellation token.");
215 :
216 0 : break;
217 : },
218 0 : _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
219 0 : tracing::debug!("woke for scheduling interval");
220 0 : break;},
221 0 : cmd = command_queue.recv() => {
222 0 : tracing::debug!("woke for command queue");
223 0 : let cmd = match cmd {
224 0 : Some(c) =>c,
225 : None => {
226 : // SecondaryController was destroyed, and this has raced with
227 : // our CancellationToken
228 0 : tracing::info!("terminating on command queue destruction");
229 0 : cancel.cancel();
230 0 : break;
231 : }
232 : };
233 :
234 : let CommandRequest{
235 0 : response_tx,
236 0 : payload
237 0 : } = cmd;
238 0 : self.handle_command(payload, response_tx);
239 : },
240 0 : _ = async {
241 0 : let completion = self.process_next_completion().await;
242 0 : match completion {
243 0 : Some(c) => {
244 0 : self.generator.on_completion(c);
245 0 : if !cancel.is_cancelled() {
246 0 : self.spawn_pending();
247 0 : }
248 : },
249 : None => {
250 : // Nothing is running, so just wait: expect that this future
251 : // will be dropped when something in the outer select! fires.
252 0 : cancel.cancelled().await;
253 : }
254 : }
255 :
256 0 : } => {}
257 : }
258 : }
259 : }
260 0 : }
261 :
262 0 : fn do_spawn(&mut self, job: PJ) {
263 0 : let tenant_shard_id = *job.get_tenant_shard_id();
264 0 : let (in_progress, fut) = self.generator.spawn(job);
265 0 :
266 0 : self.tasks.spawn(fut);
267 0 :
268 0 : let replaced = self.running.insert(tenant_shard_id, in_progress);
269 0 : debug_assert!(replaced.is_none());
270 0 : if replaced.is_some() {
271 0 : tracing::warn!(%tenant_shard_id, "Unexpectedly spawned a task when one was already running")
272 0 : }
273 0 : }
274 :
275 : /// For all pending tenants that are elegible for execution, spawn their task.
276 : ///
277 : /// Caller provides the spawn operation, we track the resulting execution.
278 0 : fn spawn_pending(&mut self) {
279 0 : while !self.pending.is_empty() && self.running.len() < self.concurrency {
280 : // unwrap: loop condition includes !is_empty()
281 0 : let pending = self.pending.pop_front().unwrap();
282 0 : if !self.running.contains_key(pending.get_tenant_shard_id()) {
283 0 : self.do_spawn(pending);
284 0 : }
285 : }
286 0 : }
287 :
288 : /// For administrative commands: skip the pending queue, ignore concurrency limits
289 0 : fn spawn_now(&mut self, job: PJ) -> &RJ {
290 0 : let tenant_shard_id = *job.get_tenant_shard_id();
291 0 : self.do_spawn(job);
292 0 : self.running
293 0 : .get(&tenant_shard_id)
294 0 : .expect("We just inserted this")
295 0 : }
296 :
297 : /// Wait until the next task completes, and handle its completion
298 : ///
299 : /// Cancellation: this method is cancel-safe.
300 0 : async fn process_next_completion(&mut self) -> Option<C> {
301 0 : match self.tasks.join_next().await {
302 0 : Some(r) => {
303 0 : // We use a channel to drive completions, but also
304 0 : // need to drain the JoinSet to avoid completed tasks
305 0 : // accumulating. These calls are 1:1 because every task
306 0 : // we spawn into this joinset submits is result to the channel.
307 0 : let completion = r.expect("Panic in background task");
308 0 :
309 0 : self.running.remove(completion.get_tenant_shard_id());
310 0 : Some(completion)
311 : }
312 : None => {
313 : // Nothing is running, so we have nothing to wait for. We may drop out: the
314 : // main even loop will call us again after the next time it has run something.
315 0 : None
316 : }
317 : }
318 0 : }
319 :
320 : /// Convert the command into a pending job, spawn it, and when the spawned
321 : /// job completes, send the result down `response_tx`.
322 0 : fn handle_command(
323 0 : &mut self,
324 0 : cmd: CMD,
325 0 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
326 0 : ) {
327 0 : let job = match self.generator.on_command(cmd) {
328 0 : Ok(j) => j,
329 0 : Err(e) => {
330 0 : response_tx.send(CommandResponse { result: Err(e) }).ok();
331 0 : return;
332 : }
333 : };
334 :
335 0 : let tenant_shard_id = job.get_tenant_shard_id();
336 0 : let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
337 0 : tracing::info!(
338 : tenant_id=%tenant_shard_id.tenant_id,
339 0 : shard_id=%tenant_shard_id.shard_slug(),
340 0 : "Command already running, waiting for it"
341 : );
342 0 : barrier
343 : } else {
344 0 : let running = self.spawn_now(job);
345 0 : running.get_barrier().clone()
346 : };
347 :
348 : // This task does no I/O: it only listens for a barrier's completion and then
349 : // sends to the command response channel. It is therefore safe to spawn this without
350 : // any gates/task_mgr hooks.
351 0 : tokio::task::spawn(async move {
352 0 : barrier.wait().await;
353 :
354 0 : response_tx.send(CommandResponse { result: Ok(()) }).ok();
355 0 : });
356 0 : }
357 :
358 0 : fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
359 0 : self.running.get(tenant_shard_id).map(|r| r.get_barrier())
360 0 : }
361 :
362 : /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
363 : ///
364 : /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
365 : ///
366 : /// This function resets the pending list: it is assumed that the caller may change their mind about
367 : /// which tenants need work between calls to schedule_iteration.
368 0 : async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
369 : let SchedulingResult {
370 0 : jobs,
371 0 : want_interval,
372 0 : } = self.generator.schedule().await;
373 :
374 : // Adjust interval based on feedback from the job generator
375 0 : if let Some(want_interval) = want_interval {
376 0 : // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
377 0 : self.scheduling_interval = Duration::from_secs(std::cmp::min(
378 0 : std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
379 0 : MAX_SCHEDULING_INTERVAL.as_secs(),
380 0 : ));
381 0 : }
382 :
383 : // The priority order of previously scheduled work may be invalidated by current state: drop
384 : // all pending work (it will be re-scheduled if still needed)
385 0 : self.pending.clear();
386 0 :
387 0 : // While iterating over the potentially-long list of tenants, we will periodically yield
388 0 : // to avoid blocking executor.
389 0 : yielding_loop(1000, cancel, jobs.into_iter(), |job| {
390 0 : // Skip tenants that already have a write in flight
391 0 : if !self.running.contains_key(job.get_tenant_shard_id()) {
392 0 : self.pending.push_back(job);
393 0 : }
394 0 : })
395 0 : .await
396 0 : .ok();
397 0 : }
398 : }
|