LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/secondary - scheduler.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 86.9 % 145 126 1 18 126
Current Date: 2024-01-09 02:06:09 Functions: 75.0 % 52 39 1 12 39
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use async_trait;
       2                 : use futures::Future;
       3                 : use std::{
       4                 :     collections::HashMap,
       5                 :     marker::PhantomData,
       6                 :     pin::Pin,
       7                 :     time::{Duration, Instant},
       8                 : };
       9                 : 
      10                 : use pageserver_api::shard::TenantShardId;
      11                 : use tokio::task::JoinSet;
      12                 : use tokio_util::sync::CancellationToken;
      13                 : use utils::{completion::Barrier, yielding_loop::yielding_loop};
      14                 : 
      15                 : use super::{CommandRequest, CommandResponse};
      16                 : 
      17                 : /// Scheduling interval is the time between calls to JobGenerator::schedule.
      18                 : /// When we schedule jobs, the job generator may provide a hint of its preferred
      19                 : /// interval, which we will respect within these intervals.
      20                 : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
      21                 : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
      22                 : 
      23                 : /// Scheduling helper for background work across many tenants.
      24                 : ///
      25                 : /// Systems that need to run background work across many tenants may use this type
      26                 : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
      27                 : /// implementation to provide the work to execute.  This is a simple scheduler that just
      28                 : /// polls the generator for outstanding work, replacing its queue of pending work with
      29                 : /// what the generator yields on each call: the job generator can change its mind about
      30                 : /// the order of jobs between calls.  The job generator is notified when jobs complete,
      31                 : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
      32                 : /// admin APIs).
      33                 : ///
      34                 : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
      35                 : ///
      36                 : /// G: A JobGenerator that this scheduler will poll to find pending jobs
      37                 : /// PJ: 'Pending Job': type for job descriptors that are ready to run
      38                 : /// RJ: 'Running Job' type' for jobs that have been spawned
      39                 : /// C : 'Completion' type that spawned jobs will send when they finish
      40                 : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
      41                 : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
      42                 : where
      43                 :     G: JobGenerator<PJ, RJ, C, CMD>,
      44                 :     C: Completion,
      45                 :     PJ: PendingJob,
      46                 :     RJ: RunningJob,
      47                 : {
      48                 :     generator: G,
      49                 : 
      50                 :     /// Ready to run.  Will progress to `running` once concurrent limit is satisfied, or
      51                 :     /// be removed on next scheduling pass.
      52                 :     pending: std::collections::VecDeque<PJ>,
      53                 : 
      54                 :     /// Tasks currently running in Self::tasks for these tenants.  Check this map
      55                 :     /// before pushing more work into pending for the same tenant.
      56                 :     running: HashMap<TenantShardId, RJ>,
      57                 : 
      58                 :     tasks: JoinSet<C>,
      59                 : 
      60                 :     concurrency: usize,
      61                 : 
      62                 :     /// How often we would like schedule_interval to be called.
      63                 :     pub(super) scheduling_interval: Duration,
      64                 : 
      65                 :     _phantom: PhantomData<(PJ, RJ, C, CMD)>,
      66                 : }
      67                 : 
      68                 : #[async_trait::async_trait]
      69                 : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
      70                 : where
      71                 :     C: Completion,
      72                 :     PJ: PendingJob,
      73                 :     RJ: RunningJob,
      74                 : {
      75                 :     /// Called at each scheduling interval.  Return a list of jobs to run, most urgent first.
      76                 :     ///
      77                 :     /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
      78                 :     /// Implementations should take care to yield the executor periodically if running
      79                 :     /// very long loops.
      80                 :     ///
      81                 :     /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
      82                 :     /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
      83                 :     /// and re-generated.
      84                 :     async fn schedule(&mut self) -> SchedulingResult<PJ>;
      85                 : 
      86                 :     /// Called when a pending job is ready to be run.
      87                 :     ///
      88                 :     /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
      89                 :     fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
      90                 : 
      91                 :     /// Called when a job previously spawned with spawn() transmits its completion
      92                 :     fn on_completion(&mut self, completion: C);
      93                 : 
      94                 :     /// Called when a command is received.  A job will be spawned immediately if the return
      95                 :     /// value is Some, ignoring concurrency limits and the pending queue.
      96                 :     fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
      97                 : }
      98                 : 
      99                 : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
     100                 : pub(super) struct SchedulingResult<PJ> {
     101                 :     pub(super) jobs: Vec<PJ>,
     102                 :     /// The job generator would like to be called again this soon
     103                 :     pub(super) want_interval: Option<Duration>,
     104                 : }
     105                 : 
     106                 : /// See [`TenantBackgroundJobs`].
     107                 : pub(super) trait PendingJob {
     108                 :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     109                 : }
     110                 : 
     111                 : /// See [`TenantBackgroundJobs`].
     112                 : pub(super) trait Completion: Send + 'static {
     113                 :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     114                 : }
     115                 : 
     116                 : /// See [`TenantBackgroundJobs`].
     117                 : pub(super) trait RunningJob {
     118                 :     fn get_barrier(&self) -> Barrier;
     119                 : }
     120                 : 
     121                 : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
     122                 : where
     123                 :     C: Completion,
     124                 :     PJ: PendingJob,
     125                 :     RJ: RunningJob,
     126                 :     G: JobGenerator<PJ, RJ, C, CMD>,
     127                 : {
     128 CBC        1114 :     pub(super) fn new(generator: G, concurrency: usize) -> Self {
     129            1114 :         Self {
     130            1114 :             generator,
     131            1114 :             pending: std::collections::VecDeque::new(),
     132            1114 :             running: HashMap::new(),
     133            1114 :             tasks: JoinSet::new(),
     134            1114 :             concurrency,
     135            1114 :             scheduling_interval: MAX_SCHEDULING_INTERVAL,
     136            1114 :             _phantom: PhantomData,
     137            1114 :         }
     138            1114 :     }
     139                 : 
     140            1114 :     pub(super) async fn run(
     141            1114 :         &mut self,
     142            1114 :         mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
     143            1114 :         background_jobs_can_start: Barrier,
     144            1114 :         cancel: CancellationToken,
     145            1114 :     ) {
     146            1114 :         tracing::info!("Waiting for background_jobs_can start...");
     147            1114 :         background_jobs_can_start.wait().await;
     148            1098 :         tracing::info!("background_jobs_can is ready, proceeding.");
     149                 : 
     150            2536 :         while !cancel.is_cancelled() {
     151                 :             // Look for new work: this is relatively expensive because we have to go acquire the lock on
     152                 :             // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
     153                 :             // require an upload.
     154            2218 :             self.schedule_iteration(&cancel).await;
     155                 : 
     156            2218 :             if cancel.is_cancelled() {
     157 UBC           0 :                 return;
     158 CBC        2218 :             }
     159            2218 : 
     160            2218 :             // Schedule some work, if concurrency limit permits it
     161            2218 :             self.spawn_pending();
     162            2218 : 
     163            2218 :             // Between scheduling iterations, we will:
     164            2218 :             //  - Drain any complete tasks and spawn pending tasks
     165            2218 :             //  - Handle incoming administrative commands
     166            2218 :             //  - Check our cancellation token
     167            2218 :             let next_scheduling_iteration = Instant::now()
     168            2218 :                 .checked_add(self.scheduling_interval)
     169            2218 :                 .unwrap_or_else(|| {
     170 UBC           0 :                     tracing::warn!(
     171               0 :                         "Scheduling interval invalid ({}s)",
     172               0 :                         self.scheduling_interval.as_secs_f64()
     173               0 :                     );
     174                 :                     // unwrap(): this constant is small, cannot fail to add to time unless
     175                 :                     // we are close to the end of the universe.
     176               0 :                     Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
     177 CBC        2218 :                 });
     178            3148 :             loop {
     179            4608 :                 tokio::select! {
     180                 :                     _ = cancel.cancelled() => {
     181             316 :                         tracing::info!("joining tasks");
     182                 :                         // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
     183                 :                         // It is the callers responsibility to make sure that the tasks they scheduled
     184                 :                         // respect an appropriate cancellation token, to shut down promptly.  It is only
     185                 :                         // safe to wait on joining these tasks because we can see the cancellation token
     186                 :                         // has been set.
     187                 :                         while let Some(_r) = self.tasks.join_next().await {}
     188             316 :                         tracing::info!("terminating on cancellation token.");
     189                 : 
     190                 :                         break;
     191                 :                     },
     192                 :                     _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
     193 UBC           0 :                         tracing::debug!("woke for scheduling interval");
     194                 :                         break;},
     195 CBC          10 :                     cmd = command_queue.recv() => {
     196 UBC           0 :                         tracing::debug!("woke for command queue");
     197                 :                         let cmd = match cmd {
     198                 :                             Some(c) =>c,
     199                 :                             None => {
     200                 :                                 // SecondaryController was destroyed, and this has raced with
     201                 :                                 // our CancellationToken
     202               0 :                                 tracing::info!("terminating on command queue destruction");
     203                 :                                 cancel.cancel();
     204                 :                                 break;
     205                 :                             }
     206                 :                         };
     207                 : 
     208                 :                         let CommandRequest{
     209                 :                             response_tx,
     210                 :                             payload
     211                 :                         } = cmd;
     212                 :                         self.handle_command(payload, response_tx);
     213                 :                     },
     214 CBC        2921 :                     _ = async {
     215            2921 :                         let completion = self.process_next_completion().await;
     216            2918 :                         match completion {
     217              12 :                             Some(c) => {
     218              12 :                                 self.generator.on_completion(c);
     219              12 :                                 if !cancel.is_cancelled() {
     220              12 :                                     self.spawn_pending();
     221              12 :                                 }
     222                 :                             },
     223                 :                             None => {
     224                 :                                 // Nothing is running, so just wait: expect that this future
     225                 :                                 // will be dropped when something in the outer select! fires.
     226            2906 :                                 cancel.cancelled().await;
     227                 :                             }
     228                 :                         }
     229                 : 
     230             920 :                      } => {}
     231            3148 :                 }
     232            3148 :             }
     233                 :         }
     234             318 :     }
     235                 : 
     236              13 :     fn do_spawn(&mut self, job: PJ) {
     237              13 :         let tenant_shard_id = *job.get_tenant_shard_id();
     238              13 :         let (in_progress, fut) = self.generator.spawn(job);
     239              13 : 
     240              13 :         self.tasks.spawn(fut);
     241              13 : 
     242              13 :         self.running.insert(tenant_shard_id, in_progress);
     243              13 :     }
     244                 : 
     245                 :     /// For all pending tenants that are elegible for execution, spawn their task.
     246                 :     ///
     247                 :     /// Caller provides the spawn operation, we track the resulting execution.
     248            2230 :     fn spawn_pending(&mut self) {
     249            2233 :         while !self.pending.is_empty() && self.running.len() < self.concurrency {
     250               3 :             // unwrap: loop condition includes !is_empty()
     251               3 :             let pending = self.pending.pop_front().unwrap();
     252               3 :             self.do_spawn(pending);
     253               3 :         }
     254            2230 :     }
     255                 : 
     256                 :     /// For administrative commands: skip the pending queue, ignore concurrency limits
     257              10 :     fn spawn_now(&mut self, job: PJ) -> &RJ {
     258              10 :         let tenant_shard_id = *job.get_tenant_shard_id();
     259              10 :         self.do_spawn(job);
     260              10 :         self.running
     261              10 :             .get(&tenant_shard_id)
     262              10 :             .expect("We just inserted this")
     263              10 :     }
     264                 : 
     265                 :     /// Wait until the next task completes, and handle its completion
     266                 :     ///
     267                 :     /// Cancellation: this method is cancel-safe.
     268            2921 :     async fn process_next_completion(&mut self) -> Option<C> {
     269            2921 :         match self.tasks.join_next().await {
     270              12 :             Some(r) => {
     271              12 :                 // We use a channel to drive completions, but also
     272              12 :                 // need to drain the JoinSet to avoid completed tasks
     273              12 :                 // accumulating.  These calls are 1:1 because every task
     274              12 :                 // we spawn into this joinset submits is result to the channel.
     275              12 :                 let completion = r.expect("Panic in background task");
     276              12 : 
     277              12 :                 self.running.remove(completion.get_tenant_shard_id());
     278              12 :                 Some(completion)
     279                 :             }
     280                 :             None => {
     281                 :                 // Nothing is running, so we have nothing to wait for.  We may drop out: the
     282                 :                 // main even loop will call us again after the next time it has run something.
     283            2906 :                 None
     284                 :             }
     285                 :         }
     286            2918 :     }
     287                 : 
     288                 :     /// Convert the command into a pending job, spawn it, and when the spawned
     289                 :     /// job completes, send the result down `response_tx`.
     290              10 :     fn handle_command(
     291              10 :         &mut self,
     292              10 :         cmd: CMD,
     293              10 :         response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
     294              10 :     ) {
     295              10 :         let job = match self.generator.on_command(cmd) {
     296              10 :             Ok(j) => j,
     297 UBC           0 :             Err(e) => {
     298               0 :                 response_tx.send(CommandResponse { result: Err(e) }).ok();
     299               0 :                 return;
     300                 :             }
     301                 :         };
     302                 : 
     303 CBC          10 :         let tenant_shard_id = job.get_tenant_shard_id();
     304              10 :         let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
     305 LBC         (1) :             barrier
     306                 :         } else {
     307 CBC          10 :             let running = self.spawn_now(job);
     308              10 :             running.get_barrier().clone()
     309                 :         };
     310                 : 
     311                 :         // This task does no I/O: it only listens for a barrier's completion and then
     312                 :         // sends to the command response channel.  It is therefore safe to spawn this without
     313                 :         // any gates/task_mgr hooks.
     314              10 :         tokio::task::spawn(async move {
     315              10 :             barrier.wait().await;
     316                 : 
     317              10 :             response_tx.send(CommandResponse { result: Ok(()) }).ok();
     318              10 :         });
     319              10 :     }
     320                 : 
     321              10 :     fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
     322              10 :         self.running.get(tenant_shard_id).map(|r| r.get_barrier())
     323              10 :     }
     324                 : 
     325                 :     /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
     326                 :     ///
     327                 :     /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
     328                 :     ///
     329                 :     /// This function resets the pending list: it is assumed that the caller may change their mind about
     330                 :     /// which tenants need work between calls to schedule_iteration.
     331            2218 :     async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
     332                 :         let SchedulingResult {
     333            2218 :             jobs,
     334            2218 :             want_interval,
     335            2218 :         } = self.generator.schedule().await;
     336                 : 
     337                 :         // Adjust interval based on feedback from the job generator
     338            2218 :         if let Some(want_interval) = want_interval {
     339 UBC           0 :             // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
     340               0 :             self.scheduling_interval = Duration::from_secs(std::cmp::min(
     341               0 :                 std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
     342               0 :                 MAX_SCHEDULING_INTERVAL.as_secs(),
     343               0 :             ));
     344 CBC        2218 :         }
     345                 : 
     346                 :         // The priority order of previously scheduled work may be invalidated by current state: drop
     347                 :         // all pending work (it will be re-scheduled if still needed)
     348            2218 :         self.pending.clear();
     349            2218 : 
     350            2218 :         // While iterating over the potentially-long list of tenants, we will periodically yield
     351            2218 :         // to avoid blocking executor.
     352            2218 :         yielding_loop(1000, cancel, jobs.into_iter(), |job| {
     353               5 :             // Skip tenants that already have a write in flight
     354               5 :             if !self.running.contains_key(job.get_tenant_shard_id()) {
     355               3 :                 self.pending.push_back(job);
     356               3 :             }
     357            2218 :         })
     358 UBC           0 :         .await
     359 CBC        2218 :         .ok();
     360            2218 :     }
     361                 : }
        

Generated by: LCOV version 2.1-beta