Line data Source code
1 : use std::collections::HashMap;
2 : use std::marker::PhantomData;
3 : use std::pin::Pin;
4 : use std::time::{Duration, Instant};
5 :
6 : use futures::Future;
7 : use pageserver_api::shard::TenantShardId;
8 : use rand::Rng;
9 : use tokio::task::JoinSet;
10 : use tokio_util::sync::CancellationToken;
11 : use utils::completion::Barrier;
12 : use utils::yielding_loop::yielding_loop;
13 :
14 : use super::{CommandRequest, CommandResponse, SecondaryTenantError};
15 :
16 : /// Scheduling interval is the time between calls to JobGenerator::schedule.
17 : /// When we schedule jobs, the job generator may provide a hint of its preferred
18 : /// interval, which we will respect within these intervals.
19 : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
20 : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
21 :
22 : /// Jitter a Duration by an integer percentage. Returned values are uniform
23 : /// in the range 100-pct..100+pct (i.e. a 5% jitter is 5% either way: a ~10% range)
24 0 : pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
25 0 : if d == Duration::ZERO {
26 0 : d
27 : } else {
28 0 : rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
29 : }
30 0 : }
31 :
32 : /// When a periodic task first starts, it should wait for some time in the range 0..period, so
33 : /// that starting many such tasks at the same time spreads them across the time range.
34 0 : pub(super) fn period_warmup(period: Duration) -> Duration {
35 0 : if period == Duration::ZERO {
36 0 : period
37 : } else {
38 0 : rand::thread_rng().gen_range(Duration::ZERO..period)
39 : }
40 0 : }
41 :
42 : /// Scheduling helper for background work across many tenants.
43 : ///
44 : /// Systems that need to run background work across many tenants may use this type
45 : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
46 : /// implementation to provide the work to execute. This is a simple scheduler that just
47 : /// polls the generator for outstanding work, replacing its queue of pending work with
48 : /// what the generator yields on each call: the job generator can change its mind about
49 : /// the order of jobs between calls. The job generator is notified when jobs complete,
50 : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
51 : /// admin APIs).
52 : ///
53 : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
54 : ///
55 : /// G: A JobGenerator that this scheduler will poll to find pending jobs
56 : /// PJ: 'Pending Job': type for job descriptors that are ready to run
57 : /// RJ: 'Running Job' type' for jobs that have been spawned
58 : /// C : 'Completion' type that spawned jobs will send when they finish
59 : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
60 : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
61 : where
62 : G: JobGenerator<PJ, RJ, C, CMD>,
63 : C: Completion,
64 : PJ: PendingJob,
65 : RJ: RunningJob,
66 : {
67 : generator: G,
68 :
69 : /// Ready to run. Will progress to `running` once concurrent limit is satisfied, or
70 : /// be removed on next scheduling pass.
71 : pending: std::collections::VecDeque<PJ>,
72 :
73 : /// Tasks currently running in Self::tasks for these tenants. Check this map
74 : /// before pushing more work into pending for the same tenant.
75 : running: HashMap<TenantShardId, RJ>,
76 :
77 : tasks: JoinSet<C>,
78 :
79 : concurrency: usize,
80 :
81 : /// How often we would like schedule_interval to be called.
82 : pub(super) scheduling_interval: Duration,
83 :
84 : _phantom: PhantomData<(PJ, RJ, C, CMD)>,
85 : }
86 :
87 : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
88 : where
89 : C: Completion,
90 : PJ: PendingJob,
91 : RJ: RunningJob,
92 : {
93 : /// Called at each scheduling interval. Return a list of jobs to run, most urgent first.
94 : ///
95 : /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
96 : /// Implementations should take care to yield the executor periodically if running
97 : /// very long loops.
98 : ///
99 : /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
100 : /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
101 : /// and re-generated.
102 : async fn schedule(&mut self) -> SchedulingResult<PJ>;
103 :
104 : /// Called when a pending job is ready to be run.
105 : ///
106 : /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
107 : fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
108 :
109 : /// Called when a job previously spawned with spawn() transmits its completion
110 : fn on_completion(&mut self, completion: C);
111 :
112 : /// Called when a command is received. A job will be spawned immediately if the return
113 : /// value is Some, ignoring concurrency limits and the pending queue.
114 : fn on_command(&mut self, cmd: CMD) -> Result<PJ, SecondaryTenantError>;
115 : }
116 :
117 : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
118 : pub(super) struct SchedulingResult<PJ> {
119 : pub(super) jobs: Vec<PJ>,
120 : /// The job generator would like to be called again this soon
121 : pub(super) want_interval: Option<Duration>,
122 : }
123 :
124 : /// See [`TenantBackgroundJobs`].
125 : pub(super) trait PendingJob {
126 : fn get_tenant_shard_id(&self) -> &TenantShardId;
127 : }
128 :
129 : /// See [`TenantBackgroundJobs`].
130 : pub(super) trait Completion: Send + 'static {
131 : fn get_tenant_shard_id(&self) -> &TenantShardId;
132 : }
133 :
134 : /// See [`TenantBackgroundJobs`].
135 : pub(super) trait RunningJob {
136 : fn get_barrier(&self) -> Barrier;
137 : }
138 :
139 : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
140 : where
141 : C: Completion,
142 : PJ: PendingJob,
143 : RJ: RunningJob,
144 : G: JobGenerator<PJ, RJ, C, CMD>,
145 : {
146 0 : pub(super) fn new(generator: G, concurrency: usize) -> Self {
147 0 : Self {
148 0 : generator,
149 0 : pending: std::collections::VecDeque::new(),
150 0 : running: HashMap::new(),
151 0 : tasks: JoinSet::new(),
152 0 : concurrency,
153 0 : scheduling_interval: MAX_SCHEDULING_INTERVAL,
154 0 : _phantom: PhantomData,
155 0 : }
156 0 : }
157 :
158 0 : pub(super) async fn run(
159 0 : &mut self,
160 0 : mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
161 0 : background_jobs_can_start: Barrier,
162 0 : cancel: CancellationToken,
163 0 : ) {
164 0 : tracing::info!("Waiting for background_jobs_can start...");
165 0 : background_jobs_can_start.wait().await;
166 0 : tracing::info!("background_jobs_can is ready, proceeding.");
167 :
168 0 : while !cancel.is_cancelled() {
169 : // Look for new work: this is relatively expensive because we have to go acquire the lock on
170 : // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
171 : // require an upload.
172 0 : self.schedule_iteration(&cancel).await;
173 :
174 0 : if cancel.is_cancelled() {
175 0 : return;
176 0 : }
177 0 :
178 0 : // Schedule some work, if concurrency limit permits it
179 0 : self.spawn_pending();
180 0 :
181 0 : // This message is printed every scheduling iteration as proof of liveness when looking at logs
182 0 : tracing::info!(
183 0 : "Status: {} tasks running, {} pending",
184 0 : self.running.len(),
185 0 : self.pending.len()
186 : );
187 :
188 : // Between scheduling iterations, we will:
189 : // - Drain any complete tasks and spawn pending tasks
190 : // - Handle incoming administrative commands
191 : // - Check our cancellation token
192 0 : let next_scheduling_iteration = Instant::now()
193 0 : .checked_add(self.scheduling_interval)
194 0 : .unwrap_or_else(|| {
195 0 : tracing::warn!(
196 0 : "Scheduling interval invalid ({}s)",
197 0 : self.scheduling_interval.as_secs_f64()
198 : );
199 : // unwrap(): this constant is small, cannot fail to add to time unless
200 : // we are close to the end of the universe.
201 0 : Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
202 0 : });
203 : loop {
204 0 : tokio::select! {
205 0 : _ = cancel.cancelled() => {
206 0 : tracing::info!("joining tasks");
207 : // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
208 : // It is the callers responsibility to make sure that the tasks they scheduled
209 : // respect an appropriate cancellation token, to shut down promptly. It is only
210 : // safe to wait on joining these tasks because we can see the cancellation token
211 : // has been set.
212 0 : while let Some(_r) = self.tasks.join_next().await {}
213 0 : tracing::info!("terminating on cancellation token.");
214 :
215 0 : break;
216 : },
217 0 : _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
218 0 : tracing::debug!("woke for scheduling interval");
219 0 : break;},
220 0 : cmd = command_queue.recv() => {
221 0 : tracing::debug!("woke for command queue");
222 0 : let cmd = match cmd {
223 0 : Some(c) =>c,
224 : None => {
225 : // SecondaryController was destroyed, and this has raced with
226 : // our CancellationToken
227 0 : tracing::info!("terminating on command queue destruction");
228 0 : cancel.cancel();
229 0 : break;
230 : }
231 : };
232 :
233 : let CommandRequest{
234 0 : response_tx,
235 0 : payload
236 0 : } = cmd;
237 0 : self.handle_command(payload, response_tx);
238 : },
239 0 : _ = async {
240 0 : let completion = self.process_next_completion().await;
241 0 : match completion {
242 0 : Some(c) => {
243 0 : self.generator.on_completion(c);
244 0 : if !cancel.is_cancelled() {
245 0 : self.spawn_pending();
246 0 : }
247 : },
248 : None => {
249 : // Nothing is running, so just wait: expect that this future
250 : // will be dropped when something in the outer select! fires.
251 0 : cancel.cancelled().await;
252 : }
253 : }
254 :
255 0 : } => {}
256 : }
257 : }
258 : }
259 0 : }
260 :
261 0 : fn do_spawn(&mut self, job: PJ) {
262 0 : let tenant_shard_id = *job.get_tenant_shard_id();
263 0 : let (in_progress, fut) = self.generator.spawn(job);
264 0 :
265 0 : self.tasks.spawn(fut);
266 0 :
267 0 : let replaced = self.running.insert(tenant_shard_id, in_progress);
268 0 : debug_assert!(replaced.is_none());
269 0 : if replaced.is_some() {
270 0 : tracing::warn!(%tenant_shard_id, "Unexpectedly spawned a task when one was already running")
271 0 : }
272 0 : }
273 :
274 : /// For all pending tenants that are elegible for execution, spawn their task.
275 : ///
276 : /// Caller provides the spawn operation, we track the resulting execution.
277 0 : fn spawn_pending(&mut self) {
278 0 : while !self.pending.is_empty() && self.running.len() < self.concurrency {
279 : // unwrap: loop condition includes !is_empty()
280 0 : let pending = self.pending.pop_front().unwrap();
281 0 : if !self.running.contains_key(pending.get_tenant_shard_id()) {
282 0 : self.do_spawn(pending);
283 0 : }
284 : }
285 0 : }
286 :
287 : /// For administrative commands: skip the pending queue, ignore concurrency limits
288 0 : fn spawn_now(&mut self, job: PJ) -> &RJ {
289 0 : let tenant_shard_id = *job.get_tenant_shard_id();
290 0 : self.do_spawn(job);
291 0 : self.running
292 0 : .get(&tenant_shard_id)
293 0 : .expect("We just inserted this")
294 0 : }
295 :
296 : /// Wait until the next task completes, and handle its completion
297 : ///
298 : /// Cancellation: this method is cancel-safe.
299 0 : async fn process_next_completion(&mut self) -> Option<C> {
300 0 : match self.tasks.join_next().await {
301 0 : Some(r) => {
302 0 : // We use a channel to drive completions, but also
303 0 : // need to drain the JoinSet to avoid completed tasks
304 0 : // accumulating. These calls are 1:1 because every task
305 0 : // we spawn into this joinset submits is result to the channel.
306 0 : let completion = r.expect("Panic in background task");
307 0 :
308 0 : self.running.remove(completion.get_tenant_shard_id());
309 0 : Some(completion)
310 : }
311 : None => {
312 : // Nothing is running, so we have nothing to wait for. We may drop out: the
313 : // main even loop will call us again after the next time it has run something.
314 0 : None
315 : }
316 : }
317 0 : }
318 :
319 : /// Convert the command into a pending job, spawn it, and when the spawned
320 : /// job completes, send the result down `response_tx`.
321 0 : fn handle_command(
322 0 : &mut self,
323 0 : cmd: CMD,
324 0 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
325 0 : ) {
326 0 : let job = match self.generator.on_command(cmd) {
327 0 : Ok(j) => j,
328 0 : Err(e) => {
329 0 : response_tx.send(CommandResponse { result: Err(e) }).ok();
330 0 : return;
331 : }
332 : };
333 :
334 0 : let tenant_shard_id = job.get_tenant_shard_id();
335 0 : let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
336 0 : tracing::info!(
337 : tenant_id=%tenant_shard_id.tenant_id,
338 0 : shard_id=%tenant_shard_id.shard_slug(),
339 0 : "Command already running, waiting for it"
340 : );
341 0 : barrier
342 : } else {
343 0 : let running = self.spawn_now(job);
344 0 : running.get_barrier().clone()
345 : };
346 :
347 : // This task does no I/O: it only listens for a barrier's completion and then
348 : // sends to the command response channel. It is therefore safe to spawn this without
349 : // any gates/task_mgr hooks.
350 0 : tokio::task::spawn(async move {
351 0 : barrier.wait().await;
352 :
353 0 : response_tx.send(CommandResponse { result: Ok(()) }).ok();
354 0 : });
355 0 : }
356 :
357 0 : fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
358 0 : self.running.get(tenant_shard_id).map(|r| r.get_barrier())
359 0 : }
360 :
361 : /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
362 : ///
363 : /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
364 : ///
365 : /// This function resets the pending list: it is assumed that the caller may change their mind about
366 : /// which tenants need work between calls to schedule_iteration.
367 0 : async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
368 : let SchedulingResult {
369 0 : jobs,
370 0 : want_interval,
371 0 : } = self.generator.schedule().await;
372 :
373 : // Adjust interval based on feedback from the job generator
374 0 : if let Some(want_interval) = want_interval {
375 0 : // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
376 0 : self.scheduling_interval = Duration::from_secs(std::cmp::min(
377 0 : std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
378 0 : MAX_SCHEDULING_INTERVAL.as_secs(),
379 0 : ));
380 0 : }
381 :
382 : // The priority order of previously scheduled work may be invalidated by current state: drop
383 : // all pending work (it will be re-scheduled if still needed)
384 0 : self.pending.clear();
385 0 :
386 0 : // While iterating over the potentially-long list of tenants, we will periodically yield
387 0 : // to avoid blocking executor.
388 0 : yielding_loop(1000, cancel, jobs.into_iter(), |job| {
389 0 : // Skip tenants that already have a write in flight
390 0 : if !self.running.contains_key(job.get_tenant_shard_id()) {
391 0 : self.pending.push_back(job);
392 0 : }
393 0 : })
394 0 : .await
395 0 : .ok();
396 0 : }
397 : }
|