TLA Line data Source code
1 : use async_trait;
2 : use futures::Future;
3 : use std::{
4 : collections::HashMap,
5 : marker::PhantomData,
6 : pin::Pin,
7 : time::{Duration, Instant},
8 : };
9 :
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio::task::JoinSet;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::{completion::Barrier, yielding_loop::yielding_loop};
14 :
15 : use super::{CommandRequest, CommandResponse};
16 :
17 : /// Scheduling interval is the time between calls to JobGenerator::schedule.
18 : /// When we schedule jobs, the job generator may provide a hint of its preferred
19 : /// interval, which we will respect within these intervals.
20 : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
21 : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
22 :
23 : /// Scheduling helper for background work across many tenants.
24 : ///
25 : /// Systems that need to run background work across many tenants may use this type
26 : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
27 : /// implementation to provide the work to execute. This is a simple scheduler that just
28 : /// polls the generator for outstanding work, replacing its queue of pending work with
29 : /// what the generator yields on each call: the job generator can change its mind about
30 : /// the order of jobs between calls. The job generator is notified when jobs complete,
31 : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
32 : /// admin APIs).
33 : ///
34 : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
35 : ///
36 : /// G: A JobGenerator that this scheduler will poll to find pending jobs
37 : /// PJ: 'Pending Job': type for job descriptors that are ready to run
38 : /// RJ: 'Running Job' type' for jobs that have been spawned
39 : /// C : 'Completion' type that spawned jobs will send when they finish
40 : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
41 : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
42 : where
43 : G: JobGenerator<PJ, RJ, C, CMD>,
44 : C: Completion,
45 : PJ: PendingJob,
46 : RJ: RunningJob,
47 : {
48 : generator: G,
49 :
50 : /// Ready to run. Will progress to `running` once concurrent limit is satisfied, or
51 : /// be removed on next scheduling pass.
52 : pending: std::collections::VecDeque<PJ>,
53 :
54 : /// Tasks currently running in Self::tasks for these tenants. Check this map
55 : /// before pushing more work into pending for the same tenant.
56 : running: HashMap<TenantShardId, RJ>,
57 :
58 : tasks: JoinSet<C>,
59 :
60 : concurrency: usize,
61 :
62 : /// How often we would like schedule_interval to be called.
63 : pub(super) scheduling_interval: Duration,
64 :
65 : _phantom: PhantomData<(PJ, RJ, C, CMD)>,
66 : }
67 :
68 : #[async_trait::async_trait]
69 : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
70 : where
71 : C: Completion,
72 : PJ: PendingJob,
73 : RJ: RunningJob,
74 : {
75 : /// Called at each scheduling interval. Return a list of jobs to run, most urgent first.
76 : ///
77 : /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
78 : /// Implementations should take care to yield the executor periodically if running
79 : /// very long loops.
80 : ///
81 : /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
82 : /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
83 : /// and re-generated.
84 : async fn schedule(&mut self) -> SchedulingResult<PJ>;
85 :
86 : /// Called when a pending job is ready to be run.
87 : ///
88 : /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
89 : fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
90 :
91 : /// Called when a job previously spawned with spawn() transmits its completion
92 : fn on_completion(&mut self, completion: C);
93 :
94 : /// Called when a command is received. A job will be spawned immediately if the return
95 : /// value is Some, ignoring concurrency limits and the pending queue.
96 : fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
97 : }
98 :
99 : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
100 : pub(super) struct SchedulingResult<PJ> {
101 : pub(super) jobs: Vec<PJ>,
102 : /// The job generator would like to be called again this soon
103 : pub(super) want_interval: Option<Duration>,
104 : }
105 :
106 : /// See [`TenantBackgroundJobs`].
107 : pub(super) trait PendingJob {
108 : fn get_tenant_shard_id(&self) -> &TenantShardId;
109 : }
110 :
111 : /// See [`TenantBackgroundJobs`].
112 : pub(super) trait Completion: Send + 'static {
113 : fn get_tenant_shard_id(&self) -> &TenantShardId;
114 : }
115 :
116 : /// See [`TenantBackgroundJobs`].
117 : pub(super) trait RunningJob {
118 : fn get_barrier(&self) -> Barrier;
119 : }
120 :
121 : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
122 : where
123 : C: Completion,
124 : PJ: PendingJob,
125 : RJ: RunningJob,
126 : G: JobGenerator<PJ, RJ, C, CMD>,
127 : {
128 CBC 1114 : pub(super) fn new(generator: G, concurrency: usize) -> Self {
129 1114 : Self {
130 1114 : generator,
131 1114 : pending: std::collections::VecDeque::new(),
132 1114 : running: HashMap::new(),
133 1114 : tasks: JoinSet::new(),
134 1114 : concurrency,
135 1114 : scheduling_interval: MAX_SCHEDULING_INTERVAL,
136 1114 : _phantom: PhantomData,
137 1114 : }
138 1114 : }
139 :
140 1114 : pub(super) async fn run(
141 1114 : &mut self,
142 1114 : mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
143 1114 : background_jobs_can_start: Barrier,
144 1114 : cancel: CancellationToken,
145 1114 : ) {
146 1114 : tracing::info!("Waiting for background_jobs_can start...");
147 1114 : background_jobs_can_start.wait().await;
148 1098 : tracing::info!("background_jobs_can is ready, proceeding.");
149 :
150 2536 : while !cancel.is_cancelled() {
151 : // Look for new work: this is relatively expensive because we have to go acquire the lock on
152 : // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
153 : // require an upload.
154 2218 : self.schedule_iteration(&cancel).await;
155 :
156 2218 : if cancel.is_cancelled() {
157 UBC 0 : return;
158 CBC 2218 : }
159 2218 :
160 2218 : // Schedule some work, if concurrency limit permits it
161 2218 : self.spawn_pending();
162 2218 :
163 2218 : // Between scheduling iterations, we will:
164 2218 : // - Drain any complete tasks and spawn pending tasks
165 2218 : // - Handle incoming administrative commands
166 2218 : // - Check our cancellation token
167 2218 : let next_scheduling_iteration = Instant::now()
168 2218 : .checked_add(self.scheduling_interval)
169 2218 : .unwrap_or_else(|| {
170 UBC 0 : tracing::warn!(
171 0 : "Scheduling interval invalid ({}s)",
172 0 : self.scheduling_interval.as_secs_f64()
173 0 : );
174 : // unwrap(): this constant is small, cannot fail to add to time unless
175 : // we are close to the end of the universe.
176 0 : Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
177 CBC 2218 : });
178 3148 : loop {
179 4608 : tokio::select! {
180 : _ = cancel.cancelled() => {
181 316 : tracing::info!("joining tasks");
182 : // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
183 : // It is the callers responsibility to make sure that the tasks they scheduled
184 : // respect an appropriate cancellation token, to shut down promptly. It is only
185 : // safe to wait on joining these tasks because we can see the cancellation token
186 : // has been set.
187 : while let Some(_r) = self.tasks.join_next().await {}
188 316 : tracing::info!("terminating on cancellation token.");
189 :
190 : break;
191 : },
192 : _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
193 UBC 0 : tracing::debug!("woke for scheduling interval");
194 : break;},
195 CBC 10 : cmd = command_queue.recv() => {
196 UBC 0 : tracing::debug!("woke for command queue");
197 : let cmd = match cmd {
198 : Some(c) =>c,
199 : None => {
200 : // SecondaryController was destroyed, and this has raced with
201 : // our CancellationToken
202 0 : tracing::info!("terminating on command queue destruction");
203 : cancel.cancel();
204 : break;
205 : }
206 : };
207 :
208 : let CommandRequest{
209 : response_tx,
210 : payload
211 : } = cmd;
212 : self.handle_command(payload, response_tx);
213 : },
214 CBC 2921 : _ = async {
215 2921 : let completion = self.process_next_completion().await;
216 2918 : match completion {
217 12 : Some(c) => {
218 12 : self.generator.on_completion(c);
219 12 : if !cancel.is_cancelled() {
220 12 : self.spawn_pending();
221 12 : }
222 : },
223 : None => {
224 : // Nothing is running, so just wait: expect that this future
225 : // will be dropped when something in the outer select! fires.
226 2906 : cancel.cancelled().await;
227 : }
228 : }
229 :
230 920 : } => {}
231 3148 : }
232 3148 : }
233 : }
234 318 : }
235 :
236 13 : fn do_spawn(&mut self, job: PJ) {
237 13 : let tenant_shard_id = *job.get_tenant_shard_id();
238 13 : let (in_progress, fut) = self.generator.spawn(job);
239 13 :
240 13 : self.tasks.spawn(fut);
241 13 :
242 13 : self.running.insert(tenant_shard_id, in_progress);
243 13 : }
244 :
245 : /// For all pending tenants that are elegible for execution, spawn their task.
246 : ///
247 : /// Caller provides the spawn operation, we track the resulting execution.
248 2230 : fn spawn_pending(&mut self) {
249 2233 : while !self.pending.is_empty() && self.running.len() < self.concurrency {
250 3 : // unwrap: loop condition includes !is_empty()
251 3 : let pending = self.pending.pop_front().unwrap();
252 3 : self.do_spawn(pending);
253 3 : }
254 2230 : }
255 :
256 : /// For administrative commands: skip the pending queue, ignore concurrency limits
257 10 : fn spawn_now(&mut self, job: PJ) -> &RJ {
258 10 : let tenant_shard_id = *job.get_tenant_shard_id();
259 10 : self.do_spawn(job);
260 10 : self.running
261 10 : .get(&tenant_shard_id)
262 10 : .expect("We just inserted this")
263 10 : }
264 :
265 : /// Wait until the next task completes, and handle its completion
266 : ///
267 : /// Cancellation: this method is cancel-safe.
268 2921 : async fn process_next_completion(&mut self) -> Option<C> {
269 2921 : match self.tasks.join_next().await {
270 12 : Some(r) => {
271 12 : // We use a channel to drive completions, but also
272 12 : // need to drain the JoinSet to avoid completed tasks
273 12 : // accumulating. These calls are 1:1 because every task
274 12 : // we spawn into this joinset submits is result to the channel.
275 12 : let completion = r.expect("Panic in background task");
276 12 :
277 12 : self.running.remove(completion.get_tenant_shard_id());
278 12 : Some(completion)
279 : }
280 : None => {
281 : // Nothing is running, so we have nothing to wait for. We may drop out: the
282 : // main even loop will call us again after the next time it has run something.
283 2906 : None
284 : }
285 : }
286 2918 : }
287 :
288 : /// Convert the command into a pending job, spawn it, and when the spawned
289 : /// job completes, send the result down `response_tx`.
290 10 : fn handle_command(
291 10 : &mut self,
292 10 : cmd: CMD,
293 10 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
294 10 : ) {
295 10 : let job = match self.generator.on_command(cmd) {
296 10 : Ok(j) => j,
297 UBC 0 : Err(e) => {
298 0 : response_tx.send(CommandResponse { result: Err(e) }).ok();
299 0 : return;
300 : }
301 : };
302 :
303 CBC 10 : let tenant_shard_id = job.get_tenant_shard_id();
304 10 : let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
305 LBC (1) : barrier
306 : } else {
307 CBC 10 : let running = self.spawn_now(job);
308 10 : running.get_barrier().clone()
309 : };
310 :
311 : // This task does no I/O: it only listens for a barrier's completion and then
312 : // sends to the command response channel. It is therefore safe to spawn this without
313 : // any gates/task_mgr hooks.
314 10 : tokio::task::spawn(async move {
315 10 : barrier.wait().await;
316 :
317 10 : response_tx.send(CommandResponse { result: Ok(()) }).ok();
318 10 : });
319 10 : }
320 :
321 10 : fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
322 10 : self.running.get(tenant_shard_id).map(|r| r.get_barrier())
323 10 : }
324 :
325 : /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
326 : ///
327 : /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
328 : ///
329 : /// This function resets the pending list: it is assumed that the caller may change their mind about
330 : /// which tenants need work between calls to schedule_iteration.
331 2218 : async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
332 : let SchedulingResult {
333 2218 : jobs,
334 2218 : want_interval,
335 2218 : } = self.generator.schedule().await;
336 :
337 : // Adjust interval based on feedback from the job generator
338 2218 : if let Some(want_interval) = want_interval {
339 UBC 0 : // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
340 0 : self.scheduling_interval = Duration::from_secs(std::cmp::min(
341 0 : std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
342 0 : MAX_SCHEDULING_INTERVAL.as_secs(),
343 0 : ));
344 CBC 2218 : }
345 :
346 : // The priority order of previously scheduled work may be invalidated by current state: drop
347 : // all pending work (it will be re-scheduled if still needed)
348 2218 : self.pending.clear();
349 2218 :
350 2218 : // While iterating over the potentially-long list of tenants, we will periodically yield
351 2218 : // to avoid blocking executor.
352 2218 : yielding_loop(1000, cancel, jobs.into_iter(), |job| {
353 5 : // Skip tenants that already have a write in flight
354 5 : if !self.running.contains_key(job.get_tenant_shard_id()) {
355 3 : self.pending.push_back(job);
356 3 : }
357 2218 : })
358 UBC 0 : .await
359 CBC 2218 : .ok();
360 2218 : }
361 : }
|