Line data Source code
1 : use futures::Future;
2 : use rand::Rng;
3 : use std::{
4 : collections::HashMap,
5 : marker::PhantomData,
6 : pin::Pin,
7 : time::{Duration, Instant},
8 : };
9 :
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio::task::JoinSet;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::{completion::Barrier, yielding_loop::yielding_loop};
14 :
15 : use super::{CommandRequest, CommandResponse};
16 :
17 : /// Scheduling interval is the time between calls to JobGenerator::schedule.
18 : /// When we schedule jobs, the job generator may provide a hint of its preferred
19 : /// interval, which we will respect within these intervals.
20 : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
21 : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
22 :
23 : /// Jitter a Duration by an integer percentage. Returned values are uniform
24 : /// in the range 100-pct..100+pct (i.e. a 5% jitter is 5% either way: a ~10% range)
25 0 : pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
26 0 : if d == Duration::ZERO {
27 0 : d
28 : } else {
29 0 : rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
30 : }
31 0 : }
32 :
33 : /// When a periodic task first starts, it should wait for some time in the range 0..period, so
34 : /// that starting many such tasks at the same time spreads them across the time range.
35 0 : pub(super) fn period_warmup(period: Duration) -> Duration {
36 0 : if period == Duration::ZERO {
37 0 : period
38 : } else {
39 0 : rand::thread_rng().gen_range(Duration::ZERO..period)
40 : }
41 0 : }
42 :
43 : /// Scheduling helper for background work across many tenants.
44 : ///
45 : /// Systems that need to run background work across many tenants may use this type
46 : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
47 : /// implementation to provide the work to execute. This is a simple scheduler that just
48 : /// polls the generator for outstanding work, replacing its queue of pending work with
49 : /// what the generator yields on each call: the job generator can change its mind about
50 : /// the order of jobs between calls. The job generator is notified when jobs complete,
51 : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
52 : /// admin APIs).
53 : ///
54 : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
55 : ///
56 : /// G: A JobGenerator that this scheduler will poll to find pending jobs
57 : /// PJ: 'Pending Job': type for job descriptors that are ready to run
58 : /// RJ: 'Running Job' type' for jobs that have been spawned
59 : /// C : 'Completion' type that spawned jobs will send when they finish
60 : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
61 : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
62 : where
63 : G: JobGenerator<PJ, RJ, C, CMD>,
64 : C: Completion,
65 : PJ: PendingJob,
66 : RJ: RunningJob,
67 : {
68 : generator: G,
69 :
70 : /// Ready to run. Will progress to `running` once concurrent limit is satisfied, or
71 : /// be removed on next scheduling pass.
72 : pending: std::collections::VecDeque<PJ>,
73 :
74 : /// Tasks currently running in Self::tasks for these tenants. Check this map
75 : /// before pushing more work into pending for the same tenant.
76 : running: HashMap<TenantShardId, RJ>,
77 :
78 : tasks: JoinSet<C>,
79 :
80 : concurrency: usize,
81 :
82 : /// How often we would like schedule_interval to be called.
83 : pub(super) scheduling_interval: Duration,
84 :
85 : _phantom: PhantomData<(PJ, RJ, C, CMD)>,
86 : }
87 :
88 : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
89 : where
90 : C: Completion,
91 : PJ: PendingJob,
92 : RJ: RunningJob,
93 : {
94 : /// Called at each scheduling interval. Return a list of jobs to run, most urgent first.
95 : ///
96 : /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
97 : /// Implementations should take care to yield the executor periodically if running
98 : /// very long loops.
99 : ///
100 : /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
101 : /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
102 : /// and re-generated.
103 : async fn schedule(&mut self) -> SchedulingResult<PJ>;
104 :
105 : /// Called when a pending job is ready to be run.
106 : ///
107 : /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
108 : fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
109 :
110 : /// Called when a job previously spawned with spawn() transmits its completion
111 : fn on_completion(&mut self, completion: C);
112 :
113 : /// Called when a command is received. A job will be spawned immediately if the return
114 : /// value is Some, ignoring concurrency limits and the pending queue.
115 : fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
116 : }
117 :
118 : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
119 : pub(super) struct SchedulingResult<PJ> {
120 : pub(super) jobs: Vec<PJ>,
121 : /// The job generator would like to be called again this soon
122 : pub(super) want_interval: Option<Duration>,
123 : }
124 :
125 : /// See [`TenantBackgroundJobs`].
126 : pub(super) trait PendingJob {
127 : fn get_tenant_shard_id(&self) -> &TenantShardId;
128 : }
129 :
130 : /// See [`TenantBackgroundJobs`].
131 : pub(super) trait Completion: Send + 'static {
132 : fn get_tenant_shard_id(&self) -> &TenantShardId;
133 : }
134 :
135 : /// See [`TenantBackgroundJobs`].
136 : pub(super) trait RunningJob {
137 : fn get_barrier(&self) -> Barrier;
138 : }
139 :
140 : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
141 : where
142 : C: Completion,
143 : PJ: PendingJob,
144 : RJ: RunningJob,
145 : G: JobGenerator<PJ, RJ, C, CMD>,
146 : {
147 0 : pub(super) fn new(generator: G, concurrency: usize) -> Self {
148 0 : Self {
149 0 : generator,
150 0 : pending: std::collections::VecDeque::new(),
151 0 : running: HashMap::new(),
152 0 : tasks: JoinSet::new(),
153 0 : concurrency,
154 0 : scheduling_interval: MAX_SCHEDULING_INTERVAL,
155 0 : _phantom: PhantomData,
156 0 : }
157 0 : }
158 :
159 0 : pub(super) async fn run(
160 0 : &mut self,
161 0 : mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
162 0 : background_jobs_can_start: Barrier,
163 0 : cancel: CancellationToken,
164 0 : ) {
165 0 : tracing::info!("Waiting for background_jobs_can start...");
166 0 : background_jobs_can_start.wait().await;
167 0 : tracing::info!("background_jobs_can is ready, proceeding.");
168 :
169 0 : while !cancel.is_cancelled() {
170 : // Look for new work: this is relatively expensive because we have to go acquire the lock on
171 : // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
172 : // require an upload.
173 0 : self.schedule_iteration(&cancel).await;
174 :
175 0 : if cancel.is_cancelled() {
176 0 : return;
177 0 : }
178 0 :
179 0 : // Schedule some work, if concurrency limit permits it
180 0 : self.spawn_pending();
181 0 :
182 0 : // Between scheduling iterations, we will:
183 0 : // - Drain any complete tasks and spawn pending tasks
184 0 : // - Handle incoming administrative commands
185 0 : // - Check our cancellation token
186 0 : let next_scheduling_iteration = Instant::now()
187 0 : .checked_add(self.scheduling_interval)
188 0 : .unwrap_or_else(|| {
189 0 : tracing::warn!(
190 0 : "Scheduling interval invalid ({}s)",
191 0 : self.scheduling_interval.as_secs_f64()
192 : );
193 : // unwrap(): this constant is small, cannot fail to add to time unless
194 : // we are close to the end of the universe.
195 0 : Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
196 0 : });
197 0 : loop {
198 0 : tokio::select! {
199 : _ = cancel.cancelled() => {
200 : tracing::info!("joining tasks");
201 : // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
202 : // It is the callers responsibility to make sure that the tasks they scheduled
203 : // respect an appropriate cancellation token, to shut down promptly. It is only
204 : // safe to wait on joining these tasks because we can see the cancellation token
205 : // has been set.
206 : while let Some(_r) = self.tasks.join_next().await {}
207 : tracing::info!("terminating on cancellation token.");
208 :
209 : break;
210 : },
211 : _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
212 : tracing::debug!("woke for scheduling interval");
213 : break;},
214 : cmd = command_queue.recv() => {
215 : tracing::debug!("woke for command queue");
216 : let cmd = match cmd {
217 : Some(c) =>c,
218 : None => {
219 : // SecondaryController was destroyed, and this has raced with
220 : // our CancellationToken
221 : tracing::info!("terminating on command queue destruction");
222 : cancel.cancel();
223 : break;
224 : }
225 : };
226 :
227 : let CommandRequest{
228 : response_tx,
229 : payload
230 : } = cmd;
231 : self.handle_command(payload, response_tx);
232 : },
233 0 : _ = async {
234 0 : let completion = self.process_next_completion().await;
235 0 : match completion {
236 0 : Some(c) => {
237 0 : self.generator.on_completion(c);
238 0 : if !cancel.is_cancelled() {
239 0 : self.spawn_pending();
240 0 : }
241 : },
242 : None => {
243 : // Nothing is running, so just wait: expect that this future
244 : // will be dropped when something in the outer select! fires.
245 0 : cancel.cancelled().await;
246 : }
247 : }
248 :
249 0 : } => {}
250 0 : }
251 0 : }
252 : }
253 0 : }
254 :
255 0 : fn do_spawn(&mut self, job: PJ) {
256 0 : let tenant_shard_id = *job.get_tenant_shard_id();
257 0 : let (in_progress, fut) = self.generator.spawn(job);
258 0 :
259 0 : self.tasks.spawn(fut);
260 0 :
261 0 : self.running.insert(tenant_shard_id, in_progress);
262 0 : }
263 :
264 : /// For all pending tenants that are elegible for execution, spawn their task.
265 : ///
266 : /// Caller provides the spawn operation, we track the resulting execution.
267 0 : fn spawn_pending(&mut self) {
268 0 : while !self.pending.is_empty() && self.running.len() < self.concurrency {
269 0 : // unwrap: loop condition includes !is_empty()
270 0 : let pending = self.pending.pop_front().unwrap();
271 0 : self.do_spawn(pending);
272 0 : }
273 0 : }
274 :
275 : /// For administrative commands: skip the pending queue, ignore concurrency limits
276 0 : fn spawn_now(&mut self, job: PJ) -> &RJ {
277 0 : let tenant_shard_id = *job.get_tenant_shard_id();
278 0 : self.do_spawn(job);
279 0 : self.running
280 0 : .get(&tenant_shard_id)
281 0 : .expect("We just inserted this")
282 0 : }
283 :
284 : /// Wait until the next task completes, and handle its completion
285 : ///
286 : /// Cancellation: this method is cancel-safe.
287 0 : async fn process_next_completion(&mut self) -> Option<C> {
288 0 : match self.tasks.join_next().await {
289 0 : Some(r) => {
290 0 : // We use a channel to drive completions, but also
291 0 : // need to drain the JoinSet to avoid completed tasks
292 0 : // accumulating. These calls are 1:1 because every task
293 0 : // we spawn into this joinset submits is result to the channel.
294 0 : let completion = r.expect("Panic in background task");
295 0 :
296 0 : self.running.remove(completion.get_tenant_shard_id());
297 0 : Some(completion)
298 : }
299 : None => {
300 : // Nothing is running, so we have nothing to wait for. We may drop out: the
301 : // main even loop will call us again after the next time it has run something.
302 0 : None
303 : }
304 : }
305 0 : }
306 :
307 : /// Convert the command into a pending job, spawn it, and when the spawned
308 : /// job completes, send the result down `response_tx`.
309 0 : fn handle_command(
310 0 : &mut self,
311 0 : cmd: CMD,
312 0 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
313 0 : ) {
314 0 : let job = match self.generator.on_command(cmd) {
315 0 : Ok(j) => j,
316 0 : Err(e) => {
317 0 : response_tx.send(CommandResponse { result: Err(e) }).ok();
318 0 : return;
319 : }
320 : };
321 :
322 0 : let tenant_shard_id = job.get_tenant_shard_id();
323 0 : let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
324 0 : tracing::info!("Command already running, waiting for it");
325 0 : barrier
326 : } else {
327 0 : let running = self.spawn_now(job);
328 0 : running.get_barrier().clone()
329 : };
330 :
331 : // This task does no I/O: it only listens for a barrier's completion and then
332 : // sends to the command response channel. It is therefore safe to spawn this without
333 : // any gates/task_mgr hooks.
334 0 : tokio::task::spawn(async move {
335 0 : barrier.wait().await;
336 :
337 0 : response_tx.send(CommandResponse { result: Ok(()) }).ok();
338 0 : });
339 0 : }
340 :
341 0 : fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
342 0 : self.running.get(tenant_shard_id).map(|r| r.get_barrier())
343 0 : }
344 :
345 : /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
346 : ///
347 : /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
348 : ///
349 : /// This function resets the pending list: it is assumed that the caller may change their mind about
350 : /// which tenants need work between calls to schedule_iteration.
351 0 : async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
352 : let SchedulingResult {
353 0 : jobs,
354 0 : want_interval,
355 0 : } = self.generator.schedule().await;
356 :
357 : // Adjust interval based on feedback from the job generator
358 0 : if let Some(want_interval) = want_interval {
359 0 : // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
360 0 : self.scheduling_interval = Duration::from_secs(std::cmp::min(
361 0 : std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
362 0 : MAX_SCHEDULING_INTERVAL.as_secs(),
363 0 : ));
364 0 : }
365 :
366 : // The priority order of previously scheduled work may be invalidated by current state: drop
367 : // all pending work (it will be re-scheduled if still needed)
368 0 : self.pending.clear();
369 0 :
370 0 : // While iterating over the potentially-long list of tenants, we will periodically yield
371 0 : // to avoid blocking executor.
372 0 : yielding_loop(1000, cancel, jobs.into_iter(), |job| {
373 0 : // Skip tenants that already have a write in flight
374 0 : if !self.running.contains_key(job.get_tenant_shard_id()) {
375 0 : self.pending.push_back(job);
376 0 : }
377 0 : })
378 0 : .await
379 0 : .ok();
380 0 : }
381 : }
|