LCOV - code coverage report
Current view: top level - storage_controller/src - tenant_shard.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 69.1 % 1635 1130
Test Date: 2025-07-26 17:20:05 Functions: 54.9 % 122 67

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use futures::future::{self, Either};
       6              : use itertools::Itertools;
       7              : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
       8              : use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
       9              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      10              : use serde::{Deserialize, Serialize};
      11              : use tokio::task::JoinHandle;
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{Instrument, instrument};
      14              : use utils::generation::Generation;
      15              : use utils::id::NodeId;
      16              : use utils::seqwait::{SeqWait, SeqWaitError};
      17              : use utils::shard::ShardCount;
      18              : use utils::sync::gate::GateGuard;
      19              : 
      20              : use crate::compute_hook::ComputeHook;
      21              : use crate::metrics::{
      22              :     self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
      23              : };
      24              : use crate::node::Node;
      25              : use crate::persistence::split_state::SplitState;
      26              : use crate::persistence::{Persistence, TenantShardPersistence};
      27              : use crate::reconciler::{
      28              :     ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
      29              :     attached_location_conf, secondary_location_conf,
      30              : };
      31              : use crate::scheduler::{
      32              :     AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
      33              :     RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
      34              : };
      35              : use crate::service::ReconcileResultRequest;
      36              : use crate::timeline_import::TimelineImportState;
      37              : use crate::{Sequence, service};
      38              : 
      39              : /// Serialization helper
      40            0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
      41            0 : where
      42            0 :     S: serde::ser::Serializer,
      43            0 :     T: std::fmt::Display,
      44              : {
      45            0 :     serializer.collect_str(
      46            0 :         &v.lock()
      47            0 :             .unwrap()
      48            0 :             .as_ref()
      49            0 :             .map(|e| format!("{e}"))
      50            0 :             .unwrap_or("".to_string()),
      51              :     )
      52            0 : }
      53              : 
      54              : /// In-memory state for a particular tenant shard.
      55              : ///
      56              : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
      57              : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
      58              : #[derive(Serialize)]
      59              : pub(crate) struct TenantShard {
      60              :     pub(crate) tenant_shard_id: TenantShardId,
      61              : 
      62              :     pub(crate) shard: ShardIdentity,
      63              : 
      64              :     // Runtime only: sequence used to coordinate when updating this object while
      65              :     // with background reconcilers may be running.  A reconciler runs to a particular
      66              :     // sequence.
      67              :     pub(crate) sequence: Sequence,
      68              : 
      69              :     // Latest generation number: next time we attach, increment this
      70              :     // and use the incremented number when attaching.
      71              :     //
      72              :     // None represents an incompletely onboarded tenant via the [`Service::location_config`]
      73              :     // API, where this tenant may only run in PlacementPolicy::Secondary.
      74              :     pub(crate) generation: Option<Generation>,
      75              : 
      76              :     // High level description of how the tenant should be set up.  Provided
      77              :     // externally.
      78              :     pub(crate) policy: PlacementPolicy,
      79              : 
      80              :     // Low level description of exactly which pageservers should fulfil
      81              :     // which role.  Generated by `Self::schedule`.
      82              :     pub(crate) intent: IntentState,
      83              : 
      84              :     // Low level description of how the tenant is configured on pageservers:
      85              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      86              :     // with `Self::reconcile`.
      87              :     pub(crate) observed: ObservedState,
      88              : 
      89              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
      90              :     // for all shards in a tenant.
      91              :     pub(crate) config: TenantConfig,
      92              : 
      93              :     /// If a reconcile task is currently in flight, it may be joined here (it is
      94              :     /// only safe to join if either the result has been received or the reconciler's
      95              :     /// cancellation token has been fired)
      96              :     #[serde(skip)]
      97              :     pub(crate) reconciler: Option<ReconcilerHandle>,
      98              : 
      99              :     /// If a tenant is being split, then all shards with that TenantId will have a
     100              :     /// SplitState set, this acts as a guard against other operations such as background
     101              :     /// reconciliation, and timeline creation.
     102              :     pub(crate) splitting: SplitState,
     103              : 
     104              :     /// Flag indicating whether the tenant has an in-progress timeline import.
     105              :     /// Used to disallow shard splits while an import is in progress.
     106              :     pub(crate) importing: TimelineImportState,
     107              : 
     108              :     /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
     109              :     /// is set. This flag is cleared when the tenant is popped off the delay queue.
     110              :     pub(crate) delayed_reconcile: bool,
     111              : 
     112              :     /// Optionally wait for reconciliation to complete up to a particular
     113              :     /// sequence number.
     114              :     #[serde(skip)]
     115              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     116              : 
     117              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
     118              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
     119              :     /// and callers should stop waiting for `waiter` and propagate the error.
     120              :     #[serde(skip)]
     121              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     122              : 
     123              :     /// The most recent error from a reconcile on this tenant.  This is a nested Arc
     124              :     /// because:
     125              :     ///  - ReconcileWaiters need to Arc-clone the overall object to read it later
     126              :     ///  - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
     127              :     ///    many waiters for one shard, and the underlying error types are not Clone.
     128              :     ///
     129              :     /// TODO: generalize to an array of recent events
     130              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
     131              :     #[serde(serialize_with = "read_last_error")]
     132              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     133              : 
     134              :     /// Amount of consecutive [`crate::service::Service::reconcile_all`] iterations that have been
     135              :     /// scheduled a reconciliation for this shard.
     136              :     ///
     137              :     /// If this reaches `MAX_CONSECUTIVE_RECONCILES`, the shard is considered "stuck" and will be
     138              :     /// ignored when deciding whether optimizations can run. This includes both successful and failed
     139              :     /// reconciliations.
     140              :     ///
     141              :     /// Incremented in [`crate::service::Service::process_result`], and reset to 0 when
     142              :     /// [`crate::service::Service::reconcile_all`] determines no reconciliation is needed for this shard.
     143              :     pub(crate) consecutive_reconciles_count: usize,
     144              : 
     145              :     /// If we have a pending compute notification that for some reason we weren't able to send,
     146              :     /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
     147              :     /// and trigger a Reconciler run.  This is the mechanism by which compute notifications are included in the scope
     148              :     /// of state that we publish externally in an eventually consistent way.
     149              :     pub(crate) pending_compute_notification: bool,
     150              : 
     151              :     /// To do a graceful migration, set this field to the destination pageserver, and optimization
     152              :     /// functions will consider this node the best location and react appropriately.
     153              :     preferred_node: Option<NodeId>,
     154              : 
     155              :     // Support/debug tool: if something is going wrong or flapping with scheduling, this may
     156              :     // be set to a non-active state to avoid making changes while the issue is fixed.
     157              :     scheduling_policy: ShardSchedulingPolicy,
     158              : }
     159              : 
     160              : #[derive(Clone, Debug, Serialize)]
     161              : pub(crate) struct IntentState {
     162              :     attached: Option<NodeId>,
     163              :     secondary: Vec<NodeId>,
     164              : 
     165              :     // We should attempt to schedule this shard in the provided AZ to
     166              :     // decrease chances of cross-AZ compute.
     167              :     preferred_az_id: Option<AvailabilityZone>,
     168              : }
     169              : 
     170              : impl IntentState {
     171        12860 :     pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
     172        12860 :         Self {
     173        12860 :             attached: None,
     174        12860 :             secondary: vec![],
     175        12860 :             preferred_az_id,
     176        12860 :         }
     177        12860 :     }
     178            0 :     pub(crate) fn single(
     179            0 :         scheduler: &mut Scheduler,
     180            0 :         node_id: Option<NodeId>,
     181            0 :         preferred_az_id: Option<AvailabilityZone>,
     182            0 :     ) -> Self {
     183            0 :         if let Some(node_id) = node_id {
     184            0 :             scheduler.update_node_ref_counts(
     185            0 :                 node_id,
     186            0 :                 preferred_az_id.as_ref(),
     187            0 :                 RefCountUpdate::Attach,
     188            0 :             );
     189            0 :         }
     190            0 :         Self {
     191            0 :             attached: node_id,
     192            0 :             secondary: vec![],
     193            0 :             preferred_az_id,
     194            0 :         }
     195            0 :     }
     196              : 
     197        12859 :     pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
     198        12859 :         if self.attached != new_attached {
     199        12859 :             if let Some(old_attached) = self.attached.take() {
     200            0 :                 scheduler.update_node_ref_counts(
     201            0 :                     old_attached,
     202            0 :                     self.preferred_az_id.as_ref(),
     203            0 :                     RefCountUpdate::Detach,
     204            0 :                 );
     205        12859 :             }
     206        12859 :             if let Some(new_attached) = &new_attached {
     207        12859 :                 scheduler.update_node_ref_counts(
     208        12859 :                     *new_attached,
     209        12859 :                     self.preferred_az_id.as_ref(),
     210        12859 :                     RefCountUpdate::Attach,
     211        12859 :                 );
     212        12859 :             }
     213        12859 :             self.attached = new_attached;
     214            0 :         }
     215              : 
     216        12859 :         if let Some(new_attached) = &new_attached {
     217        12859 :             assert!(!self.secondary.contains(new_attached));
     218            0 :         }
     219        12859 :     }
     220              : 
     221              :     /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
     222              :     /// secondary to attached while maintaining the scheduler's reference counts.
     223            7 :     pub(crate) fn promote_attached(
     224            7 :         &mut self,
     225            7 :         scheduler: &mut Scheduler,
     226            7 :         promote_secondary: NodeId,
     227            7 :     ) {
     228              :         // If we call this with a node that isn't in secondary, it would cause incorrect
     229              :         // scheduler reference counting, since we assume the node is already referenced as a secondary.
     230            7 :         debug_assert!(self.secondary.contains(&promote_secondary));
     231              : 
     232           16 :         self.secondary.retain(|n| n != &promote_secondary);
     233              : 
     234            7 :         let demoted = self.attached;
     235            7 :         self.attached = Some(promote_secondary);
     236              : 
     237            7 :         scheduler.update_node_ref_counts(
     238            7 :             promote_secondary,
     239            7 :             self.preferred_az_id.as_ref(),
     240            7 :             RefCountUpdate::PromoteSecondary,
     241              :         );
     242            7 :         if let Some(demoted) = demoted {
     243            0 :             scheduler.update_node_ref_counts(
     244            0 :                 demoted,
     245            0 :                 self.preferred_az_id.as_ref(),
     246            0 :                 RefCountUpdate::DemoteAttached,
     247            0 :             );
     248            7 :         }
     249            7 :     }
     250              : 
     251        12848 :     pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
     252              :         // Every assertion here should probably have a corresponding check in
     253              :         // `validate_optimization` unless it is an invariant that should never be violated. Note
     254              :         // that the lock is not held between planning optimizations and applying them so you have to
     255              :         // assume any valid state transition of the intent state may have occurred
     256        12848 :         assert!(!self.secondary.contains(&new_secondary));
     257        12848 :         assert!(self.attached != Some(new_secondary));
     258        12848 :         scheduler.update_node_ref_counts(
     259        12848 :             new_secondary,
     260        12848 :             self.preferred_az_id.as_ref(),
     261        12848 :             RefCountUpdate::AddSecondary,
     262              :         );
     263        12848 :         self.secondary.push(new_secondary);
     264        12848 :     }
     265              : 
     266              :     /// It is legal to call this with a node that is not currently a secondary: that is a no-op
     267            9 :     pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
     268           13 :         let index = self.secondary.iter().position(|n| *n == node_id);
     269            9 :         if let Some(index) = index {
     270            9 :             scheduler.update_node_ref_counts(
     271            9 :                 node_id,
     272            9 :                 self.preferred_az_id.as_ref(),
     273            9 :                 RefCountUpdate::RemoveSecondary,
     274            9 :             );
     275            9 :             self.secondary.remove(index);
     276            9 :         }
     277            9 :     }
     278              : 
     279        12859 :     pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
     280        12859 :         for secondary in self.secondary.drain(..) {
     281        12840 :             scheduler.update_node_ref_counts(
     282        12840 :                 secondary,
     283        12840 :                 self.preferred_az_id.as_ref(),
     284        12840 :                 RefCountUpdate::RemoveSecondary,
     285        12840 :             );
     286        12840 :         }
     287        12859 :     }
     288              : 
     289              :     /// Remove the last secondary node from the list of secondaries
     290            0 :     pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
     291            0 :         if let Some(node_id) = self.secondary.pop() {
     292            0 :             scheduler.update_node_ref_counts(
     293            0 :                 node_id,
     294            0 :                 self.preferred_az_id.as_ref(),
     295            0 :                 RefCountUpdate::RemoveSecondary,
     296            0 :             );
     297            0 :         }
     298            0 :     }
     299              : 
     300        12859 :     pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
     301        12859 :         if let Some(old_attached) = self.attached.take() {
     302        12857 :             scheduler.update_node_ref_counts(
     303        12857 :                 old_attached,
     304        12857 :                 self.preferred_az_id.as_ref(),
     305        12857 :                 RefCountUpdate::Detach,
     306        12857 :             );
     307        12857 :         }
     308              : 
     309        12859 :         self.clear_secondary(scheduler);
     310        12859 :     }
     311              : 
     312        12901 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     313        12901 :         let mut result = Vec::new();
     314        12901 :         if let Some(p) = self.attached {
     315        12896 :             result.push(p)
     316            5 :         }
     317              : 
     318        12901 :         result.extend(self.secondary.iter().copied());
     319              : 
     320        12901 :         result
     321        12901 :     }
     322              : 
     323        12624 :     pub(crate) fn get_attached(&self) -> &Option<NodeId> {
     324        12624 :         &self.attached
     325        12624 :     }
     326              : 
     327        12706 :     pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
     328        12706 :         &self.secondary
     329        12706 :     }
     330              : 
     331              :     /// If the node is in use as the attached location, demote it into
     332              :     /// the list of secondary locations.  This is used when a node goes offline,
     333              :     /// and we want to use a different node for attachment, but not permanently
     334              :     /// forget the location on the offline node.
     335              :     ///
     336              :     /// Returns true if a change was made
     337            8 :     pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
     338            8 :         if self.attached == Some(node_id) {
     339            8 :             self.attached = None;
     340            8 :             self.secondary.push(node_id);
     341            8 :             scheduler.update_node_ref_counts(
     342            8 :                 node_id,
     343            8 :                 self.preferred_az_id.as_ref(),
     344            8 :                 RefCountUpdate::DemoteAttached,
     345              :             );
     346            8 :             true
     347              :         } else {
     348            0 :             false
     349              :         }
     350            8 :     }
     351              : 
     352            2 :     pub(crate) fn set_preferred_az(
     353            2 :         &mut self,
     354            2 :         scheduler: &mut Scheduler,
     355            2 :         preferred_az: Option<AvailabilityZone>,
     356            2 :     ) {
     357            2 :         let new_az = preferred_az.as_ref();
     358            2 :         let old_az = self.preferred_az_id.as_ref();
     359              : 
     360            2 :         if old_az != new_az {
     361            2 :             if let Some(node_id) = self.attached {
     362            2 :                 scheduler.update_node_ref_counts(
     363            2 :                     node_id,
     364            2 :                     new_az,
     365            2 :                     RefCountUpdate::ChangePreferredAzFrom(old_az),
     366            2 :                 );
     367            2 :             }
     368            4 :             for node_id in &self.secondary {
     369            2 :                 scheduler.update_node_ref_counts(
     370            2 :                     *node_id,
     371            2 :                     new_az,
     372            2 :                     RefCountUpdate::ChangePreferredAzFrom(old_az),
     373            2 :                 );
     374            2 :             }
     375            2 :             self.preferred_az_id = preferred_az;
     376            0 :         }
     377            2 :     }
     378              : 
     379        12508 :     pub(crate) fn get_preferred_az(&self) -> Option<&AvailabilityZone> {
     380        12508 :         self.preferred_az_id.as_ref()
     381        12508 :     }
     382              : }
     383              : 
     384              : impl Drop for IntentState {
     385        12860 :     fn drop(&mut self) {
     386              :         // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
     387              :         // We do not check this while panicking, to avoid polluting unit test failures or
     388              :         // other assertions with this assertion's output.  It's still wrong to leak these,
     389              :         // but if we already have a panic then we don't need to independently flag this case.
     390        12860 :         if !(std::thread::panicking()) {
     391        12860 :             debug_assert!(self.attached.is_none() && self.secondary.is_empty());
     392            0 :         }
     393        12859 :     }
     394              : }
     395              : 
     396            0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
     397              : pub(crate) struct ObservedState {
     398              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
     399              : }
     400              : 
     401              : /// Our latest knowledge of how this tenant is configured in the outside world.
     402              : ///
     403              : /// Meaning:
     404              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
     405              : ///       node for this shard.
     406              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
     407              : ///       what it is (e.g. we failed partway through configuring it)
     408              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     409              : ///       and that configuration will still be present unless something external interfered.
     410            0 : #[derive(Clone, Serialize, Deserialize, Debug)]
     411              : pub(crate) struct ObservedStateLocation {
     412              :     /// If None, it means we do not know the status of this shard's location on this node, but
     413              :     /// we know that we might have some state on this node.
     414              :     pub(crate) conf: Option<LocationConfig>,
     415              : }
     416              : 
     417              : pub(crate) struct ReconcilerWaiter {
     418              :     // For observability purposes, remember the ID of the shard we're
     419              :     // waiting for.
     420              :     pub(crate) tenant_shard_id: TenantShardId,
     421              : 
     422              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     423              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     424              :     error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     425              :     seq: Sequence,
     426              : }
     427              : 
     428              : pub(crate) enum ReconcilerStatus {
     429              :     Done,
     430              :     Failed,
     431              :     InProgress,
     432              : }
     433              : 
     434              : #[derive(thiserror::Error, Debug)]
     435              : pub(crate) enum ReconcileWaitError {
     436              :     #[error("Timeout waiting for shard {0}")]
     437              :     Timeout(TenantShardId),
     438              :     #[error("shutting down")]
     439              :     Shutdown,
     440              :     #[error("Reconcile error on shard {0}: {1}")]
     441              :     Failed(TenantShardId, Arc<ReconcileError>),
     442              : }
     443              : 
     444              : #[derive(Eq, PartialEq, Debug, Clone)]
     445              : pub(crate) struct ReplaceSecondary {
     446              :     old_node_id: NodeId,
     447              :     new_node_id: NodeId,
     448              : }
     449              : 
     450              : #[derive(Eq, PartialEq, Debug, Clone)]
     451              : pub(crate) struct MigrateAttachment {
     452              :     pub(crate) old_attached_node_id: NodeId,
     453              :     pub(crate) new_attached_node_id: NodeId,
     454              : }
     455              : 
     456              : #[derive(Eq, PartialEq, Debug, Clone)]
     457              : pub(crate) enum ScheduleOptimizationAction {
     458              :     // Replace one of our secondary locations with a different node
     459              :     ReplaceSecondary(ReplaceSecondary),
     460              :     // Migrate attachment to an existing secondary location
     461              :     MigrateAttachment(MigrateAttachment),
     462              :     // Create a secondary location, with the intent of later migrating to it
     463              :     CreateSecondary(NodeId),
     464              :     // Remove a secondary location that we previously created to facilitate a migration
     465              :     RemoveSecondary(NodeId),
     466              : }
     467              : 
     468              : #[derive(Eq, PartialEq, Debug, Clone)]
     469              : pub(crate) struct ScheduleOptimization {
     470              :     // What was the reconcile sequence when we generated this optimization?  The optimization
     471              :     // should only be applied if the shard's sequence is still at this value, in case other changes
     472              :     // happened between planning the optimization and applying it.
     473              :     sequence: Sequence,
     474              : 
     475              :     pub(crate) action: ScheduleOptimizationAction,
     476              : }
     477              : 
     478              : impl ReconcilerWaiter {
     479            0 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     480            0 :         tokio::select! {
     481            0 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     482            0 :                 result.map_err(|e| match e {
     483            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     484            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     485            0 :                 })?;
     486              :             },
     487            0 :             result = self.error_seq_wait.wait_for(self.seq) => {
     488            0 :                 result.map_err(|e| match e {
     489            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     490            0 :                     SeqWaitError::Timeout => unreachable!()
     491            0 :                 })?;
     492              : 
     493            0 :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
     494            0 :                     self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
     495              :             }
     496              :         }
     497              : 
     498            0 :         Ok(())
     499            0 :     }
     500              : 
     501            0 :     pub(crate) fn get_status(&self) -> ReconcilerStatus {
     502            0 :         if self.seq_wait.would_wait_for(self.seq).is_ok() {
     503            0 :             ReconcilerStatus::Done
     504            0 :         } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
     505            0 :             ReconcilerStatus::Failed
     506              :         } else {
     507            0 :             ReconcilerStatus::InProgress
     508              :         }
     509            0 :     }
     510              : }
     511              : 
     512              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     513              : /// information to optionally cancel & await it later.
     514              : pub(crate) struct ReconcilerHandle {
     515              :     sequence: Sequence,
     516              :     handle: JoinHandle<()>,
     517              :     cancel: CancellationToken,
     518              : }
     519              : 
     520              : pub(crate) enum ReconcileNeeded {
     521              :     /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
     522              :     /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
     523              :     No,
     524              :     /// shard has a reconciler running, and its intent hasn't changed since that one was
     525              :     /// spawned: wait for the existing reconciler rather than spawning a new one.
     526              :     WaitExisting(ReconcilerWaiter),
     527              :     /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
     528              :     Yes(ReconcileReason),
     529              : }
     530              : 
     531              : #[derive(Debug)]
     532              : pub(crate) enum ReconcileReason {
     533              :     ActiveNodesDirty,
     534              :     UnknownLocation,
     535              :     PendingComputeNotification,
     536              : }
     537              : 
     538              : /// Pending modification to the observed state of a tenant shard.
     539              : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
     540              : pub(crate) enum ObservedStateDelta {
     541              :     Upsert(Box<(NodeId, ObservedStateLocation)>),
     542              :     Delete(NodeId),
     543              : }
     544              : 
     545              : impl ObservedStateDelta {
     546            0 :     pub(crate) fn node_id(&self) -> &NodeId {
     547            0 :         match self {
     548            0 :             Self::Upsert(up) => &up.0,
     549            0 :             Self::Delete(nid) => nid,
     550              :         }
     551            0 :     }
     552              : }
     553              : 
     554              : /// When a reconcile task completes, it sends this result object
     555              : /// to be applied to the primary TenantShard.
     556              : pub(crate) struct ReconcileResult {
     557              :     pub(crate) sequence: Sequence,
     558              :     /// On errors, `observed` should be treated as an incompleted description
     559              :     /// of state (i.e. any nodes present in the result should override nodes
     560              :     /// present in the parent tenant state, but any unmentioned nodes should
     561              :     /// not be removed from parent tenant state)
     562              :     pub(crate) result: Result<(), ReconcileError>,
     563              : 
     564              :     pub(crate) tenant_shard_id: TenantShardId,
     565              :     pub(crate) generation: Option<Generation>,
     566              :     pub(crate) observed_deltas: Vec<ObservedStateDelta>,
     567              : 
     568              :     /// Set [`TenantShard::pending_compute_notification`] from this flag
     569              :     pub(crate) pending_compute_notification: bool,
     570              : }
     571              : 
     572              : impl ObservedState {
     573            0 :     pub(crate) fn new() -> Self {
     574            0 :         Self {
     575            0 :             locations: HashMap::new(),
     576            0 :         }
     577            0 :     }
     578              : 
     579            0 :     pub(crate) fn is_empty(&self) -> bool {
     580            0 :         self.locations.is_empty()
     581            0 :     }
     582              : }
     583              : 
     584              : impl TenantShard {
     585        12843 :     pub(crate) fn new(
     586        12843 :         tenant_shard_id: TenantShardId,
     587        12843 :         shard: ShardIdentity,
     588        12843 :         policy: PlacementPolicy,
     589        12843 :         preferred_az_id: Option<AvailabilityZone>,
     590        12843 :     ) -> Self {
     591        12843 :         metrics::METRICS_REGISTRY
     592        12843 :             .metrics_group
     593        12843 :             .storage_controller_tenant_shards
     594        12843 :             .inc();
     595              : 
     596        12843 :         Self {
     597        12843 :             tenant_shard_id,
     598        12843 :             policy,
     599        12843 :             intent: IntentState::new(preferred_az_id),
     600        12843 :             generation: Some(Generation::new(0)),
     601        12843 :             shard,
     602        12843 :             observed: ObservedState::default(),
     603        12843 :             config: TenantConfig::default(),
     604        12843 :             reconciler: None,
     605        12843 :             splitting: SplitState::Idle,
     606        12843 :             importing: TimelineImportState::Idle,
     607        12843 :             sequence: Sequence(1),
     608        12843 :             delayed_reconcile: false,
     609        12843 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     610        12843 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     611        12843 :             last_error: Arc::default(),
     612        12843 :             consecutive_reconciles_count: 0,
     613        12843 :             pending_compute_notification: false,
     614        12843 :             scheduling_policy: ShardSchedulingPolicy::default(),
     615        12843 :             preferred_node: None,
     616        12843 :         }
     617        12843 :     }
     618              : 
     619              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     620              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     621              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     622              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     623            1 :     pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
     624              :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     625              :         // generation
     626            1 :         let mut attached_locs = self
     627            1 :             .observed
     628            1 :             .locations
     629            1 :             .iter()
     630            2 :             .filter_map(|(node_id, l)| {
     631            2 :                 if let Some(conf) = &l.conf {
     632            2 :                     if conf.mode == LocationConfigMode::AttachedMulti
     633            1 :                         || conf.mode == LocationConfigMode::AttachedSingle
     634            1 :                         || conf.mode == LocationConfigMode::AttachedStale
     635              :                     {
     636            2 :                         Some((node_id, conf.generation))
     637              :                     } else {
     638            0 :                         None
     639              :                     }
     640              :                 } else {
     641            0 :                     None
     642              :                 }
     643            2 :             })
     644            1 :             .collect::<Vec<_>>();
     645              : 
     646            1 :         attached_locs.sort_by_key(|i| i.1);
     647            1 :         if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
     648            1 :             self.intent.set_attached(scheduler, Some(*node_id));
     649            1 :         }
     650              : 
     651              :         // All remaining observed locations generate secondary intents.  This includes None
     652              :         // observations, as these may well have some local content on disk that is usable (this
     653              :         // is an edge case that might occur if we restarted during a migration or other change)
     654              :         //
     655              :         // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
     656              :         // will take care of promoting one of these secondaries to be attached.
     657            2 :         self.observed.locations.keys().for_each(|node_id| {
     658            2 :             if Some(*node_id) != self.intent.attached {
     659            1 :                 self.intent.push_secondary(scheduler, *node_id);
     660            1 :             }
     661            2 :         });
     662            1 :     }
     663              : 
     664              :     /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
     665              :     /// attached pageserver for a shard.
     666              :     ///
     667              :     /// Returns whether we modified it, and the NodeId selected.
     668        12832 :     fn schedule_attached(
     669        12832 :         &mut self,
     670        12832 :         scheduler: &mut Scheduler,
     671        12832 :         context: &ScheduleContext,
     672        12832 :     ) -> Result<(bool, NodeId), ScheduleError> {
     673              :         // No work to do if we already have an attached tenant
     674        12832 :         if let Some(node_id) = self.intent.attached {
     675            0 :             return Ok((false, node_id));
     676        12832 :         }
     677              : 
     678        12832 :         if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
     679              :             // Promote a secondary
     680            2 :             tracing::debug!("Promoted secondary {} to attached", promote_secondary);
     681            2 :             self.intent.promote_attached(scheduler, promote_secondary);
     682            2 :             Ok((true, promote_secondary))
     683              :         } else {
     684              :             // Pick a fresh node: either we had no secondaries or none were schedulable
     685        12830 :             let node_id = scheduler.schedule_shard::<AttachedShardTag>(
     686        12830 :                 &self.intent.secondary,
     687        12830 :                 &self.intent.preferred_az_id,
     688        12830 :                 context,
     689            0 :             )?;
     690        12830 :             tracing::debug!("Selected {} as attached", node_id);
     691        12830 :             self.intent.set_attached(scheduler, Some(node_id));
     692        12830 :             Ok((true, node_id))
     693              :         }
     694        12832 :     }
     695              : 
     696              :     #[instrument(skip_all, fields(
     697              :         tenant_id=%self.tenant_shard_id.tenant_id,
     698              :         shard_id=%self.tenant_shard_id.shard_slug(),
     699              :         sequence=%self.sequence
     700              :     ))]
     701              :     pub(crate) fn schedule(
     702              :         &mut self,
     703              :         scheduler: &mut Scheduler,
     704              :         context: &mut ScheduleContext,
     705              :     ) -> Result<(), ScheduleError> {
     706              :         let r = self.do_schedule(scheduler, context);
     707              : 
     708              :         context.avoid(&self.intent.all_pageservers());
     709              : 
     710              :         r
     711              :     }
     712              : 
     713        12836 :     pub(crate) fn do_schedule(
     714        12836 :         &mut self,
     715        12836 :         scheduler: &mut Scheduler,
     716        12836 :         context: &ScheduleContext,
     717        12836 :     ) -> Result<(), ScheduleError> {
     718              :         // TODO: before scheduling new nodes, check if any existing content in
     719              :         // self.intent refers to pageservers that are offline, and pick other
     720              :         // pageservers if so.
     721              : 
     722              :         // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
     723              :         // change their attach location.
     724              : 
     725        12836 :         match self.scheduling_policy {
     726        12835 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
     727              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     728              :                 // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
     729            1 :                 tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     730            0 :                     "Scheduling is disabled by policy {:?}", self.scheduling_policy);
     731            1 :                 return Ok(());
     732              :             }
     733              :         }
     734              : 
     735              :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     736              :         // more work on the same pageservers we're already using.
     737        12835 :         let mut modified = false;
     738              : 
     739              :         // Add/remove nodes to fulfil policy
     740              :         use PlacementPolicy::*;
     741        12835 :         match self.policy {
     742        12832 :             Attached(secondary_count) => {
     743              :                 // Should have exactly one attached, and at least N secondaries
     744        12832 :                 let (modified_attached, attached_node_id) =
     745        12832 :                     self.schedule_attached(scheduler, context)?;
     746        12832 :                 modified |= modified_attached;
     747              : 
     748        12832 :                 let mut used_pageservers = vec![attached_node_id];
     749        25663 :                 while self.intent.secondary.len() < secondary_count {
     750        12831 :                     let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
     751        12831 :                         &used_pageservers,
     752        12831 :                         &self.intent.preferred_az_id,
     753        12831 :                         context,
     754            0 :                     )?;
     755        12831 :                     self.intent.push_secondary(scheduler, node_id);
     756        12831 :                     used_pageservers.push(node_id);
     757        12831 :                     modified = true;
     758              :                 }
     759              :             }
     760              :             Secondary => {
     761            3 :                 if let Some(node_id) = self.intent.get_attached() {
     762            2 :                     // Populate secondary by demoting the attached node
     763            2 :                     self.intent.demote_attached(scheduler, *node_id);
     764            2 : 
     765            2 :                     modified = true;
     766            2 :                 } else if self.intent.secondary.is_empty() {
     767              :                     // Populate secondary by scheduling a fresh node
     768              :                     //
     769              :                     // We use [`AttachedShardTag`] because when a secondary location is the only one
     770              :                     // a shard has, we expect that its next use will be as an attached location: we want
     771              :                     // the tenant to be ready to warm up and run fast in their preferred AZ.
     772            1 :                     let node_id = scheduler.schedule_shard::<AttachedShardTag>(
     773            1 :                         &[],
     774            1 :                         &self.intent.preferred_az_id,
     775            1 :                         context,
     776            0 :                     )?;
     777            1 :                     self.intent.push_secondary(scheduler, node_id);
     778            1 :                     modified = true;
     779            0 :                 }
     780            4 :                 while self.intent.secondary.len() > 1 {
     781              :                     // If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
     782              :                     // having just demoted our attached location), then we should prefer to keep the location
     783              :                     // in our preferred AZ.  Tenants in Secondary mode want to be in the preferred AZ so that
     784              :                     // they have a warm location to become attached when transitioning back into Attached.
     785              : 
     786            1 :                     let mut candidates = self.intent.get_secondary().clone();
     787              :                     // Sort to get secondaries outside preferred AZ last
     788            1 :                     candidates
     789            2 :                         .sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
     790            1 :                     let secondary_to_remove = candidates.pop().unwrap();
     791            1 :                     self.intent.remove_secondary(scheduler, secondary_to_remove);
     792            1 :                     modified = true;
     793              :                 }
     794              :             }
     795              :             Detached => {
     796              :                 // Never add locations in this mode
     797            0 :                 if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
     798            0 :                     self.intent.clear(scheduler);
     799            0 :                     modified = true;
     800            0 :                 }
     801              :             }
     802              :         }
     803              : 
     804        12835 :         if modified {
     805        12835 :             self.sequence.0 += 1;
     806        12835 :         }
     807              : 
     808        12835 :         Ok(())
     809        12836 :     }
     810              : 
     811              :     /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
     812              :     /// if the swap is not possible and leaves the intent state in its original state.
     813              :     ///
     814              :     /// Arguments:
     815              :     /// `attached_to`: the currently attached location matching the intent state (may be None if the
     816              :     /// shard is not attached)
     817              :     /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
     818              :     /// the scheduler to recommend a node
     819            0 :     pub(crate) fn reschedule_to_secondary(
     820            0 :         &mut self,
     821            0 :         promote_to: Option<NodeId>,
     822            0 :         scheduler: &mut Scheduler,
     823            0 :     ) -> Result<(), ScheduleError> {
     824            0 :         let promote_to = match promote_to {
     825            0 :             Some(node) => node,
     826            0 :             None => match self.preferred_secondary(scheduler) {
     827            0 :                 Some(node) => node,
     828              :                 None => {
     829            0 :                     return Err(ScheduleError::ImpossibleConstraint);
     830              :                 }
     831              :             },
     832              :         };
     833              : 
     834            0 :         assert!(self.intent.get_secondary().contains(&promote_to));
     835              : 
     836            0 :         if let Some(node) = self.intent.get_attached() {
     837            0 :             let demoted = self.intent.demote_attached(scheduler, *node);
     838            0 :             if !demoted {
     839            0 :                 return Err(ScheduleError::ImpossibleConstraint);
     840            0 :             }
     841            0 :         }
     842              : 
     843            0 :         self.intent.promote_attached(scheduler, promote_to);
     844              : 
     845              :         // Increment the sequence number for the edge case where a
     846              :         // reconciler is already running to avoid waiting on the
     847              :         // current reconcile instead of spawning a new one.
     848            0 :         self.sequence = self.sequence.next();
     849              : 
     850            0 :         Ok(())
     851            0 :     }
     852              : 
     853              :     /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
     854           68 :     fn is_better_location<T: ShardTag>(
     855           68 :         &self,
     856           68 :         scheduler: &mut Scheduler,
     857           68 :         schedule_context: &ScheduleContext,
     858           68 :         current: NodeId,
     859           68 :         candidate: NodeId,
     860           68 :     ) -> Option<bool> {
     861           68 :         let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
     862           68 :             candidate,
     863           68 :             &self.intent.preferred_az_id,
     864           68 :             schedule_context,
     865           68 :         ) else {
     866              :             // The candidate node is unavailable for scheduling or otherwise couldn't get a score
     867            1 :             return None;
     868              :         };
     869              : 
     870              :         // If the candidate is our preferred node, then it is better than the current location, as long
     871              :         // as it is online -- the online check is part of the score calculation we did above, so it's
     872              :         // important that this check comes after that one.
     873           67 :         if let Some(preferred) = self.preferred_node.as_ref() {
     874            3 :             if preferred == &candidate {
     875            1 :                 return Some(true);
     876            2 :             }
     877           64 :         }
     878              : 
     879           66 :         match scheduler.compute_node_score::<T::Score>(
     880           66 :             current,
     881           66 :             &self.intent.preferred_az_id,
     882           66 :             schedule_context,
     883           66 :         ) {
     884           66 :             Some(current_score) => {
     885              :                 // Ignore utilization components when comparing scores: we don't want to migrate
     886              :                 // because of transient load variations, it risks making the system thrash, and
     887              :                 // migrating for utilization requires a separate high level view of the system to
     888              :                 // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
     889              :                 // moving things around in the order that we hit this function.
     890           66 :                 let candidate_score = candidate_score.for_optimization();
     891           66 :                 let current_score = current_score.for_optimization();
     892              : 
     893           66 :                 if candidate_score < current_score {
     894            8 :                     tracing::info!(
     895            0 :                         "Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
     896              :                     );
     897            8 :                     Some(true)
     898              :                 } else {
     899              :                     // The candidate node is no better than our current location, so don't migrate
     900           58 :                     tracing::debug!(
     901            0 :                         "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
     902              :                     );
     903           58 :                     Some(false)
     904              :                 }
     905              :             }
     906              :             None => {
     907              :                 // The current node is unavailable for scheduling, so we can't make any sensible
     908              :                 // decisions about optimisation.  This should be a transient state -- if the node
     909              :                 // is offline then it will get evacuated, if is blocked by a scheduling mode
     910              :                 // then we will respect that mode by doing nothing.
     911            0 :                 tracing::debug!("Current node {current} is unavailable for scheduling");
     912            0 :                 None
     913              :             }
     914              :         }
     915           68 :     }
     916              : 
     917           48 :     pub(crate) fn find_better_location<T: ShardTag>(
     918           48 :         &self,
     919           48 :         scheduler: &mut Scheduler,
     920           48 :         schedule_context: &ScheduleContext,
     921           48 :         current: NodeId,
     922           48 :         hard_exclude: &[NodeId],
     923           48 :     ) -> Option<NodeId> {
     924              :         // If we have a migration hint, then that is our better location
     925           48 :         if let Some(hint) = self.preferred_node.as_ref() {
     926            1 :             if hint == &current {
     927            0 :                 return None;
     928            1 :             }
     929              : 
     930            1 :             return Some(*hint);
     931           47 :         }
     932              : 
     933              :         // Look for a lower-scoring location to attach to
     934           47 :         let Ok(candidate_node) = scheduler.schedule_shard::<T>(
     935           47 :             hard_exclude,
     936           47 :             &self.intent.preferred_az_id,
     937           47 :             schedule_context,
     938           47 :         ) else {
     939              :             // A scheduling error means we have no possible candidate replacements
     940            0 :             tracing::debug!("No candidate node found");
     941            0 :             return None;
     942              :         };
     943              : 
     944           47 :         if candidate_node == current {
     945              :             // We're already at the best possible location, so don't migrate
     946           24 :             tracing::debug!("Candidate node {candidate_node} is already in use");
     947           24 :             return None;
     948           23 :         }
     949              : 
     950           23 :         self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
     951           23 :             .and_then(|better| if better { Some(candidate_node) } else { None })
     952           48 :     }
     953              : 
     954              :     /// This function is an optimization, used to avoid doing large numbers of scheduling operations
     955              :     /// when looking for optimizations.  This function uses knowledge of how scores work to do some
     956              :     /// fast checks for whether it may to be possible to improve a score.
     957              :     ///
     958              :     /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is.  If we
     959              :     /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
     960              :     /// work.
     961            3 :     pub(crate) fn maybe_optimizable(
     962            3 :         &self,
     963            3 :         scheduler: &mut Scheduler,
     964            3 :         schedule_context: &ScheduleContext,
     965            3 :     ) -> bool {
     966              :         // Tenant with preferred node: check if it is not already at the preferred node
     967            3 :         if let Some(preferred) = self.preferred_node.as_ref() {
     968            0 :             if Some(preferred) != self.intent.get_attached().as_ref() {
     969            0 :                 return true;
     970            0 :             }
     971            3 :         }
     972              : 
     973              :         // Sharded tenant: check if any locations have a nonzero affinity score
     974            3 :         if self.shard.count >= ShardCount(1) {
     975            3 :             let schedule_context = schedule_context.project_detach(self);
     976            5 :             for node in self.intent.all_pageservers() {
     977            5 :                 if let Some(af) = schedule_context.nodes.get(&node) {
     978            5 :                     if *af > AffinityScore(0) {
     979            3 :                         return true;
     980            2 :                     }
     981            0 :                 }
     982              :             }
     983            0 :         }
     984              : 
     985              :         // Attached tenant: check if the attachment is outside the preferred AZ
     986            0 :         if let PlacementPolicy::Attached(_) = self.policy {
     987            0 :             if let Some(attached) = self.intent.get_attached() {
     988            0 :                 if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
     989            0 :                     return true;
     990            0 :                 }
     991            0 :             }
     992            0 :         }
     993              : 
     994              :         // Tenant with secondary locations: check if any are within the preferred AZ
     995            0 :         for secondary in self.intent.get_secondary() {
     996            0 :             if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
     997            0 :                 return true;
     998            0 :             }
     999              :         }
    1000              : 
    1001              :         // Does the tenant have excess secondaries?
    1002            0 :         if self.intent.get_secondary().len() > self.policy.want_secondaries() {
    1003            0 :             return true;
    1004            0 :         }
    1005              : 
    1006              :         // Fall through: no optimizations possible
    1007            0 :         false
    1008            3 :     }
    1009              : 
    1010              :     /// Optimize attachments: if a shard has a secondary location that is preferable to
    1011              :     /// its primary location based on soft constraints, switch that secondary location
    1012              :     /// to be attached.
    1013              :     ///
    1014              :     /// `schedule_context` should have been populated with all shards in the tenant, including
    1015              :     /// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions)
    1016              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1017              :     pub(crate) fn optimize_attachment(
    1018              :         &self,
    1019              :         scheduler: &mut Scheduler,
    1020              :         schedule_context: &ScheduleContext,
    1021              :     ) -> Option<ScheduleOptimization> {
    1022              :         let attached = (*self.intent.get_attached())?;
    1023              : 
    1024              :         let schedule_context = schedule_context.project_detach(self);
    1025              : 
    1026              :         // If we already have a secondary that is higher-scoring than out current location,
    1027              :         // then simply migrate to it.
    1028              :         for secondary in self.intent.get_secondary() {
    1029              :             if let Some(true) = self.is_better_location::<AttachedShardTag>(
    1030              :                 scheduler,
    1031              :                 &schedule_context,
    1032              :                 attached,
    1033              :                 *secondary,
    1034              :             ) {
    1035              :                 return Some(ScheduleOptimization {
    1036              :                     sequence: self.sequence,
    1037              :                     action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1038              :                         old_attached_node_id: attached,
    1039              :                         new_attached_node_id: *secondary,
    1040              :                     }),
    1041              :                 });
    1042              :             }
    1043              :         }
    1044              : 
    1045              :         // Given that none of our current secondaries is a better location than our current
    1046              :         // attached location (checked above), we may trim any secondaries that are not needed
    1047              :         // for the placement policy.
    1048              :         if self.intent.get_secondary().len() > self.policy.want_secondaries() {
    1049              :             // This code path cleans up extra secondaries after migrating, and/or
    1050              :             // trims extra secondaries after a PlacementPolicy::Attached(N) was
    1051              :             // modified to decrease N.
    1052              : 
    1053              :             let secondary_scores = self
    1054              :                 .intent
    1055              :                 .get_secondary()
    1056              :                 .iter()
    1057           12 :                 .map(|node_id| {
    1058           12 :                     (
    1059           12 :                         *node_id,
    1060           12 :                         scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
    1061           12 :                             *node_id,
    1062           12 :                             &self.intent.preferred_az_id,
    1063           12 :                             &schedule_context,
    1064           12 :                         ),
    1065           12 :                     )
    1066           12 :                 })
    1067              :                 .collect::<HashMap<_, _>>();
    1068              : 
    1069           12 :             if secondary_scores.iter().any(|score| score.1.is_none()) {
    1070              :                 // Trivial case: if we only have one secondary, drop that one
    1071              :                 if self.intent.get_secondary().len() == 1 {
    1072              :                     return Some(ScheduleOptimization {
    1073              :                         sequence: self.sequence,
    1074              :                         action: ScheduleOptimizationAction::RemoveSecondary(
    1075              :                             *self.intent.get_secondary().first().unwrap(),
    1076              :                         ),
    1077              :                     });
    1078              :                 }
    1079              : 
    1080              :                 // Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
    1081              :                 // where its score can't be calculated), and drop the others.  This enables us to make progress in
    1082              :                 // most cases, even if some nodes are offline or have scheduling=pause set.
    1083              : 
    1084              :                 debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
    1085              :                 // logic presumes we are in a mode where we want secondaries to be in non-home AZ
    1086            1 :                 if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
    1087            1 :                     let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
    1088            1 :                     let is_available = secondary_scores
    1089            1 :                         .get(n)
    1090            1 :                         .expect("Built from same list of nodes")
    1091            1 :                         .is_some();
    1092            1 :                     is_available && !in_home_az
    1093            1 :                 }) {
    1094              :                     // Great, we found one to retain.  Pick some other to drop.
    1095              :                     if let Some(victim) = self
    1096              :                         .intent
    1097              :                         .get_secondary()
    1098              :                         .iter()
    1099            2 :                         .find(|n| n != &retain_secondary)
    1100              :                     {
    1101              :                         return Some(ScheduleOptimization {
    1102              :                             sequence: self.sequence,
    1103              :                             action: ScheduleOptimizationAction::RemoveSecondary(*victim),
    1104              :                         });
    1105              :                     }
    1106              :                 }
    1107              : 
    1108              :                 // Fall through: we didn't identify one to remove.  This ought to be rare.
    1109              :                 tracing::warn!(
    1110              :                     "Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
    1111              :                     self.intent.get_secondary()
    1112              :                 );
    1113              :             } else {
    1114              :                 let victim = secondary_scores
    1115              :                     .iter()
    1116           10 :                     .max_by_key(|score| score.1.unwrap())
    1117              :                     .unwrap()
    1118              :                     .0;
    1119              :                 return Some(ScheduleOptimization {
    1120              :                     sequence: self.sequence,
    1121              :                     action: ScheduleOptimizationAction::RemoveSecondary(*victim),
    1122              :                 });
    1123              :             }
    1124              :         }
    1125              : 
    1126              :         let replacement = self.find_better_location::<AttachedShardTag>(
    1127              :             scheduler,
    1128              :             &schedule_context,
    1129              :             attached,
    1130              :             &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
    1131              :         );
    1132              : 
    1133              :         // We have found a candidate and confirmed that its score is preferable
    1134              :         // to our current location. See if we have a secondary location in the preferred location already: if not,
    1135              :         // then create one.
    1136              :         if let Some(replacement) = replacement {
    1137              :             // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
    1138              :             // not in our preferred AZ.  Migration has a cost in resources an impact to the workload, so we want to avoid doing
    1139              :             // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
    1140              :             // AZ: skip this optimization if it is not in our final, preferred AZ.
    1141              :             //
    1142              :             // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
    1143              :             // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
    1144              :             if self.preferred_node.is_none()
    1145              :                 && self.intent.preferred_az_id.is_some()
    1146              :                 && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
    1147              :             {
    1148              :                 tracing::debug!(
    1149              :                     "Candidate node {replacement} is not in preferred AZ {:?}",
    1150              :                     self.intent.preferred_az_id
    1151              :                 );
    1152              : 
    1153              :                 // This should only happen if our current location is not in the preferred AZ, otherwise
    1154              :                 // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
    1155              :                 // AZ is the highest priority part of NodeAttachmentSchedulingScore.
    1156              :                 debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
    1157              : 
    1158              :                 return None;
    1159              :             }
    1160              : 
    1161              :             if !self.intent.get_secondary().contains(&replacement) {
    1162              :                 Some(ScheduleOptimization {
    1163              :                     sequence: self.sequence,
    1164              :                     action: ScheduleOptimizationAction::CreateSecondary(replacement),
    1165              :                 })
    1166              :             } else {
    1167              :                 // We already have a secondary in the preferred location, let's try migrating to it.  Our caller
    1168              :                 // will check the warmth of the destination before deciding whether to really execute this.
    1169              :                 Some(ScheduleOptimization {
    1170              :                     sequence: self.sequence,
    1171              :                     action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1172              :                         old_attached_node_id: attached,
    1173              :                         new_attached_node_id: replacement,
    1174              :                     }),
    1175              :                 })
    1176              :             }
    1177              :         } else {
    1178              :             // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
    1179              :             // to clean up: no action required.
    1180              :             None
    1181              :         }
    1182              :     }
    1183              : 
    1184              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1185              :     pub(crate) fn optimize_secondary(
    1186              :         &self,
    1187              :         scheduler: &mut Scheduler,
    1188              :         schedule_context: &ScheduleContext,
    1189              :     ) -> Option<ScheduleOptimization> {
    1190              :         if self.intent.get_secondary().len() > self.policy.want_secondaries() {
    1191              :             // We have extra secondaries, perhaps to facilitate a migration of the attached location:
    1192              :             // do nothing, it is up to [`Self::optimize_attachment`] to clean them up.  When that's done,
    1193              :             // and we are called again, we will proceed.
    1194              :             tracing::debug!("Too many secondaries: skipping");
    1195              :             return None;
    1196              :         }
    1197              : 
    1198              :         let schedule_context = schedule_context.project_detach(self);
    1199              : 
    1200              :         for secondary in self.intent.get_secondary() {
    1201              :             // Make sure we don't try to migrate a secondary to our attached location: this case happens
    1202              :             // easily in environments without multiple AZs.
    1203              :             let mut exclude = match self.intent.attached {
    1204              :                 Some(attached) => vec![attached],
    1205              :                 None => vec![],
    1206              :             };
    1207              : 
    1208              :             // Exclude all other secondaries from the scheduling process to avoid replacing
    1209              :             // one existing secondary with another existing secondary.
    1210              :             for another_secondary in self.intent.secondary.iter() {
    1211              :                 if another_secondary != secondary {
    1212              :                     exclude.push(*another_secondary);
    1213              :                 }
    1214              :             }
    1215              : 
    1216              :             let replacement = match &self.policy {
    1217              :                 PlacementPolicy::Attached(_) => {
    1218              :                     // Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
    1219              :                     // to avoid placing them in the preferred AZ.
    1220              :                     self.find_better_location::<SecondaryShardTag>(
    1221              :                         scheduler,
    1222              :                         &schedule_context,
    1223              :                         *secondary,
    1224              :                         &exclude,
    1225              :                     )
    1226              :                 }
    1227              :                 PlacementPolicy::Secondary => {
    1228              :                     // In secondary-only mode, we want our secondary locations in the preferred AZ,
    1229              :                     // so that they're ready to take over as an attached location when we transition
    1230              :                     // into PlacementPolicy::Attached.
    1231              :                     self.find_better_location::<AttachedShardTag>(
    1232              :                         scheduler,
    1233              :                         &schedule_context,
    1234              :                         *secondary,
    1235              :                         &exclude,
    1236              :                     )
    1237              :                 }
    1238              :                 PlacementPolicy::Detached => None,
    1239              :             };
    1240              : 
    1241              :             assert!(replacement != Some(*secondary));
    1242              :             if let Some(replacement) = replacement {
    1243              :                 // We have found a candidate and confirmed that its score is preferable
    1244              :                 // to our current location. See if we have a secondary location in the preferred location already: if not,
    1245              :                 // then create one.
    1246              :                 return Some(ScheduleOptimization {
    1247              :                     sequence: self.sequence,
    1248              :                     action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1249              :                         old_node_id: *secondary,
    1250              :                         new_node_id: replacement,
    1251              :                     }),
    1252              :                 });
    1253              :             }
    1254              :         }
    1255              : 
    1256              :         None
    1257              :     }
    1258              : 
    1259              :     /// Start or abort a graceful migration of this shard to another pageserver. This works on top of the
    1260              :     /// other optimisation functions, to bias them to move to the destination node.
    1261            0 :     pub(crate) fn set_preferred_node(&mut self, node: Option<NodeId>) {
    1262            0 :         if let Some(hint) = self.preferred_node.as_ref() {
    1263            0 :             if Some(hint) != node.as_ref() {
    1264              :                 // This is legal but a bit surprising: we expect that administrators wouldn't usually
    1265              :                 // change their mind about where to migrate something.
    1266            0 :                 tracing::warn!(
    1267            0 :                     "Changing migration destination from {hint} to {node:?} (current intent {:?})",
    1268              :                     self.intent
    1269              :                 );
    1270            0 :             }
    1271            0 :         }
    1272              : 
    1273            0 :         self.preferred_node = node;
    1274            0 :     }
    1275              : 
    1276            0 :     pub(crate) fn get_preferred_node(&self) -> Option<NodeId> {
    1277            0 :         self.preferred_node
    1278            0 :     }
    1279              : 
    1280              :     /// Return true if the optimization was really applied: it will not be applied if the optimization's
    1281              :     /// sequence is behind this tenant shard's or if the intent state proposed by the optimization
    1282              :     /// is not compatible with the current intent state. The later may happen when the background
    1283              :     /// reconcile loops runs concurrently with HTTP driven optimisations.
    1284           17 :     pub(crate) fn apply_optimization(
    1285           17 :         &mut self,
    1286           17 :         scheduler: &mut Scheduler,
    1287           17 :         optimization: ScheduleOptimization,
    1288           17 :     ) -> bool {
    1289           17 :         if optimization.sequence != self.sequence {
    1290            0 :             return false;
    1291           17 :         }
    1292              : 
    1293           17 :         if !self.validate_optimization(&optimization) {
    1294            0 :             tracing::info!(
    1295            0 :                 "Skipping optimization for {} because it does not match current intent: {:?}",
    1296              :                 self.tenant_shard_id,
    1297              :                 optimization,
    1298              :             );
    1299            0 :             return false;
    1300           17 :         }
    1301              : 
    1302           17 :         metrics::METRICS_REGISTRY
    1303           17 :             .metrics_group
    1304           17 :             .storage_controller_schedule_optimization
    1305           17 :             .inc();
    1306              : 
    1307           17 :         match optimization.action {
    1308              :             ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1309            5 :                 old_attached_node_id,
    1310            5 :                 new_attached_node_id,
    1311              :             }) => {
    1312            5 :                 self.intent.demote_attached(scheduler, old_attached_node_id);
    1313            5 :                 self.intent
    1314            5 :                     .promote_attached(scheduler, new_attached_node_id);
    1315              : 
    1316            5 :                 if let Some(hint) = self.preferred_node.as_ref() {
    1317            1 :                     if hint == &new_attached_node_id {
    1318              :                         // The migration target is not a long term pin: once we are done with the migration, clear it.
    1319            1 :                         tracing::info!("Graceful migration to {hint} complete");
    1320            1 :                         self.preferred_node = None;
    1321            0 :                     }
    1322            4 :                 }
    1323              :             }
    1324              :             ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1325            1 :                 old_node_id,
    1326            1 :                 new_node_id,
    1327            1 :             }) => {
    1328            1 :                 self.intent.remove_secondary(scheduler, old_node_id);
    1329            1 :                 self.intent.push_secondary(scheduler, new_node_id);
    1330            1 :             }
    1331            4 :             ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
    1332            4 :                 self.intent.push_secondary(scheduler, new_node_id);
    1333            4 :             }
    1334            7 :             ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
    1335            7 :                 self.intent.remove_secondary(scheduler, old_secondary);
    1336            7 :             }
    1337              :         }
    1338              : 
    1339           17 :         true
    1340           17 :     }
    1341              : 
    1342              :     /// Check that the desired modifications to the intent state are compatible with the current
    1343              :     /// intent state. Note that the lock is not held between planning optimizations and applying
    1344              :     /// them so any valid state transition of the intent state may have occurred.
    1345           17 :     fn validate_optimization(&self, optimization: &ScheduleOptimization) -> bool {
    1346           17 :         match optimization.action {
    1347              :             ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1348            5 :                 old_attached_node_id,
    1349            5 :                 new_attached_node_id,
    1350              :             }) => {
    1351            5 :                 self.intent.attached == Some(old_attached_node_id)
    1352            5 :                     && self.intent.secondary.contains(&new_attached_node_id)
    1353              :             }
    1354              :             ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1355              :                 old_node_id: _,
    1356            1 :                 new_node_id,
    1357              :             }) => {
    1358              :                 // It's legal to remove a secondary that is not present in the intent state
    1359            1 :                 !self.intent.secondary.contains(&new_node_id)
    1360              :                     // Ensure the secondary hasn't already been promoted to attached by a concurrent
    1361              :                     // optimization/migration.
    1362            1 :                     && self.intent.attached != Some(new_node_id)
    1363              :             }
    1364            4 :             ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
    1365            4 :                 !self.intent.secondary.contains(&new_node_id)
    1366              :             }
    1367              :             ScheduleOptimizationAction::RemoveSecondary(_) => {
    1368              :                 // It's legal to remove a secondary that is not present in the intent state
    1369            7 :                 true
    1370              :             }
    1371              :         }
    1372           17 :     }
    1373              : 
    1374              :     /// When a shard has several secondary locations, we need to pick one in situations where
    1375              :     /// we promote one of them to an attached location:
    1376              :     ///  - When draining a node for restart
    1377              :     ///  - When responding to a node failure
    1378              :     ///
    1379              :     /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
    1380              :     /// we want to pick the node which is best for use _temporarily_ while the previous attached location
    1381              :     /// is unavailable (e.g. because it's down or deploying).  That means we prefer to use secondary
    1382              :     /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
    1383              :     /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
    1384              :     /// they're probably not warmed up yet). The latter behavior is based oni
    1385              :     ///
    1386              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
    1387              :     /// caller needs to a pick a node some other way.
    1388        12832 :     pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
    1389        12832 :         let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
    1390              : 
    1391              :         // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
    1392              :         // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
    1393              :         // rather than a short-lived secondary location being used for optimization/migration (which would have
    1394              :         // been scheduled in our preferred AZ).
    1395        12832 :         let mut candidates = candidates
    1396        12832 :             .iter()
    1397        12832 :             .map(|(node_id, node_az)| {
    1398            2 :                 if node_az == &self.intent.preferred_az_id {
    1399            1 :                     (1, *node_id)
    1400              :                 } else {
    1401            1 :                     (0, *node_id)
    1402              :                 }
    1403            2 :             })
    1404        12832 :             .collect::<Vec<_>>();
    1405              : 
    1406        12832 :         candidates.sort();
    1407              : 
    1408        12832 :         candidates.first().map(|i| i.1)
    1409        12832 :     }
    1410              : 
    1411              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
    1412              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
    1413              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
    1414              :     ///
    1415              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
    1416              :     /// funciton should not be used to decide whether to reconcile.
    1417            0 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
    1418              :         // We have an intent to attach for this node
    1419            0 :         let attach_intent = self.intent.attached?;
    1420              :         // We have an observed state for this node
    1421            0 :         let location = self.observed.locations.get(&attach_intent)?;
    1422              :         // Our observed state is not None, i.e. not in flux
    1423            0 :         let location_config = location.conf.as_ref()?;
    1424              : 
    1425              :         // Check if our intent and observed state agree that this node is in an attached state.
    1426            0 :         match location_config.mode {
    1427              :             LocationConfigMode::AttachedMulti
    1428              :             | LocationConfigMode::AttachedSingle
    1429            0 :             | LocationConfigMode::AttachedStale => Some(attach_intent),
    1430            0 :             _ => None,
    1431              :         }
    1432            0 :     }
    1433              : 
    1434            0 :     fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
    1435            0 :         let mut dirty_nodes = HashSet::new();
    1436              : 
    1437            0 :         if let Some(node_id) = self.intent.attached {
    1438              :             // Maybe panic: it is a severe bug if we try to attach while generation is null.
    1439            0 :             let generation = self
    1440            0 :                 .generation
    1441            0 :                 .expect("Attempted to enter attached state without a generation");
    1442              : 
    1443            0 :             let wanted_conf = attached_location_conf(
    1444            0 :                 generation,
    1445            0 :                 &self.shard,
    1446            0 :                 &self.config,
    1447            0 :                 &self.policy,
    1448            0 :                 self.intent.get_secondary().len(),
    1449              :             );
    1450            0 :             match self.observed.locations.get(&node_id) {
    1451            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
    1452            0 :                 Some(_) | None => {
    1453            0 :                     dirty_nodes.insert(node_id);
    1454            0 :                 }
    1455              :             }
    1456            0 :         }
    1457              : 
    1458            0 :         for node_id in &self.intent.secondary {
    1459            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
    1460            0 :             match self.observed.locations.get(node_id) {
    1461            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
    1462            0 :                 Some(_) | None => {
    1463            0 :                     dirty_nodes.insert(*node_id);
    1464            0 :                 }
    1465              :             }
    1466              :         }
    1467              : 
    1468            0 :         for node_id in self.observed.locations.keys() {
    1469            0 :             if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
    1470            0 :                 // We have observed state that isn't part of our intent: need to clean it up.
    1471            0 :                 dirty_nodes.insert(*node_id);
    1472            0 :             }
    1473              :         }
    1474              : 
    1475            0 :         dirty_nodes.retain(|node_id| {
    1476            0 :             nodes
    1477            0 :                 .get(node_id)
    1478            0 :                 .map(|n| n.is_available())
    1479            0 :                 .unwrap_or(false)
    1480            0 :         });
    1481              : 
    1482            0 :         !dirty_nodes.is_empty()
    1483            0 :     }
    1484              : 
    1485              :     #[allow(clippy::too_many_arguments)]
    1486              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1487              :     pub(crate) fn get_reconcile_needed(
    1488              :         &mut self,
    1489              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1490              :     ) -> ReconcileNeeded {
    1491              :         // If there are any ambiguous observed states, and the nodes they refer to are available,
    1492              :         // we should reconcile to clean them up.
    1493              :         let mut dirty_observed = false;
    1494              :         for (node_id, observed_loc) in &self.observed.locations {
    1495              :             let node = pageservers
    1496              :                 .get(node_id)
    1497              :                 .expect("Nodes may not be removed while referenced");
    1498              :             if observed_loc.conf.is_none() && node.is_available() {
    1499              :                 dirty_observed = true;
    1500              :                 break;
    1501              :             }
    1502              :         }
    1503              : 
    1504              :         let active_nodes_dirty = self.dirty(pageservers);
    1505              : 
    1506              :         let reconcile_needed = match (
    1507              :             active_nodes_dirty,
    1508              :             dirty_observed,
    1509              :             self.pending_compute_notification,
    1510              :         ) {
    1511              :             (true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
    1512              :             (_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
    1513              :             (_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
    1514              :             _ => ReconcileNeeded::No,
    1515              :         };
    1516              : 
    1517              :         if matches!(reconcile_needed, ReconcileNeeded::No) {
    1518              :             tracing::debug!("Not dirty, no reconciliation needed.");
    1519              :             return ReconcileNeeded::No;
    1520              :         }
    1521              : 
    1522              :         // If we are currently splitting, then never start a reconciler task: the splitting logic
    1523              :         // requires that shards are not interfered with while it runs. Do this check here rather than
    1524              :         // up top, so that we only log this message if we would otherwise have done a reconciliation.
    1525              :         if !matches!(self.splitting, SplitState::Idle) {
    1526              :             tracing::info!("Refusing to reconcile, splitting in progress");
    1527              :             return ReconcileNeeded::No;
    1528              :         }
    1529              : 
    1530              :         // Reconcile already in flight for the current sequence?
    1531              :         if let Some(handle) = &self.reconciler {
    1532              :             if handle.sequence == self.sequence {
    1533              :                 tracing::info!(
    1534              :                     "Reconciliation already in progress for sequence {:?}",
    1535              :                     self.sequence,
    1536              :                 );
    1537              :                 return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
    1538              :                     tenant_shard_id: self.tenant_shard_id,
    1539              :                     seq_wait: self.waiter.clone(),
    1540              :                     error_seq_wait: self.error_waiter.clone(),
    1541              :                     error: self.last_error.clone(),
    1542              :                     seq: self.sequence,
    1543              :                 });
    1544              :             }
    1545              :         }
    1546              : 
    1547              :         // Pre-checks done: finally check whether we may actually do the work
    1548              :         match self.scheduling_policy {
    1549              :             ShardSchedulingPolicy::Active
    1550              :             | ShardSchedulingPolicy::Essential
    1551              :             | ShardSchedulingPolicy::Pause => {}
    1552              :             ShardSchedulingPolicy::Stop => {
    1553              :                 // We only reach this point if there is work to do and we're going to skip
    1554              :                 // doing it: warn it obvious why this tenant isn't doing what it ought to.
    1555              :                 tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
    1556              :                 return ReconcileNeeded::No;
    1557              :             }
    1558              :         }
    1559              : 
    1560              :         reconcile_needed
    1561              :     }
    1562              : 
    1563              :     /// Ensure the sequence number is set to a value where waiting for this value will make us wait
    1564              :     /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
    1565              :     ///
    1566              :     /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
    1567              :     /// that the waiter will not complete until some future Reconciler is constructed and run.
    1568            0 :     fn ensure_sequence_ahead(&mut self) {
    1569              :         // Find the highest sequence for which a Reconciler has previously run or is currently
    1570              :         // running
    1571            0 :         let max_seen = std::cmp::max(
    1572            0 :             self.reconciler
    1573            0 :                 .as_ref()
    1574            0 :                 .map(|r| r.sequence)
    1575            0 :                 .unwrap_or(Sequence(0)),
    1576            0 :             std::cmp::max(self.waiter.load(), self.error_waiter.load()),
    1577              :         );
    1578              : 
    1579            0 :         if self.sequence <= max_seen {
    1580            0 :             self.sequence = max_seen.next();
    1581            0 :         }
    1582            0 :     }
    1583              : 
    1584              :     /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
    1585              :     ///
    1586              :     /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
    1587              :     /// you would like to wait on the next reconciler that gets spawned in the background.
    1588            0 :     pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
    1589            0 :         self.ensure_sequence_ahead();
    1590              : 
    1591            0 :         ReconcilerWaiter {
    1592            0 :             tenant_shard_id: self.tenant_shard_id,
    1593            0 :             seq_wait: self.waiter.clone(),
    1594            0 :             error_seq_wait: self.error_waiter.clone(),
    1595            0 :             error: self.last_error.clone(),
    1596            0 :             seq: self.sequence,
    1597            0 :         }
    1598            0 :     }
    1599              : 
    1600            0 :     async fn reconcile(
    1601            0 :         sequence: Sequence,
    1602            0 :         mut reconciler: Reconciler,
    1603            0 :         must_notify: bool,
    1604            0 :     ) -> ReconcileResult {
    1605              :         // Attempt to make observed state match intent state
    1606            0 :         let result = reconciler.reconcile().await;
    1607              : 
    1608              :         // If we know we had a pending compute notification from some previous action, send a notification irrespective
    1609              :         // of whether the above reconcile() did any work.  It has to be Ok() though, because otherwise we might be
    1610              :         // sending a notification of a location that isn't really attached.
    1611            0 :         if result.is_ok() && must_notify {
    1612              :             // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
    1613            0 :             reconciler.compute_notify().await.ok();
    1614            0 :         } else if must_notify {
    1615            0 :             // Carry this flag so that the reconciler's result will indicate that it still needs to retry
    1616            0 :             // the compute hook notification eventually.
    1617            0 :             reconciler.compute_notify_failure = true;
    1618            0 :         }
    1619              : 
    1620              :         // Update result counter
    1621            0 :         let outcome_label = match &result {
    1622              :             Ok(_) => {
    1623            0 :                 if reconciler.compute_notify_failure {
    1624            0 :                     ReconcileOutcome::SuccessNoNotify
    1625              :                 } else {
    1626            0 :                     ReconcileOutcome::Success
    1627              :                 }
    1628              :             }
    1629            0 :             Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
    1630            0 :             Err(_) => ReconcileOutcome::Error,
    1631              :         };
    1632              : 
    1633            0 :         metrics::METRICS_REGISTRY
    1634            0 :             .metrics_group
    1635            0 :             .storage_controller_reconcile_complete
    1636            0 :             .inc(ReconcileCompleteLabelGroup {
    1637            0 :                 status: outcome_label,
    1638            0 :             });
    1639              : 
    1640              :         // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
    1641              :         // try and schedule more work in response to our result.
    1642            0 :         ReconcileResult {
    1643            0 :             sequence,
    1644            0 :             result,
    1645            0 :             tenant_shard_id: reconciler.tenant_shard_id,
    1646            0 :             generation: reconciler.generation,
    1647            0 :             observed_deltas: reconciler.observed_deltas(),
    1648            0 :             pending_compute_notification: reconciler.compute_notify_failure,
    1649            0 :         }
    1650            0 :     }
    1651              : 
    1652              :     #[allow(clippy::too_many_arguments)]
    1653              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1654              :     pub(crate) fn spawn_reconciler(
    1655              :         &mut self,
    1656              :         reason: ReconcileReason,
    1657              :         result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
    1658              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1659              :         compute_hook: &Arc<ComputeHook>,
    1660              :         reconciler_config: ReconcilerConfig,
    1661              :         service_config: &service::Config,
    1662              :         persistence: &Arc<Persistence>,
    1663              :         units: ReconcileUnits,
    1664              :         gate_guard: GateGuard,
    1665              :         cancel: &CancellationToken,
    1666              :         http_client: reqwest::Client,
    1667              :     ) -> Option<ReconcilerWaiter> {
    1668              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
    1669              :         // doing our sequence's work.
    1670              :         let old_handle = self.reconciler.take();
    1671              : 
    1672              :         // Build list of nodes from which the reconciler should detach
    1673              :         let mut detach = Vec::new();
    1674              :         for node_id in self.observed.locations.keys() {
    1675              :             if self.intent.get_attached() != &Some(*node_id)
    1676              :                 && !self.intent.secondary.contains(node_id)
    1677              :             {
    1678              :                 detach.push(
    1679              :                     pageservers
    1680              :                         .get(node_id)
    1681              :                         .expect("Intent references non-existent pageserver")
    1682              :                         .clone(),
    1683              :                 )
    1684              :             }
    1685              :         }
    1686              : 
    1687              :         // Advance the sequence before spawning a reconciler, so that sequence waiters
    1688              :         // can distinguish between before+after the reconcile completes.
    1689              :         self.ensure_sequence_ahead();
    1690              : 
    1691              :         let reconciler_cancel = cancel.child_token();
    1692              :         let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
    1693              :         let reconciler = Reconciler {
    1694              :             tenant_shard_id: self.tenant_shard_id,
    1695              :             shard: self.shard,
    1696              :             placement_policy: self.policy.clone(),
    1697              :             generation: self.generation,
    1698              :             intent: reconciler_intent,
    1699              :             detach,
    1700              :             reconciler_config,
    1701              :             config: self.config.clone(),
    1702              :             preferred_az: self.intent.preferred_az_id.clone(),
    1703              :             observed: self.observed.clone(),
    1704              :             original_observed: self.observed.clone(),
    1705              :             compute_hook: compute_hook.clone(),
    1706              :             service_config: service_config.clone(),
    1707              :             _gate_guard: gate_guard,
    1708              :             _resource_units: units,
    1709              :             cancel: reconciler_cancel.clone(),
    1710              :             persistence: persistence.clone(),
    1711              :             compute_notify_failure: false,
    1712              :             http_client,
    1713              :         };
    1714              : 
    1715              :         let reconcile_seq = self.sequence;
    1716              :         let long_reconcile_threshold = service_config.long_reconcile_threshold;
    1717              : 
    1718              :         tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
    1719              :         let must_notify = self.pending_compute_notification;
    1720              :         let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
    1721              :                                                         tenant_id=%reconciler.tenant_shard_id.tenant_id,
    1722              :                                                         shard_id=%reconciler.tenant_shard_id.shard_slug());
    1723              :         metrics::METRICS_REGISTRY
    1724              :             .metrics_group
    1725              :             .storage_controller_reconcile_spawn
    1726              :             .inc();
    1727              :         let result_tx = result_tx.clone();
    1728              :         let join_handle = tokio::task::spawn(
    1729            0 :             async move {
    1730              :                 // Wait for any previous reconcile task to complete before we start
    1731            0 :                 if let Some(old_handle) = old_handle {
    1732            0 :                     old_handle.cancel.cancel();
    1733            0 :                     if let Err(e) = old_handle.handle.await {
    1734              :                         // We can't do much with this other than log it: the task is done, so
    1735              :                         // we may proceed with our work.
    1736            0 :                         tracing::error!("Unexpected join error waiting for reconcile task: {e}");
    1737            0 :                     }
    1738            0 :                 }
    1739              : 
    1740              :                 // Early check for cancellation before doing any work
    1741              :                 // TODO: wrap all remote API operations in cancellation check
    1742              :                 // as well.
    1743            0 :                 if reconciler.cancel.is_cancelled() {
    1744            0 :                     metrics::METRICS_REGISTRY
    1745            0 :                         .metrics_group
    1746            0 :                         .storage_controller_reconcile_complete
    1747            0 :                         .inc(ReconcileCompleteLabelGroup {
    1748            0 :                             status: ReconcileOutcome::Cancel,
    1749            0 :                         });
    1750            0 :                     return;
    1751            0 :                 }
    1752              : 
    1753            0 :                 let (tenant_id_label, shard_number_label, sequence_label) = {
    1754            0 :                     (
    1755            0 :                         reconciler.tenant_shard_id.tenant_id.to_string(),
    1756            0 :                         reconciler.tenant_shard_id.shard_number.0.to_string(),
    1757            0 :                         reconcile_seq.to_string(),
    1758            0 :                     )
    1759            0 :                 };
    1760              : 
    1761            0 :                 let label_group = ReconcileLongRunningLabelGroup {
    1762            0 :                     tenant_id: &tenant_id_label,
    1763            0 :                     shard_number: &shard_number_label,
    1764            0 :                     sequence: &sequence_label,
    1765            0 :                 };
    1766              : 
    1767            0 :                 let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
    1768            0 :                 let long_reconcile_fut = {
    1769            0 :                     let label_group = label_group.clone();
    1770            0 :                     async move {
    1771            0 :                         tokio::time::sleep(long_reconcile_threshold).await;
    1772              : 
    1773            0 :                         tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
    1774              : 
    1775            0 :                         metrics::METRICS_REGISTRY
    1776            0 :                             .metrics_group
    1777            0 :                             .storage_controller_reconcile_long_running
    1778            0 :                             .inc(label_group);
    1779            0 :                     }
    1780              :                 };
    1781              : 
    1782            0 :                 let reconcile_fut = std::pin::pin!(reconcile_fut);
    1783            0 :                 let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
    1784              : 
    1785            0 :                 let (was_long, result) =
    1786            0 :                     match future::select(reconcile_fut, long_reconcile_fut).await {
    1787            0 :                         Either::Left((reconcile_result, _)) => (false, reconcile_result),
    1788            0 :                         Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
    1789              :                     };
    1790              : 
    1791            0 :                 if was_long {
    1792            0 :                     let id = metrics::METRICS_REGISTRY
    1793            0 :                         .metrics_group
    1794            0 :                         .storage_controller_reconcile_long_running
    1795            0 :                         .with_labels(label_group);
    1796            0 :                     metrics::METRICS_REGISTRY
    1797            0 :                         .metrics_group
    1798            0 :                         .storage_controller_reconcile_long_running
    1799            0 :                         .remove_metric(id);
    1800            0 :                 }
    1801              : 
    1802            0 :                 result_tx
    1803            0 :                     .send(ReconcileResultRequest::ReconcileResult(result))
    1804            0 :                     .ok();
    1805            0 :             }
    1806              :             .instrument(reconciler_span),
    1807              :         );
    1808              : 
    1809              :         self.reconciler = Some(ReconcilerHandle {
    1810              :             sequence: self.sequence,
    1811              :             handle: join_handle,
    1812              :             cancel: reconciler_cancel,
    1813              :         });
    1814              : 
    1815              :         Some(ReconcilerWaiter {
    1816              :             tenant_shard_id: self.tenant_shard_id,
    1817              :             seq_wait: self.waiter.clone(),
    1818              :             error_seq_wait: self.error_waiter.clone(),
    1819              :             error: self.last_error.clone(),
    1820              :             seq: self.sequence,
    1821              :         })
    1822              :     }
    1823              : 
    1824            0 :     pub(crate) fn cancel_reconciler(&self) {
    1825            0 :         if let Some(handle) = self.reconciler.as_ref() {
    1826            0 :             handle.cancel.cancel()
    1827            0 :         }
    1828            0 :     }
    1829              : 
    1830              :     /// Get a waiter for any reconciliation in flight, but do not start reconciliation
    1831              :     /// if it is not already running
    1832            0 :     pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
    1833            0 :         if self.reconciler.is_some() {
    1834            0 :             Some(ReconcilerWaiter {
    1835            0 :                 tenant_shard_id: self.tenant_shard_id,
    1836            0 :                 seq_wait: self.waiter.clone(),
    1837            0 :                 error_seq_wait: self.error_waiter.clone(),
    1838            0 :                 error: self.last_error.clone(),
    1839            0 :                 seq: self.sequence,
    1840            0 :             })
    1841              :         } else {
    1842            0 :             None
    1843              :         }
    1844            0 :     }
    1845              : 
    1846              :     /// Called when a ReconcileResult has been emitted and the service is updating
    1847              :     /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
    1848              :     /// the handle to indicate there is no longer a reconciliation in progress.
    1849            0 :     pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
    1850            0 :         if let Some(reconcile_handle) = &self.reconciler {
    1851            0 :             if reconcile_handle.sequence <= sequence {
    1852            0 :                 self.reconciler = None;
    1853            0 :             }
    1854            0 :         }
    1855            0 :     }
    1856              : 
    1857              :     /// If we had any state at all referring to this node ID, drop it.  Does not
    1858              :     /// attempt to reschedule.
    1859              :     ///
    1860              :     /// Returns true if we modified the node's intent state.
    1861            0 :     pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
    1862            0 :         let mut intent_modified = false;
    1863              : 
    1864              :         // Drop if this node was our attached intent
    1865            0 :         if self.intent.attached == Some(node_id) {
    1866            0 :             self.intent.attached = None;
    1867            0 :             intent_modified = true;
    1868            0 :         }
    1869              : 
    1870              :         // Drop from the list of secondaries, and check if we modified it
    1871            0 :         let had_secondaries = self.intent.secondary.len();
    1872            0 :         self.intent.secondary.retain(|n| n != &node_id);
    1873            0 :         intent_modified |= self.intent.secondary.len() != had_secondaries;
    1874              : 
    1875            0 :         debug_assert!(!self.intent.all_pageservers().contains(&node_id));
    1876              : 
    1877            0 :         if self.preferred_node == Some(node_id) {
    1878            0 :             self.preferred_node = None;
    1879            0 :         }
    1880              : 
    1881            0 :         intent_modified
    1882            0 :     }
    1883              : 
    1884            0 :     pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
    1885            0 :         self.scheduling_policy = p;
    1886            0 :     }
    1887              : 
    1888            0 :     pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
    1889            0 :         self.scheduling_policy
    1890            0 :     }
    1891              : 
    1892            0 :     pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
    1893              :         // Ordering: always set last_error before advancing sequence, so that sequence
    1894              :         // waiters are guaranteed to see a Some value when they see an error.
    1895            0 :         *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
    1896            0 :         self.error_waiter.advance(sequence);
    1897            0 :     }
    1898              : 
    1899            0 :     pub(crate) fn from_persistent(
    1900            0 :         tsp: TenantShardPersistence,
    1901            0 :         intent: IntentState,
    1902            0 :     ) -> anyhow::Result<Self> {
    1903            0 :         let tenant_shard_id = tsp.get_tenant_shard_id()?;
    1904            0 :         let shard_identity = tsp.get_shard_identity()?;
    1905              : 
    1906            0 :         metrics::METRICS_REGISTRY
    1907            0 :             .metrics_group
    1908            0 :             .storage_controller_tenant_shards
    1909            0 :             .inc();
    1910              : 
    1911              :         Ok(Self {
    1912            0 :             tenant_shard_id,
    1913            0 :             shard: shard_identity,
    1914            0 :             sequence: Sequence::initial(),
    1915            0 :             generation: tsp.generation.map(|g| Generation::new(g as u32)),
    1916            0 :             policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
    1917            0 :             intent,
    1918            0 :             observed: ObservedState::new(),
    1919            0 :             config: serde_json::from_str(&tsp.config).unwrap(),
    1920            0 :             reconciler: None,
    1921            0 :             splitting: tsp.splitting,
    1922              :             // Filled in during [`Service::startup_reconcile`]
    1923            0 :             importing: TimelineImportState::Idle,
    1924            0 :             waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1925            0 :             error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1926            0 :             last_error: Arc::default(),
    1927              :             consecutive_reconciles_count: 0,
    1928              :             pending_compute_notification: false,
    1929              :             delayed_reconcile: false,
    1930            0 :             scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
    1931            0 :             preferred_node: None,
    1932              :         })
    1933            0 :     }
    1934              : 
    1935            0 :     pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
    1936              :         TenantShardPersistence {
    1937            0 :             tenant_id: self.tenant_shard_id.tenant_id.to_string(),
    1938            0 :             shard_number: self.tenant_shard_id.shard_number.0 as i32,
    1939            0 :             shard_count: self.tenant_shard_id.shard_count.literal() as i32,
    1940            0 :             shard_stripe_size: self.shard.stripe_size.0 as i32,
    1941            0 :             generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
    1942            0 :             generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
    1943            0 :             placement_policy: serde_json::to_string(&self.policy).unwrap(),
    1944            0 :             config: serde_json::to_string(&self.config).unwrap(),
    1945            0 :             splitting: SplitState::default(),
    1946            0 :             scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
    1947            0 :             preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
    1948              :         }
    1949            0 :     }
    1950              : 
    1951        12508 :     pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
    1952        12508 :         self.intent.get_preferred_az()
    1953        12508 :     }
    1954              : 
    1955            2 :     pub(crate) fn set_preferred_az(
    1956            2 :         &mut self,
    1957            2 :         scheduler: &mut Scheduler,
    1958            2 :         preferred_az_id: Option<AvailabilityZone>,
    1959            2 :     ) {
    1960            2 :         self.intent.set_preferred_az(scheduler, preferred_az_id);
    1961            2 :     }
    1962              : 
    1963              :     /// Returns all the nodes to which this tenant shard is attached according to the
    1964              :     /// observed state and the generations. Return vector is sorted from latest generation
    1965              :     /// to earliest.
    1966            0 :     pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
    1967            0 :         self.observed
    1968            0 :             .locations
    1969            0 :             .iter()
    1970            0 :             .filter_map(|(node_id, observed)| {
    1971              :                 use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
    1972              : 
    1973            0 :                 let conf = observed.conf.as_ref()?;
    1974              : 
    1975            0 :                 match (conf.generation, conf.mode) {
    1976            0 :                     (Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
    1977            0 :                         Some((*node_id, gen_))
    1978              :                     }
    1979            0 :                     _ => None,
    1980              :                 }
    1981            0 :             })
    1982            0 :             .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
    1983            0 :                 lhs_gen.cmp(rhs_gen).reverse()
    1984            0 :             })
    1985            0 :             .map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
    1986            0 :             .collect()
    1987            0 :     }
    1988              : 
    1989              :     /// Update the observed state of the tenant by applying incremental deltas
    1990              :     ///
    1991              :     /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
    1992              :     /// They are then filtered in [`crate::service::Service::process_result`].
    1993            0 :     pub(crate) fn apply_observed_deltas(
    1994            0 :         &mut self,
    1995            0 :         deltas: impl Iterator<Item = ObservedStateDelta>,
    1996            0 :     ) {
    1997            0 :         for delta in deltas {
    1998            0 :             match delta {
    1999            0 :                 ObservedStateDelta::Upsert(ups) => {
    2000            0 :                     let (node_id, loc) = *ups;
    2001              : 
    2002              :                     // If the generation of the observed location in the delta is lagging
    2003              :                     // behind the current one, then we have a race condition and cannot
    2004              :                     // be certain about the true observed state. Set the observed state
    2005              :                     // to None in order to reflect this.
    2006            0 :                     let crnt_gen = self
    2007            0 :                         .observed
    2008            0 :                         .locations
    2009            0 :                         .get(&node_id)
    2010            0 :                         .and_then(|loc| loc.conf.as_ref())
    2011            0 :                         .and_then(|conf| conf.generation);
    2012            0 :                     let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
    2013            0 :                     match (crnt_gen, new_gen) {
    2014            0 :                         (Some(crnt), Some(new)) if crnt_gen > new_gen => {
    2015            0 :                             tracing::warn!(
    2016            0 :                                 "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
    2017              :                                 node_id,
    2018              :                                 loc,
    2019              :                                 crnt,
    2020              :                                 new
    2021              :                             );
    2022              : 
    2023            0 :                             self.observed
    2024            0 :                                 .locations
    2025            0 :                                 .insert(node_id, ObservedStateLocation { conf: None });
    2026              : 
    2027            0 :                             continue;
    2028              :                         }
    2029            0 :                         _ => {}
    2030              :                     }
    2031              : 
    2032            0 :                     if let Some(conf) = &loc.conf {
    2033            0 :                         tracing::info!("Updating observed location {}: {:?}", node_id, conf);
    2034              :                     } else {
    2035            0 :                         tracing::info!("Setting observed location {} to None", node_id,)
    2036              :                     }
    2037              : 
    2038            0 :                     self.observed.locations.insert(node_id, loc);
    2039              :                 }
    2040            0 :                 ObservedStateDelta::Delete(node_id) => {
    2041            0 :                     tracing::info!("Deleting observed location {}", node_id);
    2042            0 :                     self.observed.locations.remove(&node_id);
    2043              :                 }
    2044              :             }
    2045              :         }
    2046            0 :     }
    2047              : 
    2048              :     /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
    2049              :     ///
    2050              :     /// If the shard does not have a preferred AZ, returns false.
    2051            0 :     pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
    2052            0 :         self.intent
    2053            0 :             .get_attached()
    2054            0 :             .map(|node_id| {
    2055            0 :                 Some(
    2056            0 :                     nodes
    2057            0 :                         .get(&node_id)
    2058            0 :                         .expect("referenced node exists")
    2059            0 :                         .get_availability_zone_id(),
    2060            0 :                 ) != self.intent.preferred_az_id.as_ref()
    2061            0 :             })
    2062            0 :             .unwrap_or(false)
    2063            0 :     }
    2064              : }
    2065              : 
    2066              : impl Drop for TenantShard {
    2067        12843 :     fn drop(&mut self) {
    2068        12843 :         metrics::METRICS_REGISTRY
    2069        12843 :             .metrics_group
    2070        12843 :             .storage_controller_tenant_shards
    2071        12843 :             .dec();
    2072        12843 :     }
    2073              : }
    2074              : 
    2075              : #[cfg(test)]
    2076              : pub(crate) mod tests {
    2077              :     use std::cell::RefCell;
    2078              :     use std::rc::Rc;
    2079              : 
    2080              :     use pageserver_api::controller_api::NodeAvailability;
    2081              :     use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
    2082              :     use rand::SeedableRng;
    2083              :     use rand::rngs::StdRng;
    2084              :     use utils::id::TenantId;
    2085              : 
    2086              :     use super::*;
    2087              :     use crate::scheduler::test_utils::make_test_nodes;
    2088              : 
    2089           12 :     fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
    2090           12 :         let tenant_id = TenantId::generate();
    2091           12 :         let shard_number = ShardNumber(0);
    2092           12 :         let shard_count = ShardCount::new(1);
    2093           12 :         let stripe_size = DEFAULT_STRIPE_SIZE;
    2094              : 
    2095           12 :         let tenant_shard_id = TenantShardId {
    2096           12 :             tenant_id,
    2097           12 :             shard_number,
    2098           12 :             shard_count,
    2099           12 :         };
    2100           12 :         TenantShard::new(
    2101           12 :             tenant_shard_id,
    2102           12 :             ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
    2103           12 :             policy,
    2104           12 :             None,
    2105              :         )
    2106           12 :     }
    2107              : 
    2108         5004 :     pub(crate) fn make_test_tenant(
    2109         5004 :         policy: PlacementPolicy,
    2110         5004 :         shard_count: ShardCount,
    2111         5004 :         preferred_az: Option<AvailabilityZone>,
    2112         5004 :     ) -> Vec<TenantShard> {
    2113         5004 :         make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
    2114         5004 :     }
    2115              : 
    2116         5007 :     pub(crate) fn make_test_tenant_with_id(
    2117         5007 :         tenant_id: TenantId,
    2118         5007 :         policy: PlacementPolicy,
    2119         5007 :         shard_count: ShardCount,
    2120         5007 :         preferred_az: Option<AvailabilityZone>,
    2121         5007 :     ) -> Vec<TenantShard> {
    2122         5007 :         let stripe_size = DEFAULT_STRIPE_SIZE;
    2123         5007 :         (0..shard_count.count())
    2124        12522 :             .map(|i| {
    2125        12522 :                 let shard_number = ShardNumber(i);
    2126              : 
    2127        12522 :                 let tenant_shard_id = TenantShardId {
    2128        12522 :                     tenant_id,
    2129        12522 :                     shard_number,
    2130        12522 :                     shard_count,
    2131        12522 :                 };
    2132        12522 :                 TenantShard::new(
    2133        12522 :                     tenant_shard_id,
    2134        12522 :                     ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
    2135        12522 :                     policy.clone(),
    2136        12522 :                     preferred_az.clone(),
    2137              :                 )
    2138        12522 :             })
    2139         5007 :             .collect()
    2140         5007 :     }
    2141              : 
    2142              :     /// Test the scheduling behaviors used when a tenant configured for HA is subject
    2143              :     /// to nodes being marked offline.
    2144              :     #[test]
    2145            1 :     fn tenant_ha_scheduling() -> anyhow::Result<()> {
    2146              :         // Start with three nodes.  Our tenant will only use two.  The third one is
    2147              :         // expected to remain unused.
    2148            1 :         let mut nodes = make_test_nodes(3, &[]);
    2149              : 
    2150            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2151            1 :         let mut context = ScheduleContext::default();
    2152              : 
    2153            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2154            1 :         tenant_shard
    2155            1 :             .schedule(&mut scheduler, &mut context)
    2156            1 :             .expect("we have enough nodes, scheduling should work");
    2157              : 
    2158              :         // Expect to initially be schedule on to different nodes
    2159            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    2160            1 :         assert!(tenant_shard.intent.attached.is_some());
    2161              : 
    2162            1 :         let attached_node_id = tenant_shard.intent.attached.unwrap();
    2163            1 :         let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
    2164            1 :         assert_ne!(attached_node_id, secondary_node_id);
    2165              : 
    2166              :         // Notifying the attached node is offline should demote it to a secondary
    2167            1 :         let changed = tenant_shard
    2168            1 :             .intent
    2169            1 :             .demote_attached(&mut scheduler, attached_node_id);
    2170            1 :         assert!(changed);
    2171            1 :         assert!(tenant_shard.intent.attached.is_none());
    2172            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 2);
    2173              : 
    2174              :         // Update the scheduler state to indicate the node is offline
    2175            1 :         nodes
    2176            1 :             .get_mut(&attached_node_id)
    2177            1 :             .unwrap()
    2178            1 :             .set_availability(NodeAvailability::Offline);
    2179            1 :         scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
    2180              : 
    2181              :         // Scheduling the node should promote the still-available secondary node to attached
    2182            1 :         tenant_shard
    2183            1 :             .schedule(&mut scheduler, &mut context)
    2184            1 :             .expect("active nodes are available");
    2185            1 :         assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
    2186              : 
    2187              :         // The original attached node should have been retained as a secondary
    2188            1 :         assert_eq!(
    2189            1 :             *tenant_shard.intent.secondary.iter().last().unwrap(),
    2190              :             attached_node_id
    2191              :         );
    2192              : 
    2193            1 :         tenant_shard.intent.clear(&mut scheduler);
    2194              : 
    2195            1 :         Ok(())
    2196            1 :     }
    2197              : 
    2198              :     #[test]
    2199            1 :     fn intent_from_observed() -> anyhow::Result<()> {
    2200            1 :         let nodes = make_test_nodes(3, &[]);
    2201            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2202              : 
    2203            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2204              : 
    2205            1 :         tenant_shard.observed.locations.insert(
    2206            1 :             NodeId(3),
    2207            1 :             ObservedStateLocation {
    2208            1 :                 conf: Some(LocationConfig {
    2209            1 :                     mode: LocationConfigMode::AttachedMulti,
    2210            1 :                     generation: Some(2),
    2211            1 :                     secondary_conf: None,
    2212            1 :                     shard_number: tenant_shard.shard.number.0,
    2213            1 :                     shard_count: tenant_shard.shard.count.literal(),
    2214            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    2215            1 :                     tenant_conf: TenantConfig::default(),
    2216            1 :                 }),
    2217            1 :             },
    2218              :         );
    2219              : 
    2220            1 :         tenant_shard.observed.locations.insert(
    2221            1 :             NodeId(2),
    2222            1 :             ObservedStateLocation {
    2223            1 :                 conf: Some(LocationConfig {
    2224            1 :                     mode: LocationConfigMode::AttachedStale,
    2225            1 :                     generation: Some(1),
    2226            1 :                     secondary_conf: None,
    2227            1 :                     shard_number: tenant_shard.shard.number.0,
    2228            1 :                     shard_count: tenant_shard.shard.count.literal(),
    2229            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    2230            1 :                     tenant_conf: TenantConfig::default(),
    2231            1 :                 }),
    2232            1 :             },
    2233              :         );
    2234              : 
    2235            1 :         tenant_shard.intent_from_observed(&mut scheduler);
    2236              : 
    2237              :         // The highest generationed attached location gets used as attached
    2238            1 :         assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
    2239              :         // Other locations get used as secondary
    2240            1 :         assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
    2241              : 
    2242            1 :         scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
    2243              : 
    2244            1 :         tenant_shard.intent.clear(&mut scheduler);
    2245            1 :         Ok(())
    2246            1 :     }
    2247              : 
    2248              :     #[test]
    2249            1 :     fn scheduling_mode() -> anyhow::Result<()> {
    2250            1 :         let nodes = make_test_nodes(3, &[]);
    2251            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2252              : 
    2253            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2254              : 
    2255              :         // In pause mode, schedule() shouldn't do anything
    2256            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
    2257            1 :         assert!(
    2258            1 :             tenant_shard
    2259            1 :                 .schedule(&mut scheduler, &mut ScheduleContext::default())
    2260            1 :                 .is_ok()
    2261              :         );
    2262            1 :         assert!(tenant_shard.intent.all_pageservers().is_empty());
    2263              : 
    2264              :         // In active mode, schedule() works
    2265            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
    2266            1 :         assert!(
    2267            1 :             tenant_shard
    2268            1 :                 .schedule(&mut scheduler, &mut ScheduleContext::default())
    2269            1 :                 .is_ok()
    2270              :         );
    2271            1 :         assert!(!tenant_shard.intent.all_pageservers().is_empty());
    2272              : 
    2273            1 :         tenant_shard.intent.clear(&mut scheduler);
    2274            1 :         Ok(())
    2275            1 :     }
    2276              : 
    2277              :     #[test]
    2278              :     /// Simple case: moving attachment to somewhere better where we already have a secondary
    2279            1 :     fn optimize_attachment_simple() -> anyhow::Result<()> {
    2280            1 :         let nodes = make_test_nodes(
    2281              :             3,
    2282            1 :             &[
    2283            1 :                 AvailabilityZone("az-a".to_string()),
    2284            1 :                 AvailabilityZone("az-b".to_string()),
    2285            1 :                 AvailabilityZone("az-c".to_string()),
    2286            1 :             ],
    2287              :         );
    2288            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2289              : 
    2290            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2291            1 :         shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2292            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2293            1 :         shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2294              : 
    2295              :         // Initially: both nodes attached on shard 1, and both have secondary locations
    2296              :         // on different nodes.
    2297            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    2298            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
    2299            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    2300            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
    2301              : 
    2302            1 :         fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
    2303            1 :             let mut schedule_context = ScheduleContext::default();
    2304            1 :             schedule_context.avoid(&shard_a.intent.all_pageservers());
    2305            1 :             schedule_context.avoid(&shard_b.intent.all_pageservers());
    2306            1 :             schedule_context
    2307            1 :         }
    2308              : 
    2309            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2310            1 :         let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2311            1 :         assert_eq!(
    2312              :             optimization_a,
    2313            1 :             Some(ScheduleOptimization {
    2314            1 :                 sequence: shard_a.sequence,
    2315            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2316            1 :                     old_attached_node_id: NodeId(2),
    2317            1 :                     new_attached_node_id: NodeId(1)
    2318            1 :                 })
    2319            1 :             })
    2320              :         );
    2321            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    2322              : 
    2323              :         // // Either shard should recognize that it has the option to switch to a secondary location where there
    2324              :         // // would be no other shards from the same tenant, and request to do so.
    2325              :         // assert_eq!(
    2326              :         //     optimization_a_prepare,
    2327              :         //     Some(ScheduleOptimization {
    2328              :         //         sequence: shard_a.sequence,
    2329              :         //         action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
    2330              :         //     })
    2331              :         // );
    2332              :         // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
    2333              : 
    2334              :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2335              :         // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2336              :         // assert_eq!(
    2337              :         //     optimization_a_migrate,
    2338              :         //     Some(ScheduleOptimization {
    2339              :         //         sequence: shard_a.sequence,
    2340              :         //         action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2341              :         //             old_attached_node_id: NodeId(1),
    2342              :         //             new_attached_node_id: NodeId(2)
    2343              :         //         })
    2344              :         //     })
    2345              :         // );
    2346              :         // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
    2347              : 
    2348              :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2349              :         // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2350              :         // assert_eq!(
    2351              :         //     optimization_a_cleanup,
    2352              :         //     Some(ScheduleOptimization {
    2353              :         //         sequence: shard_a.sequence,
    2354              :         //         action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
    2355              :         //     })
    2356              :         // );
    2357              :         // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
    2358              : 
    2359              :         // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
    2360              :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2361              :         // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
    2362              : 
    2363            1 :         shard_a.intent.clear(&mut scheduler);
    2364            1 :         shard_b.intent.clear(&mut scheduler);
    2365              : 
    2366            1 :         Ok(())
    2367            1 :     }
    2368              : 
    2369              :     #[test]
    2370              :     /// Complicated case: moving attachment to somewhere better where we do not have a secondary
    2371              :     /// already, creating one as needed.
    2372            1 :     fn optimize_attachment_multistep() -> anyhow::Result<()> {
    2373            1 :         let nodes = make_test_nodes(
    2374              :             3,
    2375            1 :             &[
    2376            1 :                 AvailabilityZone("az-a".to_string()),
    2377            1 :                 AvailabilityZone("az-b".to_string()),
    2378            1 :                 AvailabilityZone("az-c".to_string()),
    2379            1 :             ],
    2380              :         );
    2381            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2382              : 
    2383              :         // Two shards of a tenant that wants to be in AZ A
    2384            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2385            1 :         shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2386            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2387            1 :         shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2388              : 
    2389              :         // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
    2390            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    2391            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    2392            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
    2393            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
    2394              : 
    2395            3 :         fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
    2396            3 :             let mut schedule_context = ScheduleContext::default();
    2397            3 :             schedule_context.avoid(&shard_a.intent.all_pageservers());
    2398            3 :             schedule_context.avoid(&shard_b.intent.all_pageservers());
    2399            3 :             schedule_context
    2400            3 :         }
    2401              : 
    2402            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2403            1 :         let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2404            1 :         assert_eq!(
    2405              :             optimization_a_prepare,
    2406            1 :             Some(ScheduleOptimization {
    2407            1 :                 sequence: shard_a.sequence,
    2408            1 :                 action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
    2409            1 :             })
    2410              :         );
    2411            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
    2412              : 
    2413            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2414            1 :         let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2415            1 :         assert_eq!(
    2416              :             optimization_a_migrate,
    2417            1 :             Some(ScheduleOptimization {
    2418            1 :                 sequence: shard_a.sequence,
    2419            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2420            1 :                     old_attached_node_id: NodeId(2),
    2421            1 :                     new_attached_node_id: NodeId(1)
    2422            1 :                 })
    2423            1 :             })
    2424              :         );
    2425            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
    2426              : 
    2427            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2428            1 :         let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2429            1 :         assert_eq!(
    2430              :             optimization_a_cleanup,
    2431            1 :             Some(ScheduleOptimization {
    2432            1 :                 sequence: shard_a.sequence,
    2433            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
    2434            1 :             })
    2435              :         );
    2436            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
    2437              : 
    2438              :         // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
    2439              :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2440              :         // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
    2441              : 
    2442            1 :         shard_a.intent.clear(&mut scheduler);
    2443            1 :         shard_b.intent.clear(&mut scheduler);
    2444              : 
    2445            1 :         Ok(())
    2446            1 :     }
    2447              : 
    2448              :     #[test]
    2449              :     /// How the optimisation code handles a shard with a preferred node set; this is an example
    2450              :     /// of the multi-step migration, but driven by a different input.
    2451            1 :     fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> {
    2452            1 :         let nodes = make_test_nodes(
    2453              :             4,
    2454            1 :             &[
    2455            1 :                 AvailabilityZone("az-a".to_string()),
    2456            1 :                 AvailabilityZone("az-a".to_string()),
    2457            1 :                 AvailabilityZone("az-b".to_string()),
    2458            1 :                 AvailabilityZone("az-b".to_string()),
    2459            1 :             ],
    2460              :         );
    2461            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2462              : 
    2463              :         // Two shards of a tenant that wants to be in AZ A
    2464            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2465            1 :         shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2466              : 
    2467              :         // Initially attached in a stable location
    2468            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    2469            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    2470              : 
    2471              :         // Set the preferred node to node 2, an equally high scoring node to its current location
    2472            1 :         shard_a.preferred_node = Some(NodeId(2));
    2473              : 
    2474            3 :         fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext {
    2475            3 :             let mut schedule_context = ScheduleContext::default();
    2476            3 :             schedule_context.avoid(&shard_a.intent.all_pageservers());
    2477            3 :             schedule_context
    2478            3 :         }
    2479              : 
    2480            1 :         let schedule_context = make_schedule_context(&shard_a);
    2481            1 :         let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2482            1 :         assert_eq!(
    2483              :             optimization_a_prepare,
    2484            1 :             Some(ScheduleOptimization {
    2485            1 :                 sequence: shard_a.sequence,
    2486            1 :                 action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
    2487            1 :             })
    2488              :         );
    2489            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
    2490              : 
    2491              :         // The first step of the optimisation should not have cleared the preferred node
    2492            1 :         assert_eq!(shard_a.preferred_node, Some(NodeId(2)));
    2493              : 
    2494            1 :         let schedule_context = make_schedule_context(&shard_a);
    2495            1 :         let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2496            1 :         assert_eq!(
    2497              :             optimization_a_migrate,
    2498            1 :             Some(ScheduleOptimization {
    2499            1 :                 sequence: shard_a.sequence,
    2500            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2501            1 :                     old_attached_node_id: NodeId(1),
    2502            1 :                     new_attached_node_id: NodeId(2)
    2503            1 :                 })
    2504            1 :             })
    2505              :         );
    2506            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
    2507              : 
    2508              :         // The cutover step of the optimisation should have cleared the preferred node
    2509            1 :         assert_eq!(shard_a.preferred_node, None);
    2510              : 
    2511            1 :         let schedule_context = make_schedule_context(&shard_a);
    2512            1 :         let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2513            1 :         assert_eq!(
    2514              :             optimization_a_cleanup,
    2515            1 :             Some(ScheduleOptimization {
    2516            1 :                 sequence: shard_a.sequence,
    2517            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
    2518            1 :             })
    2519              :         );
    2520            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
    2521              : 
    2522            1 :         shard_a.intent.clear(&mut scheduler);
    2523              : 
    2524            1 :         Ok(())
    2525            1 :     }
    2526              : 
    2527              :     #[test]
    2528              :     /// Check that multi-step migration works when moving to somewhere that is only better by
    2529              :     /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
    2530              :     /// counting toward the affinity score such that it prevents the rest of the migration from happening.
    2531            1 :     fn optimize_attachment_marginal() -> anyhow::Result<()> {
    2532            1 :         let nodes = make_test_nodes(2, &[]);
    2533            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2534              : 
    2535              :         // Multi-sharded tenant, we will craft a situation where affinity
    2536              :         // scores differ only slightly
    2537            1 :         let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
    2538              : 
    2539              :         // 1 attached on node 1
    2540            1 :         shards[0]
    2541            1 :             .intent
    2542            1 :             .set_attached(&mut scheduler, Some(NodeId(1)));
    2543              :         // 3 attached on node 2
    2544            1 :         shards[1]
    2545            1 :             .intent
    2546            1 :             .set_attached(&mut scheduler, Some(NodeId(2)));
    2547            1 :         shards[2]
    2548            1 :             .intent
    2549            1 :             .set_attached(&mut scheduler, Some(NodeId(2)));
    2550            1 :         shards[3]
    2551            1 :             .intent
    2552            1 :             .set_attached(&mut scheduler, Some(NodeId(2)));
    2553              : 
    2554              :         // The scheduler should figure out that we need to:
    2555              :         // - Create a secondary for shard 3 on node 1
    2556              :         // - Migrate shard 3 to node 1
    2557              :         // - Remove shard 3's location on node 2
    2558              : 
    2559            4 :         fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
    2560            4 :             let mut schedule_context = ScheduleContext::default();
    2561           20 :             for shard in shards {
    2562           16 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    2563           16 :             }
    2564            4 :             schedule_context
    2565            4 :         }
    2566              : 
    2567            1 :         let schedule_context = make_schedule_context(&shards);
    2568            1 :         let optimization_a_prepare =
    2569            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context);
    2570            1 :         assert_eq!(
    2571              :             optimization_a_prepare,
    2572            1 :             Some(ScheduleOptimization {
    2573            1 :                 sequence: shards[1].sequence,
    2574            1 :                 action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
    2575            1 :             })
    2576              :         );
    2577            1 :         shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
    2578              : 
    2579            1 :         let schedule_context = make_schedule_context(&shards);
    2580            1 :         let optimization_a_migrate =
    2581            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context);
    2582            1 :         assert_eq!(
    2583              :             optimization_a_migrate,
    2584            1 :             Some(ScheduleOptimization {
    2585            1 :                 sequence: shards[1].sequence,
    2586            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2587            1 :                     old_attached_node_id: NodeId(2),
    2588            1 :                     new_attached_node_id: NodeId(1)
    2589            1 :                 })
    2590            1 :             })
    2591              :         );
    2592            1 :         shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
    2593              : 
    2594            1 :         let schedule_context = make_schedule_context(&shards);
    2595            1 :         let optimization_a_cleanup =
    2596            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context);
    2597            1 :         assert_eq!(
    2598              :             optimization_a_cleanup,
    2599            1 :             Some(ScheduleOptimization {
    2600            1 :                 sequence: shards[1].sequence,
    2601            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
    2602            1 :             })
    2603              :         );
    2604            1 :         shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
    2605              : 
    2606              :         // Everything should be stable now
    2607            1 :         let schedule_context = make_schedule_context(&shards);
    2608            1 :         assert_eq!(
    2609            1 :             shards[0].optimize_attachment(&mut scheduler, &schedule_context),
    2610              :             None
    2611              :         );
    2612            1 :         assert_eq!(
    2613            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context),
    2614              :             None
    2615              :         );
    2616            1 :         assert_eq!(
    2617            1 :             shards[2].optimize_attachment(&mut scheduler, &schedule_context),
    2618              :             None
    2619              :         );
    2620            1 :         assert_eq!(
    2621            1 :             shards[3].optimize_attachment(&mut scheduler, &schedule_context),
    2622              :             None
    2623              :         );
    2624              : 
    2625            5 :         for mut shard in shards {
    2626            4 :             shard.intent.clear(&mut scheduler);
    2627            4 :         }
    2628              : 
    2629            1 :         Ok(())
    2630            1 :     }
    2631              : 
    2632              :     #[test]
    2633            1 :     fn optimize_secondary() -> anyhow::Result<()> {
    2634            1 :         let nodes = make_test_nodes(4, &[]);
    2635            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2636              : 
    2637            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2638            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2639              : 
    2640              :         // Initially: both nodes attached on shard 1, and both have secondary locations
    2641              :         // on different nodes.
    2642            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    2643            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    2644            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    2645            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    2646              : 
    2647            1 :         let mut schedule_context = ScheduleContext::default();
    2648            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    2649            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    2650              : 
    2651            1 :         let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
    2652              : 
    2653              :         // Since there is a node with no locations available, the node with two locations for the
    2654              :         // same tenant should generate an optimization to move one away
    2655            1 :         assert_eq!(
    2656              :             optimization_a,
    2657            1 :             Some(ScheduleOptimization {
    2658            1 :                 sequence: shard_a.sequence,
    2659            1 :                 action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    2660            1 :                     old_node_id: NodeId(3),
    2661            1 :                     new_node_id: NodeId(4)
    2662            1 :                 })
    2663            1 :             })
    2664              :         );
    2665              : 
    2666            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    2667            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    2668            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
    2669              : 
    2670            1 :         shard_a.intent.clear(&mut scheduler);
    2671            1 :         shard_b.intent.clear(&mut scheduler);
    2672              : 
    2673            1 :         Ok(())
    2674            1 :     }
    2675              : 
    2676              :     /// Test how the optimisation code behaves with an extra secondary
    2677              :     #[test]
    2678            1 :     fn optimize_removes_secondary() -> anyhow::Result<()> {
    2679            1 :         let az_a_tag = AvailabilityZone("az-a".to_string());
    2680            1 :         let az_b_tag = AvailabilityZone("az-b".to_string());
    2681            1 :         let mut nodes = make_test_nodes(
    2682              :             4,
    2683            1 :             &[
    2684            1 :                 az_a_tag.clone(),
    2685            1 :                 az_b_tag.clone(),
    2686            1 :                 az_a_tag.clone(),
    2687            1 :                 az_b_tag.clone(),
    2688            1 :             ],
    2689              :         );
    2690            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2691              : 
    2692            1 :         let mut schedule_context = ScheduleContext::default();
    2693              : 
    2694            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2695            1 :         shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
    2696            1 :         shard_a
    2697            1 :             .schedule(&mut scheduler, &mut schedule_context)
    2698            1 :             .unwrap();
    2699              : 
    2700              :         // Attached on node 1, secondary on node 2
    2701            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    2702            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
    2703              : 
    2704              :         // Initially optimiser is idle
    2705            1 :         assert_eq!(
    2706            1 :             shard_a.optimize_attachment(&mut scheduler, &schedule_context),
    2707              :             None
    2708              :         );
    2709            1 :         assert_eq!(
    2710            1 :             shard_a.optimize_secondary(&mut scheduler, &schedule_context),
    2711              :             None
    2712              :         );
    2713              : 
    2714              :         // A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
    2715              :         // to our new location
    2716            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    2717            1 :         let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2718            1 :         assert_eq!(
    2719              :             optimization,
    2720            1 :             Some(ScheduleOptimization {
    2721            1 :                 sequence: shard_a.sequence,
    2722            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
    2723            1 :             })
    2724              :         );
    2725            1 :         shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
    2726              : 
    2727              :         // A spare secondary in the non-home AZ, and one of them is offline
    2728            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
    2729            1 :         nodes
    2730            1 :             .get_mut(&NodeId(4))
    2731            1 :             .unwrap()
    2732            1 :             .set_availability(NodeAvailability::Offline);
    2733            1 :         scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
    2734            1 :         let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2735            1 :         assert_eq!(
    2736              :             optimization,
    2737            1 :             Some(ScheduleOptimization {
    2738            1 :                 sequence: shard_a.sequence,
    2739            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
    2740            1 :             })
    2741              :         );
    2742            1 :         shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
    2743              : 
    2744              :         // A spare secondary when should have none
    2745            1 :         shard_a.policy = PlacementPolicy::Attached(0);
    2746            1 :         let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2747            1 :         assert_eq!(
    2748              :             optimization,
    2749            1 :             Some(ScheduleOptimization {
    2750            1 :                 sequence: shard_a.sequence,
    2751            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
    2752            1 :             })
    2753              :         );
    2754            1 :         shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
    2755            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    2756            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![]);
    2757              : 
    2758              :         // Check that in secondary mode, we preserve the secondary in the preferred AZ
    2759            1 :         let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
    2760            1 :         shard_a.policy = PlacementPolicy::Secondary;
    2761            1 :         shard_a
    2762            1 :             .schedule(&mut scheduler, &mut schedule_context)
    2763            1 :             .unwrap();
    2764            1 :         assert_eq!(shard_a.intent.get_attached(), &None);
    2765            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
    2766            1 :         assert_eq!(
    2767            1 :             shard_a.optimize_attachment(&mut scheduler, &schedule_context),
    2768              :             None
    2769              :         );
    2770            1 :         assert_eq!(
    2771            1 :             shard_a.optimize_secondary(&mut scheduler, &schedule_context),
    2772              :             None
    2773              :         );
    2774              : 
    2775            1 :         shard_a.intent.clear(&mut scheduler);
    2776              : 
    2777            1 :         Ok(())
    2778            1 :     }
    2779              : 
    2780              :     // Optimize til quiescent: this emulates what Service::optimize_all does, when
    2781              :     // called repeatedly in the background.
    2782              :     // Returns the applied optimizations
    2783            3 :     fn optimize_til_idle(
    2784            3 :         scheduler: &mut Scheduler,
    2785            3 :         shards: &mut [TenantShard],
    2786            3 :     ) -> Vec<ScheduleOptimization> {
    2787            3 :         let mut loop_n = 0;
    2788            3 :         let mut optimizations = Vec::default();
    2789              :         loop {
    2790            6 :             let mut schedule_context = ScheduleContext::default();
    2791            6 :             let mut any_changed = false;
    2792              : 
    2793           24 :             for shard in shards.iter() {
    2794           24 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    2795           24 :             }
    2796              : 
    2797           15 :             for shard in shards.iter_mut() {
    2798           15 :                 let optimization = shard.optimize_attachment(scheduler, &schedule_context);
    2799           15 :                 tracing::info!(
    2800            0 :                     "optimize_attachment({})={:?}",
    2801              :                     shard.tenant_shard_id,
    2802              :                     optimization
    2803              :                 );
    2804           15 :                 if let Some(optimization) = optimization {
    2805              :                     // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
    2806            3 :                     assert!(shard.maybe_optimizable(scheduler, &schedule_context));
    2807            3 :                     optimizations.push(optimization.clone());
    2808            3 :                     shard.apply_optimization(scheduler, optimization);
    2809            3 :                     any_changed = true;
    2810            3 :                     break;
    2811           12 :                 }
    2812              : 
    2813           12 :                 let optimization = shard.optimize_secondary(scheduler, &schedule_context);
    2814           12 :                 tracing::info!(
    2815            0 :                     "optimize_secondary({})={:?}",
    2816              :                     shard.tenant_shard_id,
    2817              :                     optimization
    2818              :                 );
    2819           12 :                 if let Some(optimization) = optimization {
    2820              :                     // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
    2821            0 :                     assert!(shard.maybe_optimizable(scheduler, &schedule_context));
    2822              : 
    2823            0 :                     optimizations.push(optimization.clone());
    2824            0 :                     shard.apply_optimization(scheduler, optimization);
    2825            0 :                     any_changed = true;
    2826            0 :                     break;
    2827           12 :                 }
    2828              :             }
    2829              : 
    2830            6 :             if !any_changed {
    2831            3 :                 break;
    2832            3 :             }
    2833              : 
    2834              :             // Assert no infinite loop
    2835            3 :             loop_n += 1;
    2836            3 :             assert!(loop_n < 1000);
    2837              :         }
    2838              : 
    2839            3 :         optimizations
    2840            3 :     }
    2841              : 
    2842              :     /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
    2843              :     /// that it converges.
    2844              :     #[test]
    2845            1 :     fn optimize_add_nodes() -> anyhow::Result<()> {
    2846            1 :         let nodes = make_test_nodes(
    2847              :             9,
    2848            1 :             &[
    2849            1 :                 // Initial 6 nodes
    2850            1 :                 AvailabilityZone("az-a".to_string()),
    2851            1 :                 AvailabilityZone("az-a".to_string()),
    2852            1 :                 AvailabilityZone("az-b".to_string()),
    2853            1 :                 AvailabilityZone("az-b".to_string()),
    2854            1 :                 AvailabilityZone("az-c".to_string()),
    2855            1 :                 AvailabilityZone("az-c".to_string()),
    2856            1 :                 // Three we will add later
    2857            1 :                 AvailabilityZone("az-a".to_string()),
    2858            1 :                 AvailabilityZone("az-b".to_string()),
    2859            1 :                 AvailabilityZone("az-c".to_string()),
    2860            1 :             ],
    2861              :         );
    2862              : 
    2863              :         // Only show the scheduler two nodes in each AZ to start with
    2864            1 :         let mut scheduler = Scheduler::new([].iter());
    2865            7 :         for i in 1..=6 {
    2866            6 :             scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
    2867            6 :         }
    2868              : 
    2869            1 :         let mut shards = make_test_tenant(
    2870            1 :             PlacementPolicy::Attached(1),
    2871            1 :             ShardCount::new(4),
    2872            1 :             Some(AvailabilityZone("az-a".to_string())),
    2873              :         );
    2874            1 :         let mut schedule_context = ScheduleContext::default();
    2875            5 :         for shard in &mut shards {
    2876            4 :             assert!(
    2877            4 :                 shard
    2878            4 :                     .schedule(&mut scheduler, &mut schedule_context)
    2879            4 :                     .is_ok()
    2880              :             );
    2881              :         }
    2882              : 
    2883              :         // Initial: attached locations land in the tenant's home AZ.
    2884            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
    2885            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
    2886            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    2887            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
    2888              : 
    2889              :         // Initial: secondary locations in a remote AZ
    2890            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
    2891            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
    2892            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
    2893            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
    2894            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
    2895            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
    2896            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
    2897            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
    2898              : 
    2899              :         // Add another three nodes: we should see the shards spread out when their optimize
    2900              :         // methods are called
    2901            1 :         scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
    2902            1 :         scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
    2903            1 :         scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
    2904            1 :         optimize_til_idle(&mut scheduler, &mut shards);
    2905              : 
    2906              :         // We expect one attached location was moved to the new node in the tenant's home AZ
    2907            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
    2908            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
    2909              :         // The original node has one less attached shard
    2910            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
    2911            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
    2912              : 
    2913              :         // One of the original nodes still has two attachments, since there are an odd number of nodes
    2914            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    2915            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
    2916              : 
    2917              :         // None of our secondaries moved, since we already had enough nodes for those to be
    2918              :         // scheduled perfectly
    2919            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
    2920            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
    2921            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
    2922            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
    2923            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
    2924            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
    2925            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
    2926            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
    2927              : 
    2928            4 :         for shard in shards.iter_mut() {
    2929            4 :             shard.intent.clear(&mut scheduler);
    2930            4 :         }
    2931              : 
    2932            1 :         Ok(())
    2933            1 :     }
    2934              : 
    2935              :     /// Test that initial shard scheduling is optimal. By optimal we mean
    2936              :     /// that the optimizer cannot find a way to improve it.
    2937              :     ///
    2938              :     /// This test is an example of the scheduling issue described in
    2939              :     /// https://github.com/neondatabase/neon/issues/8969
    2940              :     #[test]
    2941            1 :     fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
    2942              :         use itertools::Itertools;
    2943              : 
    2944            1 :         let nodes = make_test_nodes(2, &[]);
    2945              : 
    2946            1 :         let mut scheduler = Scheduler::new([].iter());
    2947            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    2948            1 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    2949              : 
    2950            1 :         let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
    2951            1 :         let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
    2952              : 
    2953            1 :         let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
    2954            1 :         let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
    2955              : 
    2956            4 :         let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
    2957            4 :         let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
    2958              : 
    2959            1 :         let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
    2960              : 
    2961            9 :         for (shard, context) in schedule_order {
    2962            8 :             let context = &mut *context.borrow_mut();
    2963            8 :             shard.schedule(&mut scheduler, context).unwrap();
    2964            8 :         }
    2965              : 
    2966            1 :         let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
    2967            1 :         assert_eq!(applied_to_a, vec![]);
    2968              : 
    2969            1 :         let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
    2970            1 :         assert_eq!(applied_to_b, vec![]);
    2971              : 
    2972            8 :         for shard in a.iter_mut().chain(b.iter_mut()) {
    2973            8 :             shard.intent.clear(&mut scheduler);
    2974            8 :         }
    2975              : 
    2976            1 :         Ok(())
    2977            1 :     }
    2978              : 
    2979              :     #[test]
    2980            1 :     fn random_az_shard_scheduling() -> anyhow::Result<()> {
    2981              :         use rand::seq::SliceRandom;
    2982              : 
    2983           51 :         for seed in 0..50 {
    2984           50 :             eprintln!("Running test with seed {seed}");
    2985           50 :             let mut rng = StdRng::seed_from_u64(seed);
    2986              : 
    2987           50 :             let az_a_tag = AvailabilityZone("az-a".to_string());
    2988           50 :             let az_b_tag = AvailabilityZone("az-b".to_string());
    2989           50 :             let azs = [az_a_tag, az_b_tag];
    2990           50 :             let nodes = make_test_nodes(4, &azs);
    2991           50 :             let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
    2992              : 
    2993           50 :             let mut scheduler = Scheduler::new([].iter());
    2994          200 :             for node in nodes.values() {
    2995          200 :                 scheduler.node_upsert(node);
    2996          200 :             }
    2997              : 
    2998           50 :             let mut shards = Vec::default();
    2999           50 :             let mut contexts = Vec::default();
    3000           50 :             let mut az_picker = azs.iter().cycle().cloned();
    3001         5050 :             for i in 0..100 {
    3002         5000 :                 let az = az_picker.next().unwrap();
    3003         5000 :                 let shard_count = i % 4 + 1;
    3004         5000 :                 *shards_per_az.entry(az.clone()).or_default() += shard_count;
    3005              : 
    3006         5000 :                 let tenant_shards = make_test_tenant(
    3007         5000 :                     PlacementPolicy::Attached(1),
    3008         5000 :                     ShardCount::new(shard_count.try_into().unwrap()),
    3009         5000 :                     Some(az),
    3010              :                 );
    3011         5000 :                 let context = Rc::new(RefCell::new(ScheduleContext::default()));
    3012              : 
    3013         5000 :                 contexts.push(context.clone());
    3014         5000 :                 let with_ctx = tenant_shards
    3015         5000 :                     .into_iter()
    3016        12500 :                     .map(|shard| (shard, context.clone()));
    3017        17500 :                 for shard_with_ctx in with_ctx {
    3018        12500 :                     shards.push(shard_with_ctx);
    3019        12500 :                 }
    3020              :             }
    3021              : 
    3022           50 :             shards.shuffle(&mut rng);
    3023              : 
    3024              :             #[derive(Default, Debug)]
    3025              :             struct NodeStats {
    3026              :                 attachments: u32,
    3027              :                 secondaries: u32,
    3028              :             }
    3029              : 
    3030           50 :             let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
    3031           50 :             let mut attachments_in_wrong_az = 0;
    3032           50 :             let mut secondaries_in_wrong_az = 0;
    3033              : 
    3034        12550 :             for (shard, context) in &mut shards {
    3035        12500 :                 let context = &mut *context.borrow_mut();
    3036        12500 :                 shard.schedule(&mut scheduler, context).unwrap();
    3037              : 
    3038        12500 :                 let attached_node = shard.intent.get_attached().unwrap();
    3039        12500 :                 let stats = node_stats.entry(attached_node).or_default();
    3040        12500 :                 stats.attachments += 1;
    3041              : 
    3042        12500 :                 let secondary_node = *shard.intent.get_secondary().first().unwrap();
    3043        12500 :                 let stats = node_stats.entry(secondary_node).or_default();
    3044        12500 :                 stats.secondaries += 1;
    3045              : 
    3046        12500 :                 let attached_node_az = nodes
    3047        12500 :                     .get(&attached_node)
    3048        12500 :                     .unwrap()
    3049        12500 :                     .get_availability_zone_id();
    3050        12500 :                 let secondary_node_az = nodes
    3051        12500 :                     .get(&secondary_node)
    3052        12500 :                     .unwrap()
    3053        12500 :                     .get_availability_zone_id();
    3054        12500 :                 let preferred_az = shard.preferred_az().unwrap();
    3055              : 
    3056        12500 :                 if attached_node_az != preferred_az {
    3057            0 :                     eprintln!(
    3058            0 :                         "{} attachment was scheduled in AZ {} but preferred AZ {}",
    3059            0 :                         shard.tenant_shard_id, attached_node_az, preferred_az
    3060            0 :                     );
    3061            0 :                     attachments_in_wrong_az += 1;
    3062        12500 :                 }
    3063              : 
    3064        12500 :                 if secondary_node_az == preferred_az {
    3065            0 :                     eprintln!(
    3066            0 :                         "{} secondary was scheduled in AZ {} which matches preference",
    3067            0 :                         shard.tenant_shard_id, attached_node_az
    3068            0 :                     );
    3069            0 :                     secondaries_in_wrong_az += 1;
    3070        12500 :                 }
    3071              :             }
    3072              : 
    3073           50 :             let mut violations = Vec::default();
    3074              : 
    3075           50 :             if attachments_in_wrong_az > 0 {
    3076            0 :                 violations.push(format!(
    3077            0 :                     "{attachments_in_wrong_az} attachments scheduled to the incorrect AZ"
    3078            0 :                 ));
    3079           50 :             }
    3080              : 
    3081           50 :             if secondaries_in_wrong_az > 0 {
    3082            0 :                 violations.push(format!(
    3083            0 :                     "{secondaries_in_wrong_az} secondaries scheduled to the incorrect AZ"
    3084            0 :                 ));
    3085           50 :             }
    3086              : 
    3087           50 :             eprintln!(
    3088           50 :                 "attachments_in_wrong_az={attachments_in_wrong_az} secondaries_in_wrong_az={secondaries_in_wrong_az}"
    3089              :             );
    3090              : 
    3091          250 :             for (node_id, stats) in &node_stats {
    3092          200 :                 let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
    3093          200 :                 let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
    3094          200 :                 let allowed_attachment_load =
    3095          200 :                     (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
    3096              : 
    3097          200 :                 if !allowed_attachment_load.contains(&stats.attachments) {
    3098            0 :                     violations.push(format!(
    3099            0 :                         "Found {} attachments on node {}, but expected {}",
    3100            0 :                         stats.attachments, node_id, ideal_attachment_load
    3101            0 :                     ));
    3102          200 :                 }
    3103              : 
    3104          200 :                 eprintln!(
    3105          200 :                     "{}: attachments={} secondaries={} ideal_attachment_load={}",
    3106              :                     node_id, stats.attachments, stats.secondaries, ideal_attachment_load
    3107              :                 );
    3108              :             }
    3109              : 
    3110           50 :             assert!(violations.is_empty(), "{violations:?}");
    3111              : 
    3112        12550 :             for (mut shard, _ctx) in shards {
    3113        12500 :                 shard.intent.clear(&mut scheduler);
    3114        12500 :             }
    3115              :         }
    3116            1 :         Ok(())
    3117            1 :     }
    3118              : 
    3119              :     /// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
    3120              :     #[test]
    3121            1 :     fn tenant_secondary_scheduling() -> anyhow::Result<()> {
    3122            1 :         let az_a = AvailabilityZone("az-a".to_string());
    3123            1 :         let nodes = make_test_nodes(
    3124              :             3,
    3125            1 :             &[
    3126            1 :                 az_a.clone(),
    3127            1 :                 AvailabilityZone("az-b".to_string()),
    3128            1 :                 AvailabilityZone("az-c".to_string()),
    3129            1 :             ],
    3130              :         );
    3131              : 
    3132            1 :         let mut scheduler = Scheduler::new(nodes.values());
    3133            1 :         let mut context = ScheduleContext::default();
    3134              : 
    3135            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
    3136            1 :         tenant_shard.intent.preferred_az_id = Some(az_a.clone());
    3137            1 :         tenant_shard
    3138            1 :             .schedule(&mut scheduler, &mut context)
    3139            1 :             .expect("we have enough nodes, scheduling should work");
    3140            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    3141            1 :         assert!(tenant_shard.intent.attached.is_none());
    3142              : 
    3143              :         // Should have scheduled into the preferred AZ
    3144            1 :         assert_eq!(
    3145            1 :             scheduler
    3146            1 :                 .get_node_az(&tenant_shard.intent.secondary[0])
    3147            1 :                 .as_ref(),
    3148            1 :             tenant_shard.preferred_az()
    3149              :         );
    3150              : 
    3151              :         // Optimizer should agree
    3152            1 :         assert_eq!(
    3153            1 :             tenant_shard.optimize_attachment(&mut scheduler, &context),
    3154              :             None
    3155              :         );
    3156            1 :         assert_eq!(
    3157            1 :             tenant_shard.optimize_secondary(&mut scheduler, &context),
    3158              :             None
    3159              :         );
    3160              : 
    3161              :         // Switch to PlacementPolicy::Attached
    3162            1 :         tenant_shard.policy = PlacementPolicy::Attached(1);
    3163            1 :         tenant_shard
    3164            1 :             .schedule(&mut scheduler, &mut context)
    3165            1 :             .expect("we have enough nodes, scheduling should work");
    3166            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    3167            1 :         assert!(tenant_shard.intent.attached.is_some());
    3168              :         // Secondary should now be in non-preferred AZ
    3169            1 :         assert_ne!(
    3170            1 :             scheduler
    3171            1 :                 .get_node_az(&tenant_shard.intent.secondary[0])
    3172            1 :                 .as_ref(),
    3173            1 :             tenant_shard.preferred_az()
    3174              :         );
    3175              :         // Attached should be in preferred AZ
    3176            1 :         assert_eq!(
    3177            1 :             scheduler
    3178            1 :                 .get_node_az(&tenant_shard.intent.attached.unwrap())
    3179            1 :                 .as_ref(),
    3180            1 :             tenant_shard.preferred_az()
    3181              :         );
    3182              : 
    3183              :         // Optimizer should agree
    3184            1 :         assert_eq!(
    3185            1 :             tenant_shard.optimize_attachment(&mut scheduler, &context),
    3186              :             None
    3187              :         );
    3188            1 :         assert_eq!(
    3189            1 :             tenant_shard.optimize_secondary(&mut scheduler, &context),
    3190              :             None
    3191              :         );
    3192              : 
    3193              :         // Switch back to PlacementPolicy::Secondary
    3194            1 :         tenant_shard.policy = PlacementPolicy::Secondary;
    3195            1 :         tenant_shard
    3196            1 :             .schedule(&mut scheduler, &mut context)
    3197            1 :             .expect("we have enough nodes, scheduling should work");
    3198            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    3199            1 :         assert!(tenant_shard.intent.attached.is_none());
    3200              :         // When we picked a location to keep, we should have kept the one in the preferred AZ
    3201            1 :         assert_eq!(
    3202            1 :             scheduler
    3203            1 :                 .get_node_az(&tenant_shard.intent.secondary[0])
    3204            1 :                 .as_ref(),
    3205            1 :             tenant_shard.preferred_az()
    3206              :         );
    3207              : 
    3208              :         // Optimizer should agree
    3209            1 :         assert_eq!(
    3210            1 :             tenant_shard.optimize_attachment(&mut scheduler, &context),
    3211              :             None
    3212              :         );
    3213            1 :         assert_eq!(
    3214            1 :             tenant_shard.optimize_secondary(&mut scheduler, &context),
    3215              :             None
    3216              :         );
    3217              : 
    3218            1 :         tenant_shard.intent.clear(&mut scheduler);
    3219              : 
    3220            1 :         Ok(())
    3221            1 :     }
    3222              : }
        

Generated by: LCOV version 2.1-beta