LCOV - code coverage report
Current view: top level - pageserver/src/tenant/secondary - scheduler.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 0.0 % 172 0
Test Date: 2025-03-12 18:28:53 Functions: 0.0 % 36 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::marker::PhantomData;
       3              : use std::pin::Pin;
       4              : use std::time::{Duration, Instant};
       5              : 
       6              : use futures::Future;
       7              : use pageserver_api::shard::TenantShardId;
       8              : use rand::Rng;
       9              : use tokio::task::JoinSet;
      10              : use tokio_util::sync::CancellationToken;
      11              : use utils::completion::Barrier;
      12              : use utils::yielding_loop::yielding_loop;
      13              : 
      14              : use super::{CommandRequest, CommandResponse, SecondaryTenantError};
      15              : 
      16              : /// Scheduling interval is the time between calls to JobGenerator::schedule.
      17              : /// When we schedule jobs, the job generator may provide a hint of its preferred
      18              : /// interval, which we will respect within these intervals.
      19              : const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
      20              : const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
      21              : 
      22              : /// Jitter a Duration by an integer percentage.  Returned values are uniform
      23              : /// in the range 100-pct..100+pct (i.e. a 5% jitter is 5% either way: a ~10% range)
      24            0 : pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
      25            0 :     if d == Duration::ZERO {
      26            0 :         d
      27              :     } else {
      28            0 :         rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
      29              :     }
      30            0 : }
      31              : 
      32              : /// When a periodic task first starts, it should wait for some time in the range 0..period, so
      33              : /// that starting many such tasks at the same time spreads them across the time range.
      34            0 : pub(super) fn period_warmup(period: Duration) -> Duration {
      35            0 :     if period == Duration::ZERO {
      36            0 :         period
      37              :     } else {
      38            0 :         rand::thread_rng().gen_range(Duration::ZERO..period)
      39              :     }
      40            0 : }
      41              : 
      42              : /// Scheduling helper for background work across many tenants.
      43              : ///
      44              : /// Systems that need to run background work across many tenants may use this type
      45              : /// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
      46              : /// implementation to provide the work to execute.  This is a simple scheduler that just
      47              : /// polls the generator for outstanding work, replacing its queue of pending work with
      48              : /// what the generator yields on each call: the job generator can change its mind about
      49              : /// the order of jobs between calls.  The job generator is notified when jobs complete,
      50              : /// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
      51              : /// admin APIs).
      52              : ///
      53              : /// For an example see [`crate::tenant::secondary::heatmap_uploader`]
      54              : ///
      55              : /// G: A JobGenerator that this scheduler will poll to find pending jobs
      56              : /// PJ: 'Pending Job': type for job descriptors that are ready to run
      57              : /// RJ: 'Running Job' type' for jobs that have been spawned
      58              : /// C : 'Completion' type that spawned jobs will send when they finish
      59              : /// CMD: 'Command' type that the job generator will accept to create jobs on-demand
      60              : pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
      61              : where
      62              :     G: JobGenerator<PJ, RJ, C, CMD>,
      63              :     C: Completion,
      64              :     PJ: PendingJob,
      65              :     RJ: RunningJob,
      66              : {
      67              :     generator: G,
      68              : 
      69              :     /// Ready to run.  Will progress to `running` once concurrent limit is satisfied, or
      70              :     /// be removed on next scheduling pass.
      71              :     pending: std::collections::VecDeque<PJ>,
      72              : 
      73              :     /// Tasks currently running in Self::tasks for these tenants.  Check this map
      74              :     /// before pushing more work into pending for the same tenant.
      75              :     running: HashMap<TenantShardId, RJ>,
      76              : 
      77              :     tasks: JoinSet<C>,
      78              : 
      79              :     concurrency: usize,
      80              : 
      81              :     /// How often we would like schedule_interval to be called.
      82              :     pub(super) scheduling_interval: Duration,
      83              : 
      84              :     _phantom: PhantomData<(PJ, RJ, C, CMD)>,
      85              : }
      86              : 
      87              : pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
      88              : where
      89              :     C: Completion,
      90              :     PJ: PendingJob,
      91              :     RJ: RunningJob,
      92              : {
      93              :     /// Called at each scheduling interval.  Return a list of jobs to run, most urgent first.
      94              :     ///
      95              :     /// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
      96              :     /// Implementations should take care to yield the executor periodically if running
      97              :     /// very long loops.
      98              :     ///
      99              :     /// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
     100              :     /// jobs is not drained by the next scheduling interval, pending jobs will be cleared
     101              :     /// and re-generated.
     102              :     async fn schedule(&mut self) -> SchedulingResult<PJ>;
     103              : 
     104              :     /// Called when a pending job is ready to be run.
     105              :     ///
     106              :     /// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
     107              :     fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
     108              : 
     109              :     /// Called when a job previously spawned with spawn() transmits its completion
     110              :     fn on_completion(&mut self, completion: C);
     111              : 
     112              :     /// Called when a command is received.  A job will be spawned immediately if the return
     113              :     /// value is Some, ignoring concurrency limits and the pending queue.
     114              :     fn on_command(&mut self, cmd: CMD) -> Result<PJ, SecondaryTenantError>;
     115              : }
     116              : 
     117              : /// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
     118              : pub(super) struct SchedulingResult<PJ> {
     119              :     pub(super) jobs: Vec<PJ>,
     120              :     /// The job generator would like to be called again this soon
     121              :     pub(super) want_interval: Option<Duration>,
     122              : }
     123              : 
     124              : /// See [`TenantBackgroundJobs`].
     125              : pub(super) trait PendingJob {
     126              :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     127              : }
     128              : 
     129              : /// See [`TenantBackgroundJobs`].
     130              : pub(super) trait Completion: Send + 'static {
     131              :     fn get_tenant_shard_id(&self) -> &TenantShardId;
     132              : }
     133              : 
     134              : /// See [`TenantBackgroundJobs`].
     135              : pub(super) trait RunningJob {
     136              :     fn get_barrier(&self) -> Barrier;
     137              : }
     138              : 
     139              : impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
     140              : where
     141              :     C: Completion,
     142              :     PJ: PendingJob,
     143              :     RJ: RunningJob,
     144              :     G: JobGenerator<PJ, RJ, C, CMD>,
     145              : {
     146            0 :     pub(super) fn new(generator: G, concurrency: usize) -> Self {
     147            0 :         Self {
     148            0 :             generator,
     149            0 :             pending: std::collections::VecDeque::new(),
     150            0 :             running: HashMap::new(),
     151            0 :             tasks: JoinSet::new(),
     152            0 :             concurrency,
     153            0 :             scheduling_interval: MAX_SCHEDULING_INTERVAL,
     154            0 :             _phantom: PhantomData,
     155            0 :         }
     156            0 :     }
     157              : 
     158            0 :     pub(super) async fn run(
     159            0 :         &mut self,
     160            0 :         mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
     161            0 :         background_jobs_can_start: Barrier,
     162            0 :         cancel: CancellationToken,
     163            0 :     ) {
     164            0 :         tracing::info!("Waiting for background_jobs_can start...");
     165            0 :         background_jobs_can_start.wait().await;
     166            0 :         tracing::info!("background_jobs_can is ready, proceeding.");
     167              : 
     168            0 :         while !cancel.is_cancelled() {
     169              :             // Look for new work: this is relatively expensive because we have to go acquire the lock on
     170              :             // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
     171              :             // require an upload.
     172            0 :             self.schedule_iteration(&cancel).await;
     173              : 
     174            0 :             if cancel.is_cancelled() {
     175            0 :                 return;
     176            0 :             }
     177            0 : 
     178            0 :             // Schedule some work, if concurrency limit permits it
     179            0 :             self.spawn_pending();
     180            0 : 
     181            0 :             // This message is printed every scheduling iteration as proof of liveness when looking at logs
     182            0 :             tracing::info!(
     183            0 :                 "Status: {} tasks running, {} pending",
     184            0 :                 self.running.len(),
     185            0 :                 self.pending.len()
     186              :             );
     187              : 
     188              :             // Between scheduling iterations, we will:
     189              :             //  - Drain any complete tasks and spawn pending tasks
     190              :             //  - Handle incoming administrative commands
     191              :             //  - Check our cancellation token
     192            0 :             let next_scheduling_iteration = Instant::now()
     193            0 :                 .checked_add(self.scheduling_interval)
     194            0 :                 .unwrap_or_else(|| {
     195            0 :                     tracing::warn!(
     196            0 :                         "Scheduling interval invalid ({}s)",
     197            0 :                         self.scheduling_interval.as_secs_f64()
     198              :                     );
     199              :                     // unwrap(): this constant is small, cannot fail to add to time unless
     200              :                     // we are close to the end of the universe.
     201            0 :                     Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
     202            0 :                 });
     203              :             loop {
     204            0 :                 tokio::select! {
     205            0 :                     _ = cancel.cancelled() => {
     206            0 :                         tracing::info!("joining tasks");
     207              :                         // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
     208              :                         // It is the callers responsibility to make sure that the tasks they scheduled
     209              :                         // respect an appropriate cancellation token, to shut down promptly.  It is only
     210              :                         // safe to wait on joining these tasks because we can see the cancellation token
     211              :                         // has been set.
     212            0 :                         while let Some(_r) = self.tasks.join_next().await {}
     213            0 :                         tracing::info!("terminating on cancellation token.");
     214              : 
     215            0 :                         break;
     216              :                     },
     217            0 :                     _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
     218            0 :                         tracing::debug!("woke for scheduling interval");
     219            0 :                         break;},
     220            0 :                     cmd = command_queue.recv() => {
     221            0 :                         tracing::debug!("woke for command queue");
     222            0 :                         let cmd = match cmd {
     223            0 :                             Some(c) =>c,
     224              :                             None => {
     225              :                                 // SecondaryController was destroyed, and this has raced with
     226              :                                 // our CancellationToken
     227            0 :                                 tracing::info!("terminating on command queue destruction");
     228            0 :                                 cancel.cancel();
     229            0 :                                 break;
     230              :                             }
     231              :                         };
     232              : 
     233              :                         let CommandRequest{
     234            0 :                             response_tx,
     235            0 :                             payload
     236            0 :                         } = cmd;
     237            0 :                         self.handle_command(payload, response_tx);
     238              :                     },
     239            0 :                     _ = async {
     240            0 :                         let completion = self.process_next_completion().await;
     241            0 :                         match completion {
     242            0 :                             Some(c) => {
     243            0 :                                 self.generator.on_completion(c);
     244            0 :                                 if !cancel.is_cancelled() {
     245            0 :                                     self.spawn_pending();
     246            0 :                                 }
     247              :                             },
     248              :                             None => {
     249              :                                 // Nothing is running, so just wait: expect that this future
     250              :                                 // will be dropped when something in the outer select! fires.
     251            0 :                                 cancel.cancelled().await;
     252              :                             }
     253              :                         }
     254              : 
     255            0 :                      } => {}
     256              :                 }
     257              :             }
     258              :         }
     259            0 :     }
     260              : 
     261            0 :     fn do_spawn(&mut self, job: PJ) {
     262            0 :         let tenant_shard_id = *job.get_tenant_shard_id();
     263            0 :         let (in_progress, fut) = self.generator.spawn(job);
     264            0 : 
     265            0 :         self.tasks.spawn(fut);
     266            0 : 
     267            0 :         let replaced = self.running.insert(tenant_shard_id, in_progress);
     268            0 :         debug_assert!(replaced.is_none());
     269            0 :         if replaced.is_some() {
     270            0 :             tracing::warn!(%tenant_shard_id, "Unexpectedly spawned a task when one was already running")
     271            0 :         }
     272            0 :     }
     273              : 
     274              :     /// For all pending tenants that are elegible for execution, spawn their task.
     275              :     ///
     276              :     /// Caller provides the spawn operation, we track the resulting execution.
     277            0 :     fn spawn_pending(&mut self) {
     278            0 :         while !self.pending.is_empty() && self.running.len() < self.concurrency {
     279              :             // unwrap: loop condition includes !is_empty()
     280            0 :             let pending = self.pending.pop_front().unwrap();
     281            0 :             if !self.running.contains_key(pending.get_tenant_shard_id()) {
     282            0 :                 self.do_spawn(pending);
     283            0 :             }
     284              :         }
     285            0 :     }
     286              : 
     287              :     /// For administrative commands: skip the pending queue, ignore concurrency limits
     288            0 :     fn spawn_now(&mut self, job: PJ) -> &RJ {
     289            0 :         let tenant_shard_id = *job.get_tenant_shard_id();
     290            0 :         self.do_spawn(job);
     291            0 :         self.running
     292            0 :             .get(&tenant_shard_id)
     293            0 :             .expect("We just inserted this")
     294            0 :     }
     295              : 
     296              :     /// Wait until the next task completes, and handle its completion
     297              :     ///
     298              :     /// Cancellation: this method is cancel-safe.
     299            0 :     async fn process_next_completion(&mut self) -> Option<C> {
     300            0 :         match self.tasks.join_next().await {
     301            0 :             Some(r) => {
     302            0 :                 // We use a channel to drive completions, but also
     303            0 :                 // need to drain the JoinSet to avoid completed tasks
     304            0 :                 // accumulating.  These calls are 1:1 because every task
     305            0 :                 // we spawn into this joinset submits is result to the channel.
     306            0 :                 let completion = r.expect("Panic in background task");
     307            0 : 
     308            0 :                 self.running.remove(completion.get_tenant_shard_id());
     309            0 :                 Some(completion)
     310              :             }
     311              :             None => {
     312              :                 // Nothing is running, so we have nothing to wait for.  We may drop out: the
     313              :                 // main even loop will call us again after the next time it has run something.
     314            0 :                 None
     315              :             }
     316              :         }
     317            0 :     }
     318              : 
     319              :     /// Convert the command into a pending job, spawn it, and when the spawned
     320              :     /// job completes, send the result down `response_tx`.
     321            0 :     fn handle_command(
     322            0 :         &mut self,
     323            0 :         cmd: CMD,
     324            0 :         response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
     325            0 :     ) {
     326            0 :         let job = match self.generator.on_command(cmd) {
     327            0 :             Ok(j) => j,
     328            0 :             Err(e) => {
     329            0 :                 response_tx.send(CommandResponse { result: Err(e) }).ok();
     330            0 :                 return;
     331              :             }
     332              :         };
     333              : 
     334            0 :         let tenant_shard_id = job.get_tenant_shard_id();
     335            0 :         let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
     336            0 :             tracing::info!(
     337              :                 tenant_id=%tenant_shard_id.tenant_id,
     338            0 :                 shard_id=%tenant_shard_id.shard_slug(),
     339            0 :                 "Command already running, waiting for it"
     340              :             );
     341            0 :             barrier
     342              :         } else {
     343            0 :             let running = self.spawn_now(job);
     344            0 :             running.get_barrier().clone()
     345              :         };
     346              : 
     347              :         // This task does no I/O: it only listens for a barrier's completion and then
     348              :         // sends to the command response channel.  It is therefore safe to spawn this without
     349              :         // any gates/task_mgr hooks.
     350            0 :         tokio::task::spawn(async move {
     351            0 :             barrier.wait().await;
     352              : 
     353            0 :             response_tx.send(CommandResponse { result: Ok(()) }).ok();
     354            0 :         });
     355            0 :     }
     356              : 
     357            0 :     fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
     358            0 :         self.running.get(tenant_shard_id).map(|r| r.get_barrier())
     359            0 :     }
     360              : 
     361              :     /// Periodic execution phase: inspect all attached tenants and schedule any work they require.
     362              :     ///
     363              :     /// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
     364              :     ///
     365              :     /// This function resets the pending list: it is assumed that the caller may change their mind about
     366              :     /// which tenants need work between calls to schedule_iteration.
     367            0 :     async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
     368              :         let SchedulingResult {
     369            0 :             jobs,
     370            0 :             want_interval,
     371            0 :         } = self.generator.schedule().await;
     372              : 
     373              :         // Adjust interval based on feedback from the job generator
     374            0 :         if let Some(want_interval) = want_interval {
     375            0 :             // Calculation uses second granularity: this scheduler is not intended for high frequency tasks
     376            0 :             self.scheduling_interval = Duration::from_secs(std::cmp::min(
     377            0 :                 std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
     378            0 :                 MAX_SCHEDULING_INTERVAL.as_secs(),
     379            0 :             ));
     380            0 :         }
     381              : 
     382              :         // The priority order of previously scheduled work may be invalidated by current state: drop
     383              :         // all pending work (it will be re-scheduled if still needed)
     384            0 :         self.pending.clear();
     385            0 : 
     386            0 :         // While iterating over the potentially-long list of tenants, we will periodically yield
     387            0 :         // to avoid blocking executor.
     388            0 :         yielding_loop(1000, cancel, jobs.into_iter(), |job| {
     389            0 :             // Skip tenants that already have a write in flight
     390            0 :             if !self.running.contains_key(job.get_tenant_shard_id()) {
     391            0 :                 self.pending.push_back(job);
     392            0 :             }
     393            0 :         })
     394            0 :         .await
     395            0 :         .ok();
     396            0 :     }
     397              : }
        

Generated by: LCOV version 2.1-beta