Line data Source code
1 : use futures::Future;
2 : use std::{
3 : collections::HashMap,
4 : marker::PhantomData,
5 : pin::Pin,
6 : time::{Duration, Instant},
7 : };
8 :
9 : use pageserver_api::shard::TenantShardId;
10 : use tokio::task::JoinSet;
11 : use tokio_util::sync::CancellationToken;
12 : use utils::{completion::Barrier, yielding_loop::yielding_loop};
13 :
14 : use super::{CommandRequest, CommandResponse};
15 :
16 : /// Scheduling interval is the time between calls to JobGenerator::schedule.
17 : /// When we schedule jobs, the job generator may provide a hint of its preferred
18 : /// interval, which we will respect within these intervals.
19 : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
20 : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
21 :
22 : /// Scheduling helper for background work across many tenants.
23 : ///
24 : /// Systems that need to run background work across many tenants may use this type
25 : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
26 : /// implementation to provide the work to execute. This is a simple scheduler that just
27 : /// polls the generator for outstanding work, replacing its queue of pending work with
28 : /// what the generator yields on each call: the job generator can change its mind about
29 : /// the order of jobs between calls. The job generator is notified when jobs complete,
30 : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
31 : /// admin APIs).
32 : ///
33 : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
34 : ///
35 : /// G: A JobGenerator that this scheduler will poll to find pending jobs
36 : /// PJ: 'Pending Job': type for job descriptors that are ready to run
37 : /// RJ: 'Running Job' type' for jobs that have been spawned
38 : /// C : 'Completion' type that spawned jobs will send when they finish
39 : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
40 : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
41 : where
42 : G: JobGenerator<PJ, RJ, C, CMD>,
43 : C: Completion,
44 : PJ: PendingJob,
45 : RJ: RunningJob,
46 : {
47 : generator: G,
48 :
49 : /// Ready to run. Will progress to `running` once concurrent limit is satisfied, or
50 : /// be removed on next scheduling pass.
51 : pending: std::collections::VecDeque<PJ>,
52 :
53 : /// Tasks currently running in Self::tasks for these tenants. Check this map
54 : /// before pushing more work into pending for the same tenant.
55 : running: HashMap<TenantShardId, RJ>,
56 :
57 : tasks: JoinSet<C>,
58 :
59 : concurrency: usize,
60 :
61 : /// How often we would like schedule_interval to be called.
62 : pub(super) scheduling_interval: Duration,
63 :
64 : _phantom: PhantomData<(PJ, RJ, C, CMD)>,
65 : }
66 :
67 : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
68 : where
69 : C: Completion,
70 : PJ: PendingJob,
71 : RJ: RunningJob,
72 : {
73 : /// Called at each scheduling interval. Return a list of jobs to run, most urgent first.
74 : ///
75 : /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
76 : /// Implementations should take care to yield the executor periodically if running
77 : /// very long loops.
78 : ///
79 : /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
80 : /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
81 : /// and re-generated.
82 : async fn schedule(&mut self) -> SchedulingResult<PJ>;
83 :
84 : /// Called when a pending job is ready to be run.
85 : ///
86 : /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
87 : fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
88 :
89 : /// Called when a job previously spawned with spawn() transmits its completion
90 : fn on_completion(&mut self, completion: C);
91 :
92 : /// Called when a command is received. A job will be spawned immediately if the return
93 : /// value is Some, ignoring concurrency limits and the pending queue.
94 : fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
95 : }
96 :
97 : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
98 : pub(super) struct SchedulingResult<PJ> {
99 : pub(super) jobs: Vec<PJ>,
100 : /// The job generator would like to be called again this soon
101 : pub(super) want_interval: Option<Duration>,
102 : }
103 :
104 : /// See [`TenantBackgroundJobs`].
105 : pub(super) trait PendingJob {
106 : fn get_tenant_shard_id(&self) -> &TenantShardId;
107 : }
108 :
109 : /// See [`TenantBackgroundJobs`].
110 : pub(super) trait Completion: Send + 'static {
111 : fn get_tenant_shard_id(&self) -> &TenantShardId;
112 : }
113 :
114 : /// See [`TenantBackgroundJobs`].
115 : pub(super) trait RunningJob {
116 : fn get_barrier(&self) -> Barrier;
117 : }
118 :
119 : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
120 : where
121 : C: Completion,
122 : PJ: PendingJob,
123 : RJ: RunningJob,
124 : G: JobGenerator<PJ, RJ, C, CMD>,
125 : {
126 1208 : pub(super) fn new(generator: G, concurrency: usize) -> Self {
127 1208 : Self {
128 1208 : generator,
129 1208 : pending: std::collections::VecDeque::new(),
130 1208 : running: HashMap::new(),
131 1208 : tasks: JoinSet::new(),
132 1208 : concurrency,
133 1208 : scheduling_interval: MAX_SCHEDULING_INTERVAL,
134 1208 : _phantom: PhantomData,
135 1208 : }
136 1208 : }
137 :
138 1208 : pub(super) async fn run(
139 1208 : &mut self,
140 1208 : mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
141 1208 : background_jobs_can_start: Barrier,
142 1208 : cancel: CancellationToken,
143 1208 : ) {
144 1208 : tracing::info!("Waiting for background_jobs_can start...");
145 1208 : background_jobs_can_start.wait().await;
146 1194 : tracing::info!("background_jobs_can is ready, proceeding.");
147 :
148 2704 : while !cancel.is_cancelled() {
149 : // Look for new work: this is relatively expensive because we have to go acquire the lock on
150 : // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
151 : // require an upload.
152 2360 : self.schedule_iteration(&cancel).await;
153 :
154 2360 : if cancel.is_cancelled() {
155 0 : return;
156 2360 : }
157 2360 :
158 2360 : // Schedule some work, if concurrency limit permits it
159 2360 : self.spawn_pending();
160 2360 :
161 2360 : // Between scheduling iterations, we will:
162 2360 : // - Drain any complete tasks and spawn pending tasks
163 2360 : // - Handle incoming administrative commands
164 2360 : // - Check our cancellation token
165 2360 : let next_scheduling_iteration = Instant::now()
166 2360 : .checked_add(self.scheduling_interval)
167 2360 : .unwrap_or_else(|| {
168 0 : tracing::warn!(
169 0 : "Scheduling interval invalid ({}s)",
170 0 : self.scheduling_interval.as_secs_f64()
171 0 : );
172 : // unwrap(): this constant is small, cannot fail to add to time unless
173 : // we are close to the end of the universe.
174 0 : Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
175 2360 : });
176 3405 : loop {
177 4947 : tokio::select! {
178 : _ = cancel.cancelled() => {
179 344 : tracing::info!("joining tasks");
180 : // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
181 : // It is the callers responsibility to make sure that the tasks they scheduled
182 : // respect an appropriate cancellation token, to shut down promptly. It is only
183 : // safe to wait on joining these tasks because we can see the cancellation token
184 : // has been set.
185 : while let Some(_r) = self.tasks.join_next().await {}
186 344 : tracing::info!("terminating on cancellation token.");
187 :
188 : break;
189 : },
190 : _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
191 0 : tracing::debug!("woke for scheduling interval");
192 : break;},
193 14 : cmd = command_queue.recv() => {
194 0 : tracing::debug!("woke for command queue");
195 : let cmd = match cmd {
196 : Some(c) =>c,
197 : None => {
198 : // SecondaryController was destroyed, and this has raced with
199 : // our CancellationToken
200 0 : tracing::info!("terminating on command queue destruction");
201 : cancel.cancel();
202 : break;
203 : }
204 : };
205 :
206 : let CommandRequest{
207 : response_tx,
208 : payload
209 : } = cmd;
210 : self.handle_command(payload, response_tx);
211 : },
212 3138 : _ = async {
213 3138 : let completion = self.process_next_completion().await;
214 3135 : match completion {
215 18 : Some(c) => {
216 18 : self.generator.on_completion(c);
217 18 : if !cancel.is_cancelled() {
218 18 : self.spawn_pending();
219 18 : }
220 : },
221 : None => {
222 : // Nothing is running, so just wait: expect that this future
223 : // will be dropped when something in the outer select! fires.
224 3117 : cancel.cancelled().await;
225 : }
226 : }
227 :
228 1031 : } => {}
229 3405 : }
230 3405 : }
231 : }
232 344 : }
233 :
234 18 : fn do_spawn(&mut self, job: PJ) {
235 18 : let tenant_shard_id = *job.get_tenant_shard_id();
236 18 : let (in_progress, fut) = self.generator.spawn(job);
237 18 :
238 18 : self.tasks.spawn(fut);
239 18 :
240 18 : self.running.insert(tenant_shard_id, in_progress);
241 18 : }
242 :
243 : /// For all pending tenants that are elegible for execution, spawn their task.
244 : ///
245 : /// Caller provides the spawn operation, we track the resulting execution.
246 2378 : fn spawn_pending(&mut self) {
247 2382 : while !self.pending.is_empty() && self.running.len() < self.concurrency {
248 4 : // unwrap: loop condition includes !is_empty()
249 4 : let pending = self.pending.pop_front().unwrap();
250 4 : self.do_spawn(pending);
251 4 : }
252 2378 : }
253 :
254 : /// For administrative commands: skip the pending queue, ignore concurrency limits
255 14 : fn spawn_now(&mut self, job: PJ) -> &RJ {
256 14 : let tenant_shard_id = *job.get_tenant_shard_id();
257 14 : self.do_spawn(job);
258 14 : self.running
259 14 : .get(&tenant_shard_id)
260 14 : .expect("We just inserted this")
261 14 : }
262 :
263 : /// Wait until the next task completes, and handle its completion
264 : ///
265 : /// Cancellation: this method is cancel-safe.
266 3138 : async fn process_next_completion(&mut self) -> Option<C> {
267 3138 : match self.tasks.join_next().await {
268 18 : Some(r) => {
269 18 : // We use a channel to drive completions, but also
270 18 : // need to drain the JoinSet to avoid completed tasks
271 18 : // accumulating. These calls are 1:1 because every task
272 18 : // we spawn into this joinset submits is result to the channel.
273 18 : let completion = r.expect("Panic in background task");
274 18 :
275 18 : self.running.remove(completion.get_tenant_shard_id());
276 18 : Some(completion)
277 : }
278 : None => {
279 : // Nothing is running, so we have nothing to wait for. We may drop out: the
280 : // main even loop will call us again after the next time it has run something.
281 3117 : None
282 : }
283 : }
284 3135 : }
285 :
286 : /// Convert the command into a pending job, spawn it, and when the spawned
287 : /// job completes, send the result down `response_tx`.
288 14 : fn handle_command(
289 14 : &mut self,
290 14 : cmd: CMD,
291 14 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
292 14 : ) {
293 14 : let job = match self.generator.on_command(cmd) {
294 14 : Ok(j) => j,
295 0 : Err(e) => {
296 0 : response_tx.send(CommandResponse { result: Err(e) }).ok();
297 0 : return;
298 : }
299 : };
300 :
301 14 : let tenant_shard_id = job.get_tenant_shard_id();
302 14 : let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
303 0 : barrier
304 : } else {
305 14 : let running = self.spawn_now(job);
306 14 : running.get_barrier().clone()
307 : };
308 :
309 : // This task does no I/O: it only listens for a barrier's completion and then
310 : // sends to the command response channel. It is therefore safe to spawn this without
311 : // any gates/task_mgr hooks.
312 14 : tokio::task::spawn(async move {
313 14 : barrier.wait().await;
314 :
315 14 : response_tx.send(CommandResponse { result: Ok(()) }).ok();
316 14 : });
317 14 : }
318 :
319 14 : fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
320 14 : self.running.get(tenant_shard_id).map(|r| r.get_barrier())
321 14 : }
322 :
323 : /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
324 : ///
325 : /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
326 : ///
327 : /// This function resets the pending list: it is assumed that the caller may change their mind about
328 : /// which tenants need work between calls to schedule_iteration.
329 2360 : async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
330 : let SchedulingResult {
331 2360 : jobs,
332 2360 : want_interval,
333 2360 : } = self.generator.schedule().await;
334 :
335 : // Adjust interval based on feedback from the job generator
336 2360 : if let Some(want_interval) = want_interval {
337 0 : // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
338 0 : self.scheduling_interval = Duration::from_secs(std::cmp::min(
339 0 : std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
340 0 : MAX_SCHEDULING_INTERVAL.as_secs(),
341 0 : ));
342 2360 : }
343 :
344 : // The priority order of previously scheduled work may be invalidated by current state: drop
345 : // all pending work (it will be re-scheduled if still needed)
346 2360 : self.pending.clear();
347 2360 :
348 2360 : // While iterating over the potentially-long list of tenants, we will periodically yield
349 2360 : // to avoid blocking executor.
350 2360 : yielding_loop(1000, cancel, jobs.into_iter(), |job| {
351 7 : // Skip tenants that already have a write in flight
352 7 : if !self.running.contains_key(job.get_tenant_shard_id()) {
353 4 : self.pending.push_back(job);
354 4 : }
355 2360 : })
356 0 : .await
357 2360 : .ok();
358 2360 : }
359 : }
|