LCOV - code coverage report
Current view: top level - storage_controller/src - tenant_shard.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 64.4 % 940 605
Test Date: 2024-09-24 13:57:57 Functions: 38.5 % 104 40

            Line data    Source code
       1              : use std::{
       2              :     collections::{HashMap, HashSet},
       3              :     sync::Arc,
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use crate::{
       8              :     metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
       9              :     persistence::TenantShardPersistence,
      10              :     reconciler::{ReconcileUnits, ReconcilerConfig},
      11              :     scheduler::{
      12              :         AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
      13              :         SecondaryShardTag,
      14              :     },
      15              :     service::ReconcileResultRequest,
      16              : };
      17              : use pageserver_api::controller_api::{
      18              :     NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
      19              : };
      20              : use pageserver_api::{
      21              :     models::{LocationConfig, LocationConfigMode, TenantConfig},
      22              :     shard::{ShardIdentity, TenantShardId},
      23              : };
      24              : use serde::{Deserialize, Serialize};
      25              : use tokio::task::JoinHandle;
      26              : use tokio_util::sync::CancellationToken;
      27              : use tracing::{instrument, Instrument};
      28              : use utils::{
      29              :     generation::Generation,
      30              :     id::NodeId,
      31              :     seqwait::{SeqWait, SeqWaitError},
      32              :     sync::gate::GateGuard,
      33              : };
      34              : 
      35              : use crate::{
      36              :     compute_hook::ComputeHook,
      37              :     node::Node,
      38              :     persistence::{split_state::SplitState, Persistence},
      39              :     reconciler::{
      40              :         attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
      41              :     },
      42              :     scheduler::{ScheduleError, Scheduler},
      43              :     service, Sequence,
      44              : };
      45              : 
      46              : /// Serialization helper
      47            0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
      48            0 : where
      49            0 :     S: serde::ser::Serializer,
      50            0 :     T: std::fmt::Display,
      51            0 : {
      52            0 :     serializer.collect_str(
      53            0 :         &v.lock()
      54            0 :             .unwrap()
      55            0 :             .as_ref()
      56            0 :             .map(|e| format!("{e}"))
      57            0 :             .unwrap_or("".to_string()),
      58            0 :     )
      59            0 : }
      60              : 
      61              : /// In-memory state for a particular tenant shard.
      62              : ///
      63              : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
      64              : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
      65            0 : #[derive(Serialize)]
      66              : pub(crate) struct TenantShard {
      67              :     pub(crate) tenant_shard_id: TenantShardId,
      68              : 
      69              :     pub(crate) shard: ShardIdentity,
      70              : 
      71              :     // Runtime only: sequence used to coordinate when updating this object while
      72              :     // with background reconcilers may be running.  A reconciler runs to a particular
      73              :     // sequence.
      74              :     pub(crate) sequence: Sequence,
      75              : 
      76              :     // Latest generation number: next time we attach, increment this
      77              :     // and use the incremented number when attaching.
      78              :     //
      79              :     // None represents an incompletely onboarded tenant via the [`Service::location_config`]
      80              :     // API, where this tenant may only run in PlacementPolicy::Secondary.
      81              :     pub(crate) generation: Option<Generation>,
      82              : 
      83              :     // High level description of how the tenant should be set up.  Provided
      84              :     // externally.
      85              :     pub(crate) policy: PlacementPolicy,
      86              : 
      87              :     // Low level description of exactly which pageservers should fulfil
      88              :     // which role.  Generated by `Self::schedule`.
      89              :     pub(crate) intent: IntentState,
      90              : 
      91              :     // Low level description of how the tenant is configured on pageservers:
      92              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      93              :     // with `Self::reconcile`.
      94              :     pub(crate) observed: ObservedState,
      95              : 
      96              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
      97              :     // for all shards in a tenant.
      98              :     pub(crate) config: TenantConfig,
      99              : 
     100              :     /// If a reconcile task is currently in flight, it may be joined here (it is
     101              :     /// only safe to join if either the result has been received or the reconciler's
     102              :     /// cancellation token has been fired)
     103              :     #[serde(skip)]
     104              :     pub(crate) reconciler: Option<ReconcilerHandle>,
     105              : 
     106              :     /// If a tenant is being split, then all shards with that TenantId will have a
     107              :     /// SplitState set, this acts as a guard against other operations such as background
     108              :     /// reconciliation, and timeline creation.
     109              :     pub(crate) splitting: SplitState,
     110              : 
     111              :     /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
     112              :     /// is set. This flag is cleared when the tenant is popped off the delay queue.
     113              :     pub(crate) delayed_reconcile: bool,
     114              : 
     115              :     /// Optionally wait for reconciliation to complete up to a particular
     116              :     /// sequence number.
     117              :     #[serde(skip)]
     118              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     119              : 
     120              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
     121              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
     122              :     /// and callers should stop waiting for `waiter` and propagate the error.
     123              :     #[serde(skip)]
     124              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     125              : 
     126              :     /// The most recent error from a reconcile on this tenant.  This is a nested Arc
     127              :     /// because:
     128              :     ///  - ReconcileWaiters need to Arc-clone the overall object to read it later
     129              :     ///  - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
     130              :     ///    many waiters for one shard, and the underlying error types are not Clone.
     131              :     ///
     132              :     /// TODO: generalize to an array of recent events
     133              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
     134              :     #[serde(serialize_with = "read_last_error")]
     135              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     136              : 
     137              :     /// If we have a pending compute notification that for some reason we weren't able to send,
     138              :     /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
     139              :     /// and trigger a Reconciler run.  This is the mechanism by which compute notifications are included in the scope
     140              :     /// of state that we publish externally in an eventually consistent way.
     141              :     pub(crate) pending_compute_notification: bool,
     142              : 
     143              :     // Support/debug tool: if something is going wrong or flapping with scheduling, this may
     144              :     // be set to a non-active state to avoid making changes while the issue is fixed.
     145              :     scheduling_policy: ShardSchedulingPolicy,
     146              : 
     147              :     // We should attempt to schedule this shard in the provided AZ to
     148              :     // decrease chances of cross-AZ compute.
     149              :     preferred_az_id: Option<String>,
     150              : }
     151              : 
     152              : #[derive(Default, Clone, Debug, Serialize)]
     153              : pub(crate) struct IntentState {
     154              :     attached: Option<NodeId>,
     155              :     secondary: Vec<NodeId>,
     156              : }
     157              : 
     158              : impl IntentState {
     159           13 :     pub(crate) fn new() -> Self {
     160           13 :         Self {
     161           13 :             attached: None,
     162           13 :             secondary: vec![],
     163           13 :         }
     164           13 :     }
     165            0 :     pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
     166            0 :         if let Some(node_id) = node_id {
     167            0 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
     168            0 :         }
     169            0 :         Self {
     170            0 :             attached: node_id,
     171            0 :             secondary: vec![],
     172            0 :         }
     173            0 :     }
     174              : 
     175           32 :     pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
     176           32 :         if self.attached != new_attached {
     177           32 :             if let Some(old_attached) = self.attached.take() {
     178            0 :                 scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
     179           32 :             }
     180           32 :             if let Some(new_attached) = &new_attached {
     181           32 :                 scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
     182           32 :             }
     183           32 :             self.attached = new_attached;
     184            0 :         }
     185           32 :     }
     186              : 
     187              :     /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
     188              :     /// secondary to attached while maintaining the scheduler's reference counts.
     189            5 :     pub(crate) fn promote_attached(
     190            5 :         &mut self,
     191            5 :         scheduler: &mut Scheduler,
     192            5 :         promote_secondary: NodeId,
     193            5 :     ) {
     194            5 :         // If we call this with a node that isn't in secondary, it would cause incorrect
     195            5 :         // scheduler reference counting, since we assume the node is already referenced as a secondary.
     196            5 :         debug_assert!(self.secondary.contains(&promote_secondary));
     197              : 
     198           10 :         self.secondary.retain(|n| n != &promote_secondary);
     199            5 : 
     200            5 :         let demoted = self.attached;
     201            5 :         self.attached = Some(promote_secondary);
     202            5 : 
     203            5 :         scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
     204            5 :         if let Some(demoted) = demoted {
     205            0 :             scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
     206            5 :         }
     207            5 :     }
     208              : 
     209           25 :     pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
     210           25 :         debug_assert!(!self.secondary.contains(&new_secondary));
     211           25 :         scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
     212           25 :         self.secondary.push(new_secondary);
     213           25 :     }
     214              : 
     215              :     /// It is legal to call this with a node that is not currently a secondary: that is a no-op
     216            5 :     pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
     217            5 :         let index = self.secondary.iter().position(|n| *n == node_id);
     218            5 :         if let Some(index) = index {
     219            5 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
     220            5 :             self.secondary.remove(index);
     221            5 :         }
     222            5 :     }
     223              : 
     224           31 :     pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
     225           31 :         for secondary in self.secondary.drain(..) {
     226           20 :             scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
     227           20 :         }
     228           31 :     }
     229              : 
     230              :     /// Remove the last secondary node from the list of secondaries
     231            0 :     pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
     232            0 :         if let Some(node_id) = self.secondary.pop() {
     233            0 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
     234            0 :         }
     235            0 :     }
     236              : 
     237           31 :     pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
     238           31 :         if let Some(old_attached) = self.attached.take() {
     239           31 :             scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
     240           31 :         }
     241              : 
     242           31 :         self.clear_secondary(scheduler);
     243           31 :     }
     244              : 
     245          102 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     246          102 :         let mut result = Vec::new();
     247          102 :         if let Some(p) = self.attached {
     248          100 :             result.push(p)
     249            2 :         }
     250              : 
     251          102 :         result.extend(self.secondary.iter().copied());
     252          102 : 
     253          102 :         result
     254          102 :     }
     255              : 
     256           83 :     pub(crate) fn get_attached(&self) -> &Option<NodeId> {
     257           83 :         &self.attached
     258           83 :     }
     259              : 
     260           24 :     pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
     261           24 :         &self.secondary
     262           24 :     }
     263              : 
     264              :     /// If the node is in use as the attached location, demote it into
     265              :     /// the list of secondary locations.  This is used when a node goes offline,
     266              :     /// and we want to use a different node for attachment, but not permanently
     267              :     /// forget the location on the offline node.
     268              :     ///
     269              :     /// Returns true if a change was made
     270            5 :     pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
     271            5 :         if self.attached == Some(node_id) {
     272            5 :             self.attached = None;
     273            5 :             self.secondary.push(node_id);
     274            5 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
     275            5 :             true
     276              :         } else {
     277            0 :             false
     278              :         }
     279            5 :     }
     280              : }
     281              : 
     282              : impl Drop for IntentState {
     283           32 :     fn drop(&mut self) {
     284           32 :         // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
     285           32 :         // We do not check this while panicking, to avoid polluting unit test failures or
     286           32 :         // other assertions with this assertion's output.  It's still wrong to leak these,
     287           32 :         // but if we already have a panic then we don't need to independently flag this case.
     288           32 :         if !(std::thread::panicking()) {
     289           32 :             debug_assert!(self.attached.is_none() && self.secondary.is_empty());
     290            0 :         }
     291           31 :     }
     292              : }
     293              : 
     294            0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
     295              : pub(crate) struct ObservedState {
     296              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
     297              : }
     298              : 
     299              : /// Our latest knowledge of how this tenant is configured in the outside world.
     300              : ///
     301              : /// Meaning:
     302              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
     303              : ///       node for this shard.
     304              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
     305              : ///       what it is (e.g. we failed partway through configuring it)
     306              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     307              : ///       and that configuration will still be present unless something external interfered.
     308            0 : #[derive(Clone, Serialize, Deserialize, Debug)]
     309              : pub(crate) struct ObservedStateLocation {
     310              :     /// If None, it means we do not know the status of this shard's location on this node, but
     311              :     /// we know that we might have some state on this node.
     312              :     pub(crate) conf: Option<LocationConfig>,
     313              : }
     314              : pub(crate) struct ReconcilerWaiter {
     315              :     // For observability purposes, remember the ID of the shard we're
     316              :     // waiting for.
     317              :     pub(crate) tenant_shard_id: TenantShardId,
     318              : 
     319              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     320              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     321              :     error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     322              :     seq: Sequence,
     323              : }
     324              : 
     325              : pub(crate) enum ReconcilerStatus {
     326              :     Done,
     327              :     Failed,
     328              :     InProgress,
     329              : }
     330              : 
     331            0 : #[derive(thiserror::Error, Debug)]
     332              : pub(crate) enum ReconcileWaitError {
     333              :     #[error("Timeout waiting for shard {0}")]
     334              :     Timeout(TenantShardId),
     335              :     #[error("shutting down")]
     336              :     Shutdown,
     337              :     #[error("Reconcile error on shard {0}: {1}")]
     338              :     Failed(TenantShardId, Arc<ReconcileError>),
     339              : }
     340              : 
     341              : #[derive(Eq, PartialEq, Debug, Clone)]
     342              : pub(crate) struct ReplaceSecondary {
     343              :     old_node_id: NodeId,
     344              :     new_node_id: NodeId,
     345              : }
     346              : 
     347              : #[derive(Eq, PartialEq, Debug, Clone)]
     348              : pub(crate) struct MigrateAttachment {
     349              :     pub(crate) old_attached_node_id: NodeId,
     350              :     pub(crate) new_attached_node_id: NodeId,
     351              : }
     352              : 
     353              : #[derive(Eq, PartialEq, Debug, Clone)]
     354              : pub(crate) enum ScheduleOptimizationAction {
     355              :     // Replace one of our secondary locations with a different node
     356              :     ReplaceSecondary(ReplaceSecondary),
     357              :     // Migrate attachment to an existing secondary location
     358              :     MigrateAttachment(MigrateAttachment),
     359              : }
     360              : 
     361              : #[derive(Eq, PartialEq, Debug, Clone)]
     362              : pub(crate) struct ScheduleOptimization {
     363              :     // What was the reconcile sequence when we generated this optimization?  The optimization
     364              :     // should only be applied if the shard's sequence is still at this value, in case other changes
     365              :     // happened between planning the optimization and applying it.
     366              :     sequence: Sequence,
     367              : 
     368              :     pub(crate) action: ScheduleOptimizationAction,
     369              : }
     370              : 
     371              : impl ReconcilerWaiter {
     372            0 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     373            0 :         tokio::select! {
     374            0 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     375            0 :                 result.map_err(|e| match e {
     376            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     377            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     378            0 :                 })?;
     379              :             },
     380            0 :             result = self.error_seq_wait.wait_for(self.seq) => {
     381            0 :                 result.map_err(|e| match e {
     382            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     383            0 :                     SeqWaitError::Timeout => unreachable!()
     384            0 :                 })?;
     385              : 
     386            0 :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
     387            0 :                     self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
     388              :             }
     389              :         }
     390              : 
     391            0 :         Ok(())
     392            0 :     }
     393              : 
     394            0 :     pub(crate) fn get_status(&self) -> ReconcilerStatus {
     395            0 :         if self.seq_wait.would_wait_for(self.seq).is_ok() {
     396            0 :             ReconcilerStatus::Done
     397            0 :         } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
     398            0 :             ReconcilerStatus::Failed
     399              :         } else {
     400            0 :             ReconcilerStatus::InProgress
     401              :         }
     402            0 :     }
     403              : }
     404              : 
     405              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     406              : /// information to optionally cancel & await it later.
     407              : pub(crate) struct ReconcilerHandle {
     408              :     sequence: Sequence,
     409              :     handle: JoinHandle<()>,
     410              :     cancel: CancellationToken,
     411              : }
     412              : 
     413              : pub(crate) enum ReconcileNeeded {
     414              :     /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
     415              :     /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
     416              :     No,
     417              :     /// shard has a reconciler running, and its intent hasn't changed since that one was
     418              :     /// spawned: wait for the existing reconciler rather than spawning a new one.
     419              :     WaitExisting(ReconcilerWaiter),
     420              :     /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
     421              :     Yes,
     422              : }
     423              : 
     424              : /// When a reconcile task completes, it sends this result object
     425              : /// to be applied to the primary TenantShard.
     426              : pub(crate) struct ReconcileResult {
     427              :     pub(crate) sequence: Sequence,
     428              :     /// On errors, `observed` should be treated as an incompleted description
     429              :     /// of state (i.e. any nodes present in the result should override nodes
     430              :     /// present in the parent tenant state, but any unmentioned nodes should
     431              :     /// not be removed from parent tenant state)
     432              :     pub(crate) result: Result<(), ReconcileError>,
     433              : 
     434              :     pub(crate) tenant_shard_id: TenantShardId,
     435              :     pub(crate) generation: Option<Generation>,
     436              :     pub(crate) observed: ObservedState,
     437              : 
     438              :     /// Set [`TenantShard::pending_compute_notification`] from this flag
     439              :     pub(crate) pending_compute_notification: bool,
     440              : }
     441              : 
     442              : impl ObservedState {
     443            0 :     pub(crate) fn new() -> Self {
     444            0 :         Self {
     445            0 :             locations: HashMap::new(),
     446            0 :         }
     447            0 :     }
     448              : }
     449              : 
     450              : impl TenantShard {
     451           19 :     pub(crate) fn new(
     452           19 :         tenant_shard_id: TenantShardId,
     453           19 :         shard: ShardIdentity,
     454           19 :         policy: PlacementPolicy,
     455           19 :     ) -> Self {
     456           19 :         Self {
     457           19 :             tenant_shard_id,
     458           19 :             policy,
     459           19 :             intent: IntentState::default(),
     460           19 :             generation: Some(Generation::new(0)),
     461           19 :             shard,
     462           19 :             observed: ObservedState::default(),
     463           19 :             config: TenantConfig::default(),
     464           19 :             reconciler: None,
     465           19 :             splitting: SplitState::Idle,
     466           19 :             sequence: Sequence(1),
     467           19 :             delayed_reconcile: false,
     468           19 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     469           19 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     470           19 :             last_error: Arc::default(),
     471           19 :             pending_compute_notification: false,
     472           19 :             scheduling_policy: ShardSchedulingPolicy::default(),
     473           19 :             preferred_az_id: None,
     474           19 :         }
     475           19 :     }
     476              : 
     477              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     478              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     479              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     480              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     481            1 :     pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
     482            1 :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     483            1 :         // generation
     484            1 :         let mut attached_locs = self
     485            1 :             .observed
     486            1 :             .locations
     487            1 :             .iter()
     488            2 :             .filter_map(|(node_id, l)| {
     489            2 :                 if let Some(conf) = &l.conf {
     490            2 :                     if conf.mode == LocationConfigMode::AttachedMulti
     491            1 :                         || conf.mode == LocationConfigMode::AttachedSingle
     492            1 :                         || conf.mode == LocationConfigMode::AttachedStale
     493              :                     {
     494            2 :                         Some((node_id, conf.generation))
     495              :                     } else {
     496            0 :                         None
     497              :                     }
     498              :                 } else {
     499            0 :                     None
     500              :                 }
     501            2 :             })
     502            1 :             .collect::<Vec<_>>();
     503            1 : 
     504            2 :         attached_locs.sort_by_key(|i| i.1);
     505            1 :         if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
     506            1 :             self.intent.set_attached(scheduler, Some(*node_id));
     507            1 :         }
     508              : 
     509              :         // All remaining observed locations generate secondary intents.  This includes None
     510              :         // observations, as these may well have some local content on disk that is usable (this
     511              :         // is an edge case that might occur if we restarted during a migration or other change)
     512              :         //
     513              :         // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
     514              :         // will take care of promoting one of these secondaries to be attached.
     515            2 :         self.observed.locations.keys().for_each(|node_id| {
     516            2 :             if Some(*node_id) != self.intent.attached {
     517            1 :                 self.intent.push_secondary(scheduler, *node_id);
     518            1 :             }
     519            2 :         });
     520            1 :     }
     521              : 
     522              :     /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
     523              :     /// attached pageserver for a shard.
     524              :     ///
     525              :     /// Returns whether we modified it, and the NodeId selected.
     526           15 :     fn schedule_attached(
     527           15 :         &mut self,
     528           15 :         scheduler: &mut Scheduler,
     529           15 :         context: &ScheduleContext,
     530           15 :     ) -> Result<(bool, NodeId), ScheduleError> {
     531              :         // No work to do if we already have an attached tenant
     532           15 :         if let Some(node_id) = self.intent.attached {
     533            0 :             return Ok((false, node_id));
     534           15 :         }
     535              : 
     536           15 :         if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
     537              :             // Promote a secondary
     538            1 :             tracing::debug!("Promoted secondary {} to attached", promote_secondary);
     539            1 :             self.intent.promote_attached(scheduler, promote_secondary);
     540            1 :             Ok((true, promote_secondary))
     541              :         } else {
     542              :             // Pick a fresh node: either we had no secondaries or none were schedulable
     543           14 :             let node_id =
     544           14 :                 scheduler.schedule_shard::<AttachedShardTag>(&self.intent.secondary, context)?;
     545           14 :             tracing::debug!("Selected {} as attached", node_id);
     546           14 :             self.intent.set_attached(scheduler, Some(node_id));
     547           14 :             Ok((true, node_id))
     548              :         }
     549           15 :     }
     550              : 
     551           16 :     pub(crate) fn schedule(
     552           16 :         &mut self,
     553           16 :         scheduler: &mut Scheduler,
     554           16 :         context: &mut ScheduleContext,
     555           16 :     ) -> Result<(), ScheduleError> {
     556           16 :         let r = self.do_schedule(scheduler, context);
     557           16 : 
     558           16 :         context.avoid(&self.intent.all_pageservers());
     559           16 :         if let Some(attached) = self.intent.get_attached() {
     560           15 :             context.push_attached(*attached);
     561           15 :         }
     562              : 
     563           16 :         r
     564           16 :     }
     565              : 
     566           16 :     pub(crate) fn do_schedule(
     567           16 :         &mut self,
     568           16 :         scheduler: &mut Scheduler,
     569           16 :         context: &ScheduleContext,
     570           16 :     ) -> Result<(), ScheduleError> {
     571           16 :         // TODO: before scheduling new nodes, check if any existing content in
     572           16 :         // self.intent refers to pageservers that are offline, and pick other
     573           16 :         // pageservers if so.
     574           16 : 
     575           16 :         // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
     576           16 :         // change their attach location.
     577           16 : 
     578           16 :         match self.scheduling_policy {
     579           15 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
     580              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     581              :                 // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
     582            1 :                 tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     583            0 :                     "Scheduling is disabled by policy {:?}", self.scheduling_policy);
     584            1 :                 return Ok(());
     585              :             }
     586              :         }
     587              : 
     588              :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     589              :         // more work on the same pageservers we're already using.
     590           15 :         let mut modified = false;
     591              : 
     592              :         // Add/remove nodes to fulfil policy
     593              :         use PlacementPolicy::*;
     594           15 :         match self.policy {
     595           15 :             Attached(secondary_count) => {
     596           15 :                 let retain_secondaries = if self.intent.attached.is_none()
     597           15 :                     && scheduler.node_preferred(&self.intent.secondary).is_some()
     598              :                 {
     599              :                     // If we have no attached, and one of the secondaries is elegible to be promoted, retain
     600              :                     // one more secondary than we usually would, as one of them will become attached futher down this function.
     601            1 :                     secondary_count + 1
     602              :                 } else {
     603           14 :                     secondary_count
     604              :                 };
     605              : 
     606           15 :                 while self.intent.secondary.len() > retain_secondaries {
     607            0 :                     // We have no particular preference for one secondary location over another: just
     608            0 :                     // arbitrarily drop from the end
     609            0 :                     self.intent.pop_secondary(scheduler);
     610            0 :                     modified = true;
     611            0 :                 }
     612              : 
     613              :                 // Should have exactly one attached, and N secondaries
     614           15 :                 let (modified_attached, attached_node_id) =
     615           15 :                     self.schedule_attached(scheduler, context)?;
     616           15 :                 modified |= modified_attached;
     617           15 : 
     618           15 :                 let mut used_pageservers = vec![attached_node_id];
     619           29 :                 while self.intent.secondary.len() < secondary_count {
     620           14 :                     let node_id = scheduler
     621           14 :                         .schedule_shard::<SecondaryShardTag>(&used_pageservers, context)?;
     622           14 :                     self.intent.push_secondary(scheduler, node_id);
     623           14 :                     used_pageservers.push(node_id);
     624           14 :                     modified = true;
     625              :                 }
     626              :             }
     627              :             Secondary => {
     628            0 :                 if let Some(node_id) = self.intent.get_attached() {
     629            0 :                     // Populate secondary by demoting the attached node
     630            0 :                     self.intent.demote_attached(scheduler, *node_id);
     631            0 :                     modified = true;
     632            0 :                 } else if self.intent.secondary.is_empty() {
     633              :                     // Populate secondary by scheduling a fresh node
     634            0 :                     let node_id = scheduler.schedule_shard::<SecondaryShardTag>(&[], context)?;
     635            0 :                     self.intent.push_secondary(scheduler, node_id);
     636            0 :                     modified = true;
     637            0 :                 }
     638            0 :                 while self.intent.secondary.len() > 1 {
     639            0 :                     // We have no particular preference for one secondary location over another: just
     640            0 :                     // arbitrarily drop from the end
     641            0 :                     self.intent.pop_secondary(scheduler);
     642            0 :                     modified = true;
     643            0 :                 }
     644              :             }
     645              :             Detached => {
     646              :                 // Never add locations in this mode
     647            0 :                 if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
     648            0 :                     self.intent.clear(scheduler);
     649            0 :                     modified = true;
     650            0 :                 }
     651              :             }
     652              :         }
     653              : 
     654           15 :         if modified {
     655           15 :             self.sequence.0 += 1;
     656           15 :         }
     657              : 
     658           15 :         Ok(())
     659           16 :     }
     660              : 
     661              :     /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
     662              :     /// if the swap is not possible and leaves the intent state in its original state.
     663              :     ///
     664              :     /// Arguments:
     665              :     /// `attached_to`: the currently attached location matching the intent state (may be None if the
     666              :     /// shard is not attached)
     667              :     /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
     668              :     /// the scheduler to recommend a node
     669            0 :     pub(crate) fn reschedule_to_secondary(
     670            0 :         &mut self,
     671            0 :         promote_to: Option<NodeId>,
     672            0 :         scheduler: &mut Scheduler,
     673            0 :     ) -> Result<(), ScheduleError> {
     674            0 :         let promote_to = match promote_to {
     675            0 :             Some(node) => node,
     676            0 :             None => match scheduler.node_preferred(self.intent.get_secondary()) {
     677            0 :                 Some(node) => node,
     678              :                 None => {
     679            0 :                     return Err(ScheduleError::ImpossibleConstraint);
     680              :                 }
     681              :             },
     682              :         };
     683              : 
     684            0 :         assert!(self.intent.get_secondary().contains(&promote_to));
     685              : 
     686            0 :         if let Some(node) = self.intent.get_attached() {
     687            0 :             let demoted = self.intent.demote_attached(scheduler, *node);
     688            0 :             if !demoted {
     689            0 :                 return Err(ScheduleError::ImpossibleConstraint);
     690            0 :             }
     691            0 :         }
     692              : 
     693            0 :         self.intent.promote_attached(scheduler, promote_to);
     694            0 : 
     695            0 :         // Increment the sequence number for the edge case where a
     696            0 :         // reconciler is already running to avoid waiting on the
     697            0 :         // current reconcile instead of spawning a new one.
     698            0 :         self.sequence = self.sequence.next();
     699            0 : 
     700            0 :         Ok(())
     701            0 :     }
     702              : 
     703              :     /// Optimize attachments: if a shard has a secondary location that is preferable to
     704              :     /// its primary location based on soft constraints, switch that secondary location
     705              :     /// to be attached.
     706           23 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     707              :     pub(crate) fn optimize_attachment(
     708              :         &self,
     709              :         nodes: &HashMap<NodeId, Node>,
     710              :         schedule_context: &ScheduleContext,
     711              :     ) -> Option<ScheduleOptimization> {
     712              :         let attached = (*self.intent.get_attached())?;
     713              :         if self.intent.secondary.is_empty() {
     714              :             // We can only do useful work if we have both attached and secondary locations: this
     715              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     716              :             return None;
     717              :         }
     718              : 
     719              :         let current_affinity_score = schedule_context.get_node_affinity(attached);
     720              :         let current_attachment_count = schedule_context.get_node_attachments(attached);
     721              : 
     722              :         // Generate score for each node, dropping any un-schedulable nodes.
     723              :         let all_pageservers = self.intent.all_pageservers();
     724              :         let mut scores = all_pageservers
     725              :             .iter()
     726           46 :             .flat_map(|node_id| {
     727           46 :                 let node = nodes.get(node_id);
     728           46 :                 if node.is_none() {
     729            0 :                     None
     730           46 :                 } else if matches!(
     731           46 :                     node.unwrap().get_scheduling(),
     732              :                     NodeSchedulingPolicy::Filling
     733              :                 ) {
     734              :                     // If the node is currently filling, don't count it as a candidate to avoid,
     735              :                     // racing with the background fill.
     736            0 :                     None
     737           46 :                 } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
     738            0 :                     None
     739              :                 } else {
     740           46 :                     let affinity_score = schedule_context.get_node_affinity(*node_id);
     741           46 :                     let attachment_count = schedule_context.get_node_attachments(*node_id);
     742           46 :                     Some((*node_id, affinity_score, attachment_count))
     743              :                 }
     744           46 :             })
     745              :             .collect::<Vec<_>>();
     746              : 
     747              :         // Sort precedence:
     748              :         //  1st - prefer nodes with the lowest total affinity score
     749              :         //  2nd - prefer nodes with the lowest number of attachments in this context
     750              :         //  3rd - if all else is equal, sort by node ID for determinism in tests.
     751           46 :         scores.sort_by_key(|i| (i.1, i.2, i.0));
     752              : 
     753              :         if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
     754              :             scores.first()
     755              :         {
     756              :             if attached != *preferred_node {
     757              :                 // The best alternative must be more than 1 better than us, otherwise we could end
     758              :                 // up flapping back next time we're called (e.g. there's no point migrating from
     759              :                 // a location with score 1 to a score zero, because on next location the situation
     760              :                 // would be the same, but in reverse).
     761              :                 if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
     762              :                     || current_attachment_count > *preferred_attachment_count + 1
     763              :                 {
     764              :                     tracing::info!(
     765              :                         "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
     766              :                         self.intent.get_secondary()
     767              :                     );
     768              :                     return Some(ScheduleOptimization {
     769              :                         sequence: self.sequence,
     770              :                         action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
     771              :                             old_attached_node_id: attached,
     772              :                             new_attached_node_id: *preferred_node,
     773              :                         }),
     774              :                     });
     775              :                 }
     776              :             } else {
     777              :                 tracing::debug!(
     778              :                     "Node {} is already preferred (score {:?})",
     779              :                     preferred_node,
     780              :                     preferred_affinity_score
     781              :                 );
     782              :             }
     783              :         }
     784              : 
     785              :         // Fall-through: we didn't find an optimization
     786              :         None
     787              :     }
     788              : 
     789           20 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     790              :     pub(crate) fn optimize_secondary(
     791              :         &self,
     792              :         scheduler: &mut Scheduler,
     793              :         schedule_context: &ScheduleContext,
     794              :     ) -> Option<ScheduleOptimization> {
     795              :         if self.intent.secondary.is_empty() {
     796              :             // We can only do useful work if we have both attached and secondary locations: this
     797              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     798              :             return None;
     799              :         }
     800              : 
     801              :         for secondary in self.intent.get_secondary() {
     802              :             let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
     803              :                 // We're already on a node unaffected any affinity constraints,
     804              :                 // so we won't change it.
     805              :                 continue;
     806              :             };
     807              : 
     808              :             // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
     809              :             // This implicitly limits the choice to nodes that are available, and prefers nodes
     810              :             // with lower utilization.
     811              :             let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
     812              :                 &self.intent.all_pageservers(),
     813              :                 schedule_context,
     814              :             ) else {
     815              :                 // A scheduling error means we have no possible candidate replacements
     816              :                 continue;
     817              :             };
     818              : 
     819              :             let candidate_affinity_score = schedule_context
     820              :                 .nodes
     821              :                 .get(&candidate_node)
     822              :                 .unwrap_or(&AffinityScore::FREE);
     823              : 
     824              :             // The best alternative must be more than 1 better than us, otherwise we could end
     825              :             // up flapping back next time we're called.
     826              :             if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
     827              :                 // If some other node is available and has a lower score than this node, then
     828              :                 // that other node is a good place to migrate to.
     829              :                 tracing::info!(
     830              :                     "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
     831              :                     self.intent.get_secondary()
     832              :                 );
     833              :                 return Some(ScheduleOptimization {
     834              :                     sequence: self.sequence,
     835              :                     action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
     836              :                         old_node_id: *secondary,
     837              :                         new_node_id: candidate_node,
     838              :                     }),
     839              :                 });
     840              :             }
     841              :         }
     842              : 
     843              :         None
     844              :     }
     845              : 
     846              :     /// Return true if the optimization was really applied: it will not be applied if the optimization's
     847              :     /// sequence is behind this tenant shard's
     848            9 :     pub(crate) fn apply_optimization(
     849            9 :         &mut self,
     850            9 :         scheduler: &mut Scheduler,
     851            9 :         optimization: ScheduleOptimization,
     852            9 :     ) -> bool {
     853            9 :         if optimization.sequence != self.sequence {
     854            0 :             return false;
     855            9 :         }
     856            9 : 
     857            9 :         metrics::METRICS_REGISTRY
     858            9 :             .metrics_group
     859            9 :             .storage_controller_schedule_optimization
     860            9 :             .inc();
     861            9 : 
     862            9 :         match optimization.action {
     863              :             ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
     864            4 :                 old_attached_node_id,
     865            4 :                 new_attached_node_id,
     866            4 :             }) => {
     867            4 :                 self.intent.demote_attached(scheduler, old_attached_node_id);
     868            4 :                 self.intent
     869            4 :                     .promote_attached(scheduler, new_attached_node_id);
     870            4 :             }
     871              :             ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
     872            5 :                 old_node_id,
     873            5 :                 new_node_id,
     874            5 :             }) => {
     875            5 :                 self.intent.remove_secondary(scheduler, old_node_id);
     876            5 :                 self.intent.push_secondary(scheduler, new_node_id);
     877            5 :             }
     878              :         }
     879              : 
     880            9 :         true
     881            9 :     }
     882              : 
     883              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
     884              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
     885              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
     886              :     ///
     887              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
     888              :     /// funciton should not be used to decide whether to reconcile.
     889            0 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
     890            0 :         if let Some(attach_intent) = self.intent.attached {
     891            0 :             match self.observed.locations.get(&attach_intent) {
     892            0 :                 Some(loc) => match &loc.conf {
     893            0 :                     Some(conf) => match conf.mode {
     894              :                         LocationConfigMode::AttachedMulti
     895              :                         | LocationConfigMode::AttachedSingle
     896              :                         | LocationConfigMode::AttachedStale => {
     897              :                             // Our intent and observed state agree that this node is in an attached state.
     898            0 :                             Some(attach_intent)
     899              :                         }
     900              :                         // Our observed config is not an attached state
     901            0 :                         _ => None,
     902              :                     },
     903              :                     // Our observed state is None, i.e. in flux
     904            0 :                     None => None,
     905              :                 },
     906              :                 // We have no observed state for this node
     907            0 :                 None => None,
     908              :             }
     909              :         } else {
     910              :             // Our intent is not to attach
     911            0 :             None
     912              :         }
     913            0 :     }
     914              : 
     915            0 :     fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
     916            0 :         let mut dirty_nodes = HashSet::new();
     917              : 
     918            0 :         if let Some(node_id) = self.intent.attached {
     919              :             // Maybe panic: it is a severe bug if we try to attach while generation is null.
     920            0 :             let generation = self
     921            0 :                 .generation
     922            0 :                 .expect("Attempted to enter attached state without a generation");
     923            0 : 
     924            0 :             let wanted_conf =
     925            0 :                 attached_location_conf(generation, &self.shard, &self.config, &self.policy);
     926            0 :             match self.observed.locations.get(&node_id) {
     927            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     928            0 :                 Some(_) | None => {
     929            0 :                     dirty_nodes.insert(node_id);
     930            0 :                 }
     931              :             }
     932            0 :         }
     933              : 
     934            0 :         for node_id in &self.intent.secondary {
     935            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     936            0 :             match self.observed.locations.get(node_id) {
     937            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     938            0 :                 Some(_) | None => {
     939            0 :                     dirty_nodes.insert(*node_id);
     940            0 :                 }
     941              :             }
     942              :         }
     943              : 
     944            0 :         for node_id in self.observed.locations.keys() {
     945            0 :             if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
     946            0 :                 // We have observed state that isn't part of our intent: need to clean it up.
     947            0 :                 dirty_nodes.insert(*node_id);
     948            0 :             }
     949              :         }
     950              : 
     951            0 :         dirty_nodes.retain(|node_id| {
     952            0 :             nodes
     953            0 :                 .get(node_id)
     954            0 :                 .map(|n| n.is_available())
     955            0 :                 .unwrap_or(false)
     956            0 :         });
     957            0 : 
     958            0 :         !dirty_nodes.is_empty()
     959            0 :     }
     960              : 
     961              :     #[allow(clippy::too_many_arguments)]
     962            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     963              :     pub(crate) fn get_reconcile_needed(
     964              :         &mut self,
     965              :         pageservers: &Arc<HashMap<NodeId, Node>>,
     966              :     ) -> ReconcileNeeded {
     967              :         // If there are any ambiguous observed states, and the nodes they refer to are available,
     968              :         // we should reconcile to clean them up.
     969              :         let mut dirty_observed = false;
     970              :         for (node_id, observed_loc) in &self.observed.locations {
     971              :             let node = pageservers
     972              :                 .get(node_id)
     973              :                 .expect("Nodes may not be removed while referenced");
     974              :             if observed_loc.conf.is_none() && node.is_available() {
     975              :                 dirty_observed = true;
     976              :                 break;
     977              :             }
     978              :         }
     979              : 
     980              :         let active_nodes_dirty = self.dirty(pageservers);
     981              : 
     982              :         // Even if there is no pageserver work to be done, if we have a pending notification to computes,
     983              :         // wake up a reconciler to send it.
     984              :         let do_reconcile =
     985              :             active_nodes_dirty || dirty_observed || self.pending_compute_notification;
     986              : 
     987              :         if !do_reconcile {
     988              :             tracing::debug!("Not dirty, no reconciliation needed.");
     989              :             return ReconcileNeeded::No;
     990              :         }
     991              : 
     992              :         // If we are currently splitting, then never start a reconciler task: the splitting logic
     993              :         // requires that shards are not interfered with while it runs. Do this check here rather than
     994              :         // up top, so that we only log this message if we would otherwise have done a reconciliation.
     995              :         if !matches!(self.splitting, SplitState::Idle) {
     996              :             tracing::info!("Refusing to reconcile, splitting in progress");
     997              :             return ReconcileNeeded::No;
     998              :         }
     999              : 
    1000              :         // Reconcile already in flight for the current sequence?
    1001              :         if let Some(handle) = &self.reconciler {
    1002              :             if handle.sequence == self.sequence {
    1003              :                 tracing::info!(
    1004              :                     "Reconciliation already in progress for sequence {:?}",
    1005              :                     self.sequence,
    1006              :                 );
    1007              :                 return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
    1008              :                     tenant_shard_id: self.tenant_shard_id,
    1009              :                     seq_wait: self.waiter.clone(),
    1010              :                     error_seq_wait: self.error_waiter.clone(),
    1011              :                     error: self.last_error.clone(),
    1012              :                     seq: self.sequence,
    1013              :                 });
    1014              :             }
    1015              :         }
    1016              : 
    1017              :         // Pre-checks done: finally check whether we may actually do the work
    1018              :         match self.scheduling_policy {
    1019              :             ShardSchedulingPolicy::Active
    1020              :             | ShardSchedulingPolicy::Essential
    1021              :             | ShardSchedulingPolicy::Pause => {}
    1022              :             ShardSchedulingPolicy::Stop => {
    1023              :                 // We only reach this point if there is work to do and we're going to skip
    1024              :                 // doing it: warn it obvious why this tenant isn't doing what it ought to.
    1025              :                 tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
    1026              :                 return ReconcileNeeded::No;
    1027              :             }
    1028              :         }
    1029              : 
    1030              :         ReconcileNeeded::Yes
    1031              :     }
    1032              : 
    1033              :     /// Ensure the sequence number is set to a value where waiting for this value will make us wait
    1034              :     /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
    1035              :     ///
    1036              :     /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
    1037              :     /// that the waiter will not complete until some future Reconciler is constructed and run.
    1038            0 :     fn ensure_sequence_ahead(&mut self) {
    1039            0 :         // Find the highest sequence for which a Reconciler has previously run or is currently
    1040            0 :         // running
    1041            0 :         let max_seen = std::cmp::max(
    1042            0 :             self.reconciler
    1043            0 :                 .as_ref()
    1044            0 :                 .map(|r| r.sequence)
    1045            0 :                 .unwrap_or(Sequence(0)),
    1046            0 :             std::cmp::max(self.waiter.load(), self.error_waiter.load()),
    1047            0 :         );
    1048            0 : 
    1049            0 :         if self.sequence <= max_seen {
    1050            0 :             self.sequence = max_seen.next();
    1051            0 :         }
    1052            0 :     }
    1053              : 
    1054              :     /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
    1055              :     ///
    1056              :     /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
    1057              :     /// you would like to wait on the next reconciler that gets spawned in the background.
    1058            0 :     pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
    1059            0 :         self.ensure_sequence_ahead();
    1060            0 : 
    1061            0 :         ReconcilerWaiter {
    1062            0 :             tenant_shard_id: self.tenant_shard_id,
    1063            0 :             seq_wait: self.waiter.clone(),
    1064            0 :             error_seq_wait: self.error_waiter.clone(),
    1065            0 :             error: self.last_error.clone(),
    1066            0 :             seq: self.sequence,
    1067            0 :         }
    1068            0 :     }
    1069              : 
    1070              :     #[allow(clippy::too_many_arguments)]
    1071            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1072              :     pub(crate) fn spawn_reconciler(
    1073              :         &mut self,
    1074              :         result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
    1075              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1076              :         compute_hook: &Arc<ComputeHook>,
    1077              :         reconciler_config: ReconcilerConfig,
    1078              :         service_config: &service::Config,
    1079              :         persistence: &Arc<Persistence>,
    1080              :         units: ReconcileUnits,
    1081              :         gate_guard: GateGuard,
    1082              :         cancel: &CancellationToken,
    1083              :     ) -> Option<ReconcilerWaiter> {
    1084              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
    1085              :         // doing our sequence's work.
    1086              :         let old_handle = self.reconciler.take();
    1087              : 
    1088              :         // Build list of nodes from which the reconciler should detach
    1089              :         let mut detach = Vec::new();
    1090              :         for node_id in self.observed.locations.keys() {
    1091              :             if self.intent.get_attached() != &Some(*node_id)
    1092              :                 && !self.intent.secondary.contains(node_id)
    1093              :             {
    1094              :                 detach.push(
    1095              :                     pageservers
    1096              :                         .get(node_id)
    1097              :                         .expect("Intent references non-existent pageserver")
    1098              :                         .clone(),
    1099              :                 )
    1100              :             }
    1101              :         }
    1102              : 
    1103              :         // Advance the sequence before spawning a reconciler, so that sequence waiters
    1104              :         // can distinguish between before+after the reconcile completes.
    1105              :         self.ensure_sequence_ahead();
    1106              : 
    1107              :         let reconciler_cancel = cancel.child_token();
    1108              :         let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
    1109              :         let mut reconciler = Reconciler {
    1110              :             tenant_shard_id: self.tenant_shard_id,
    1111              :             shard: self.shard,
    1112              :             placement_policy: self.policy.clone(),
    1113              :             generation: self.generation,
    1114              :             intent: reconciler_intent,
    1115              :             detach,
    1116              :             reconciler_config,
    1117              :             config: self.config.clone(),
    1118              :             observed: self.observed.clone(),
    1119              :             compute_hook: compute_hook.clone(),
    1120              :             service_config: service_config.clone(),
    1121              :             _gate_guard: gate_guard,
    1122              :             _resource_units: units,
    1123              :             cancel: reconciler_cancel.clone(),
    1124              :             persistence: persistence.clone(),
    1125              :             compute_notify_failure: false,
    1126              :         };
    1127              : 
    1128              :         let reconcile_seq = self.sequence;
    1129              : 
    1130              :         tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
    1131              :         let must_notify = self.pending_compute_notification;
    1132              :         let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
    1133              :                                                         tenant_id=%reconciler.tenant_shard_id.tenant_id,
    1134              :                                                         shard_id=%reconciler.tenant_shard_id.shard_slug());
    1135              :         metrics::METRICS_REGISTRY
    1136              :             .metrics_group
    1137              :             .storage_controller_reconcile_spawn
    1138              :             .inc();
    1139              :         let result_tx = result_tx.clone();
    1140              :         let join_handle = tokio::task::spawn(
    1141            0 :             async move {
    1142              :                 // Wait for any previous reconcile task to complete before we start
    1143            0 :                 if let Some(old_handle) = old_handle {
    1144            0 :                     old_handle.cancel.cancel();
    1145            0 :                     if let Err(e) = old_handle.handle.await {
    1146              :                         // We can't do much with this other than log it: the task is done, so
    1147              :                         // we may proceed with our work.
    1148            0 :                         tracing::error!("Unexpected join error waiting for reconcile task: {e}");
    1149            0 :                     }
    1150            0 :                 }
    1151              : 
    1152              :                 // Early check for cancellation before doing any work
    1153              :                 // TODO: wrap all remote API operations in cancellation check
    1154              :                 // as well.
    1155            0 :                 if reconciler.cancel.is_cancelled() {
    1156            0 :                     metrics::METRICS_REGISTRY
    1157            0 :                         .metrics_group
    1158            0 :                         .storage_controller_reconcile_complete
    1159            0 :                         .inc(ReconcileCompleteLabelGroup {
    1160            0 :                             status: ReconcileOutcome::Cancel,
    1161            0 :                         });
    1162            0 :                     return;
    1163            0 :                 }
    1164              : 
    1165              :                 // Attempt to make observed state match intent state
    1166            0 :                 let result = reconciler.reconcile().await;
    1167              : 
    1168              :                 // If we know we had a pending compute notification from some previous action, send a notification irrespective
    1169              :                 // of whether the above reconcile() did any work
    1170            0 :                 if result.is_ok() && must_notify {
    1171              :                     // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
    1172            0 :                     reconciler.compute_notify().await.ok();
    1173            0 :                 }
    1174              : 
    1175              :                 // Update result counter
    1176            0 :                 let outcome_label = match &result {
    1177            0 :                     Ok(_) => ReconcileOutcome::Success,
    1178            0 :                     Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
    1179            0 :                     Err(_) => ReconcileOutcome::Error,
    1180              :                 };
    1181              : 
    1182            0 :                 metrics::METRICS_REGISTRY
    1183            0 :                     .metrics_group
    1184            0 :                     .storage_controller_reconcile_complete
    1185            0 :                     .inc(ReconcileCompleteLabelGroup {
    1186            0 :                         status: outcome_label,
    1187            0 :                     });
    1188            0 : 
    1189            0 :                 // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
    1190            0 :                 // try and schedule more work in response to our result.
    1191            0 :                 let result = ReconcileResult {
    1192            0 :                     sequence: reconcile_seq,
    1193            0 :                     result,
    1194            0 :                     tenant_shard_id: reconciler.tenant_shard_id,
    1195            0 :                     generation: reconciler.generation,
    1196            0 :                     observed: reconciler.observed,
    1197            0 :                     pending_compute_notification: reconciler.compute_notify_failure,
    1198            0 :                 };
    1199            0 : 
    1200            0 :                 result_tx
    1201            0 :                     .send(ReconcileResultRequest::ReconcileResult(result))
    1202            0 :                     .ok();
    1203            0 :             }
    1204              :             .instrument(reconciler_span),
    1205              :         );
    1206              : 
    1207              :         self.reconciler = Some(ReconcilerHandle {
    1208              :             sequence: self.sequence,
    1209              :             handle: join_handle,
    1210              :             cancel: reconciler_cancel,
    1211              :         });
    1212              : 
    1213              :         Some(ReconcilerWaiter {
    1214              :             tenant_shard_id: self.tenant_shard_id,
    1215              :             seq_wait: self.waiter.clone(),
    1216              :             error_seq_wait: self.error_waiter.clone(),
    1217              :             error: self.last_error.clone(),
    1218              :             seq: self.sequence,
    1219              :         })
    1220              :     }
    1221              : 
    1222              :     /// Get a waiter for any reconciliation in flight, but do not start reconciliation
    1223              :     /// if it is not already running
    1224            0 :     pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
    1225            0 :         if self.reconciler.is_some() {
    1226            0 :             Some(ReconcilerWaiter {
    1227            0 :                 tenant_shard_id: self.tenant_shard_id,
    1228            0 :                 seq_wait: self.waiter.clone(),
    1229            0 :                 error_seq_wait: self.error_waiter.clone(),
    1230            0 :                 error: self.last_error.clone(),
    1231            0 :                 seq: self.sequence,
    1232            0 :             })
    1233              :         } else {
    1234            0 :             None
    1235              :         }
    1236            0 :     }
    1237              : 
    1238              :     /// Called when a ReconcileResult has been emitted and the service is updating
    1239              :     /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
    1240              :     /// the handle to indicate there is no longer a reconciliation in progress.
    1241            0 :     pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
    1242            0 :         if let Some(reconcile_handle) = &self.reconciler {
    1243            0 :             if reconcile_handle.sequence <= sequence {
    1244            0 :                 self.reconciler = None;
    1245            0 :             }
    1246            0 :         }
    1247            0 :     }
    1248              : 
    1249              :     /// If we had any state at all referring to this node ID, drop it.  Does not
    1250              :     /// attempt to reschedule.
    1251              :     ///
    1252              :     /// Returns true if we modified the node's intent state.
    1253            0 :     pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
    1254            0 :         let mut intent_modified = false;
    1255            0 : 
    1256            0 :         // Drop if this node was our attached intent
    1257            0 :         if self.intent.attached == Some(node_id) {
    1258            0 :             self.intent.attached = None;
    1259            0 :             intent_modified = true;
    1260            0 :         }
    1261              : 
    1262              :         // Drop from the list of secondaries, and check if we modified it
    1263            0 :         let had_secondaries = self.intent.secondary.len();
    1264            0 :         self.intent.secondary.retain(|n| n != &node_id);
    1265            0 :         intent_modified |= self.intent.secondary.len() != had_secondaries;
    1266            0 : 
    1267            0 :         debug_assert!(!self.intent.all_pageservers().contains(&node_id));
    1268              : 
    1269            0 :         intent_modified
    1270            0 :     }
    1271              : 
    1272            0 :     pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
    1273            0 :         self.scheduling_policy = p;
    1274            0 :     }
    1275              : 
    1276            0 :     pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
    1277            0 :         &self.scheduling_policy
    1278            0 :     }
    1279              : 
    1280            0 :     pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
    1281            0 :         // Ordering: always set last_error before advancing sequence, so that sequence
    1282            0 :         // waiters are guaranteed to see a Some value when they see an error.
    1283            0 :         *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
    1284            0 :         self.error_waiter.advance(sequence);
    1285            0 :     }
    1286              : 
    1287            0 :     pub(crate) fn from_persistent(
    1288            0 :         tsp: TenantShardPersistence,
    1289            0 :         intent: IntentState,
    1290            0 :     ) -> anyhow::Result<Self> {
    1291            0 :         let tenant_shard_id = tsp.get_tenant_shard_id()?;
    1292            0 :         let shard_identity = tsp.get_shard_identity()?;
    1293              : 
    1294            0 :         Ok(Self {
    1295            0 :             tenant_shard_id,
    1296            0 :             shard: shard_identity,
    1297            0 :             sequence: Sequence::initial(),
    1298            0 :             generation: tsp.generation.map(|g| Generation::new(g as u32)),
    1299            0 :             policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
    1300            0 :             intent,
    1301            0 :             observed: ObservedState::new(),
    1302            0 :             config: serde_json::from_str(&tsp.config).unwrap(),
    1303            0 :             reconciler: None,
    1304            0 :             splitting: tsp.splitting,
    1305            0 :             waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1306            0 :             error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1307            0 :             last_error: Arc::default(),
    1308            0 :             pending_compute_notification: false,
    1309            0 :             delayed_reconcile: false,
    1310            0 :             scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
    1311            0 :             preferred_az_id: tsp.preferred_az_id,
    1312            0 :         })
    1313            0 :     }
    1314              : 
    1315            0 :     pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
    1316            0 :         TenantShardPersistence {
    1317            0 :             tenant_id: self.tenant_shard_id.tenant_id.to_string(),
    1318            0 :             shard_number: self.tenant_shard_id.shard_number.0 as i32,
    1319            0 :             shard_count: self.tenant_shard_id.shard_count.literal() as i32,
    1320            0 :             shard_stripe_size: self.shard.stripe_size.0 as i32,
    1321            0 :             generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
    1322            0 :             generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
    1323            0 :             placement_policy: serde_json::to_string(&self.policy).unwrap(),
    1324            0 :             config: serde_json::to_string(&self.config).unwrap(),
    1325            0 :             splitting: SplitState::default(),
    1326            0 :             scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
    1327            0 :             preferred_az_id: self.preferred_az_id.clone(),
    1328            0 :         }
    1329            0 :     }
    1330              : 
    1331            0 :     pub(crate) fn preferred_az(&self) -> Option<&str> {
    1332            0 :         self.preferred_az_id.as_deref()
    1333            0 :     }
    1334              : 
    1335            0 :     pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
    1336            0 :         self.preferred_az_id = Some(preferred_az_id);
    1337            0 :     }
    1338              : }
    1339              : 
    1340              : #[cfg(test)]
    1341              : pub(crate) mod tests {
    1342              :     use std::{cell::RefCell, rc::Rc};
    1343              : 
    1344              :     use pageserver_api::{
    1345              :         controller_api::NodeAvailability,
    1346              :         shard::{ShardCount, ShardNumber},
    1347              :     };
    1348              :     use utils::id::TenantId;
    1349              : 
    1350              :     use crate::scheduler::test_utils::make_test_nodes;
    1351              : 
    1352              :     use super::*;
    1353              : 
    1354            7 :     fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
    1355            7 :         let tenant_id = TenantId::generate();
    1356            7 :         let shard_number = ShardNumber(0);
    1357            7 :         let shard_count = ShardCount::new(1);
    1358            7 : 
    1359            7 :         let tenant_shard_id = TenantShardId {
    1360            7 :             tenant_id,
    1361            7 :             shard_number,
    1362            7 :             shard_count,
    1363            7 :         };
    1364            7 :         TenantShard::new(
    1365            7 :             tenant_shard_id,
    1366            7 :             ShardIdentity::new(
    1367            7 :                 shard_number,
    1368            7 :                 shard_count,
    1369            7 :                 pageserver_api::shard::ShardStripeSize(32768),
    1370            7 :             )
    1371            7 :             .unwrap(),
    1372            7 :             policy,
    1373            7 :         )
    1374            7 :     }
    1375              : 
    1376            3 :     fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
    1377            3 :         let tenant_id = TenantId::generate();
    1378            3 : 
    1379            3 :         (0..shard_count.count())
    1380           12 :             .map(|i| {
    1381           12 :                 let shard_number = ShardNumber(i);
    1382           12 : 
    1383           12 :                 let tenant_shard_id = TenantShardId {
    1384           12 :                     tenant_id,
    1385           12 :                     shard_number,
    1386           12 :                     shard_count,
    1387           12 :                 };
    1388           12 :                 TenantShard::new(
    1389           12 :                     tenant_shard_id,
    1390           12 :                     ShardIdentity::new(
    1391           12 :                         shard_number,
    1392           12 :                         shard_count,
    1393           12 :                         pageserver_api::shard::ShardStripeSize(32768),
    1394           12 :                     )
    1395           12 :                     .unwrap(),
    1396           12 :                     policy.clone(),
    1397           12 :                 )
    1398           12 :             })
    1399            3 :             .collect()
    1400            3 :     }
    1401              : 
    1402              :     /// Test the scheduling behaviors used when a tenant configured for HA is subject
    1403              :     /// to nodes being marked offline.
    1404              :     #[test]
    1405            1 :     fn tenant_ha_scheduling() -> anyhow::Result<()> {
    1406            1 :         // Start with three nodes.  Our tenant will only use two.  The third one is
    1407            1 :         // expected to remain unused.
    1408            1 :         let mut nodes = make_test_nodes(3);
    1409            1 : 
    1410            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1411            1 :         let mut context = ScheduleContext::default();
    1412            1 : 
    1413            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1414            1 :         tenant_shard
    1415            1 :             .schedule(&mut scheduler, &mut context)
    1416            1 :             .expect("we have enough nodes, scheduling should work");
    1417            1 : 
    1418            1 :         // Expect to initially be schedule on to different nodes
    1419            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    1420            1 :         assert!(tenant_shard.intent.attached.is_some());
    1421              : 
    1422            1 :         let attached_node_id = tenant_shard.intent.attached.unwrap();
    1423            1 :         let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
    1424            1 :         assert_ne!(attached_node_id, secondary_node_id);
    1425              : 
    1426              :         // Notifying the attached node is offline should demote it to a secondary
    1427            1 :         let changed = tenant_shard
    1428            1 :             .intent
    1429            1 :             .demote_attached(&mut scheduler, attached_node_id);
    1430            1 :         assert!(changed);
    1431            1 :         assert!(tenant_shard.intent.attached.is_none());
    1432            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 2);
    1433              : 
    1434              :         // Update the scheduler state to indicate the node is offline
    1435            1 :         nodes
    1436            1 :             .get_mut(&attached_node_id)
    1437            1 :             .unwrap()
    1438            1 :             .set_availability(NodeAvailability::Offline);
    1439            1 :         scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
    1440            1 : 
    1441            1 :         // Scheduling the node should promote the still-available secondary node to attached
    1442            1 :         tenant_shard
    1443            1 :             .schedule(&mut scheduler, &mut context)
    1444            1 :             .expect("active nodes are available");
    1445            1 :         assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
    1446              : 
    1447              :         // The original attached node should have been retained as a secondary
    1448            1 :         assert_eq!(
    1449            1 :             *tenant_shard.intent.secondary.iter().last().unwrap(),
    1450            1 :             attached_node_id
    1451            1 :         );
    1452              : 
    1453            1 :         tenant_shard.intent.clear(&mut scheduler);
    1454            1 : 
    1455            1 :         Ok(())
    1456            1 :     }
    1457              : 
    1458              :     #[test]
    1459            1 :     fn intent_from_observed() -> anyhow::Result<()> {
    1460            1 :         let nodes = make_test_nodes(3);
    1461            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1462            1 : 
    1463            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1464            1 : 
    1465            1 :         tenant_shard.observed.locations.insert(
    1466            1 :             NodeId(3),
    1467            1 :             ObservedStateLocation {
    1468            1 :                 conf: Some(LocationConfig {
    1469            1 :                     mode: LocationConfigMode::AttachedMulti,
    1470            1 :                     generation: Some(2),
    1471            1 :                     secondary_conf: None,
    1472            1 :                     shard_number: tenant_shard.shard.number.0,
    1473            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1474            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1475            1 :                     tenant_conf: TenantConfig::default(),
    1476            1 :                 }),
    1477            1 :             },
    1478            1 :         );
    1479            1 : 
    1480            1 :         tenant_shard.observed.locations.insert(
    1481            1 :             NodeId(2),
    1482            1 :             ObservedStateLocation {
    1483            1 :                 conf: Some(LocationConfig {
    1484            1 :                     mode: LocationConfigMode::AttachedStale,
    1485            1 :                     generation: Some(1),
    1486            1 :                     secondary_conf: None,
    1487            1 :                     shard_number: tenant_shard.shard.number.0,
    1488            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1489            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1490            1 :                     tenant_conf: TenantConfig::default(),
    1491            1 :                 }),
    1492            1 :             },
    1493            1 :         );
    1494            1 : 
    1495            1 :         tenant_shard.intent_from_observed(&mut scheduler);
    1496            1 : 
    1497            1 :         // The highest generationed attached location gets used as attached
    1498            1 :         assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
    1499              :         // Other locations get used as secondary
    1500            1 :         assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
    1501              : 
    1502            1 :         scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
    1503              : 
    1504            1 :         tenant_shard.intent.clear(&mut scheduler);
    1505            1 :         Ok(())
    1506            1 :     }
    1507              : 
    1508              :     #[test]
    1509            1 :     fn scheduling_mode() -> anyhow::Result<()> {
    1510            1 :         let nodes = make_test_nodes(3);
    1511            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1512            1 : 
    1513            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1514            1 : 
    1515            1 :         // In pause mode, schedule() shouldn't do anything
    1516            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
    1517            1 :         assert!(tenant_shard
    1518            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1519            1 :             .is_ok());
    1520            1 :         assert!(tenant_shard.intent.all_pageservers().is_empty());
    1521              : 
    1522              :         // In active mode, schedule() works
    1523            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
    1524            1 :         assert!(tenant_shard
    1525            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1526            1 :             .is_ok());
    1527            1 :         assert!(!tenant_shard.intent.all_pageservers().is_empty());
    1528              : 
    1529            1 :         tenant_shard.intent.clear(&mut scheduler);
    1530            1 :         Ok(())
    1531            1 :     }
    1532              : 
    1533              :     #[test]
    1534            1 :     fn optimize_attachment() -> anyhow::Result<()> {
    1535            1 :         let nodes = make_test_nodes(3);
    1536            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1537            1 : 
    1538            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1539            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1540            1 : 
    1541            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1542            1 :         // on different nodes.
    1543            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1544            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
    1545            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1546            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1547            1 : 
    1548            1 :         let mut schedule_context = ScheduleContext::default();
    1549            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1550            1 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1551            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1552            1 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1553            1 : 
    1554            1 :         let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
    1555            1 : 
    1556            1 :         // Either shard should recognize that it has the option to switch to a secondary location where there
    1557            1 :         // would be no other shards from the same tenant, and request to do so.
    1558            1 :         assert_eq!(
    1559            1 :             optimization_a,
    1560            1 :             Some(ScheduleOptimization {
    1561            1 :                 sequence: shard_a.sequence,
    1562            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1563            1 :                     old_attached_node_id: NodeId(1),
    1564            1 :                     new_attached_node_id: NodeId(2)
    1565            1 :                 })
    1566            1 :             })
    1567            1 :         );
    1568              : 
    1569              :         // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
    1570              :         // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
    1571              :         // of [`Service::optimize_all`] to avoid trying
    1572              :         // to do optimizations for multiple shards in the same tenant at the same time.  Generating
    1573              :         // both optimizations is just done for test purposes
    1574            1 :         let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
    1575            1 :         assert_eq!(
    1576            1 :             optimization_b,
    1577            1 :             Some(ScheduleOptimization {
    1578            1 :                 sequence: shard_b.sequence,
    1579            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1580            1 :                     old_attached_node_id: NodeId(1),
    1581            1 :                     new_attached_node_id: NodeId(3)
    1582            1 :                 })
    1583            1 :             })
    1584            1 :         );
    1585              : 
    1586              :         // Applying these optimizations should result in the end state proposed
    1587            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1588            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
    1589            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
    1590            1 :         shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
    1591            1 :         assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
    1592            1 :         assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
    1593              : 
    1594            1 :         shard_a.intent.clear(&mut scheduler);
    1595            1 :         shard_b.intent.clear(&mut scheduler);
    1596            1 : 
    1597            1 :         Ok(())
    1598            1 :     }
    1599              : 
    1600              :     #[test]
    1601            1 :     fn optimize_secondary() -> anyhow::Result<()> {
    1602            1 :         let nodes = make_test_nodes(4);
    1603            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1604            1 : 
    1605            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1606            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1607            1 : 
    1608            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1609            1 :         // on different nodes.
    1610            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1611            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    1612            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    1613            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1614            1 : 
    1615            1 :         let mut schedule_context = ScheduleContext::default();
    1616            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1617            1 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1618            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1619            1 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1620            1 : 
    1621            1 :         let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
    1622            1 : 
    1623            1 :         // Since there is a node with no locations available, the node with two locations for the
    1624            1 :         // same tenant should generate an optimization to move one away
    1625            1 :         assert_eq!(
    1626            1 :             optimization_a,
    1627            1 :             Some(ScheduleOptimization {
    1628            1 :                 sequence: shard_a.sequence,
    1629            1 :                 action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1630            1 :                     old_node_id: NodeId(3),
    1631            1 :                     new_node_id: NodeId(4)
    1632            1 :                 })
    1633            1 :             })
    1634            1 :         );
    1635              : 
    1636            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1637            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    1638            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
    1639              : 
    1640            1 :         shard_a.intent.clear(&mut scheduler);
    1641            1 :         shard_b.intent.clear(&mut scheduler);
    1642            1 : 
    1643            1 :         Ok(())
    1644            1 :     }
    1645              : 
    1646              :     // Optimize til quiescent: this emulates what Service::optimize_all does, when
    1647              :     // called repeatedly in the background.
    1648              :     // Returns the applied optimizations
    1649            3 :     fn optimize_til_idle(
    1650            3 :         nodes: &HashMap<NodeId, Node>,
    1651            3 :         scheduler: &mut Scheduler,
    1652            3 :         shards: &mut [TenantShard],
    1653            3 :     ) -> Vec<ScheduleOptimization> {
    1654            3 :         let mut loop_n = 0;
    1655            3 :         let mut optimizations = Vec::default();
    1656              :         loop {
    1657            9 :             let mut schedule_context = ScheduleContext::default();
    1658            9 :             let mut any_changed = false;
    1659              : 
    1660           36 :             for shard in shards.iter() {
    1661           36 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    1662           36 :                 if let Some(attached) = shard.intent.get_attached() {
    1663           36 :                     schedule_context.push_attached(*attached);
    1664           36 :                 }
    1665              :             }
    1666              : 
    1667           21 :             for shard in shards.iter_mut() {
    1668           21 :                 let optimization = shard.optimize_attachment(nodes, &schedule_context);
    1669           21 :                 if let Some(optimization) = optimization {
    1670            2 :                     optimizations.push(optimization.clone());
    1671            2 :                     shard.apply_optimization(scheduler, optimization);
    1672            2 :                     any_changed = true;
    1673            2 :                     break;
    1674           19 :                 }
    1675           19 : 
    1676           19 :                 let optimization = shard.optimize_secondary(scheduler, &schedule_context);
    1677           19 :                 if let Some(optimization) = optimization {
    1678            4 :                     optimizations.push(optimization.clone());
    1679            4 :                     shard.apply_optimization(scheduler, optimization);
    1680            4 :                     any_changed = true;
    1681            4 :                     break;
    1682           15 :                 }
    1683              :             }
    1684              : 
    1685            9 :             if !any_changed {
    1686            3 :                 break;
    1687            6 :             }
    1688            6 : 
    1689            6 :             // Assert no infinite loop
    1690            6 :             loop_n += 1;
    1691            6 :             assert!(loop_n < 1000);
    1692              :         }
    1693              : 
    1694            3 :         optimizations
    1695            3 :     }
    1696              : 
    1697              :     /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
    1698              :     /// that it converges.
    1699              :     #[test]
    1700            1 :     fn optimize_add_nodes() -> anyhow::Result<()> {
    1701            1 :         let nodes = make_test_nodes(4);
    1702            1 : 
    1703            1 :         // Only show the scheduler a couple of nodes
    1704            1 :         let mut scheduler = Scheduler::new([].iter());
    1705            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1706            1 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    1707            1 : 
    1708            1 :         let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
    1709            1 :         let mut schedule_context = ScheduleContext::default();
    1710            5 :         for shard in &mut shards {
    1711            4 :             assert!(shard
    1712            4 :                 .schedule(&mut scheduler, &mut schedule_context)
    1713            4 :                 .is_ok());
    1714              :         }
    1715              : 
    1716              :         // We should see equal number of locations on the two nodes.
    1717            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
    1718            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
    1719              : 
    1720            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
    1721            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
    1722              : 
    1723              :         // Add another two nodes: we should see the shards spread out when their optimize
    1724              :         // methods are called
    1725            1 :         scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
    1726            1 :         scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
    1727            1 :         optimize_til_idle(&nodes, &mut scheduler, &mut shards);
    1728            1 : 
    1729            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
    1730            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
    1731              : 
    1732            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    1733            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
    1734              : 
    1735            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
    1736            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
    1737              : 
    1738            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
    1739            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
    1740              : 
    1741            4 :         for shard in shards.iter_mut() {
    1742            4 :             shard.intent.clear(&mut scheduler);
    1743            4 :         }
    1744              : 
    1745            1 :         Ok(())
    1746            1 :     }
    1747              : 
    1748              :     /// Test that initial shard scheduling is optimal. By optimal we mean
    1749              :     /// that the optimizer cannot find a way to improve it.
    1750              :     ///
    1751              :     /// This test is an example of the scheduling issue described in
    1752              :     /// https://github.com/neondatabase/neon/issues/8969
    1753              :     #[test]
    1754            1 :     fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
    1755              :         use itertools::Itertools;
    1756              : 
    1757            1 :         let nodes = make_test_nodes(2);
    1758            1 : 
    1759            1 :         let mut scheduler = Scheduler::new([].iter());
    1760            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1761            1 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    1762            1 : 
    1763            1 :         let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
    1764            1 :         let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
    1765            1 : 
    1766            1 :         let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
    1767            1 :         let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
    1768            1 : 
    1769            4 :         let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
    1770            4 :         let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
    1771            1 : 
    1772            1 :         let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
    1773              : 
    1774            9 :         for (shard, context) in schedule_order {
    1775            8 :             let context = &mut *context.borrow_mut();
    1776            8 :             shard.schedule(&mut scheduler, context).unwrap();
    1777            8 :         }
    1778              : 
    1779            1 :         let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
    1780            1 :         assert_eq!(applied_to_a, vec![]);
    1781              : 
    1782            1 :         let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
    1783            1 :         assert_eq!(applied_to_b, vec![]);
    1784              : 
    1785            8 :         for shard in a.iter_mut().chain(b.iter_mut()) {
    1786            8 :             shard.intent.clear(&mut scheduler);
    1787            8 :         }
    1788              : 
    1789            1 :         Ok(())
    1790            1 :     }
    1791              : }
        

Generated by: LCOV version 2.1-beta