LCOV - code coverage report
Current view: top level - pageserver/src/tenant/secondary - scheduler.rs (source / functions) Coverage Total Hit
Test: 465a86b0c1fda0069b3e0f6c1c126e6b635a1f72.info Lines: 0.0 % 156 0
Test Date: 2024-06-25 15:47:26 Functions: 0.0 % 36 0

            Line data    Source code
       1              : use futures::Future;
       2              : use rand::Rng;
       3              : use std::{
       4              :     collections::HashMap,
       5              :     marker::PhantomData,
       6              :     pin::Pin,
       7              :     time::{Duration, Instant},
       8              : };
       9              : 
      10              : use pageserver_api::shard::TenantShardId;
      11              : use tokio::task::JoinSet;
      12              : use tokio_util::sync::CancellationToken;
      13              : use utils::{completion::Barrier, yielding_loop::yielding_loop};
      14              : 
      15              : use super::{CommandRequest, CommandResponse};
      16              : 
      17              : /// Scheduling interval is the time between calls to JobGenerator::schedule.
      18              : /// When we schedule jobs, the job generator may provide a hint of its preferred
      19              : /// interval, which we will respect within these intervals.
      20              : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
      21              : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
      22              : 
      23              : /// Jitter a Duration by an integer percentage.  Returned values are uniform
      24              : /// in the range 100-pct..100+pct (i.e. a 5% jitter is 5% either way: a ~10% range)
      25            0 : pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
      26            0 :     if d == Duration::ZERO {
      27            0 :         d
      28              :     } else {
      29            0 :         rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
      30              :     }
      31            0 : }
      32              : 
      33              : /// When a periodic task first starts, it should wait for some time in the range 0..period, so
      34              : /// that starting many such tasks at the same time spreads them across the time range.
      35            0 : pub(super) fn period_warmup(period: Duration) -> Duration {
      36            0 :     if period == Duration::ZERO {
      37            0 :         period
      38              :     } else {
      39            0 :         rand::thread_rng().gen_range(Duration::ZERO..period)
      40              :     }
      41            0 : }
      42              : 
      43              : /// Scheduling helper for background work across many tenants.
      44              : ///
      45              : /// Systems that need to run background work across many tenants may use this type
      46              : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
      47              : /// implementation to provide the work to execute.  This is a simple scheduler that just
      48              : /// polls the generator for outstanding work, replacing its queue of pending work with
      49              : /// what the generator yields on each call: the job generator can change its mind about
      50              : /// the order of jobs between calls.  The job generator is notified when jobs complete,
      51              : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
      52              : /// admin APIs).
      53              : ///
      54              : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
      55              : ///
      56              : /// G: A JobGenerator that this scheduler will poll to find pending jobs
      57              : /// PJ: 'Pending Job': type for job descriptors that are ready to run
      58              : /// RJ: 'Running Job' type' for jobs that have been spawned
      59              : /// C : 'Completion' type that spawned jobs will send when they finish
      60              : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
      61              : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
      62              : where
      63              :     G: JobGenerator<PJ, RJ, C, CMD>,
      64              :     C: Completion,
      65              :     PJ: PendingJob,
      66              :     RJ: RunningJob,
      67              : {
      68              :     generator: G,
      69              : 
      70              :     /// Ready to run.  Will progress to `running` once concurrent limit is satisfied, or
      71              :     /// be removed on next scheduling pass.
      72              :     pending: std::collections::VecDeque<PJ>,
      73              : 
      74              :     /// Tasks currently running in Self::tasks for these tenants.  Check this map
      75              :     /// before pushing more work into pending for the same tenant.
      76              :     running: HashMap<TenantShardId, RJ>,
      77              : 
      78              :     tasks: JoinSet<C>,
      79              : 
      80              :     concurrency: usize,
      81              : 
      82              :     /// How often we would like schedule_interval to be called.
      83              :     pub(super) scheduling_interval: Duration,
      84              : 
      85              :     _phantom: PhantomData<(PJ, RJ, C, CMD)>,
      86              : }
      87              : 
      88              : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
      89              : where
      90              :     C: Completion,
      91              :     PJ: PendingJob,
      92              :     RJ: RunningJob,
      93              : {
      94              :     /// Called at each scheduling interval.  Return a list of jobs to run, most urgent first.
      95              :     ///
      96              :     /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
      97              :     /// Implementations should take care to yield the executor periodically if running
      98              :     /// very long loops.
      99              :     ///
     100              :     /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
     101              :     /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
     102              :     /// and re-generated.
     103              :     async fn schedule(&mut self) -> SchedulingResult<PJ>;
     104              : 
     105              :     /// Called when a pending job is ready to be run.
     106              :     ///
     107              :     /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
     108              :     fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
     109              : 
     110              :     /// Called when a job previously spawned with spawn() transmits its completion
     111              :     fn on_completion(&mut self, completion: C);
     112              : 
     113              :     /// Called when a command is received.  A job will be spawned immediately if the return
     114              :     /// value is Some, ignoring concurrency limits and the pending queue.
     115              :     fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
     116              : }
     117              : 
     118              : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
     119              : pub(super) struct SchedulingResult<PJ> {
     120              :     pub(super) jobs: Vec<PJ>,
     121              :     /// The job generator would like to be called again this soon
     122              :     pub(super) want_interval: Option<Duration>,
     123              : }
     124              : 
     125              : /// See [`TenantBackgroundJobs`].
     126              : pub(super) trait PendingJob {
     127              :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     128              : }
     129              : 
     130              : /// See [`TenantBackgroundJobs`].
     131              : pub(super) trait Completion: Send + 'static {
     132              :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     133              : }
     134              : 
     135              : /// See [`TenantBackgroundJobs`].
     136              : pub(super) trait RunningJob {
     137              :     fn get_barrier(&self) -> Barrier;
     138              : }
     139              : 
     140              : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
     141              : where
     142              :     C: Completion,
     143              :     PJ: PendingJob,
     144              :     RJ: RunningJob,
     145              :     G: JobGenerator<PJ, RJ, C, CMD>,
     146              : {
     147            0 :     pub(super) fn new(generator: G, concurrency: usize) -> Self {
     148            0 :         Self {
     149            0 :             generator,
     150            0 :             pending: std::collections::VecDeque::new(),
     151            0 :             running: HashMap::new(),
     152            0 :             tasks: JoinSet::new(),
     153            0 :             concurrency,
     154            0 :             scheduling_interval: MAX_SCHEDULING_INTERVAL,
     155            0 :             _phantom: PhantomData,
     156            0 :         }
     157            0 :     }
     158              : 
     159            0 :     pub(super) async fn run(
     160            0 :         &mut self,
     161            0 :         mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
     162            0 :         background_jobs_can_start: Barrier,
     163            0 :         cancel: CancellationToken,
     164            0 :     ) {
     165            0 :         tracing::info!("Waiting for background_jobs_can start...");
     166            0 :         background_jobs_can_start.wait().await;
     167            0 :         tracing::info!("background_jobs_can is ready, proceeding.");
     168              : 
     169            0 :         while !cancel.is_cancelled() {
     170              :             // Look for new work: this is relatively expensive because we have to go acquire the lock on
     171              :             // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
     172              :             // require an upload.
     173            0 :             self.schedule_iteration(&cancel).await;
     174              : 
     175            0 :             if cancel.is_cancelled() {
     176            0 :                 return;
     177            0 :             }
     178            0 : 
     179            0 :             // Schedule some work, if concurrency limit permits it
     180            0 :             self.spawn_pending();
     181            0 : 
     182            0 :             // This message is printed every scheduling iteration as proof of liveness when looking at logs
     183            0 :             tracing::info!(
     184            0 :                 "Status: {} tasks running, {} pending",
     185            0 :                 self.running.len(),
     186            0 :                 self.pending.len()
     187              :             );
     188              : 
     189              :             // Between scheduling iterations, we will:
     190              :             //  - Drain any complete tasks and spawn pending tasks
     191              :             //  - Handle incoming administrative commands
     192              :             //  - Check our cancellation token
     193            0 :             let next_scheduling_iteration = Instant::now()
     194            0 :                 .checked_add(self.scheduling_interval)
     195            0 :                 .unwrap_or_else(|| {
     196            0 :                     tracing::warn!(
     197            0 :                         "Scheduling interval invalid ({}s)",
     198            0 :                         self.scheduling_interval.as_secs_f64()
     199              :                     );
     200              :                     // unwrap(): this constant is small, cannot fail to add to time unless
     201              :                     // we are close to the end of the universe.
     202            0 :                     Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
     203            0 :                 });
     204            0 :             loop {
     205            0 :                 tokio::select! {
     206              :                     _ = cancel.cancelled() => {
     207              :                         tracing::info!("joining tasks");
     208              :                         // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
     209              :                         // It is the callers responsibility to make sure that the tasks they scheduled
     210              :                         // respect an appropriate cancellation token, to shut down promptly.  It is only
     211              :                         // safe to wait on joining these tasks because we can see the cancellation token
     212              :                         // has been set.
     213              :                         while let Some(_r) = self.tasks.join_next().await {}
     214              :                         tracing::info!("terminating on cancellation token.");
     215              : 
     216              :                         break;
     217              :                     },
     218              :                     _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
     219              :                         tracing::debug!("woke for scheduling interval");
     220              :                         break;},
     221              :                     cmd = command_queue.recv() => {
     222              :                         tracing::debug!("woke for command queue");
     223              :                         let cmd = match cmd {
     224              :                             Some(c) =>c,
     225              :                             None => {
     226              :                                 // SecondaryController was destroyed, and this has raced with
     227              :                                 // our CancellationToken
     228              :                                 tracing::info!("terminating on command queue destruction");
     229              :                                 cancel.cancel();
     230              :                                 break;
     231              :                             }
     232              :                         };
     233              : 
     234              :                         let CommandRequest{
     235              :                             response_tx,
     236              :                             payload
     237              :                         } = cmd;
     238              :                         self.handle_command(payload, response_tx);
     239              :                     },
     240            0 :                     _ = async {
     241            0 :                         let completion = self.process_next_completion().await;
     242            0 :                         match completion {
     243            0 :                             Some(c) => {
     244            0 :                                 self.generator.on_completion(c);
     245            0 :                                 if !cancel.is_cancelled() {
     246            0 :                                     self.spawn_pending();
     247            0 :                                 }
     248              :                             },
     249              :                             None => {
     250              :                                 // Nothing is running, so just wait: expect that this future
     251              :                                 // will be dropped when something in the outer select! fires.
     252            0 :                                 cancel.cancelled().await;
     253              :                             }
     254              :                         }
     255              : 
     256            0 :                      } => {}
     257            0 :                 }
     258            0 :             }
     259              :         }
     260            0 :     }
     261              : 
     262            0 :     fn do_spawn(&mut self, job: PJ) {
     263            0 :         let tenant_shard_id = *job.get_tenant_shard_id();
     264            0 :         let (in_progress, fut) = self.generator.spawn(job);
     265            0 : 
     266            0 :         self.tasks.spawn(fut);
     267            0 : 
     268            0 :         let replaced = self.running.insert(tenant_shard_id, in_progress);
     269            0 :         debug_assert!(replaced.is_none());
     270            0 :         if replaced.is_some() {
     271            0 :             tracing::warn!(%tenant_shard_id, "Unexpectedly spawned a task when one was already running")
     272            0 :         }
     273            0 :     }
     274              : 
     275              :     /// For all pending tenants that are elegible for execution, spawn their task.
     276              :     ///
     277              :     /// Caller provides the spawn operation, we track the resulting execution.
     278            0 :     fn spawn_pending(&mut self) {
     279            0 :         while !self.pending.is_empty() && self.running.len() < self.concurrency {
     280              :             // unwrap: loop condition includes !is_empty()
     281            0 :             let pending = self.pending.pop_front().unwrap();
     282            0 :             if !self.running.contains_key(pending.get_tenant_shard_id()) {
     283            0 :                 self.do_spawn(pending);
     284            0 :             }
     285              :         }
     286            0 :     }
     287              : 
     288              :     /// For administrative commands: skip the pending queue, ignore concurrency limits
     289            0 :     fn spawn_now(&mut self, job: PJ) -> &RJ {
     290            0 :         let tenant_shard_id = *job.get_tenant_shard_id();
     291            0 :         self.do_spawn(job);
     292            0 :         self.running
     293            0 :             .get(&tenant_shard_id)
     294            0 :             .expect("We just inserted this")
     295            0 :     }
     296              : 
     297              :     /// Wait until the next task completes, and handle its completion
     298              :     ///
     299              :     /// Cancellation: this method is cancel-safe.
     300            0 :     async fn process_next_completion(&mut self) -> Option<C> {
     301            0 :         match self.tasks.join_next().await {
     302            0 :             Some(r) => {
     303            0 :                 // We use a channel to drive completions, but also
     304            0 :                 // need to drain the JoinSet to avoid completed tasks
     305            0 :                 // accumulating.  These calls are 1:1 because every task
     306            0 :                 // we spawn into this joinset submits is result to the channel.
     307            0 :                 let completion = r.expect("Panic in background task");
     308            0 : 
     309            0 :                 self.running.remove(completion.get_tenant_shard_id());
     310            0 :                 Some(completion)
     311              :             }
     312              :             None => {
     313              :                 // Nothing is running, so we have nothing to wait for.  We may drop out: the
     314              :                 // main even loop will call us again after the next time it has run something.
     315            0 :                 None
     316              :             }
     317              :         }
     318            0 :     }
     319              : 
     320              :     /// Convert the command into a pending job, spawn it, and when the spawned
     321              :     /// job completes, send the result down `response_tx`.
     322            0 :     fn handle_command(
     323            0 :         &mut self,
     324            0 :         cmd: CMD,
     325            0 :         response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
     326            0 :     ) {
     327            0 :         let job = match self.generator.on_command(cmd) {
     328            0 :             Ok(j) => j,
     329            0 :             Err(e) => {
     330            0 :                 response_tx.send(CommandResponse { result: Err(e) }).ok();
     331            0 :                 return;
     332              :             }
     333              :         };
     334              : 
     335            0 :         let tenant_shard_id = job.get_tenant_shard_id();
     336            0 :         let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
     337            0 :             tracing::info!(
     338              :                 tenant_id=%tenant_shard_id.tenant_id,
     339            0 :                 shard_id=%tenant_shard_id.shard_slug(),
     340            0 :                 "Command already running, waiting for it"
     341              :             );
     342            0 :             barrier
     343              :         } else {
     344            0 :             let running = self.spawn_now(job);
     345            0 :             running.get_barrier().clone()
     346              :         };
     347              : 
     348              :         // This task does no I/O: it only listens for a barrier's completion and then
     349              :         // sends to the command response channel.  It is therefore safe to spawn this without
     350              :         // any gates/task_mgr hooks.
     351            0 :         tokio::task::spawn(async move {
     352            0 :             barrier.wait().await;
     353              : 
     354            0 :             response_tx.send(CommandResponse { result: Ok(()) }).ok();
     355            0 :         });
     356            0 :     }
     357              : 
     358            0 :     fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
     359            0 :         self.running.get(tenant_shard_id).map(|r| r.get_barrier())
     360            0 :     }
     361              : 
     362              :     /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
     363              :     ///
     364              :     /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
     365              :     ///
     366              :     /// This function resets the pending list: it is assumed that the caller may change their mind about
     367              :     /// which tenants need work between calls to schedule_iteration.
     368            0 :     async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
     369              :         let SchedulingResult {
     370            0 :             jobs,
     371            0 :             want_interval,
     372            0 :         } = self.generator.schedule().await;
     373              : 
     374              :         // Adjust interval based on feedback from the job generator
     375            0 :         if let Some(want_interval) = want_interval {
     376            0 :             // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
     377            0 :             self.scheduling_interval = Duration::from_secs(std::cmp::min(
     378            0 :                 std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
     379            0 :                 MAX_SCHEDULING_INTERVAL.as_secs(),
     380            0 :             ));
     381            0 :         }
     382              : 
     383              :         // The priority order of previously scheduled work may be invalidated by current state: drop
     384              :         // all pending work (it will be re-scheduled if still needed)
     385            0 :         self.pending.clear();
     386            0 : 
     387            0 :         // While iterating over the potentially-long list of tenants, we will periodically yield
     388            0 :         // to avoid blocking executor.
     389            0 :         yielding_loop(1000, cancel, jobs.into_iter(), |job| {
     390            0 :             // Skip tenants that already have a write in flight
     391            0 :             if !self.running.contains_key(job.get_tenant_shard_id()) {
     392            0 :                 self.pending.push_back(job);
     393            0 :             }
     394            0 :         })
     395            0 :         .await
     396            0 :         .ok();
     397            0 :     }
     398              : }
        

Generated by: LCOV version 2.1-beta