LCOV - code coverage report
Current view: top level - pageserver/src/tenant/secondary - scheduler.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 86.9 % 145 126
Test Date: 2024-02-07 07:37:29 Functions: 75.0 % 52 39

            Line data    Source code
       1              : use futures::Future;
       2              : use std::{
       3              :     collections::HashMap,
       4              :     marker::PhantomData,
       5              :     pin::Pin,
       6              :     time::{Duration, Instant},
       7              : };
       8              : 
       9              : use pageserver_api::shard::TenantShardId;
      10              : use tokio::task::JoinSet;
      11              : use tokio_util::sync::CancellationToken;
      12              : use utils::{completion::Barrier, yielding_loop::yielding_loop};
      13              : 
      14              : use super::{CommandRequest, CommandResponse};
      15              : 
      16              : /// Scheduling interval is the time between calls to JobGenerator::schedule.
      17              : /// When we schedule jobs, the job generator may provide a hint of its preferred
      18              : /// interval, which we will respect within these intervals.
      19              : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
      20              : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
      21              : 
      22              : /// Scheduling helper for background work across many tenants.
      23              : ///
      24              : /// Systems that need to run background work across many tenants may use this type
      25              : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
      26              : /// implementation to provide the work to execute.  This is a simple scheduler that just
      27              : /// polls the generator for outstanding work, replacing its queue of pending work with
      28              : /// what the generator yields on each call: the job generator can change its mind about
      29              : /// the order of jobs between calls.  The job generator is notified when jobs complete,
      30              : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
      31              : /// admin APIs).
      32              : ///
      33              : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
      34              : ///
      35              : /// G: A JobGenerator that this scheduler will poll to find pending jobs
      36              : /// PJ: 'Pending Job': type for job descriptors that are ready to run
      37              : /// RJ: 'Running Job' type' for jobs that have been spawned
      38              : /// C : 'Completion' type that spawned jobs will send when they finish
      39              : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
      40              : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
      41              : where
      42              :     G: JobGenerator<PJ, RJ, C, CMD>,
      43              :     C: Completion,
      44              :     PJ: PendingJob,
      45              :     RJ: RunningJob,
      46              : {
      47              :     generator: G,
      48              : 
      49              :     /// Ready to run.  Will progress to `running` once concurrent limit is satisfied, or
      50              :     /// be removed on next scheduling pass.
      51              :     pending: std::collections::VecDeque<PJ>,
      52              : 
      53              :     /// Tasks currently running in Self::tasks for these tenants.  Check this map
      54              :     /// before pushing more work into pending for the same tenant.
      55              :     running: HashMap<TenantShardId, RJ>,
      56              : 
      57              :     tasks: JoinSet<C>,
      58              : 
      59              :     concurrency: usize,
      60              : 
      61              :     /// How often we would like schedule_interval to be called.
      62              :     pub(super) scheduling_interval: Duration,
      63              : 
      64              :     _phantom: PhantomData<(PJ, RJ, C, CMD)>,
      65              : }
      66              : 
      67              : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
      68              : where
      69              :     C: Completion,
      70              :     PJ: PendingJob,
      71              :     RJ: RunningJob,
      72              : {
      73              :     /// Called at each scheduling interval.  Return a list of jobs to run, most urgent first.
      74              :     ///
      75              :     /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
      76              :     /// Implementations should take care to yield the executor periodically if running
      77              :     /// very long loops.
      78              :     ///
      79              :     /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
      80              :     /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
      81              :     /// and re-generated.
      82              :     async fn schedule(&mut self) -> SchedulingResult<PJ>;
      83              : 
      84              :     /// Called when a pending job is ready to be run.
      85              :     ///
      86              :     /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
      87              :     fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
      88              : 
      89              :     /// Called when a job previously spawned with spawn() transmits its completion
      90              :     fn on_completion(&mut self, completion: C);
      91              : 
      92              :     /// Called when a command is received.  A job will be spawned immediately if the return
      93              :     /// value is Some, ignoring concurrency limits and the pending queue.
      94              :     fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
      95              : }
      96              : 
      97              : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
      98              : pub(super) struct SchedulingResult<PJ> {
      99              :     pub(super) jobs: Vec<PJ>,
     100              :     /// The job generator would like to be called again this soon
     101              :     pub(super) want_interval: Option<Duration>,
     102              : }
     103              : 
     104              : /// See [`TenantBackgroundJobs`].
     105              : pub(super) trait PendingJob {
     106              :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     107              : }
     108              : 
     109              : /// See [`TenantBackgroundJobs`].
     110              : pub(super) trait Completion: Send + 'static {
     111              :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     112              : }
     113              : 
     114              : /// See [`TenantBackgroundJobs`].
     115              : pub(super) trait RunningJob {
     116              :     fn get_barrier(&self) -> Barrier;
     117              : }
     118              : 
     119              : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
     120              : where
     121              :     C: Completion,
     122              :     PJ: PendingJob,
     123              :     RJ: RunningJob,
     124              :     G: JobGenerator<PJ, RJ, C, CMD>,
     125              : {
     126         1208 :     pub(super) fn new(generator: G, concurrency: usize) -> Self {
     127         1208 :         Self {
     128         1208 :             generator,
     129         1208 :             pending: std::collections::VecDeque::new(),
     130         1208 :             running: HashMap::new(),
     131         1208 :             tasks: JoinSet::new(),
     132         1208 :             concurrency,
     133         1208 :             scheduling_interval: MAX_SCHEDULING_INTERVAL,
     134         1208 :             _phantom: PhantomData,
     135         1208 :         }
     136         1208 :     }
     137              : 
     138         1208 :     pub(super) async fn run(
     139         1208 :         &mut self,
     140         1208 :         mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
     141         1208 :         background_jobs_can_start: Barrier,
     142         1208 :         cancel: CancellationToken,
     143         1208 :     ) {
     144         1208 :         tracing::info!("Waiting for background_jobs_can start...");
     145         1208 :         background_jobs_can_start.wait().await;
     146         1194 :         tracing::info!("background_jobs_can is ready, proceeding.");
     147              : 
     148         2704 :         while !cancel.is_cancelled() {
     149              :             // Look for new work: this is relatively expensive because we have to go acquire the lock on
     150              :             // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
     151              :             // require an upload.
     152         2360 :             self.schedule_iteration(&cancel).await;
     153              : 
     154         2360 :             if cancel.is_cancelled() {
     155            0 :                 return;
     156         2360 :             }
     157         2360 : 
     158         2360 :             // Schedule some work, if concurrency limit permits it
     159         2360 :             self.spawn_pending();
     160         2360 : 
     161         2360 :             // Between scheduling iterations, we will:
     162         2360 :             //  - Drain any complete tasks and spawn pending tasks
     163         2360 :             //  - Handle incoming administrative commands
     164         2360 :             //  - Check our cancellation token
     165         2360 :             let next_scheduling_iteration = Instant::now()
     166         2360 :                 .checked_add(self.scheduling_interval)
     167         2360 :                 .unwrap_or_else(|| {
     168            0 :                     tracing::warn!(
     169            0 :                         "Scheduling interval invalid ({}s)",
     170            0 :                         self.scheduling_interval.as_secs_f64()
     171            0 :                     );
     172              :                     // unwrap(): this constant is small, cannot fail to add to time unless
     173              :                     // we are close to the end of the universe.
     174            0 :                     Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
     175         2360 :                 });
     176         3405 :             loop {
     177         4947 :                 tokio::select! {
     178              :                     _ = cancel.cancelled() => {
     179          344 :                         tracing::info!("joining tasks");
     180              :                         // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
     181              :                         // It is the callers responsibility to make sure that the tasks they scheduled
     182              :                         // respect an appropriate cancellation token, to shut down promptly.  It is only
     183              :                         // safe to wait on joining these tasks because we can see the cancellation token
     184              :                         // has been set.
     185              :                         while let Some(_r) = self.tasks.join_next().await {}
     186          344 :                         tracing::info!("terminating on cancellation token.");
     187              : 
     188              :                         break;
     189              :                     },
     190              :                     _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
     191            0 :                         tracing::debug!("woke for scheduling interval");
     192              :                         break;},
     193           14 :                     cmd = command_queue.recv() => {
     194            0 :                         tracing::debug!("woke for command queue");
     195              :                         let cmd = match cmd {
     196              :                             Some(c) =>c,
     197              :                             None => {
     198              :                                 // SecondaryController was destroyed, and this has raced with
     199              :                                 // our CancellationToken
     200            0 :                                 tracing::info!("terminating on command queue destruction");
     201              :                                 cancel.cancel();
     202              :                                 break;
     203              :                             }
     204              :                         };
     205              : 
     206              :                         let CommandRequest{
     207              :                             response_tx,
     208              :                             payload
     209              :                         } = cmd;
     210              :                         self.handle_command(payload, response_tx);
     211              :                     },
     212         3138 :                     _ = async {
     213         3138 :                         let completion = self.process_next_completion().await;
     214         3135 :                         match completion {
     215           18 :                             Some(c) => {
     216           18 :                                 self.generator.on_completion(c);
     217           18 :                                 if !cancel.is_cancelled() {
     218           18 :                                     self.spawn_pending();
     219           18 :                                 }
     220              :                             },
     221              :                             None => {
     222              :                                 // Nothing is running, so just wait: expect that this future
     223              :                                 // will be dropped when something in the outer select! fires.
     224         3117 :                                 cancel.cancelled().await;
     225              :                             }
     226              :                         }
     227              : 
     228         1031 :                      } => {}
     229         3405 :                 }
     230         3405 :             }
     231              :         }
     232          344 :     }
     233              : 
     234           18 :     fn do_spawn(&mut self, job: PJ) {
     235           18 :         let tenant_shard_id = *job.get_tenant_shard_id();
     236           18 :         let (in_progress, fut) = self.generator.spawn(job);
     237           18 : 
     238           18 :         self.tasks.spawn(fut);
     239           18 : 
     240           18 :         self.running.insert(tenant_shard_id, in_progress);
     241           18 :     }
     242              : 
     243              :     /// For all pending tenants that are elegible for execution, spawn their task.
     244              :     ///
     245              :     /// Caller provides the spawn operation, we track the resulting execution.
     246         2378 :     fn spawn_pending(&mut self) {
     247         2382 :         while !self.pending.is_empty() && self.running.len() < self.concurrency {
     248            4 :             // unwrap: loop condition includes !is_empty()
     249            4 :             let pending = self.pending.pop_front().unwrap();
     250            4 :             self.do_spawn(pending);
     251            4 :         }
     252         2378 :     }
     253              : 
     254              :     /// For administrative commands: skip the pending queue, ignore concurrency limits
     255           14 :     fn spawn_now(&mut self, job: PJ) -> &RJ {
     256           14 :         let tenant_shard_id = *job.get_tenant_shard_id();
     257           14 :         self.do_spawn(job);
     258           14 :         self.running
     259           14 :             .get(&tenant_shard_id)
     260           14 :             .expect("We just inserted this")
     261           14 :     }
     262              : 
     263              :     /// Wait until the next task completes, and handle its completion
     264              :     ///
     265              :     /// Cancellation: this method is cancel-safe.
     266         3138 :     async fn process_next_completion(&mut self) -> Option<C> {
     267         3138 :         match self.tasks.join_next().await {
     268           18 :             Some(r) => {
     269           18 :                 // We use a channel to drive completions, but also
     270           18 :                 // need to drain the JoinSet to avoid completed tasks
     271           18 :                 // accumulating.  These calls are 1:1 because every task
     272           18 :                 // we spawn into this joinset submits is result to the channel.
     273           18 :                 let completion = r.expect("Panic in background task");
     274           18 : 
     275           18 :                 self.running.remove(completion.get_tenant_shard_id());
     276           18 :                 Some(completion)
     277              :             }
     278              :             None => {
     279              :                 // Nothing is running, so we have nothing to wait for.  We may drop out: the
     280              :                 // main even loop will call us again after the next time it has run something.
     281         3117 :                 None
     282              :             }
     283              :         }
     284         3135 :     }
     285              : 
     286              :     /// Convert the command into a pending job, spawn it, and when the spawned
     287              :     /// job completes, send the result down `response_tx`.
     288           14 :     fn handle_command(
     289           14 :         &mut self,
     290           14 :         cmd: CMD,
     291           14 :         response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
     292           14 :     ) {
     293           14 :         let job = match self.generator.on_command(cmd) {
     294           14 :             Ok(j) => j,
     295            0 :             Err(e) => {
     296            0 :                 response_tx.send(CommandResponse { result: Err(e) }).ok();
     297            0 :                 return;
     298              :             }
     299              :         };
     300              : 
     301           14 :         let tenant_shard_id = job.get_tenant_shard_id();
     302           14 :         let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
     303            0 :             barrier
     304              :         } else {
     305           14 :             let running = self.spawn_now(job);
     306           14 :             running.get_barrier().clone()
     307              :         };
     308              : 
     309              :         // This task does no I/O: it only listens for a barrier's completion and then
     310              :         // sends to the command response channel.  It is therefore safe to spawn this without
     311              :         // any gates/task_mgr hooks.
     312           14 :         tokio::task::spawn(async move {
     313           14 :             barrier.wait().await;
     314              : 
     315           14 :             response_tx.send(CommandResponse { result: Ok(()) }).ok();
     316           14 :         });
     317           14 :     }
     318              : 
     319           14 :     fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
     320           14 :         self.running.get(tenant_shard_id).map(|r| r.get_barrier())
     321           14 :     }
     322              : 
     323              :     /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
     324              :     ///
     325              :     /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
     326              :     ///
     327              :     /// This function resets the pending list: it is assumed that the caller may change their mind about
     328              :     /// which tenants need work between calls to schedule_iteration.
     329         2360 :     async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
     330              :         let SchedulingResult {
     331         2360 :             jobs,
     332         2360 :             want_interval,
     333         2360 :         } = self.generator.schedule().await;
     334              : 
     335              :         // Adjust interval based on feedback from the job generator
     336         2360 :         if let Some(want_interval) = want_interval {
     337            0 :             // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
     338            0 :             self.scheduling_interval = Duration::from_secs(std::cmp::min(
     339            0 :                 std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
     340            0 :                 MAX_SCHEDULING_INTERVAL.as_secs(),
     341            0 :             ));
     342         2360 :         }
     343              : 
     344              :         // The priority order of previously scheduled work may be invalidated by current state: drop
     345              :         // all pending work (it will be re-scheduled if still needed)
     346         2360 :         self.pending.clear();
     347         2360 : 
     348         2360 :         // While iterating over the potentially-long list of tenants, we will periodically yield
     349         2360 :         // to avoid blocking executor.
     350         2360 :         yielding_loop(1000, cancel, jobs.into_iter(), |job| {
     351            7 :             // Skip tenants that already have a write in flight
     352            7 :             if !self.running.contains_key(job.get_tenant_shard_id()) {
     353            4 :                 self.pending.push_back(job);
     354            4 :             }
     355         2360 :         })
     356            0 :         .await
     357         2360 :         .ok();
     358         2360 :     }
     359              : }
        

Generated by: LCOV version 2.1-beta