LCOV - code coverage report
Current view: top level - storage_controller/src - tenant_shard.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 67.3 % 805 542
Test Date: 2024-04-18 15:32:49 Functions: 47.5 % 80 38

            Line data    Source code
       1              : use std::{
       2              :     collections::{HashMap, HashSet},
       3              :     sync::Arc,
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use crate::{
       8              :     metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
       9              :     persistence::TenantShardPersistence,
      10              :     scheduler::{AffinityScore, MaySchedule, ScheduleContext},
      11              : };
      12              : use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
      13              : use pageserver_api::{
      14              :     models::{LocationConfig, LocationConfigMode, TenantConfig},
      15              :     shard::{ShardIdentity, TenantShardId},
      16              : };
      17              : use serde::Serialize;
      18              : use tokio::task::JoinHandle;
      19              : use tokio_util::sync::CancellationToken;
      20              : use tracing::{instrument, Instrument};
      21              : use utils::{
      22              :     generation::Generation,
      23              :     id::NodeId,
      24              :     seqwait::{SeqWait, SeqWaitError},
      25              :     sync::gate::Gate,
      26              : };
      27              : 
      28              : use crate::{
      29              :     compute_hook::ComputeHook,
      30              :     node::Node,
      31              :     persistence::{split_state::SplitState, Persistence},
      32              :     reconciler::{
      33              :         attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
      34              :     },
      35              :     scheduler::{ScheduleError, Scheduler},
      36              :     service, Sequence,
      37              : };
      38              : 
      39              : /// Serialization helper
      40            0 : fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
      41            0 : where
      42            0 :     S: serde::ser::Serializer,
      43            0 :     T: Clone + std::fmt::Display,
      44            0 : {
      45            0 :     serializer.collect_str(&v.lock().unwrap())
      46            0 : }
      47              : 
      48              : /// In-memory state for a particular tenant shard.
      49              : ///
      50              : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
      51              : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
      52            0 : #[derive(Serialize)]
      53              : pub(crate) struct TenantShard {
      54              :     pub(crate) tenant_shard_id: TenantShardId,
      55              : 
      56              :     pub(crate) shard: ShardIdentity,
      57              : 
      58              :     // Runtime only: sequence used to coordinate when updating this object while
      59              :     // with background reconcilers may be running.  A reconciler runs to a particular
      60              :     // sequence.
      61              :     pub(crate) sequence: Sequence,
      62              : 
      63              :     // Latest generation number: next time we attach, increment this
      64              :     // and use the incremented number when attaching.
      65              :     //
      66              :     // None represents an incompletely onboarded tenant via the [`Service::location_config`]
      67              :     // API, where this tenant may only run in PlacementPolicy::Secondary.
      68              :     pub(crate) generation: Option<Generation>,
      69              : 
      70              :     // High level description of how the tenant should be set up.  Provided
      71              :     // externally.
      72              :     pub(crate) policy: PlacementPolicy,
      73              : 
      74              :     // Low level description of exactly which pageservers should fulfil
      75              :     // which role.  Generated by `Self::schedule`.
      76              :     pub(crate) intent: IntentState,
      77              : 
      78              :     // Low level description of how the tenant is configured on pageservers:
      79              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      80              :     // with `Self::reconcile`.
      81              :     pub(crate) observed: ObservedState,
      82              : 
      83              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
      84              :     // for all shards in a tenant.
      85              :     pub(crate) config: TenantConfig,
      86              : 
      87              :     /// If a reconcile task is currently in flight, it may be joined here (it is
      88              :     /// only safe to join if either the result has been received or the reconciler's
      89              :     /// cancellation token has been fired)
      90              :     #[serde(skip)]
      91              :     pub(crate) reconciler: Option<ReconcilerHandle>,
      92              : 
      93              :     /// If a tenant is being split, then all shards with that TenantId will have a
      94              :     /// SplitState set, this acts as a guard against other operations such as background
      95              :     /// reconciliation, and timeline creation.
      96              :     pub(crate) splitting: SplitState,
      97              : 
      98              :     /// Optionally wait for reconciliation to complete up to a particular
      99              :     /// sequence number.
     100              :     #[serde(skip)]
     101              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     102              : 
     103              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
     104              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
     105              :     /// and callers should stop waiting for `waiter` and propagate the error.
     106              :     #[serde(skip)]
     107              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     108              : 
     109              :     /// The most recent error from a reconcile on this tenant
     110              :     /// TODO: generalize to an array of recent events
     111              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
     112              :     #[serde(serialize_with = "read_mutex_content")]
     113              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
     114              : 
     115              :     /// If we have a pending compute notification that for some reason we weren't able to send,
     116              :     /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
     117              :     /// sending it.  This is the mechanism by which compute notifications are included in the scope
     118              :     /// of state that we publish externally in an eventually consistent way.
     119              :     pub(crate) pending_compute_notification: bool,
     120              : 
     121              :     // Support/debug tool: if something is going wrong or flapping with scheduling, this may
     122              :     // be set to a non-active state to avoid making changes while the issue is fixed.
     123              :     scheduling_policy: ShardSchedulingPolicy,
     124              : }
     125              : 
     126              : #[derive(Default, Clone, Debug, Serialize)]
     127              : pub(crate) struct IntentState {
     128              :     attached: Option<NodeId>,
     129              :     secondary: Vec<NodeId>,
     130              : }
     131              : 
     132              : impl IntentState {
     133            4 :     pub(crate) fn new() -> Self {
     134            4 :         Self {
     135            4 :             attached: None,
     136            4 :             secondary: vec![],
     137            4 :         }
     138            4 :     }
     139            0 :     pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
     140            0 :         if let Some(node_id) = node_id {
     141            0 :             scheduler.node_inc_ref(node_id);
     142            0 :         }
     143            0 :         Self {
     144            0 :             attached: node_id,
     145            0 :             secondary: vec![],
     146            0 :         }
     147            0 :     }
     148              : 
     149           26 :     pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
     150           26 :         if self.attached != new_attached {
     151           26 :             if let Some(old_attached) = self.attached.take() {
     152            0 :                 scheduler.node_dec_ref(old_attached);
     153           26 :             }
     154           26 :             if let Some(new_attached) = &new_attached {
     155           26 :                 scheduler.node_inc_ref(*new_attached);
     156           26 :             }
     157           26 :             self.attached = new_attached;
     158            0 :         }
     159           26 :     }
     160              : 
     161              :     /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
     162              :     /// secondary to attached while maintaining the scheduler's reference counts.
     163           14 :     pub(crate) fn promote_attached(
     164           14 :         &mut self,
     165           14 :         _scheduler: &mut Scheduler,
     166           14 :         promote_secondary: NodeId,
     167           14 :     ) {
     168           14 :         // If we call this with a node that isn't in secondary, it would cause incorrect
     169           14 :         // scheduler reference counting, since we assume the node is already referenced as a secondary.
     170           14 :         debug_assert!(self.secondary.contains(&promote_secondary));
     171              : 
     172              :         // TODO: when scheduler starts tracking attached + secondary counts separately, we will
     173              :         // need to call into it here.
     174           28 :         self.secondary.retain(|n| n != &promote_secondary);
     175           14 :         self.attached = Some(promote_secondary);
     176           14 :     }
     177              : 
     178           34 :     pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
     179           34 :         debug_assert!(!self.secondary.contains(&new_secondary));
     180           34 :         scheduler.node_inc_ref(new_secondary);
     181           34 :         self.secondary.push(new_secondary);
     182           34 :     }
     183              : 
     184              :     /// It is legal to call this with a node that is not currently a secondary: that is a no-op
     185           10 :     pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
     186           10 :         let index = self.secondary.iter().position(|n| *n == node_id);
     187           10 :         if let Some(index) = index {
     188           10 :             scheduler.node_dec_ref(node_id);
     189           10 :             self.secondary.remove(index);
     190           10 :         }
     191           10 :     }
     192              : 
     193           24 :     pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
     194           24 :         for secondary in self.secondary.drain(..) {
     195           24 :             scheduler.node_dec_ref(secondary);
     196           24 :         }
     197           24 :     }
     198              : 
     199              :     /// Remove the last secondary node from the list of secondaries
     200            0 :     pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
     201            0 :         if let Some(node_id) = self.secondary.pop() {
     202            0 :             scheduler.node_dec_ref(node_id);
     203            0 :         }
     204            0 :     }
     205              : 
     206           24 :     pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
     207           24 :         if let Some(old_attached) = self.attached.take() {
     208           24 :             scheduler.node_dec_ref(old_attached);
     209           24 :         }
     210              : 
     211           24 :         self.clear_secondary(scheduler);
     212           24 :     }
     213              : 
     214          172 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     215          172 :         let mut result = Vec::new();
     216          172 :         if let Some(p) = self.attached {
     217          168 :             result.push(p)
     218            4 :         }
     219              : 
     220          172 :         result.extend(self.secondary.iter().copied());
     221          172 : 
     222          172 :         result
     223          172 :     }
     224              : 
     225          144 :     pub(crate) fn get_attached(&self) -> &Option<NodeId> {
     226          144 :         &self.attached
     227          144 :     }
     228              : 
     229           38 :     pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
     230           38 :         &self.secondary
     231           38 :     }
     232              : 
     233              :     /// If the node is in use as the attached location, demote it into
     234              :     /// the list of secondary locations.  This is used when a node goes offline,
     235              :     /// and we want to use a different node for attachment, but not permanently
     236              :     /// forget the location on the offline node.
     237              :     ///
     238              :     /// Returns true if a change was made
     239           14 :     pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
     240           14 :         if self.attached == Some(node_id) {
     241              :             // TODO: when scheduler starts tracking attached + secondary counts separately, we will
     242              :             // need to call into it here.
     243           14 :             self.attached = None;
     244           14 :             self.secondary.push(node_id);
     245           14 :             true
     246              :         } else {
     247            0 :             false
     248              :         }
     249           14 :     }
     250              : }
     251              : 
     252              : impl Drop for IntentState {
     253           26 :     fn drop(&mut self) {
     254           26 :         // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
     255           26 :         // We do not check this while panicking, to avoid polluting unit test failures or
     256           26 :         // other assertions with this assertion's output.  It's still wrong to leak these,
     257           26 :         // but if we already have a panic then we don't need to independently flag this case.
     258           26 :         if !(std::thread::panicking()) {
     259           26 :             debug_assert!(self.attached.is_none() && self.secondary.is_empty());
     260            0 :         }
     261           24 :     }
     262              : }
     263              : 
     264              : #[derive(Default, Clone, Serialize)]
     265              : pub(crate) struct ObservedState {
     266              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
     267              : }
     268              : 
     269              : /// Our latest knowledge of how this tenant is configured in the outside world.
     270              : ///
     271              : /// Meaning:
     272              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
     273              : ///       node for this shard.
     274              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
     275              : ///       what it is (e.g. we failed partway through configuring it)
     276              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     277              : ///       and that configuration will still be present unless something external interfered.
     278              : #[derive(Clone, Serialize)]
     279              : pub(crate) struct ObservedStateLocation {
     280              :     /// If None, it means we do not know the status of this shard's location on this node, but
     281              :     /// we know that we might have some state on this node.
     282              :     pub(crate) conf: Option<LocationConfig>,
     283              : }
     284              : pub(crate) struct ReconcilerWaiter {
     285              :     // For observability purposes, remember the ID of the shard we're
     286              :     // waiting for.
     287              :     pub(crate) tenant_shard_id: TenantShardId,
     288              : 
     289              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     290              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     291              :     error: std::sync::Arc<std::sync::Mutex<String>>,
     292              :     seq: Sequence,
     293              : }
     294              : 
     295            0 : #[derive(thiserror::Error, Debug)]
     296              : pub enum ReconcileWaitError {
     297              :     #[error("Timeout waiting for shard {0}")]
     298              :     Timeout(TenantShardId),
     299              :     #[error("shutting down")]
     300              :     Shutdown,
     301              :     #[error("Reconcile error on shard {0}: {1}")]
     302              :     Failed(TenantShardId, String),
     303              : }
     304              : 
     305              : #[derive(Eq, PartialEq, Debug)]
     306              : pub(crate) struct ReplaceSecondary {
     307              :     old_node_id: NodeId,
     308              :     new_node_id: NodeId,
     309              : }
     310              : 
     311              : #[derive(Eq, PartialEq, Debug)]
     312              : pub(crate) struct MigrateAttachment {
     313              :     old_attached_node_id: NodeId,
     314              :     new_attached_node_id: NodeId,
     315              : }
     316              : 
     317              : #[derive(Eq, PartialEq, Debug)]
     318              : pub(crate) enum ScheduleOptimization {
     319              :     // Replace one of our secondary locations with a different node
     320              :     ReplaceSecondary(ReplaceSecondary),
     321              :     // Migrate attachment to an existing secondary location
     322              :     MigrateAttachment(MigrateAttachment),
     323              : }
     324              : 
     325              : impl ReconcilerWaiter {
     326            0 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     327            0 :         tokio::select! {
     328            0 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     329            0 :                 result.map_err(|e| match e {
     330            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     331            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     332            0 :                 })?;
     333              :             },
     334            0 :             result = self.error_seq_wait.wait_for(self.seq) => {
     335            0 :                 result.map_err(|e| match e {
     336            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     337            0 :                     SeqWaitError::Timeout => unreachable!()
     338            0 :                 })?;
     339              : 
     340              :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
     341              :             }
     342              :         }
     343              : 
     344            0 :         Ok(())
     345            0 :     }
     346              : }
     347              : 
     348              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     349              : /// information to optionally cancel & await it later.
     350              : pub(crate) struct ReconcilerHandle {
     351              :     sequence: Sequence,
     352              :     handle: JoinHandle<()>,
     353              :     cancel: CancellationToken,
     354              : }
     355              : 
     356              : /// When a reconcile task completes, it sends this result object
     357              : /// to be applied to the primary TenantShard.
     358              : pub(crate) struct ReconcileResult {
     359              :     pub(crate) sequence: Sequence,
     360              :     /// On errors, `observed` should be treated as an incompleted description
     361              :     /// of state (i.e. any nodes present in the result should override nodes
     362              :     /// present in the parent tenant state, but any unmentioned nodes should
     363              :     /// not be removed from parent tenant state)
     364              :     pub(crate) result: Result<(), ReconcileError>,
     365              : 
     366              :     pub(crate) tenant_shard_id: TenantShardId,
     367              :     pub(crate) generation: Option<Generation>,
     368              :     pub(crate) observed: ObservedState,
     369              : 
     370              :     /// Set [`TenantShard::pending_compute_notification`] from this flag
     371              :     pub(crate) pending_compute_notification: bool,
     372              : }
     373              : 
     374              : impl ObservedState {
     375            0 :     pub(crate) fn new() -> Self {
     376            0 :         Self {
     377            0 :             locations: HashMap::new(),
     378            0 :         }
     379            0 :     }
     380              : }
     381              : 
     382              : impl TenantShard {
     383           22 :     pub(crate) fn new(
     384           22 :         tenant_shard_id: TenantShardId,
     385           22 :         shard: ShardIdentity,
     386           22 :         policy: PlacementPolicy,
     387           22 :     ) -> Self {
     388           22 :         Self {
     389           22 :             tenant_shard_id,
     390           22 :             policy,
     391           22 :             intent: IntentState::default(),
     392           22 :             generation: Some(Generation::new(0)),
     393           22 :             shard,
     394           22 :             observed: ObservedState::default(),
     395           22 :             config: TenantConfig::default(),
     396           22 :             reconciler: None,
     397           22 :             splitting: SplitState::Idle,
     398           22 :             sequence: Sequence(1),
     399           22 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     400           22 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     401           22 :             last_error: Arc::default(),
     402           22 :             pending_compute_notification: false,
     403           22 :             scheduling_policy: ShardSchedulingPolicy::default(),
     404           22 :         }
     405           22 :     }
     406              : 
     407              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     408              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     409              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     410              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     411            2 :     pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
     412            2 :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     413            2 :         // generation
     414            2 :         let mut attached_locs = self
     415            2 :             .observed
     416            2 :             .locations
     417            2 :             .iter()
     418            4 :             .filter_map(|(node_id, l)| {
     419            4 :                 if let Some(conf) = &l.conf {
     420            4 :                     if conf.mode == LocationConfigMode::AttachedMulti
     421            2 :                         || conf.mode == LocationConfigMode::AttachedSingle
     422            2 :                         || conf.mode == LocationConfigMode::AttachedStale
     423              :                     {
     424            4 :                         Some((node_id, conf.generation))
     425              :                     } else {
     426            0 :                         None
     427              :                     }
     428              :                 } else {
     429            0 :                     None
     430              :                 }
     431            4 :             })
     432            2 :             .collect::<Vec<_>>();
     433            2 : 
     434            4 :         attached_locs.sort_by_key(|i| i.1);
     435            2 :         if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
     436            2 :             self.intent.set_attached(scheduler, Some(*node_id));
     437            2 :         }
     438              : 
     439              :         // All remaining observed locations generate secondary intents.  This includes None
     440              :         // observations, as these may well have some local content on disk that is usable (this
     441              :         // is an edge case that might occur if we restarted during a migration or other change)
     442              :         //
     443              :         // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
     444              :         // will take care of promoting one of these secondaries to be attached.
     445            4 :         self.observed.locations.keys().for_each(|node_id| {
     446            4 :             if Some(*node_id) != self.intent.attached {
     447            2 :                 self.intent.push_secondary(scheduler, *node_id);
     448            2 :             }
     449            4 :         });
     450            2 :     }
     451              : 
     452              :     /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
     453              :     /// attached pageserver for a shard.
     454              :     ///
     455              :     /// Returns whether we modified it, and the NodeId selected.
     456           14 :     fn schedule_attached(
     457           14 :         &mut self,
     458           14 :         scheduler: &mut Scheduler,
     459           14 :         context: &ScheduleContext,
     460           14 :     ) -> Result<(bool, NodeId), ScheduleError> {
     461              :         // No work to do if we already have an attached tenant
     462           14 :         if let Some(node_id) = self.intent.attached {
     463            0 :             return Ok((false, node_id));
     464           14 :         }
     465              : 
     466           14 :         if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
     467              :             // Promote a secondary
     468            2 :             tracing::debug!("Promoted secondary {} to attached", promote_secondary);
     469            2 :             self.intent.promote_attached(scheduler, promote_secondary);
     470            2 :             Ok((true, promote_secondary))
     471              :         } else {
     472              :             // Pick a fresh node: either we had no secondaries or none were schedulable
     473           12 :             let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
     474           12 :             tracing::debug!("Selected {} as attached", node_id);
     475           12 :             self.intent.set_attached(scheduler, Some(node_id));
     476           12 :             Ok((true, node_id))
     477              :         }
     478           14 :     }
     479              : 
     480           16 :     pub(crate) fn schedule(
     481           16 :         &mut self,
     482           16 :         scheduler: &mut Scheduler,
     483           16 :         context: &mut ScheduleContext,
     484           16 :     ) -> Result<(), ScheduleError> {
     485           16 :         let r = self.do_schedule(scheduler, context);
     486           16 : 
     487           16 :         context.avoid(&self.intent.all_pageservers());
     488           16 :         if let Some(attached) = self.intent.get_attached() {
     489           14 :             context.push_attached(*attached);
     490           14 :         }
     491              : 
     492           16 :         r
     493           16 :     }
     494              : 
     495           16 :     pub(crate) fn do_schedule(
     496           16 :         &mut self,
     497           16 :         scheduler: &mut Scheduler,
     498           16 :         context: &ScheduleContext,
     499           16 :     ) -> Result<(), ScheduleError> {
     500           16 :         // TODO: before scheduling new nodes, check if any existing content in
     501           16 :         // self.intent refers to pageservers that are offline, and pick other
     502           16 :         // pageservers if so.
     503           16 : 
     504           16 :         // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
     505           16 :         // change their attach location.
     506           16 : 
     507           16 :         match self.scheduling_policy {
     508           14 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
     509              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     510              :                 // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
     511            2 :                 tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     512            0 :                     "Scheduling is disabled by policy {:?}", self.scheduling_policy);
     513            2 :                 return Ok(());
     514              :             }
     515              :         }
     516              : 
     517              :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     518              :         // more work on the same pageservers we're already using.
     519           14 :         let mut modified = false;
     520           14 : 
     521           14 :         // Add/remove nodes to fulfil policy
     522           14 :         use PlacementPolicy::*;
     523           14 :         match self.policy {
     524           14 :             Attached(secondary_count) => {
     525           14 :                 let retain_secondaries = if self.intent.attached.is_none()
     526           14 :                     && scheduler.node_preferred(&self.intent.secondary).is_some()
     527              :                 {
     528              :                     // If we have no attached, and one of the secondaries is elegible to be promoted, retain
     529              :                     // one more secondary than we usually would, as one of them will become attached futher down this function.
     530            2 :                     secondary_count + 1
     531              :                 } else {
     532           12 :                     secondary_count
     533              :                 };
     534              : 
     535           14 :                 while self.intent.secondary.len() > retain_secondaries {
     536            0 :                     // We have no particular preference for one secondary location over another: just
     537            0 :                     // arbitrarily drop from the end
     538            0 :                     self.intent.pop_secondary(scheduler);
     539            0 :                     modified = true;
     540            0 :                 }
     541              : 
     542              :                 // Should have exactly one attached, and N secondaries
     543           14 :                 let (modified_attached, attached_node_id) =
     544           14 :                     self.schedule_attached(scheduler, context)?;
     545           14 :                 modified |= modified_attached;
     546           14 : 
     547           14 :                 let mut used_pageservers = vec![attached_node_id];
     548           26 :                 while self.intent.secondary.len() < secondary_count {
     549           12 :                     let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
     550           12 :                     self.intent.push_secondary(scheduler, node_id);
     551           12 :                     used_pageservers.push(node_id);
     552           12 :                     modified = true;
     553              :                 }
     554              :             }
     555              :             Secondary => {
     556            0 :                 if let Some(node_id) = self.intent.get_attached() {
     557            0 :                     // Populate secondary by demoting the attached node
     558            0 :                     self.intent.demote_attached(*node_id);
     559            0 :                     modified = true;
     560            0 :                 } else if self.intent.secondary.is_empty() {
     561            0 :                     // Populate secondary by scheduling a fresh node
     562            0 :                     let node_id = scheduler.schedule_shard(&[], context)?;
     563            0 :                     self.intent.push_secondary(scheduler, node_id);
     564            0 :                     modified = true;
     565            0 :                 }
     566            0 :                 while self.intent.secondary.len() > 1 {
     567            0 :                     // We have no particular preference for one secondary location over another: just
     568            0 :                     // arbitrarily drop from the end
     569            0 :                     self.intent.pop_secondary(scheduler);
     570            0 :                     modified = true;
     571            0 :                 }
     572              :             }
     573              :             Detached => {
     574              :                 // Never add locations in this mode
     575            0 :                 if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
     576            0 :                     self.intent.clear(scheduler);
     577            0 :                     modified = true;
     578            0 :                 }
     579              :             }
     580              :         }
     581              : 
     582           14 :         if modified {
     583           14 :             self.sequence.0 += 1;
     584           14 :         }
     585              : 
     586           14 :         Ok(())
     587           16 :     }
     588              : 
     589              :     /// Optimize attachments: if a shard has a secondary location that is preferable to
     590              :     /// its primary location based on soft constraints, switch that secondary location
     591              :     /// to be attached.
     592           80 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     593              :     pub(crate) fn optimize_attachment(
     594              :         &self,
     595              :         nodes: &HashMap<NodeId, Node>,
     596              :         schedule_context: &ScheduleContext,
     597              :     ) -> Option<ScheduleOptimization> {
     598              :         let attached = (*self.intent.get_attached())?;
     599              :         if self.intent.secondary.is_empty() {
     600              :             // We can only do useful work if we have both attached and secondary locations: this
     601              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     602              :             return None;
     603              :         }
     604              : 
     605              :         let current_affinity_score = schedule_context.get_node_affinity(attached);
     606              :         let current_attachment_count = schedule_context.get_node_attachments(attached);
     607              : 
     608              :         // Generate score for each node, dropping any un-schedulable nodes.
     609              :         let all_pageservers = self.intent.all_pageservers();
     610              :         let mut scores = all_pageservers
     611              :             .iter()
     612           80 :             .flat_map(|node_id| {
     613           80 :                 if matches!(
     614           80 :                     nodes
     615           80 :                         .get(node_id)
     616           80 :                         .map(|n| n.may_schedule())
     617           80 :                         .unwrap_or(MaySchedule::No),
     618              :                     MaySchedule::No
     619              :                 ) {
     620            0 :                     None
     621              :                 } else {
     622           80 :                     let affinity_score = schedule_context.get_node_affinity(*node_id);
     623           80 :                     let attachment_count = schedule_context.get_node_attachments(*node_id);
     624           80 :                     Some((*node_id, affinity_score, attachment_count))
     625              :                 }
     626           80 :             })
     627              :             .collect::<Vec<_>>();
     628              : 
     629              :         // Sort precedence:
     630              :         //  1st - prefer nodes with the lowest total affinity score
     631              :         //  2nd - prefer nodes with the lowest number of attachments in this context
     632              :         //  3rd - if all else is equal, sort by node ID for determinism in tests.
     633           80 :         scores.sort_by_key(|i| (i.1, i.2, i.0));
     634              : 
     635              :         if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
     636              :             scores.first()
     637              :         {
     638              :             if attached != *preferred_node {
     639              :                 // The best alternative must be more than 1 better than us, otherwise we could end
     640              :                 // up flapping back next time we're called (e.g. there's no point migrating from
     641              :                 // a location with score 1 to a score zero, because on next location the situation
     642              :                 // would be the same, but in reverse).
     643              :                 if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
     644              :                     || current_attachment_count > *preferred_attachment_count + 1
     645              :                 {
     646            0 :                     tracing::info!(
     647            0 :                         "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
     648            0 :                         self.intent.get_secondary()
     649            0 :                     );
     650              :                     return Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
     651              :                         old_attached_node_id: attached,
     652              :                         new_attached_node_id: *preferred_node,
     653              :                     }));
     654              :                 }
     655              :             } else {
     656            0 :                 tracing::debug!(
     657            0 :                     "Node {} is already preferred (score {:?})",
     658            0 :                     preferred_node,
     659            0 :                     preferred_affinity_score
     660            0 :                 );
     661              :             }
     662              :         }
     663              : 
     664              :         // Fall-through: we didn't find an optimization
     665              :         None
     666              :     }
     667              : 
     668           60 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     669              :     pub(crate) fn optimize_secondary(
     670              :         &self,
     671              :         scheduler: &Scheduler,
     672              :         schedule_context: &ScheduleContext,
     673              :     ) -> Option<ScheduleOptimization> {
     674              :         if self.intent.secondary.is_empty() {
     675              :             // We can only do useful work if we have both attached and secondary locations: this
     676              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     677              :             return None;
     678              :         }
     679              : 
     680              :         for secondary in self.intent.get_secondary() {
     681              :             let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
     682              :                 // We're already on a node unaffected any affinity constraints,
     683              :                 // so we won't change it.
     684              :                 continue;
     685              :             };
     686              : 
     687              :             // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
     688              :             // This implicitly limits the choice to nodes that are available, and prefers nodes
     689              :             // with lower utilization.
     690              :             let Ok(candidate_node) =
     691              :                 scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
     692              :             else {
     693              :                 // A scheduling error means we have no possible candidate replacements
     694              :                 continue;
     695              :             };
     696              : 
     697              :             let candidate_affinity_score = schedule_context
     698              :                 .nodes
     699              :                 .get(&candidate_node)
     700              :                 .unwrap_or(&AffinityScore::FREE);
     701              : 
     702              :             // The best alternative must be more than 1 better than us, otherwise we could end
     703              :             // up flapping back next time we're called.
     704              :             if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
     705              :                 // If some other node is available and has a lower score than this node, then
     706              :                 // that other node is a good place to migrate to.
     707            0 :                 tracing::info!(
     708            0 :                     "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
     709            0 :                     self.intent.get_secondary()
     710            0 :                 );
     711              :                 return Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
     712              :                     old_node_id: *secondary,
     713              :                     new_node_id: candidate_node,
     714              :                 }));
     715              :             }
     716              :         }
     717              : 
     718              :         None
     719              :     }
     720              : 
     721           22 :     pub(crate) fn apply_optimization(
     722           22 :         &mut self,
     723           22 :         scheduler: &mut Scheduler,
     724           22 :         optimization: ScheduleOptimization,
     725           22 :     ) {
     726           22 :         metrics::METRICS_REGISTRY
     727           22 :             .metrics_group
     728           22 :             .storage_controller_schedule_optimization
     729           22 :             .inc();
     730           22 : 
     731           22 :         match optimization {
     732              :             ScheduleOptimization::MigrateAttachment(MigrateAttachment {
     733           12 :                 old_attached_node_id,
     734           12 :                 new_attached_node_id,
     735           12 :             }) => {
     736           12 :                 self.intent.demote_attached(old_attached_node_id);
     737           12 :                 self.intent
     738           12 :                     .promote_attached(scheduler, new_attached_node_id);
     739           12 :             }
     740              :             ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
     741           10 :                 old_node_id,
     742           10 :                 new_node_id,
     743           10 :             }) => {
     744           10 :                 self.intent.remove_secondary(scheduler, old_node_id);
     745           10 :                 self.intent.push_secondary(scheduler, new_node_id);
     746           10 :             }
     747              :         }
     748           22 :     }
     749              : 
     750              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
     751              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
     752              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
     753              :     ///
     754              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
     755              :     /// funciton should not be used to decide whether to reconcile.
     756            0 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
     757            0 :         if let Some(attach_intent) = self.intent.attached {
     758            0 :             match self.observed.locations.get(&attach_intent) {
     759            0 :                 Some(loc) => match &loc.conf {
     760            0 :                     Some(conf) => match conf.mode {
     761              :                         LocationConfigMode::AttachedMulti
     762              :                         | LocationConfigMode::AttachedSingle
     763              :                         | LocationConfigMode::AttachedStale => {
     764              :                             // Our intent and observed state agree that this node is in an attached state.
     765            0 :                             Some(attach_intent)
     766              :                         }
     767              :                         // Our observed config is not an attached state
     768            0 :                         _ => None,
     769              :                     },
     770              :                     // Our observed state is None, i.e. in flux
     771            0 :                     None => None,
     772              :                 },
     773              :                 // We have no observed state for this node
     774            0 :                 None => None,
     775              :             }
     776              :         } else {
     777              :             // Our intent is not to attach
     778            0 :             None
     779              :         }
     780            0 :     }
     781              : 
     782            0 :     fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
     783            0 :         let mut dirty_nodes = HashSet::new();
     784              : 
     785            0 :         if let Some(node_id) = self.intent.attached {
     786              :             // Maybe panic: it is a severe bug if we try to attach while generation is null.
     787            0 :             let generation = self
     788            0 :                 .generation
     789            0 :                 .expect("Attempted to enter attached state without a generation");
     790            0 : 
     791            0 :             let wanted_conf = attached_location_conf(
     792            0 :                 generation,
     793            0 :                 &self.shard,
     794            0 :                 &self.config,
     795            0 :                 !self.intent.secondary.is_empty(),
     796            0 :             );
     797            0 :             match self.observed.locations.get(&node_id) {
     798            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     799            0 :                 Some(_) | None => {
     800            0 :                     dirty_nodes.insert(node_id);
     801            0 :                 }
     802              :             }
     803            0 :         }
     804              : 
     805            0 :         for node_id in &self.intent.secondary {
     806            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     807            0 :             match self.observed.locations.get(node_id) {
     808            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     809            0 :                 Some(_) | None => {
     810            0 :                     dirty_nodes.insert(*node_id);
     811            0 :                 }
     812              :             }
     813              :         }
     814              : 
     815            0 :         for node_id in self.observed.locations.keys() {
     816            0 :             if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
     817            0 :                 // We have observed state that isn't part of our intent: need to clean it up.
     818            0 :                 dirty_nodes.insert(*node_id);
     819            0 :             }
     820              :         }
     821              : 
     822            0 :         dirty_nodes.retain(|node_id| {
     823            0 :             nodes
     824            0 :                 .get(node_id)
     825            0 :                 .map(|n| n.is_available())
     826            0 :                 .unwrap_or(false)
     827            0 :         });
     828            0 : 
     829            0 :         !dirty_nodes.is_empty()
     830            0 :     }
     831              : 
     832              :     #[allow(clippy::too_many_arguments)]
     833            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     834              :     pub(crate) fn maybe_reconcile(
     835              :         &mut self,
     836              :         result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
     837              :         pageservers: &Arc<HashMap<NodeId, Node>>,
     838              :         compute_hook: &Arc<ComputeHook>,
     839              :         service_config: &service::Config,
     840              :         persistence: &Arc<Persistence>,
     841              :         gate: &Gate,
     842              :         cancel: &CancellationToken,
     843              :     ) -> Option<ReconcilerWaiter> {
     844              :         // If there are any ambiguous observed states, and the nodes they refer to are available,
     845              :         // we should reconcile to clean them up.
     846              :         let mut dirty_observed = false;
     847              :         for (node_id, observed_loc) in &self.observed.locations {
     848              :             let node = pageservers
     849              :                 .get(node_id)
     850              :                 .expect("Nodes may not be removed while referenced");
     851              :             if observed_loc.conf.is_none() && node.is_available() {
     852              :                 dirty_observed = true;
     853              :                 break;
     854              :             }
     855              :         }
     856              : 
     857              :         let active_nodes_dirty = self.dirty(pageservers);
     858              : 
     859              :         // Even if there is no pageserver work to be done, if we have a pending notification to computes,
     860              :         // wake up a reconciler to send it.
     861              :         let do_reconcile =
     862              :             active_nodes_dirty || dirty_observed || self.pending_compute_notification;
     863              : 
     864              :         if !do_reconcile {
     865            0 :             tracing::info!("Not dirty, no reconciliation needed.");
     866              :             return None;
     867              :         }
     868              : 
     869              :         // If we are currently splitting, then never start a reconciler task: the splitting logic
     870              :         // requires that shards are not interfered with while it runs. Do this check here rather than
     871              :         // up top, so that we only log this message if we would otherwise have done a reconciliation.
     872              :         if !matches!(self.splitting, SplitState::Idle) {
     873            0 :             tracing::info!("Refusing to reconcile, splitting in progress");
     874              :             return None;
     875              :         }
     876              : 
     877              :         // Reconcile already in flight for the current sequence?
     878              :         if let Some(handle) = &self.reconciler {
     879              :             if handle.sequence == self.sequence {
     880            0 :                 tracing::info!(
     881            0 :                     "Reconciliation already in progress for sequence {:?}",
     882            0 :                     self.sequence,
     883            0 :                 );
     884              :                 return Some(ReconcilerWaiter {
     885              :                     tenant_shard_id: self.tenant_shard_id,
     886              :                     seq_wait: self.waiter.clone(),
     887              :                     error_seq_wait: self.error_waiter.clone(),
     888              :                     error: self.last_error.clone(),
     889              :                     seq: self.sequence,
     890              :                 });
     891              :             }
     892              :         }
     893              : 
     894              :         // Pre-checks done: finally check whether we may actually do the work
     895              :         match self.scheduling_policy {
     896              :             ShardSchedulingPolicy::Active
     897              :             | ShardSchedulingPolicy::Essential
     898              :             | ShardSchedulingPolicy::Pause => {}
     899              :             ShardSchedulingPolicy::Stop => {
     900              :                 // We only reach this point if there is work to do and we're going to skip
     901              :                 // doing it: warn it obvious why this tenant isn't doing what it ought to.
     902            0 :                 tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
     903              :                 return None;
     904              :             }
     905              :         }
     906              : 
     907              :         // Build list of nodes from which the reconciler should detach
     908              :         let mut detach = Vec::new();
     909              :         for node_id in self.observed.locations.keys() {
     910              :             if self.intent.get_attached() != &Some(*node_id)
     911              :                 && !self.intent.secondary.contains(node_id)
     912              :             {
     913              :                 detach.push(
     914              :                     pageservers
     915              :                         .get(node_id)
     916              :                         .expect("Intent references non-existent pageserver")
     917              :                         .clone(),
     918              :                 )
     919              :             }
     920              :         }
     921              : 
     922              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
     923              :         // doing our sequence's work.
     924              :         let old_handle = self.reconciler.take();
     925              : 
     926              :         let Ok(gate_guard) = gate.enter() else {
     927              :             // Shutting down, don't start a reconciler
     928              :             return None;
     929              :         };
     930              : 
     931              :         // Advance the sequence before spawning a reconciler, so that sequence waiters
     932              :         // can distinguish between before+after the reconcile completes.
     933              :         self.sequence = self.sequence.next();
     934              : 
     935              :         let reconciler_cancel = cancel.child_token();
     936              :         let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
     937              :         let mut reconciler = Reconciler {
     938              :             tenant_shard_id: self.tenant_shard_id,
     939              :             shard: self.shard,
     940              :             generation: self.generation,
     941              :             intent: reconciler_intent,
     942              :             detach,
     943              :             config: self.config.clone(),
     944              :             observed: self.observed.clone(),
     945              :             compute_hook: compute_hook.clone(),
     946              :             service_config: service_config.clone(),
     947              :             _gate_guard: gate_guard,
     948              :             cancel: reconciler_cancel.clone(),
     949              :             persistence: persistence.clone(),
     950              :             compute_notify_failure: false,
     951              :         };
     952              : 
     953              :         let reconcile_seq = self.sequence;
     954              : 
     955            0 :         tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
     956              :         let must_notify = self.pending_compute_notification;
     957              :         let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
     958              :                                                         tenant_id=%reconciler.tenant_shard_id.tenant_id,
     959              :                                                         shard_id=%reconciler.tenant_shard_id.shard_slug());
     960              :         metrics::METRICS_REGISTRY
     961              :             .metrics_group
     962              :             .storage_controller_reconcile_spawn
     963              :             .inc();
     964              :         let result_tx = result_tx.clone();
     965              :         let join_handle = tokio::task::spawn(
     966            0 :             async move {
     967              :                 // Wait for any previous reconcile task to complete before we start
     968            0 :                 if let Some(old_handle) = old_handle {
     969            0 :                     old_handle.cancel.cancel();
     970            0 :                     if let Err(e) = old_handle.handle.await {
     971              :                         // We can't do much with this other than log it: the task is done, so
     972              :                         // we may proceed with our work.
     973            0 :                         tracing::error!("Unexpected join error waiting for reconcile task: {e}");
     974            0 :                     }
     975            0 :                 }
     976              : 
     977              :                 // Early check for cancellation before doing any work
     978              :                 // TODO: wrap all remote API operations in cancellation check
     979              :                 // as well.
     980            0 :                 if reconciler.cancel.is_cancelled() {
     981            0 :                     metrics::METRICS_REGISTRY
     982            0 :                         .metrics_group
     983            0 :                         .storage_controller_reconcile_complete
     984            0 :                         .inc(ReconcileCompleteLabelGroup {
     985            0 :                             status: ReconcileOutcome::Cancel,
     986            0 :                         });
     987            0 :                     return;
     988            0 :                 }
     989              : 
     990              :                 // Attempt to make observed state match intent state
     991            0 :                 let result = reconciler.reconcile().await;
     992              : 
     993              :                 // If we know we had a pending compute notification from some previous action, send a notification irrespective
     994              :                 // of whether the above reconcile() did any work
     995            0 :                 if result.is_ok() && must_notify {
     996              :                     // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
     997            0 :                     reconciler.compute_notify().await.ok();
     998            0 :                 }
     999              : 
    1000              :                 // Update result counter
    1001            0 :                 let outcome_label = match &result {
    1002            0 :                     Ok(_) => ReconcileOutcome::Success,
    1003            0 :                     Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
    1004            0 :                     Err(_) => ReconcileOutcome::Error,
    1005              :                 };
    1006              : 
    1007            0 :                 metrics::METRICS_REGISTRY
    1008            0 :                     .metrics_group
    1009            0 :                     .storage_controller_reconcile_complete
    1010            0 :                     .inc(ReconcileCompleteLabelGroup {
    1011            0 :                         status: outcome_label,
    1012            0 :                     });
    1013            0 : 
    1014            0 :                 result_tx
    1015            0 :                     .send(ReconcileResult {
    1016            0 :                         sequence: reconcile_seq,
    1017            0 :                         result,
    1018            0 :                         tenant_shard_id: reconciler.tenant_shard_id,
    1019            0 :                         generation: reconciler.generation,
    1020            0 :                         observed: reconciler.observed,
    1021            0 :                         pending_compute_notification: reconciler.compute_notify_failure,
    1022            0 :                     })
    1023            0 :                     .ok();
    1024            0 :             }
    1025              :             .instrument(reconciler_span),
    1026              :         );
    1027              : 
    1028              :         self.reconciler = Some(ReconcilerHandle {
    1029              :             sequence: self.sequence,
    1030              :             handle: join_handle,
    1031              :             cancel: reconciler_cancel,
    1032              :         });
    1033              : 
    1034              :         Some(ReconcilerWaiter {
    1035              :             tenant_shard_id: self.tenant_shard_id,
    1036              :             seq_wait: self.waiter.clone(),
    1037              :             error_seq_wait: self.error_waiter.clone(),
    1038              :             error: self.last_error.clone(),
    1039              :             seq: self.sequence,
    1040              :         })
    1041              :     }
    1042              : 
    1043              :     /// Get a waiter for any reconciliation in flight, but do not start reconciliation
    1044              :     /// if it is not already running
    1045            0 :     pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
    1046            0 :         if self.reconciler.is_some() {
    1047            0 :             Some(ReconcilerWaiter {
    1048            0 :                 tenant_shard_id: self.tenant_shard_id,
    1049            0 :                 seq_wait: self.waiter.clone(),
    1050            0 :                 error_seq_wait: self.error_waiter.clone(),
    1051            0 :                 error: self.last_error.clone(),
    1052            0 :                 seq: self.sequence,
    1053            0 :             })
    1054              :         } else {
    1055            0 :             None
    1056              :         }
    1057            0 :     }
    1058              : 
    1059              :     /// Called when a ReconcileResult has been emitted and the service is updating
    1060              :     /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
    1061              :     /// the handle to indicate there is no longer a reconciliation in progress.
    1062            0 :     pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
    1063            0 :         if let Some(reconcile_handle) = &self.reconciler {
    1064            0 :             if reconcile_handle.sequence <= sequence {
    1065            0 :                 self.reconciler = None;
    1066            0 :             }
    1067            0 :         }
    1068            0 :     }
    1069              : 
    1070              :     // If we had any state at all referring to this node ID, drop it.  Does not
    1071              :     // attempt to reschedule.
    1072            0 :     pub(crate) fn deref_node(&mut self, node_id: NodeId) {
    1073            0 :         if self.intent.attached == Some(node_id) {
    1074            0 :             self.intent.attached = None;
    1075            0 :         }
    1076              : 
    1077            0 :         self.intent.secondary.retain(|n| n != &node_id);
    1078            0 : 
    1079            0 :         self.observed.locations.remove(&node_id);
    1080            0 : 
    1081            0 :         debug_assert!(!self.intent.all_pageservers().contains(&node_id));
    1082            0 :     }
    1083              : 
    1084            0 :     pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
    1085            0 :         self.scheduling_policy = p;
    1086            0 :     }
    1087              : 
    1088            0 :     pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
    1089            0 :         &self.scheduling_policy
    1090            0 :     }
    1091              : 
    1092            0 :     pub(crate) fn from_persistent(
    1093            0 :         tsp: TenantShardPersistence,
    1094            0 :         intent: IntentState,
    1095            0 :     ) -> anyhow::Result<Self> {
    1096            0 :         let tenant_shard_id = tsp.get_tenant_shard_id()?;
    1097            0 :         let shard_identity = tsp.get_shard_identity()?;
    1098              : 
    1099            0 :         Ok(Self {
    1100            0 :             tenant_shard_id,
    1101            0 :             shard: shard_identity,
    1102            0 :             sequence: Sequence::initial(),
    1103            0 :             generation: tsp.generation.map(|g| Generation::new(g as u32)),
    1104            0 :             policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
    1105            0 :             intent,
    1106            0 :             observed: ObservedState::new(),
    1107            0 :             config: serde_json::from_str(&tsp.config).unwrap(),
    1108            0 :             reconciler: None,
    1109            0 :             splitting: tsp.splitting,
    1110            0 :             waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1111            0 :             error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1112            0 :             last_error: Arc::default(),
    1113            0 :             pending_compute_notification: false,
    1114            0 :             scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
    1115            0 :         })
    1116            0 :     }
    1117              : 
    1118            0 :     pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
    1119            0 :         TenantShardPersistence {
    1120            0 :             tenant_id: self.tenant_shard_id.tenant_id.to_string(),
    1121            0 :             shard_number: self.tenant_shard_id.shard_number.0 as i32,
    1122            0 :             shard_count: self.tenant_shard_id.shard_count.literal() as i32,
    1123            0 :             shard_stripe_size: self.shard.stripe_size.0 as i32,
    1124            0 :             generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
    1125            0 :             generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
    1126            0 :             placement_policy: serde_json::to_string(&self.policy).unwrap(),
    1127            0 :             config: serde_json::to_string(&self.config).unwrap(),
    1128            0 :             splitting: SplitState::default(),
    1129            0 :             scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
    1130            0 :         }
    1131            0 :     }
    1132              : }
    1133              : 
    1134              : #[cfg(test)]
    1135              : pub(crate) mod tests {
    1136              :     use pageserver_api::{
    1137              :         controller_api::NodeAvailability,
    1138              :         shard::{ShardCount, ShardNumber},
    1139              :     };
    1140              :     use utils::id::TenantId;
    1141              : 
    1142              :     use crate::scheduler::test_utils::make_test_nodes;
    1143              : 
    1144              :     use super::*;
    1145              : 
    1146           14 :     fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
    1147           14 :         let tenant_id = TenantId::generate();
    1148           14 :         let shard_number = ShardNumber(0);
    1149           14 :         let shard_count = ShardCount::new(1);
    1150           14 : 
    1151           14 :         let tenant_shard_id = TenantShardId {
    1152           14 :             tenant_id,
    1153           14 :             shard_number,
    1154           14 :             shard_count,
    1155           14 :         };
    1156           14 :         TenantShard::new(
    1157           14 :             tenant_shard_id,
    1158           14 :             ShardIdentity::new(
    1159           14 :                 shard_number,
    1160           14 :                 shard_count,
    1161           14 :                 pageserver_api::shard::ShardStripeSize(32768),
    1162           14 :             )
    1163           14 :             .unwrap(),
    1164           14 :             policy,
    1165           14 :         )
    1166           14 :     }
    1167              : 
    1168            2 :     fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
    1169            2 :         let tenant_id = TenantId::generate();
    1170            2 : 
    1171            2 :         (0..shard_count.count())
    1172            8 :             .map(|i| {
    1173            8 :                 let shard_number = ShardNumber(i);
    1174            8 : 
    1175            8 :                 let tenant_shard_id = TenantShardId {
    1176            8 :                     tenant_id,
    1177            8 :                     shard_number,
    1178            8 :                     shard_count,
    1179            8 :                 };
    1180            8 :                 TenantShard::new(
    1181            8 :                     tenant_shard_id,
    1182            8 :                     ShardIdentity::new(
    1183            8 :                         shard_number,
    1184            8 :                         shard_count,
    1185            8 :                         pageserver_api::shard::ShardStripeSize(32768),
    1186            8 :                     )
    1187            8 :                     .unwrap(),
    1188            8 :                     policy.clone(),
    1189            8 :                 )
    1190            8 :             })
    1191            2 :             .collect()
    1192            2 :     }
    1193              : 
    1194              :     /// Test the scheduling behaviors used when a tenant configured for HA is subject
    1195              :     /// to nodes being marked offline.
    1196              :     #[test]
    1197            2 :     fn tenant_ha_scheduling() -> anyhow::Result<()> {
    1198            2 :         // Start with three nodes.  Our tenant will only use two.  The third one is
    1199            2 :         // expected to remain unused.
    1200            2 :         let mut nodes = make_test_nodes(3);
    1201            2 : 
    1202            2 :         let mut scheduler = Scheduler::new(nodes.values());
    1203            2 :         let mut context = ScheduleContext::default();
    1204            2 : 
    1205            2 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1206            2 :         tenant_shard
    1207            2 :             .schedule(&mut scheduler, &mut context)
    1208            2 :             .expect("we have enough nodes, scheduling should work");
    1209            2 : 
    1210            2 :         // Expect to initially be schedule on to different nodes
    1211            2 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    1212            2 :         assert!(tenant_shard.intent.attached.is_some());
    1213              : 
    1214            2 :         let attached_node_id = tenant_shard.intent.attached.unwrap();
    1215            2 :         let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
    1216            2 :         assert_ne!(attached_node_id, secondary_node_id);
    1217              : 
    1218              :         // Notifying the attached node is offline should demote it to a secondary
    1219            2 :         let changed = tenant_shard.intent.demote_attached(attached_node_id);
    1220            2 :         assert!(changed);
    1221            2 :         assert!(tenant_shard.intent.attached.is_none());
    1222            2 :         assert_eq!(tenant_shard.intent.secondary.len(), 2);
    1223              : 
    1224              :         // Update the scheduler state to indicate the node is offline
    1225            2 :         nodes
    1226            2 :             .get_mut(&attached_node_id)
    1227            2 :             .unwrap()
    1228            2 :             .set_availability(NodeAvailability::Offline);
    1229            2 :         scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
    1230            2 : 
    1231            2 :         // Scheduling the node should promote the still-available secondary node to attached
    1232            2 :         tenant_shard
    1233            2 :             .schedule(&mut scheduler, &mut context)
    1234            2 :             .expect("active nodes are available");
    1235            2 :         assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
    1236              : 
    1237              :         // The original attached node should have been retained as a secondary
    1238            2 :         assert_eq!(
    1239            2 :             *tenant_shard.intent.secondary.iter().last().unwrap(),
    1240            2 :             attached_node_id
    1241            2 :         );
    1242              : 
    1243            2 :         tenant_shard.intent.clear(&mut scheduler);
    1244            2 : 
    1245            2 :         Ok(())
    1246            2 :     }
    1247              : 
    1248              :     #[test]
    1249            2 :     fn intent_from_observed() -> anyhow::Result<()> {
    1250            2 :         let nodes = make_test_nodes(3);
    1251            2 :         let mut scheduler = Scheduler::new(nodes.values());
    1252            2 : 
    1253            2 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1254            2 : 
    1255            2 :         tenant_shard.observed.locations.insert(
    1256            2 :             NodeId(3),
    1257            2 :             ObservedStateLocation {
    1258            2 :                 conf: Some(LocationConfig {
    1259            2 :                     mode: LocationConfigMode::AttachedMulti,
    1260            2 :                     generation: Some(2),
    1261            2 :                     secondary_conf: None,
    1262            2 :                     shard_number: tenant_shard.shard.number.0,
    1263            2 :                     shard_count: tenant_shard.shard.count.literal(),
    1264            2 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1265            2 :                     tenant_conf: TenantConfig::default(),
    1266            2 :                 }),
    1267            2 :             },
    1268            2 :         );
    1269            2 : 
    1270            2 :         tenant_shard.observed.locations.insert(
    1271            2 :             NodeId(2),
    1272            2 :             ObservedStateLocation {
    1273            2 :                 conf: Some(LocationConfig {
    1274            2 :                     mode: LocationConfigMode::AttachedStale,
    1275            2 :                     generation: Some(1),
    1276            2 :                     secondary_conf: None,
    1277            2 :                     shard_number: tenant_shard.shard.number.0,
    1278            2 :                     shard_count: tenant_shard.shard.count.literal(),
    1279            2 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1280            2 :                     tenant_conf: TenantConfig::default(),
    1281            2 :                 }),
    1282            2 :             },
    1283            2 :         );
    1284            2 : 
    1285            2 :         tenant_shard.intent_from_observed(&mut scheduler);
    1286            2 : 
    1287            2 :         // The highest generationed attached location gets used as attached
    1288            2 :         assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
    1289              :         // Other locations get used as secondary
    1290            2 :         assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
    1291              : 
    1292            2 :         scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
    1293              : 
    1294            2 :         tenant_shard.intent.clear(&mut scheduler);
    1295            2 :         Ok(())
    1296            2 :     }
    1297              : 
    1298              :     #[test]
    1299            2 :     fn scheduling_mode() -> anyhow::Result<()> {
    1300            2 :         let nodes = make_test_nodes(3);
    1301            2 :         let mut scheduler = Scheduler::new(nodes.values());
    1302            2 : 
    1303            2 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1304            2 : 
    1305            2 :         // In pause mode, schedule() shouldn't do anything
    1306            2 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
    1307            2 :         assert!(tenant_shard
    1308            2 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1309            2 :             .is_ok());
    1310            2 :         assert!(tenant_shard.intent.all_pageservers().is_empty());
    1311              : 
    1312              :         // In active mode, schedule() works
    1313            2 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
    1314            2 :         assert!(tenant_shard
    1315            2 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1316            2 :             .is_ok());
    1317            2 :         assert!(!tenant_shard.intent.all_pageservers().is_empty());
    1318              : 
    1319            2 :         tenant_shard.intent.clear(&mut scheduler);
    1320            2 :         Ok(())
    1321            2 :     }
    1322              : 
    1323              :     #[test]
    1324            2 :     fn optimize_attachment() -> anyhow::Result<()> {
    1325            2 :         let nodes = make_test_nodes(3);
    1326            2 :         let mut scheduler = Scheduler::new(nodes.values());
    1327            2 : 
    1328            2 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1329            2 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1330            2 : 
    1331            2 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1332            2 :         // on different nodes.
    1333            2 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1334            2 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
    1335            2 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1336            2 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1337            2 : 
    1338            2 :         let mut schedule_context = ScheduleContext::default();
    1339            2 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1340            2 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1341            2 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1342            2 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1343            2 : 
    1344            2 :         let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
    1345            2 : 
    1346            2 :         // Either shard should recognize that it has the option to switch to a secondary location where there
    1347            2 :         // would be no other shards from the same tenant, and request to do so.
    1348            2 :         assert_eq!(
    1349            2 :             optimization_a,
    1350            2 :             Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
    1351            2 :                 old_attached_node_id: NodeId(1),
    1352            2 :                 new_attached_node_id: NodeId(2)
    1353            2 :             }))
    1354            2 :         );
    1355              : 
    1356              :         // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
    1357              :         // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
    1358              :         // of [`Service::optimize_all`] to avoid trying
    1359              :         // to do optimizations for multiple shards in the same tenant at the same time.  Generating
    1360              :         // both optimizations is just done for test purposes
    1361            2 :         let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
    1362            2 :         assert_eq!(
    1363            2 :             optimization_b,
    1364            2 :             Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
    1365            2 :                 old_attached_node_id: NodeId(1),
    1366            2 :                 new_attached_node_id: NodeId(3)
    1367            2 :             }))
    1368            2 :         );
    1369              : 
    1370              :         // Applying these optimizations should result in the end state proposed
    1371            2 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1372            2 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
    1373            2 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
    1374            2 :         shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
    1375            2 :         assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
    1376            2 :         assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
    1377              : 
    1378            2 :         shard_a.intent.clear(&mut scheduler);
    1379            2 :         shard_b.intent.clear(&mut scheduler);
    1380            2 : 
    1381            2 :         Ok(())
    1382            2 :     }
    1383              : 
    1384              :     #[test]
    1385            2 :     fn optimize_secondary() -> anyhow::Result<()> {
    1386            2 :         let nodes = make_test_nodes(4);
    1387            2 :         let mut scheduler = Scheduler::new(nodes.values());
    1388            2 : 
    1389            2 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1390            2 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1391            2 : 
    1392            2 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1393            2 :         // on different nodes.
    1394            2 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1395            2 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    1396            2 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    1397            2 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1398            2 : 
    1399            2 :         let mut schedule_context = ScheduleContext::default();
    1400            2 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1401            2 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1402            2 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1403            2 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1404            2 : 
    1405            2 :         let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
    1406            2 : 
    1407            2 :         // Since there is a node with no locations available, the node with two locations for the
    1408            2 :         // same tenant should generate an optimization to move one away
    1409            2 :         assert_eq!(
    1410            2 :             optimization_a,
    1411            2 :             Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
    1412            2 :                 old_node_id: NodeId(3),
    1413            2 :                 new_node_id: NodeId(4)
    1414            2 :             }))
    1415            2 :         );
    1416              : 
    1417            2 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1418            2 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    1419            2 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
    1420              : 
    1421            2 :         shard_a.intent.clear(&mut scheduler);
    1422            2 :         shard_b.intent.clear(&mut scheduler);
    1423            2 : 
    1424            2 :         Ok(())
    1425            2 :     }
    1426              : 
    1427              :     // Optimize til quiescent: this emulates what Service::optimize_all does, when
    1428              :     // called repeatedly in the background.
    1429            2 :     fn optimize_til_idle(
    1430            2 :         nodes: &HashMap<NodeId, Node>,
    1431            2 :         scheduler: &mut Scheduler,
    1432            2 :         shards: &mut [TenantShard],
    1433            2 :     ) {
    1434            2 :         let mut loop_n = 0;
    1435              :         loop {
    1436           18 :             let mut schedule_context = ScheduleContext::default();
    1437           18 :             let mut any_changed = false;
    1438              : 
    1439           72 :             for shard in shards.iter() {
    1440           72 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    1441           72 :                 if let Some(attached) = shard.intent.get_attached() {
    1442           72 :                     schedule_context.push_attached(*attached);
    1443           72 :                 }
    1444              :             }
    1445              : 
    1446           36 :             for shard in shards.iter_mut() {
    1447           36 :                 let optimization = shard.optimize_attachment(nodes, &schedule_context);
    1448           36 :                 if let Some(optimization) = optimization {
    1449            8 :                     shard.apply_optimization(scheduler, optimization);
    1450            8 :                     any_changed = true;
    1451            8 :                     break;
    1452           28 :                 }
    1453           28 : 
    1454           28 :                 let optimization = shard.optimize_secondary(scheduler, &schedule_context);
    1455           28 :                 if let Some(optimization) = optimization {
    1456            8 :                     shard.apply_optimization(scheduler, optimization);
    1457            8 :                     any_changed = true;
    1458            8 :                     break;
    1459           20 :                 }
    1460              :             }
    1461              : 
    1462           18 :             if !any_changed {
    1463            2 :                 break;
    1464           16 :             }
    1465           16 : 
    1466           16 :             // Assert no infinite loop
    1467           16 :             loop_n += 1;
    1468           16 :             assert!(loop_n < 1000);
    1469              :         }
    1470            2 :     }
    1471              : 
    1472              :     /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
    1473              :     /// that it converges.
    1474              :     #[test]
    1475            2 :     fn optimize_add_nodes() -> anyhow::Result<()> {
    1476            2 :         let nodes = make_test_nodes(4);
    1477            2 : 
    1478            2 :         // Only show the scheduler a couple of nodes
    1479            2 :         let mut scheduler = Scheduler::new([].iter());
    1480            2 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1481            2 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    1482            2 : 
    1483            2 :         let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
    1484            2 :         let mut schedule_context = ScheduleContext::default();
    1485           10 :         for shard in &mut shards {
    1486            8 :             assert!(shard
    1487            8 :                 .schedule(&mut scheduler, &mut schedule_context)
    1488            8 :                 .is_ok());
    1489              :         }
    1490              : 
    1491              :         // We should see equal number of locations on the two nodes.
    1492            2 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
    1493            2 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
    1494              : 
    1495              :         // Add another two nodes: we should see the shards spread out when their optimize
    1496              :         // methods are called
    1497            2 :         scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
    1498            2 :         scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
    1499            2 :         optimize_til_idle(&nodes, &mut scheduler, &mut shards);
    1500            2 : 
    1501            2 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
    1502            2 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    1503            2 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
    1504            2 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
    1505              : 
    1506            8 :         for shard in shards.iter_mut() {
    1507            8 :             shard.intent.clear(&mut scheduler);
    1508            8 :         }
    1509              : 
    1510            2 :         Ok(())
    1511            2 :     }
    1512              : }
        

Generated by: LCOV version 2.1-beta