LCOV - code coverage report
Current view: top level - storage_controller/src - tenant_shard.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 66.3 % 1598 1060
Test Date: 2025-01-30 15:18:43 Functions: 44.3 % 131 58

            Line data    Source code
       1              : use std::{
       2              :     collections::{HashMap, HashSet},
       3              :     sync::Arc,
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use crate::{
       8              :     metrics::{
       9              :         self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
      10              :     },
      11              :     persistence::TenantShardPersistence,
      12              :     reconciler::{ReconcileUnits, ReconcilerConfig},
      13              :     scheduler::{
      14              :         AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
      15              :         RefCountUpdate, ScheduleContext, SecondaryShardTag, ShardTag,
      16              :     },
      17              :     service::ReconcileResultRequest,
      18              : };
      19              : use futures::future::{self, Either};
      20              : use itertools::Itertools;
      21              : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
      22              : use pageserver_api::{
      23              :     models::{LocationConfig, LocationConfigMode, TenantConfig},
      24              :     shard::{ShardIdentity, TenantShardId},
      25              : };
      26              : use serde::{Deserialize, Serialize};
      27              : use tokio::task::JoinHandle;
      28              : use tokio_util::sync::CancellationToken;
      29              : use tracing::{instrument, Instrument};
      30              : use utils::{
      31              :     generation::Generation,
      32              :     id::NodeId,
      33              :     seqwait::{SeqWait, SeqWaitError},
      34              :     shard::ShardCount,
      35              :     sync::gate::GateGuard,
      36              : };
      37              : 
      38              : use crate::{
      39              :     compute_hook::ComputeHook,
      40              :     node::Node,
      41              :     persistence::{split_state::SplitState, Persistence},
      42              :     reconciler::{
      43              :         attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
      44              :     },
      45              :     scheduler::{ScheduleError, Scheduler},
      46              :     service, Sequence,
      47              : };
      48              : 
      49              : /// Serialization helper
      50            0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
      51            0 : where
      52            0 :     S: serde::ser::Serializer,
      53            0 :     T: std::fmt::Display,
      54            0 : {
      55            0 :     serializer.collect_str(
      56            0 :         &v.lock()
      57            0 :             .unwrap()
      58            0 :             .as_ref()
      59            0 :             .map(|e| format!("{e}"))
      60            0 :             .unwrap_or("".to_string()),
      61            0 :     )
      62            0 : }
      63              : 
      64              : /// In-memory state for a particular tenant shard.
      65              : ///
      66              : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
      67              : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
      68            0 : #[derive(Serialize)]
      69              : pub(crate) struct TenantShard {
      70              :     pub(crate) tenant_shard_id: TenantShardId,
      71              : 
      72              :     pub(crate) shard: ShardIdentity,
      73              : 
      74              :     // Runtime only: sequence used to coordinate when updating this object while
      75              :     // with background reconcilers may be running.  A reconciler runs to a particular
      76              :     // sequence.
      77              :     pub(crate) sequence: Sequence,
      78              : 
      79              :     // Latest generation number: next time we attach, increment this
      80              :     // and use the incremented number when attaching.
      81              :     //
      82              :     // None represents an incompletely onboarded tenant via the [`Service::location_config`]
      83              :     // API, where this tenant may only run in PlacementPolicy::Secondary.
      84              :     pub(crate) generation: Option<Generation>,
      85              : 
      86              :     // High level description of how the tenant should be set up.  Provided
      87              :     // externally.
      88              :     pub(crate) policy: PlacementPolicy,
      89              : 
      90              :     // Low level description of exactly which pageservers should fulfil
      91              :     // which role.  Generated by `Self::schedule`.
      92              :     pub(crate) intent: IntentState,
      93              : 
      94              :     // Low level description of how the tenant is configured on pageservers:
      95              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      96              :     // with `Self::reconcile`.
      97              :     pub(crate) observed: ObservedState,
      98              : 
      99              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
     100              :     // for all shards in a tenant.
     101              :     pub(crate) config: TenantConfig,
     102              : 
     103              :     /// If a reconcile task is currently in flight, it may be joined here (it is
     104              :     /// only safe to join if either the result has been received or the reconciler's
     105              :     /// cancellation token has been fired)
     106              :     #[serde(skip)]
     107              :     pub(crate) reconciler: Option<ReconcilerHandle>,
     108              : 
     109              :     /// If a tenant is being split, then all shards with that TenantId will have a
     110              :     /// SplitState set, this acts as a guard against other operations such as background
     111              :     /// reconciliation, and timeline creation.
     112              :     pub(crate) splitting: SplitState,
     113              : 
     114              :     /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
     115              :     /// is set. This flag is cleared when the tenant is popped off the delay queue.
     116              :     pub(crate) delayed_reconcile: bool,
     117              : 
     118              :     /// Optionally wait for reconciliation to complete up to a particular
     119              :     /// sequence number.
     120              :     #[serde(skip)]
     121              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     122              : 
     123              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
     124              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
     125              :     /// and callers should stop waiting for `waiter` and propagate the error.
     126              :     #[serde(skip)]
     127              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     128              : 
     129              :     /// The most recent error from a reconcile on this tenant.  This is a nested Arc
     130              :     /// because:
     131              :     ///  - ReconcileWaiters need to Arc-clone the overall object to read it later
     132              :     ///  - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
     133              :     ///    many waiters for one shard, and the underlying error types are not Clone.
     134              :     ///
     135              :     /// TODO: generalize to an array of recent events
     136              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
     137              :     #[serde(serialize_with = "read_last_error")]
     138              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     139              : 
     140              :     /// If we have a pending compute notification that for some reason we weren't able to send,
     141              :     /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
     142              :     /// and trigger a Reconciler run.  This is the mechanism by which compute notifications are included in the scope
     143              :     /// of state that we publish externally in an eventually consistent way.
     144              :     pub(crate) pending_compute_notification: bool,
     145              : 
     146              :     // Support/debug tool: if something is going wrong or flapping with scheduling, this may
     147              :     // be set to a non-active state to avoid making changes while the issue is fixed.
     148              :     scheduling_policy: ShardSchedulingPolicy,
     149              : }
     150              : 
     151              : #[derive(Clone, Debug, Serialize)]
     152              : pub(crate) struct IntentState {
     153              :     attached: Option<NodeId>,
     154              :     secondary: Vec<NodeId>,
     155              : 
     156              :     // We should attempt to schedule this shard in the provided AZ to
     157              :     // decrease chances of cross-AZ compute.
     158              :     preferred_az_id: Option<AvailabilityZone>,
     159              : }
     160              : 
     161              : impl IntentState {
     162        12856 :     pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
     163        12856 :         Self {
     164        12856 :             attached: None,
     165        12856 :             secondary: vec![],
     166        12856 :             preferred_az_id,
     167        12856 :         }
     168        12856 :     }
     169            0 :     pub(crate) fn single(
     170            0 :         scheduler: &mut Scheduler,
     171            0 :         node_id: Option<NodeId>,
     172            0 :         preferred_az_id: Option<AvailabilityZone>,
     173            0 :     ) -> Self {
     174            0 :         if let Some(node_id) = node_id {
     175            0 :             scheduler.update_node_ref_counts(
     176            0 :                 node_id,
     177            0 :                 preferred_az_id.as_ref(),
     178            0 :                 RefCountUpdate::Attach,
     179            0 :             );
     180            0 :         }
     181            0 :         Self {
     182            0 :             attached: node_id,
     183            0 :             secondary: vec![],
     184            0 :             preferred_az_id,
     185            0 :         }
     186            0 :     }
     187              : 
     188        12856 :     pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
     189        12856 :         if self.attached != new_attached {
     190        12856 :             if let Some(old_attached) = self.attached.take() {
     191            0 :                 scheduler.update_node_ref_counts(
     192            0 :                     old_attached,
     193            0 :                     self.preferred_az_id.as_ref(),
     194            0 :                     RefCountUpdate::Detach,
     195            0 :                 );
     196        12856 :             }
     197        12856 :             if let Some(new_attached) = &new_attached {
     198        12856 :                 scheduler.update_node_ref_counts(
     199        12856 :                     *new_attached,
     200        12856 :                     self.preferred_az_id.as_ref(),
     201        12856 :                     RefCountUpdate::Attach,
     202        12856 :                 );
     203        12856 :             }
     204        12856 :             self.attached = new_attached;
     205            0 :         }
     206              : 
     207        12856 :         if let Some(new_attached) = &new_attached {
     208        12856 :             assert!(!self.secondary.contains(new_attached));
     209            0 :         }
     210        12856 :     }
     211              : 
     212              :     /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
     213              :     /// secondary to attached while maintaining the scheduler's reference counts.
     214            5 :     pub(crate) fn promote_attached(
     215            5 :         &mut self,
     216            5 :         scheduler: &mut Scheduler,
     217            5 :         promote_secondary: NodeId,
     218            5 :     ) {
     219            5 :         // If we call this with a node that isn't in secondary, it would cause incorrect
     220            5 :         // scheduler reference counting, since we assume the node is already referenced as a secondary.
     221            5 :         debug_assert!(self.secondary.contains(&promote_secondary));
     222              : 
     223           12 :         self.secondary.retain(|n| n != &promote_secondary);
     224            5 : 
     225            5 :         let demoted = self.attached;
     226            5 :         self.attached = Some(promote_secondary);
     227            5 : 
     228            5 :         scheduler.update_node_ref_counts(
     229            5 :             promote_secondary,
     230            5 :             self.preferred_az_id.as_ref(),
     231            5 :             RefCountUpdate::PromoteSecondary,
     232            5 :         );
     233            5 :         if let Some(demoted) = demoted {
     234            0 :             scheduler.update_node_ref_counts(
     235            0 :                 demoted,
     236            0 :                 self.preferred_az_id.as_ref(),
     237            0 :                 RefCountUpdate::DemoteAttached,
     238            0 :             );
     239            5 :         }
     240            5 :     }
     241              : 
     242        12840 :     pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
     243        12840 :         assert!(!self.secondary.contains(&new_secondary));
     244        12840 :         assert!(self.attached != Some(new_secondary));
     245        12840 :         scheduler.update_node_ref_counts(
     246        12840 :             new_secondary,
     247        12840 :             self.preferred_az_id.as_ref(),
     248        12840 :             RefCountUpdate::AddSecondary,
     249        12840 :         );
     250        12840 :         self.secondary.push(new_secondary);
     251        12840 :     }
     252              : 
     253              :     /// It is legal to call this with a node that is not currently a secondary: that is a no-op
     254            4 :     pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
     255            5 :         let index = self.secondary.iter().position(|n| *n == node_id);
     256            4 :         if let Some(index) = index {
     257            4 :             scheduler.update_node_ref_counts(
     258            4 :                 node_id,
     259            4 :                 self.preferred_az_id.as_ref(),
     260            4 :                 RefCountUpdate::RemoveSecondary,
     261            4 :             );
     262            4 :             self.secondary.remove(index);
     263            4 :         }
     264            4 :     }
     265              : 
     266        12855 :     pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
     267        12855 :         for secondary in self.secondary.drain(..) {
     268        12836 :             scheduler.update_node_ref_counts(
     269        12836 :                 secondary,
     270        12836 :                 self.preferred_az_id.as_ref(),
     271        12836 :                 RefCountUpdate::RemoveSecondary,
     272        12836 :             );
     273        12836 :         }
     274        12855 :     }
     275              : 
     276              :     /// Remove the last secondary node from the list of secondaries
     277            0 :     pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
     278            0 :         if let Some(node_id) = self.secondary.pop() {
     279            0 :             scheduler.update_node_ref_counts(
     280            0 :                 node_id,
     281            0 :                 self.preferred_az_id.as_ref(),
     282            0 :                 RefCountUpdate::RemoveSecondary,
     283            0 :             );
     284            0 :         }
     285            0 :     }
     286              : 
     287        12855 :     pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
     288        12855 :         if let Some(old_attached) = self.attached.take() {
     289        12855 :             scheduler.update_node_ref_counts(
     290        12855 :                 old_attached,
     291        12855 :                 self.preferred_az_id.as_ref(),
     292        12855 :                 RefCountUpdate::Detach,
     293        12855 :             );
     294        12855 :         }
     295              : 
     296        12855 :         self.clear_secondary(scheduler);
     297        12855 :     }
     298              : 
     299        12892 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     300        12892 :         let mut result = Vec::new();
     301        12892 :         if let Some(p) = self.attached {
     302        12890 :             result.push(p)
     303            2 :         }
     304              : 
     305        12892 :         result.extend(self.secondary.iter().copied());
     306        12892 : 
     307        12892 :         result
     308        12892 :     }
     309              : 
     310        12594 :     pub(crate) fn get_attached(&self) -> &Option<NodeId> {
     311        12594 :         &self.attached
     312        12594 :     }
     313              : 
     314        12656 :     pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
     315        12656 :         &self.secondary
     316        12656 :     }
     317              : 
     318              :     /// If the node is in use as the attached location, demote it into
     319              :     /// the list of secondary locations.  This is used when a node goes offline,
     320              :     /// and we want to use a different node for attachment, but not permanently
     321              :     /// forget the location on the offline node.
     322              :     ///
     323              :     /// Returns true if a change was made
     324            5 :     pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
     325            5 :         if self.attached == Some(node_id) {
     326            5 :             self.attached = None;
     327            5 :             self.secondary.push(node_id);
     328            5 :             scheduler.update_node_ref_counts(
     329            5 :                 node_id,
     330            5 :                 self.preferred_az_id.as_ref(),
     331            5 :                 RefCountUpdate::DemoteAttached,
     332            5 :             );
     333            5 :             true
     334              :         } else {
     335            0 :             false
     336              :         }
     337            5 :     }
     338              : }
     339              : 
     340              : impl Drop for IntentState {
     341        12856 :     fn drop(&mut self) {
     342        12856 :         // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
     343        12856 :         // We do not check this while panicking, to avoid polluting unit test failures or
     344        12856 :         // other assertions with this assertion's output.  It's still wrong to leak these,
     345        12856 :         // but if we already have a panic then we don't need to independently flag this case.
     346        12856 :         if !(std::thread::panicking()) {
     347        12856 :             debug_assert!(self.attached.is_none() && self.secondary.is_empty());
     348            0 :         }
     349        12855 :     }
     350              : }
     351              : 
     352            0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
     353              : pub(crate) struct ObservedState {
     354              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
     355              : }
     356              : 
     357              : /// Our latest knowledge of how this tenant is configured in the outside world.
     358              : ///
     359              : /// Meaning:
     360              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
     361              : ///       node for this shard.
     362              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
     363              : ///       what it is (e.g. we failed partway through configuring it)
     364              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     365              : ///       and that configuration will still be present unless something external interfered.
     366            0 : #[derive(Clone, Serialize, Deserialize, Debug)]
     367              : pub(crate) struct ObservedStateLocation {
     368              :     /// If None, it means we do not know the status of this shard's location on this node, but
     369              :     /// we know that we might have some state on this node.
     370              :     pub(crate) conf: Option<LocationConfig>,
     371              : }
     372              : 
     373              : pub(crate) struct ReconcilerWaiter {
     374              :     // For observability purposes, remember the ID of the shard we're
     375              :     // waiting for.
     376              :     pub(crate) tenant_shard_id: TenantShardId,
     377              : 
     378              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     379              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     380              :     error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     381              :     seq: Sequence,
     382              : }
     383              : 
     384              : pub(crate) enum ReconcilerStatus {
     385              :     Done,
     386              :     Failed,
     387              :     InProgress,
     388              : }
     389              : 
     390              : #[derive(thiserror::Error, Debug)]
     391              : pub(crate) enum ReconcileWaitError {
     392              :     #[error("Timeout waiting for shard {0}")]
     393              :     Timeout(TenantShardId),
     394              :     #[error("shutting down")]
     395              :     Shutdown,
     396              :     #[error("Reconcile error on shard {0}: {1}")]
     397              :     Failed(TenantShardId, Arc<ReconcileError>),
     398              : }
     399              : 
     400              : #[derive(Eq, PartialEq, Debug, Clone)]
     401              : pub(crate) struct ReplaceSecondary {
     402              :     old_node_id: NodeId,
     403              :     new_node_id: NodeId,
     404              : }
     405              : 
     406              : #[derive(Eq, PartialEq, Debug, Clone)]
     407              : pub(crate) struct MigrateAttachment {
     408              :     pub(crate) old_attached_node_id: NodeId,
     409              :     pub(crate) new_attached_node_id: NodeId,
     410              : }
     411              : 
     412              : #[derive(Eq, PartialEq, Debug, Clone)]
     413              : pub(crate) enum ScheduleOptimizationAction {
     414              :     // Replace one of our secondary locations with a different node
     415              :     ReplaceSecondary(ReplaceSecondary),
     416              :     // Migrate attachment to an existing secondary location
     417              :     MigrateAttachment(MigrateAttachment),
     418              :     // Create a secondary location, with the intent of later migrating to it
     419              :     CreateSecondary(NodeId),
     420              :     // Remove a secondary location that we previously created to facilitate a migration
     421              :     RemoveSecondary(NodeId),
     422              : }
     423              : 
     424              : #[derive(Eq, PartialEq, Debug, Clone)]
     425              : pub(crate) struct ScheduleOptimization {
     426              :     // What was the reconcile sequence when we generated this optimization?  The optimization
     427              :     // should only be applied if the shard's sequence is still at this value, in case other changes
     428              :     // happened between planning the optimization and applying it.
     429              :     sequence: Sequence,
     430              : 
     431              :     pub(crate) action: ScheduleOptimizationAction,
     432              : }
     433              : 
     434              : impl ReconcilerWaiter {
     435            0 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     436            0 :         tokio::select! {
     437            0 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     438            0 :                 result.map_err(|e| match e {
     439            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     440            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     441            0 :                 })?;
     442              :             },
     443            0 :             result = self.error_seq_wait.wait_for(self.seq) => {
     444            0 :                 result.map_err(|e| match e {
     445            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     446            0 :                     SeqWaitError::Timeout => unreachable!()
     447            0 :                 })?;
     448              : 
     449            0 :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
     450            0 :                     self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
     451              :             }
     452              :         }
     453              : 
     454            0 :         Ok(())
     455            0 :     }
     456              : 
     457            0 :     pub(crate) fn get_status(&self) -> ReconcilerStatus {
     458            0 :         if self.seq_wait.would_wait_for(self.seq).is_ok() {
     459            0 :             ReconcilerStatus::Done
     460            0 :         } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
     461            0 :             ReconcilerStatus::Failed
     462              :         } else {
     463            0 :             ReconcilerStatus::InProgress
     464              :         }
     465            0 :     }
     466              : }
     467              : 
     468              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     469              : /// information to optionally cancel & await it later.
     470              : pub(crate) struct ReconcilerHandle {
     471              :     sequence: Sequence,
     472              :     handle: JoinHandle<()>,
     473              :     cancel: CancellationToken,
     474              : }
     475              : 
     476              : pub(crate) enum ReconcileNeeded {
     477              :     /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
     478              :     /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
     479              :     No,
     480              :     /// shard has a reconciler running, and its intent hasn't changed since that one was
     481              :     /// spawned: wait for the existing reconciler rather than spawning a new one.
     482              :     WaitExisting(ReconcilerWaiter),
     483              :     /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
     484              :     Yes,
     485              : }
     486              : 
     487              : /// Pending modification to the observed state of a tenant shard.
     488              : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
     489              : pub(crate) enum ObservedStateDelta {
     490              :     Upsert(Box<(NodeId, ObservedStateLocation)>),
     491              :     Delete(NodeId),
     492              : }
     493              : 
     494              : impl ObservedStateDelta {
     495            0 :     pub(crate) fn node_id(&self) -> &NodeId {
     496            0 :         match self {
     497            0 :             Self::Upsert(up) => &up.0,
     498            0 :             Self::Delete(nid) => nid,
     499              :         }
     500            0 :     }
     501              : }
     502              : 
     503              : /// When a reconcile task completes, it sends this result object
     504              : /// to be applied to the primary TenantShard.
     505              : pub(crate) struct ReconcileResult {
     506              :     pub(crate) sequence: Sequence,
     507              :     /// On errors, `observed` should be treated as an incompleted description
     508              :     /// of state (i.e. any nodes present in the result should override nodes
     509              :     /// present in the parent tenant state, but any unmentioned nodes should
     510              :     /// not be removed from parent tenant state)
     511              :     pub(crate) result: Result<(), ReconcileError>,
     512              : 
     513              :     pub(crate) tenant_shard_id: TenantShardId,
     514              :     pub(crate) generation: Option<Generation>,
     515              :     pub(crate) observed_deltas: Vec<ObservedStateDelta>,
     516              : 
     517              :     /// Set [`TenantShard::pending_compute_notification`] from this flag
     518              :     pub(crate) pending_compute_notification: bool,
     519              : }
     520              : 
     521              : impl ObservedState {
     522            0 :     pub(crate) fn new() -> Self {
     523            0 :         Self {
     524            0 :             locations: HashMap::new(),
     525            0 :         }
     526            0 :     }
     527              : 
     528            0 :     pub(crate) fn is_empty(&self) -> bool {
     529            0 :         self.locations.is_empty()
     530            0 :     }
     531              : }
     532              : 
     533              : impl TenantShard {
     534        12839 :     pub(crate) fn new(
     535        12839 :         tenant_shard_id: TenantShardId,
     536        12839 :         shard: ShardIdentity,
     537        12839 :         policy: PlacementPolicy,
     538        12839 :         preferred_az_id: Option<AvailabilityZone>,
     539        12839 :     ) -> Self {
     540        12839 :         metrics::METRICS_REGISTRY
     541        12839 :             .metrics_group
     542        12839 :             .storage_controller_tenant_shards
     543        12839 :             .inc();
     544        12839 : 
     545        12839 :         Self {
     546        12839 :             tenant_shard_id,
     547        12839 :             policy,
     548        12839 :             intent: IntentState::new(preferred_az_id),
     549        12839 :             generation: Some(Generation::new(0)),
     550        12839 :             shard,
     551        12839 :             observed: ObservedState::default(),
     552        12839 :             config: TenantConfig::default(),
     553        12839 :             reconciler: None,
     554        12839 :             splitting: SplitState::Idle,
     555        12839 :             sequence: Sequence(1),
     556        12839 :             delayed_reconcile: false,
     557        12839 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     558        12839 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     559        12839 :             last_error: Arc::default(),
     560        12839 :             pending_compute_notification: false,
     561        12839 :             scheduling_policy: ShardSchedulingPolicy::default(),
     562        12839 :         }
     563        12839 :     }
     564              : 
     565              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     566              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     567              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     568              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     569            1 :     pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
     570            1 :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     571            1 :         // generation
     572            1 :         let mut attached_locs = self
     573            1 :             .observed
     574            1 :             .locations
     575            1 :             .iter()
     576            2 :             .filter_map(|(node_id, l)| {
     577            2 :                 if let Some(conf) = &l.conf {
     578            2 :                     if conf.mode == LocationConfigMode::AttachedMulti
     579            1 :                         || conf.mode == LocationConfigMode::AttachedSingle
     580            1 :                         || conf.mode == LocationConfigMode::AttachedStale
     581              :                     {
     582            2 :                         Some((node_id, conf.generation))
     583              :                     } else {
     584            0 :                         None
     585              :                     }
     586              :                 } else {
     587            0 :                     None
     588              :                 }
     589            2 :             })
     590            1 :             .collect::<Vec<_>>();
     591            1 : 
     592            2 :         attached_locs.sort_by_key(|i| i.1);
     593            1 :         if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
     594            1 :             self.intent.set_attached(scheduler, Some(*node_id));
     595            1 :         }
     596              : 
     597              :         // All remaining observed locations generate secondary intents.  This includes None
     598              :         // observations, as these may well have some local content on disk that is usable (this
     599              :         // is an edge case that might occur if we restarted during a migration or other change)
     600              :         //
     601              :         // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
     602              :         // will take care of promoting one of these secondaries to be attached.
     603            2 :         self.observed.locations.keys().for_each(|node_id| {
     604            2 :             if Some(*node_id) != self.intent.attached {
     605            1 :                 self.intent.push_secondary(scheduler, *node_id);
     606            1 :             }
     607            2 :         });
     608            1 :     }
     609              : 
     610              :     /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
     611              :     /// attached pageserver for a shard.
     612              :     ///
     613              :     /// Returns whether we modified it, and the NodeId selected.
     614        12829 :     fn schedule_attached(
     615        12829 :         &mut self,
     616        12829 :         scheduler: &mut Scheduler,
     617        12829 :         context: &ScheduleContext,
     618        12829 :     ) -> Result<(bool, NodeId), ScheduleError> {
     619              :         // No work to do if we already have an attached tenant
     620        12829 :         if let Some(node_id) = self.intent.attached {
     621            0 :             return Ok((false, node_id));
     622        12829 :         }
     623              : 
     624        12829 :         if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
     625              :             // Promote a secondary
     626            1 :             tracing::debug!("Promoted secondary {} to attached", promote_secondary);
     627            1 :             self.intent.promote_attached(scheduler, promote_secondary);
     628            1 :             Ok((true, promote_secondary))
     629              :         } else {
     630              :             // Pick a fresh node: either we had no secondaries or none were schedulable
     631        12828 :             let node_id = scheduler.schedule_shard::<AttachedShardTag>(
     632        12828 :                 &self.intent.secondary,
     633        12828 :                 &self.intent.preferred_az_id,
     634        12828 :                 context,
     635        12828 :             )?;
     636        12828 :             tracing::debug!("Selected {} as attached", node_id);
     637        12828 :             self.intent.set_attached(scheduler, Some(node_id));
     638        12828 :             Ok((true, node_id))
     639              :         }
     640        12829 :     }
     641              : 
     642              :     #[instrument(skip_all, fields(
     643              :         tenant_id=%self.tenant_shard_id.tenant_id,
     644              :         shard_id=%self.tenant_shard_id.shard_slug(),
     645              :         sequence=%self.sequence
     646              :     ))]
     647              :     pub(crate) fn schedule(
     648              :         &mut self,
     649              :         scheduler: &mut Scheduler,
     650              :         context: &mut ScheduleContext,
     651              :     ) -> Result<(), ScheduleError> {
     652              :         let r = self.do_schedule(scheduler, context);
     653              : 
     654              :         context.avoid(&self.intent.all_pageservers());
     655              : 
     656              :         r
     657              :     }
     658              : 
     659        12830 :     pub(crate) fn do_schedule(
     660        12830 :         &mut self,
     661        12830 :         scheduler: &mut Scheduler,
     662        12830 :         context: &ScheduleContext,
     663        12830 :     ) -> Result<(), ScheduleError> {
     664        12830 :         // TODO: before scheduling new nodes, check if any existing content in
     665        12830 :         // self.intent refers to pageservers that are offline, and pick other
     666        12830 :         // pageservers if so.
     667        12830 : 
     668        12830 :         // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
     669        12830 :         // change their attach location.
     670        12830 : 
     671        12830 :         match self.scheduling_policy {
     672        12829 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
     673              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     674              :                 // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
     675            1 :                 tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     676            0 :                     "Scheduling is disabled by policy {:?}", self.scheduling_policy);
     677            1 :                 return Ok(());
     678              :             }
     679              :         }
     680              : 
     681              :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     682              :         // more work on the same pageservers we're already using.
     683        12829 :         let mut modified = false;
     684              : 
     685              :         // Add/remove nodes to fulfil policy
     686              :         use PlacementPolicy::*;
     687        12829 :         match self.policy {
     688        12829 :             Attached(secondary_count) => {
     689              :                 // Should have exactly one attached, and at least N secondaries
     690        12829 :                 let (modified_attached, attached_node_id) =
     691        12829 :                     self.schedule_attached(scheduler, context)?;
     692        12829 :                 modified |= modified_attached;
     693        12829 : 
     694        12829 :                 let mut used_pageservers = vec![attached_node_id];
     695        25657 :                 while self.intent.secondary.len() < secondary_count {
     696        12828 :                     let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
     697        12828 :                         &used_pageservers,
     698        12828 :                         &self.intent.preferred_az_id,
     699        12828 :                         context,
     700        12828 :                     )?;
     701        12828 :                     self.intent.push_secondary(scheduler, node_id);
     702        12828 :                     used_pageservers.push(node_id);
     703        12828 :                     modified = true;
     704              :                 }
     705              :             }
     706              :             Secondary => {
     707            0 :                 if let Some(node_id) = self.intent.get_attached() {
     708            0 :                     // Populate secondary by demoting the attached node
     709            0 :                     self.intent.demote_attached(scheduler, *node_id);
     710            0 :                     modified = true;
     711            0 :                 } else if self.intent.secondary.is_empty() {
     712              :                     // Populate secondary by scheduling a fresh node
     713            0 :                     let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
     714            0 :                         &[],
     715            0 :                         &self.intent.preferred_az_id,
     716            0 :                         context,
     717            0 :                     )?;
     718            0 :                     self.intent.push_secondary(scheduler, node_id);
     719            0 :                     modified = true;
     720            0 :                 }
     721            0 :                 while self.intent.secondary.len() > 1 {
     722            0 :                     // We have no particular preference for one secondary location over another: just
     723            0 :                     // arbitrarily drop from the end
     724            0 :                     self.intent.pop_secondary(scheduler);
     725            0 :                     modified = true;
     726            0 :                 }
     727              :             }
     728              :             Detached => {
     729              :                 // Never add locations in this mode
     730            0 :                 if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
     731            0 :                     self.intent.clear(scheduler);
     732            0 :                     modified = true;
     733            0 :                 }
     734              :             }
     735              :         }
     736              : 
     737        12829 :         if modified {
     738        12829 :             self.sequence.0 += 1;
     739        12829 :         }
     740              : 
     741        12829 :         Ok(())
     742        12830 :     }
     743              : 
     744              :     /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
     745              :     /// if the swap is not possible and leaves the intent state in its original state.
     746              :     ///
     747              :     /// Arguments:
     748              :     /// `attached_to`: the currently attached location matching the intent state (may be None if the
     749              :     /// shard is not attached)
     750              :     /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
     751              :     /// the scheduler to recommend a node
     752            0 :     pub(crate) fn reschedule_to_secondary(
     753            0 :         &mut self,
     754            0 :         promote_to: Option<NodeId>,
     755            0 :         scheduler: &mut Scheduler,
     756            0 :     ) -> Result<(), ScheduleError> {
     757            0 :         let promote_to = match promote_to {
     758            0 :             Some(node) => node,
     759            0 :             None => match self.preferred_secondary(scheduler) {
     760            0 :                 Some(node) => node,
     761              :                 None => {
     762            0 :                     return Err(ScheduleError::ImpossibleConstraint);
     763              :                 }
     764              :             },
     765              :         };
     766              : 
     767            0 :         assert!(self.intent.get_secondary().contains(&promote_to));
     768              : 
     769            0 :         if let Some(node) = self.intent.get_attached() {
     770            0 :             let demoted = self.intent.demote_attached(scheduler, *node);
     771            0 :             if !demoted {
     772            0 :                 return Err(ScheduleError::ImpossibleConstraint);
     773            0 :             }
     774            0 :         }
     775              : 
     776            0 :         self.intent.promote_attached(scheduler, promote_to);
     777            0 : 
     778            0 :         // Increment the sequence number for the edge case where a
     779            0 :         // reconciler is already running to avoid waiting on the
     780            0 :         // current reconcile instead of spawning a new one.
     781            0 :         self.sequence = self.sequence.next();
     782            0 : 
     783            0 :         Ok(())
     784            0 :     }
     785              : 
     786              :     /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
     787           53 :     fn is_better_location<T: ShardTag>(
     788           53 :         &self,
     789           53 :         scheduler: &mut Scheduler,
     790           53 :         schedule_context: &ScheduleContext,
     791           53 :         current: NodeId,
     792           53 :         candidate: NodeId,
     793           53 :     ) -> Option<bool> {
     794           53 :         let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
     795           53 :             candidate,
     796           53 :             &self.intent.preferred_az_id,
     797           53 :             schedule_context,
     798           53 :         ) else {
     799              :             // The candidate node is unavailable for scheduling or otherwise couldn't get a score
     800            0 :             return None;
     801              :         };
     802              : 
     803           53 :         match scheduler.compute_node_score::<T::Score>(
     804           53 :             current,
     805           53 :             &self.intent.preferred_az_id,
     806           53 :             schedule_context,
     807           53 :         ) {
     808           53 :             Some(current_score) => {
     809           53 :                 // Ignore utilization components when comparing scores: we don't want to migrate
     810           53 :                 // because of transient load variations, it risks making the system thrash, and
     811           53 :                 // migrating for utilization requires a separate high level view of the system to
     812           53 :                 // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
     813           53 :                 // moving things around in the order that we hit this function.
     814           53 :                 let candidate_score = candidate_score.for_optimization();
     815           53 :                 let current_score = current_score.for_optimization();
     816           53 : 
     817           53 :                 if candidate_score < current_score {
     818            8 :                     tracing::info!("Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})");
     819            8 :                     Some(true)
     820              :                 } else {
     821              :                     // The candidate node is no better than our current location, so don't migrate
     822           45 :                     tracing::debug!(
     823            0 :                         "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
     824              :                     );
     825           45 :                     Some(false)
     826              :                 }
     827              :             }
     828              :             None => {
     829              :                 // The current node is unavailable for scheduling, so we can't make any sensible
     830              :                 // decisions about optimisation.  This should be a transient state -- if the node
     831              :                 // is offline then it will get evacuated, if is blocked by a scheduling mode
     832              :                 // then we will respect that mode by doing nothing.
     833            0 :                 tracing::debug!("Current node {current} is unavailable for scheduling");
     834            0 :                 None
     835              :             }
     836              :         }
     837           53 :     }
     838              : 
     839           40 :     fn find_better_location<T: ShardTag>(
     840           40 :         &self,
     841           40 :         scheduler: &mut Scheduler,
     842           40 :         schedule_context: &ScheduleContext,
     843           40 :         current: NodeId,
     844           40 :         hard_exclude: &[NodeId],
     845           40 :     ) -> Option<NodeId> {
     846              :         // Look for a lower-scoring location to attach to
     847           40 :         let Ok(candidate_node) = scheduler.schedule_shard::<T>(
     848           40 :             hard_exclude,
     849           40 :             &self.intent.preferred_az_id,
     850           40 :             schedule_context,
     851           40 :         ) else {
     852              :             // A scheduling error means we have no possible candidate replacements
     853            0 :             tracing::debug!("No candidate node found");
     854            0 :             return None;
     855              :         };
     856              : 
     857           40 :         if candidate_node == current {
     858              :             // We're already at the best possible location, so don't migrate
     859           20 :             tracing::debug!("Candidate node {candidate_node} is already in use");
     860           20 :             return None;
     861           20 :         }
     862           20 : 
     863           20 :         self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
     864           20 :             .and_then(|better| if better { Some(candidate_node) } else { None })
     865           40 :     }
     866              : 
     867              :     /// This function is an optimization, used to avoid doing large numbers of scheduling operations
     868              :     /// when looking for optimizations.  This function uses knowledge of how scores work to do some
     869              :     /// fast checks for whether it may to be possible to improve a score.
     870              :     ///
     871              :     /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is.  If we
     872              :     /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
     873              :     /// work.
     874            3 :     pub(crate) fn maybe_optimizable(
     875            3 :         &self,
     876            3 :         scheduler: &mut Scheduler,
     877            3 :         schedule_context: &ScheduleContext,
     878            3 :     ) -> bool {
     879            3 :         // Sharded tenant: check if any locations have a nonzero affinity score
     880            3 :         if self.shard.count >= ShardCount(1) {
     881            3 :             let schedule_context = schedule_context.project_detach(self);
     882            5 :             for node in self.intent.all_pageservers() {
     883            5 :                 if let Some(af) = schedule_context.nodes.get(&node) {
     884            5 :                     if *af > AffinityScore(0) {
     885            3 :                         return true;
     886            2 :                     }
     887            0 :                 }
     888              :             }
     889            0 :         }
     890              : 
     891              :         // Attached tenant: check if the attachment is outside the preferred AZ
     892            0 :         if let PlacementPolicy::Attached(_) = self.policy {
     893            0 :             if let Some(attached) = self.intent.get_attached() {
     894            0 :                 if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
     895            0 :                     return true;
     896            0 :                 }
     897            0 :             }
     898            0 :         }
     899              : 
     900              :         // Tenant with secondary locations: check if any are within the preferred AZ
     901            0 :         for secondary in self.intent.get_secondary() {
     902            0 :             if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
     903            0 :                 return true;
     904            0 :             }
     905              :         }
     906              : 
     907              :         // Does the tenant have excess secondaries?
     908            0 :         if self.intent.get_secondary().len() > self.policy.want_secondaries() {
     909            0 :             return true;
     910            0 :         }
     911            0 : 
     912            0 :         // Fall through: no optimizations possible
     913            0 :         false
     914            3 :     }
     915              : 
     916              :     /// Optimize attachments: if a shard has a secondary location that is preferable to
     917              :     /// its primary location based on soft constraints, switch that secondary location
     918              :     /// to be attached.
     919              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     920              :     pub(crate) fn optimize_attachment(
     921              :         &self,
     922              :         scheduler: &mut Scheduler,
     923              :         schedule_context: &ScheduleContext,
     924              :     ) -> Option<ScheduleOptimization> {
     925              :         let attached = (*self.intent.get_attached())?;
     926              : 
     927              :         let schedule_context = schedule_context.project_detach(self);
     928              : 
     929              :         // If we already have a secondary that is higher-scoring than out current location,
     930              :         // then simply migrate to it.
     931              :         for secondary in self.intent.get_secondary() {
     932              :             if let Some(true) = self.is_better_location::<AttachedShardTag>(
     933              :                 scheduler,
     934              :                 &schedule_context,
     935              :                 attached,
     936              :                 *secondary,
     937              :             ) {
     938              :                 return Some(ScheduleOptimization {
     939              :                     sequence: self.sequence,
     940              :                     action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
     941              :                         old_attached_node_id: attached,
     942              :                         new_attached_node_id: *secondary,
     943              :                     }),
     944              :                 });
     945              :             }
     946              :         }
     947              : 
     948              :         // Given that none of our current secondaries is a better location than our current
     949              :         // attached location (checked above), we may trim any secondaries that are not needed
     950              :         // for the placement policy.
     951              :         if self.intent.get_secondary().len() > self.policy.want_secondaries() {
     952              :             // This code path cleans up extra secondaries after migrating, and/or
     953              :             // trims extra secondaries after a PlacementPolicy::Attached(N) was
     954              :             // modified to decrease N.
     955              : 
     956              :             let secondary_scores = self
     957              :                 .intent
     958              :                 .get_secondary()
     959              :                 .iter()
     960            5 :                 .map(|node_id| {
     961            5 :                     (
     962            5 :                         *node_id,
     963            5 :                         scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
     964            5 :                             *node_id,
     965            5 :                             &self.intent.preferred_az_id,
     966            5 :                             &schedule_context,
     967            5 :                         ),
     968            5 :                     )
     969            5 :                 })
     970              :                 .collect::<Vec<_>>();
     971              : 
     972            5 :             if secondary_scores.iter().any(|score| score.1.is_none()) {
     973              :                 // Don't have full list of scores, so can't make a good decision about which to drop unless
     974              :                 // there is an obvious one in the wrong AZ
     975              :                 for secondary in self.intent.get_secondary() {
     976              :                     if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
     977              :                         return Some(ScheduleOptimization {
     978              :                             sequence: self.sequence,
     979              :                             action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
     980              :                         });
     981              :                     }
     982              :                 }
     983              : 
     984              :                 // Fall through: we didn't identify one to remove.  This ought to be rare.
     985              :                 tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
     986              :                 self.intent.get_secondary()
     987              :             );
     988              :             } else {
     989              :                 let victim = secondary_scores
     990              :                     .iter()
     991            5 :                     .max_by_key(|score| score.1.unwrap())
     992              :                     .unwrap()
     993              :                     .0;
     994              :                 return Some(ScheduleOptimization {
     995              :                     sequence: self.sequence,
     996              :                     action: ScheduleOptimizationAction::RemoveSecondary(victim),
     997              :                 });
     998              :             }
     999              :         }
    1000              : 
    1001              :         let replacement = self.find_better_location::<AttachedShardTag>(
    1002              :             scheduler,
    1003              :             &schedule_context,
    1004              :             attached,
    1005              :             &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
    1006              :         );
    1007              : 
    1008              :         // We have found a candidate and confirmed that its score is preferable
    1009              :         // to our current location. See if we have a secondary location in the preferred location already: if not,
    1010              :         // then create one.
    1011              :         if let Some(replacement) = replacement {
    1012              :             // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
    1013              :             // not in our preferred AZ.  Migration has a cost in resources an impact to the workload, so we want to avoid doing
    1014              :             // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
    1015              :             // AZ: skip this optimization if it is not in our final, preferred AZ.
    1016              :             //
    1017              :             // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
    1018              :             // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
    1019              :             if self.intent.preferred_az_id.is_some()
    1020              :                 && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
    1021              :             {
    1022              :                 tracing::debug!(
    1023              :                     "Candidate node {replacement} is not in preferred AZ {:?}",
    1024              :                     self.intent.preferred_az_id
    1025              :                 );
    1026              : 
    1027              :                 // This should only happen if our current location is not in the preferred AZ, otherwise
    1028              :                 // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
    1029              :                 // AZ is the highest priority part of NodeAttachmentSchedulingScore.
    1030              :                 debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
    1031              : 
    1032              :                 return None;
    1033              :             }
    1034              : 
    1035              :             if !self.intent.get_secondary().contains(&replacement) {
    1036              :                 Some(ScheduleOptimization {
    1037              :                     sequence: self.sequence,
    1038              :                     action: ScheduleOptimizationAction::CreateSecondary(replacement),
    1039              :                 })
    1040              :             } else {
    1041              :                 // We already have a secondary in the preferred location, let's try migrating to it.  Our caller
    1042              :                 // will check the warmth of the destination before deciding whether to really execute this.
    1043              :                 Some(ScheduleOptimization {
    1044              :                     sequence: self.sequence,
    1045              :                     action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1046              :                         old_attached_node_id: attached,
    1047              :                         new_attached_node_id: replacement,
    1048              :                     }),
    1049              :                 })
    1050              :             }
    1051              :         } else {
    1052              :             // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
    1053              :             // to clean up: no action required.
    1054              :             None
    1055              :         }
    1056              :     }
    1057              : 
    1058              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1059              :     pub(crate) fn optimize_secondary(
    1060              :         &self,
    1061              :         scheduler: &mut Scheduler,
    1062              :         schedule_context: &ScheduleContext,
    1063              :     ) -> Option<ScheduleOptimization> {
    1064              :         if self.intent.get_secondary().len() > self.policy.want_secondaries() {
    1065              :             // We have extra secondaries, perhaps to facilitate a migration of the attached location:
    1066              :             // do nothing, it is up to [`Self::optimize_attachment`] to clean them up.  When that's done,
    1067              :             // and we are called again, we will proceed.
    1068              :             tracing::debug!("Too many secondaries: skipping");
    1069              :             return None;
    1070              :         }
    1071              : 
    1072              :         let schedule_context = schedule_context.project_detach(self);
    1073              : 
    1074              :         for secondary in self.intent.get_secondary() {
    1075              :             // Make sure we don't try to migrate a secondary to our attached location: this case happens
    1076              :             // easily in environments without multiple AZs.
    1077              :             let exclude = match self.intent.attached {
    1078              :                 Some(attached) => vec![attached],
    1079              :                 None => vec![],
    1080              :             };
    1081              : 
    1082              :             let replacement = self.find_better_location::<SecondaryShardTag>(
    1083              :                 scheduler,
    1084              :                 &schedule_context,
    1085              :                 *secondary,
    1086              :                 &exclude,
    1087              :             );
    1088              :             assert!(replacement != Some(*secondary));
    1089              :             if let Some(replacement) = replacement {
    1090              :                 // We have found a candidate and confirmed that its score is preferable
    1091              :                 // to our current location. See if we have a secondary location in the preferred location already: if not,
    1092              :                 // then create one.
    1093              :                 return Some(ScheduleOptimization {
    1094              :                     sequence: self.sequence,
    1095              :                     action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1096              :                         old_node_id: *secondary,
    1097              :                         new_node_id: replacement,
    1098              :                     }),
    1099              :                 });
    1100              :             }
    1101              :         }
    1102              : 
    1103              :         None
    1104              :     }
    1105              : 
    1106              :     /// Return true if the optimization was really applied: it will not be applied if the optimization's
    1107              :     /// sequence is behind this tenant shard's
    1108           11 :     pub(crate) fn apply_optimization(
    1109           11 :         &mut self,
    1110           11 :         scheduler: &mut Scheduler,
    1111           11 :         optimization: ScheduleOptimization,
    1112           11 :     ) -> bool {
    1113           11 :         if optimization.sequence != self.sequence {
    1114            0 :             return false;
    1115           11 :         }
    1116           11 : 
    1117           11 :         metrics::METRICS_REGISTRY
    1118           11 :             .metrics_group
    1119           11 :             .storage_controller_schedule_optimization
    1120           11 :             .inc();
    1121           11 : 
    1122           11 :         match optimization.action {
    1123              :             ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1124            4 :                 old_attached_node_id,
    1125            4 :                 new_attached_node_id,
    1126            4 :             }) => {
    1127            4 :                 self.intent.demote_attached(scheduler, old_attached_node_id);
    1128            4 :                 self.intent
    1129            4 :                     .promote_attached(scheduler, new_attached_node_id);
    1130            4 :             }
    1131              :             ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1132            1 :                 old_node_id,
    1133            1 :                 new_node_id,
    1134            1 :             }) => {
    1135            1 :                 self.intent.remove_secondary(scheduler, old_node_id);
    1136            1 :                 self.intent.push_secondary(scheduler, new_node_id);
    1137            1 :             }
    1138            3 :             ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
    1139            3 :                 self.intent.push_secondary(scheduler, new_node_id);
    1140            3 :             }
    1141            3 :             ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
    1142            3 :                 self.intent.remove_secondary(scheduler, old_secondary);
    1143            3 :             }
    1144              :         }
    1145              : 
    1146           11 :         true
    1147           11 :     }
    1148              : 
    1149              :     /// When a shard has several secondary locations, we need to pick one in situations where
    1150              :     /// we promote one of them to an attached location:
    1151              :     ///  - When draining a node for restart
    1152              :     ///  - When responding to a node failure
    1153              :     ///
    1154              :     /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
    1155              :     /// we want to pick the node which is best for use _temporarily_ while the previous attached location
    1156              :     /// is unavailable (e.g. because it's down or deploying).  That means we prefer to use secondary
    1157              :     /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
    1158              :     /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
    1159              :     /// they're probably not warmed up yet). The latter behavior is based oni
    1160              :     ///
    1161              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
    1162              :     /// caller needs to a pick a node some other way.
    1163        12829 :     pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
    1164        12829 :         let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
    1165        12829 : 
    1166        12829 :         // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
    1167        12829 :         // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
    1168        12829 :         // rather than a short-lived secondary location being used for optimization/migration (which would have
    1169        12829 :         // been scheduled in our preferred AZ).
    1170        12829 :         let mut candidates = candidates
    1171        12829 :             .iter()
    1172        12829 :             .map(|(node_id, node_az)| {
    1173            1 :                 if node_az == &self.intent.preferred_az_id {
    1174            0 :                     (1, *node_id)
    1175              :                 } else {
    1176            1 :                     (0, *node_id)
    1177              :                 }
    1178        12829 :             })
    1179        12829 :             .collect::<Vec<_>>();
    1180        12829 : 
    1181        12829 :         candidates.sort();
    1182        12829 : 
    1183        12829 :         candidates.first().map(|i| i.1)
    1184        12829 :     }
    1185              : 
    1186              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
    1187              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
    1188              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
    1189              :     ///
    1190              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
    1191              :     /// funciton should not be used to decide whether to reconcile.
    1192            0 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
    1193            0 :         if let Some(attach_intent) = self.intent.attached {
    1194            0 :             match self.observed.locations.get(&attach_intent) {
    1195            0 :                 Some(loc) => match &loc.conf {
    1196            0 :                     Some(conf) => match conf.mode {
    1197              :                         LocationConfigMode::AttachedMulti
    1198              :                         | LocationConfigMode::AttachedSingle
    1199              :                         | LocationConfigMode::AttachedStale => {
    1200              :                             // Our intent and observed state agree that this node is in an attached state.
    1201            0 :                             Some(attach_intent)
    1202              :                         }
    1203              :                         // Our observed config is not an attached state
    1204            0 :                         _ => None,
    1205              :                     },
    1206              :                     // Our observed state is None, i.e. in flux
    1207            0 :                     None => None,
    1208              :                 },
    1209              :                 // We have no observed state for this node
    1210            0 :                 None => None,
    1211              :             }
    1212              :         } else {
    1213              :             // Our intent is not to attach
    1214            0 :             None
    1215              :         }
    1216            0 :     }
    1217              : 
    1218            0 :     fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
    1219            0 :         let mut dirty_nodes = HashSet::new();
    1220              : 
    1221            0 :         if let Some(node_id) = self.intent.attached {
    1222              :             // Maybe panic: it is a severe bug if we try to attach while generation is null.
    1223            0 :             let generation = self
    1224            0 :                 .generation
    1225            0 :                 .expect("Attempted to enter attached state without a generation");
    1226            0 : 
    1227            0 :             let wanted_conf =
    1228            0 :                 attached_location_conf(generation, &self.shard, &self.config, &self.policy);
    1229            0 :             match self.observed.locations.get(&node_id) {
    1230            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
    1231            0 :                 Some(_) | None => {
    1232            0 :                     dirty_nodes.insert(node_id);
    1233            0 :                 }
    1234              :             }
    1235            0 :         }
    1236              : 
    1237            0 :         for node_id in &self.intent.secondary {
    1238            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
    1239            0 :             match self.observed.locations.get(node_id) {
    1240            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
    1241            0 :                 Some(_) | None => {
    1242            0 :                     dirty_nodes.insert(*node_id);
    1243            0 :                 }
    1244              :             }
    1245              :         }
    1246              : 
    1247            0 :         for node_id in self.observed.locations.keys() {
    1248            0 :             if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
    1249            0 :                 // We have observed state that isn't part of our intent: need to clean it up.
    1250            0 :                 dirty_nodes.insert(*node_id);
    1251            0 :             }
    1252              :         }
    1253              : 
    1254            0 :         dirty_nodes.retain(|node_id| {
    1255            0 :             nodes
    1256            0 :                 .get(node_id)
    1257            0 :                 .map(|n| n.is_available())
    1258            0 :                 .unwrap_or(false)
    1259            0 :         });
    1260            0 : 
    1261            0 :         !dirty_nodes.is_empty()
    1262            0 :     }
    1263              : 
    1264              :     #[allow(clippy::too_many_arguments)]
    1265              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1266              :     pub(crate) fn get_reconcile_needed(
    1267              :         &mut self,
    1268              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1269              :     ) -> ReconcileNeeded {
    1270              :         // If there are any ambiguous observed states, and the nodes they refer to are available,
    1271              :         // we should reconcile to clean them up.
    1272              :         let mut dirty_observed = false;
    1273              :         for (node_id, observed_loc) in &self.observed.locations {
    1274              :             let node = pageservers
    1275              :                 .get(node_id)
    1276              :                 .expect("Nodes may not be removed while referenced");
    1277              :             if observed_loc.conf.is_none() && node.is_available() {
    1278              :                 dirty_observed = true;
    1279              :                 break;
    1280              :             }
    1281              :         }
    1282              : 
    1283              :         let active_nodes_dirty = self.dirty(pageservers);
    1284              : 
    1285              :         // Even if there is no pageserver work to be done, if we have a pending notification to computes,
    1286              :         // wake up a reconciler to send it.
    1287              :         let do_reconcile =
    1288              :             active_nodes_dirty || dirty_observed || self.pending_compute_notification;
    1289              : 
    1290              :         if !do_reconcile {
    1291              :             tracing::debug!("Not dirty, no reconciliation needed.");
    1292              :             return ReconcileNeeded::No;
    1293              :         }
    1294              : 
    1295              :         // If we are currently splitting, then never start a reconciler task: the splitting logic
    1296              :         // requires that shards are not interfered with while it runs. Do this check here rather than
    1297              :         // up top, so that we only log this message if we would otherwise have done a reconciliation.
    1298              :         if !matches!(self.splitting, SplitState::Idle) {
    1299              :             tracing::info!("Refusing to reconcile, splitting in progress");
    1300              :             return ReconcileNeeded::No;
    1301              :         }
    1302              : 
    1303              :         // Reconcile already in flight for the current sequence?
    1304              :         if let Some(handle) = &self.reconciler {
    1305              :             if handle.sequence == self.sequence {
    1306              :                 tracing::info!(
    1307              :                     "Reconciliation already in progress for sequence {:?}",
    1308              :                     self.sequence,
    1309              :                 );
    1310              :                 return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
    1311              :                     tenant_shard_id: self.tenant_shard_id,
    1312              :                     seq_wait: self.waiter.clone(),
    1313              :                     error_seq_wait: self.error_waiter.clone(),
    1314              :                     error: self.last_error.clone(),
    1315              :                     seq: self.sequence,
    1316              :                 });
    1317              :             }
    1318              :         }
    1319              : 
    1320              :         // Pre-checks done: finally check whether we may actually do the work
    1321              :         match self.scheduling_policy {
    1322              :             ShardSchedulingPolicy::Active
    1323              :             | ShardSchedulingPolicy::Essential
    1324              :             | ShardSchedulingPolicy::Pause => {}
    1325              :             ShardSchedulingPolicy::Stop => {
    1326              :                 // We only reach this point if there is work to do and we're going to skip
    1327              :                 // doing it: warn it obvious why this tenant isn't doing what it ought to.
    1328              :                 tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
    1329              :                 return ReconcileNeeded::No;
    1330              :             }
    1331              :         }
    1332              : 
    1333              :         ReconcileNeeded::Yes
    1334              :     }
    1335              : 
    1336              :     /// Ensure the sequence number is set to a value where waiting for this value will make us wait
    1337              :     /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
    1338              :     ///
    1339              :     /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
    1340              :     /// that the waiter will not complete until some future Reconciler is constructed and run.
    1341            0 :     fn ensure_sequence_ahead(&mut self) {
    1342            0 :         // Find the highest sequence for which a Reconciler has previously run or is currently
    1343            0 :         // running
    1344            0 :         let max_seen = std::cmp::max(
    1345            0 :             self.reconciler
    1346            0 :                 .as_ref()
    1347            0 :                 .map(|r| r.sequence)
    1348            0 :                 .unwrap_or(Sequence(0)),
    1349            0 :             std::cmp::max(self.waiter.load(), self.error_waiter.load()),
    1350            0 :         );
    1351            0 : 
    1352            0 :         if self.sequence <= max_seen {
    1353            0 :             self.sequence = max_seen.next();
    1354            0 :         }
    1355            0 :     }
    1356              : 
    1357              :     /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
    1358              :     ///
    1359              :     /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
    1360              :     /// you would like to wait on the next reconciler that gets spawned in the background.
    1361            0 :     pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
    1362            0 :         self.ensure_sequence_ahead();
    1363            0 : 
    1364            0 :         ReconcilerWaiter {
    1365            0 :             tenant_shard_id: self.tenant_shard_id,
    1366            0 :             seq_wait: self.waiter.clone(),
    1367            0 :             error_seq_wait: self.error_waiter.clone(),
    1368            0 :             error: self.last_error.clone(),
    1369            0 :             seq: self.sequence,
    1370            0 :         }
    1371            0 :     }
    1372              : 
    1373            0 :     async fn reconcile(
    1374            0 :         sequence: Sequence,
    1375            0 :         mut reconciler: Reconciler,
    1376            0 :         must_notify: bool,
    1377            0 :     ) -> ReconcileResult {
    1378              :         // Attempt to make observed state match intent state
    1379            0 :         let result = reconciler.reconcile().await;
    1380              : 
    1381              :         // If we know we had a pending compute notification from some previous action, send a notification irrespective
    1382              :         // of whether the above reconcile() did any work.  It has to be Ok() though, because otherwise we might be
    1383              :         // sending a notification of a location that isn't really attached.
    1384            0 :         if result.is_ok() && must_notify {
    1385              :             // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
    1386            0 :             reconciler.compute_notify().await.ok();
    1387            0 :         } else if must_notify {
    1388            0 :             // Carry this flag so that the reconciler's result will indicate that it still needs to retry
    1389            0 :             // the compute hook notification eventually.
    1390            0 :             reconciler.compute_notify_failure = true;
    1391            0 :         }
    1392              : 
    1393              :         // Update result counter
    1394            0 :         let outcome_label = match &result {
    1395            0 :             Ok(_) => ReconcileOutcome::Success,
    1396            0 :             Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
    1397            0 :             Err(_) => ReconcileOutcome::Error,
    1398              :         };
    1399              : 
    1400            0 :         metrics::METRICS_REGISTRY
    1401            0 :             .metrics_group
    1402            0 :             .storage_controller_reconcile_complete
    1403            0 :             .inc(ReconcileCompleteLabelGroup {
    1404            0 :                 status: outcome_label,
    1405            0 :             });
    1406            0 : 
    1407            0 :         // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
    1408            0 :         // try and schedule more work in response to our result.
    1409            0 :         ReconcileResult {
    1410            0 :             sequence,
    1411            0 :             result,
    1412            0 :             tenant_shard_id: reconciler.tenant_shard_id,
    1413            0 :             generation: reconciler.generation,
    1414            0 :             observed_deltas: reconciler.observed_deltas(),
    1415            0 :             pending_compute_notification: reconciler.compute_notify_failure,
    1416            0 :         }
    1417            0 :     }
    1418              : 
    1419              :     #[allow(clippy::too_many_arguments)]
    1420              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1421              :     pub(crate) fn spawn_reconciler(
    1422              :         &mut self,
    1423              :         result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
    1424              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1425              :         compute_hook: &Arc<ComputeHook>,
    1426              :         reconciler_config: ReconcilerConfig,
    1427              :         service_config: &service::Config,
    1428              :         persistence: &Arc<Persistence>,
    1429              :         units: ReconcileUnits,
    1430              :         gate_guard: GateGuard,
    1431              :         cancel: &CancellationToken,
    1432              :     ) -> Option<ReconcilerWaiter> {
    1433              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
    1434              :         // doing our sequence's work.
    1435              :         let old_handle = self.reconciler.take();
    1436              : 
    1437              :         // Build list of nodes from which the reconciler should detach
    1438              :         let mut detach = Vec::new();
    1439              :         for node_id in self.observed.locations.keys() {
    1440              :             if self.intent.get_attached() != &Some(*node_id)
    1441              :                 && !self.intent.secondary.contains(node_id)
    1442              :             {
    1443              :                 detach.push(
    1444              :                     pageservers
    1445              :                         .get(node_id)
    1446              :                         .expect("Intent references non-existent pageserver")
    1447              :                         .clone(),
    1448              :                 )
    1449              :             }
    1450              :         }
    1451              : 
    1452              :         // Advance the sequence before spawning a reconciler, so that sequence waiters
    1453              :         // can distinguish between before+after the reconcile completes.
    1454              :         self.ensure_sequence_ahead();
    1455              : 
    1456              :         let reconciler_cancel = cancel.child_token();
    1457              :         let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
    1458              :         let reconciler = Reconciler {
    1459              :             tenant_shard_id: self.tenant_shard_id,
    1460              :             shard: self.shard,
    1461              :             placement_policy: self.policy.clone(),
    1462              :             generation: self.generation,
    1463              :             intent: reconciler_intent,
    1464              :             detach,
    1465              :             reconciler_config,
    1466              :             config: self.config.clone(),
    1467              :             preferred_az: self.intent.preferred_az_id.clone(),
    1468              :             observed: self.observed.clone(),
    1469              :             original_observed: self.observed.clone(),
    1470              :             compute_hook: compute_hook.clone(),
    1471              :             service_config: service_config.clone(),
    1472              :             _gate_guard: gate_guard,
    1473              :             _resource_units: units,
    1474              :             cancel: reconciler_cancel.clone(),
    1475              :             persistence: persistence.clone(),
    1476              :             compute_notify_failure: false,
    1477              :         };
    1478              : 
    1479              :         let reconcile_seq = self.sequence;
    1480              :         let long_reconcile_threshold = service_config.long_reconcile_threshold;
    1481              : 
    1482              :         tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
    1483              :         let must_notify = self.pending_compute_notification;
    1484              :         let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
    1485              :                                                         tenant_id=%reconciler.tenant_shard_id.tenant_id,
    1486              :                                                         shard_id=%reconciler.tenant_shard_id.shard_slug());
    1487              :         metrics::METRICS_REGISTRY
    1488              :             .metrics_group
    1489              :             .storage_controller_reconcile_spawn
    1490              :             .inc();
    1491              :         let result_tx = result_tx.clone();
    1492              :         let join_handle = tokio::task::spawn(
    1493            0 :             async move {
    1494              :                 // Wait for any previous reconcile task to complete before we start
    1495            0 :                 if let Some(old_handle) = old_handle {
    1496            0 :                     old_handle.cancel.cancel();
    1497            0 :                     if let Err(e) = old_handle.handle.await {
    1498              :                         // We can't do much with this other than log it: the task is done, so
    1499              :                         // we may proceed with our work.
    1500            0 :                         tracing::error!("Unexpected join error waiting for reconcile task: {e}");
    1501            0 :                     }
    1502            0 :                 }
    1503              : 
    1504              :                 // Early check for cancellation before doing any work
    1505              :                 // TODO: wrap all remote API operations in cancellation check
    1506              :                 // as well.
    1507            0 :                 if reconciler.cancel.is_cancelled() {
    1508            0 :                     metrics::METRICS_REGISTRY
    1509            0 :                         .metrics_group
    1510            0 :                         .storage_controller_reconcile_complete
    1511            0 :                         .inc(ReconcileCompleteLabelGroup {
    1512            0 :                             status: ReconcileOutcome::Cancel,
    1513            0 :                         });
    1514            0 :                     return;
    1515            0 :                 }
    1516            0 : 
    1517            0 :                 let (tenant_id_label, shard_number_label, sequence_label) = {
    1518            0 :                     (
    1519            0 :                         reconciler.tenant_shard_id.tenant_id.to_string(),
    1520            0 :                         reconciler.tenant_shard_id.shard_number.0.to_string(),
    1521            0 :                         reconcile_seq.to_string(),
    1522            0 :                     )
    1523            0 :                 };
    1524            0 : 
    1525            0 :                 let label_group = ReconcileLongRunningLabelGroup {
    1526            0 :                     tenant_id: &tenant_id_label,
    1527            0 :                     shard_number: &shard_number_label,
    1528            0 :                     sequence: &sequence_label,
    1529            0 :                 };
    1530            0 : 
    1531            0 :                 let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
    1532            0 :                 let long_reconcile_fut = {
    1533            0 :                     let label_group = label_group.clone();
    1534            0 :                     async move {
    1535            0 :                         tokio::time::sleep(long_reconcile_threshold).await;
    1536              : 
    1537            0 :                         tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
    1538              : 
    1539            0 :                         metrics::METRICS_REGISTRY
    1540            0 :                             .metrics_group
    1541            0 :                             .storage_controller_reconcile_long_running
    1542            0 :                             .inc(label_group);
    1543            0 :                     }
    1544              :                 };
    1545              : 
    1546            0 :                 let reconcile_fut = std::pin::pin!(reconcile_fut);
    1547            0 :                 let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
    1548              : 
    1549            0 :                 let (was_long, result) =
    1550            0 :                     match future::select(reconcile_fut, long_reconcile_fut).await {
    1551            0 :                         Either::Left((reconcile_result, _)) => (false, reconcile_result),
    1552            0 :                         Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
    1553              :                     };
    1554              : 
    1555            0 :                 if was_long {
    1556            0 :                     let id = metrics::METRICS_REGISTRY
    1557            0 :                         .metrics_group
    1558            0 :                         .storage_controller_reconcile_long_running
    1559            0 :                         .with_labels(label_group);
    1560            0 :                     metrics::METRICS_REGISTRY
    1561            0 :                         .metrics_group
    1562            0 :                         .storage_controller_reconcile_long_running
    1563            0 :                         .remove_metric(id);
    1564            0 :                 }
    1565              : 
    1566            0 :                 result_tx
    1567            0 :                     .send(ReconcileResultRequest::ReconcileResult(result))
    1568            0 :                     .ok();
    1569            0 :             }
    1570              :             .instrument(reconciler_span),
    1571              :         );
    1572              : 
    1573              :         self.reconciler = Some(ReconcilerHandle {
    1574              :             sequence: self.sequence,
    1575              :             handle: join_handle,
    1576              :             cancel: reconciler_cancel,
    1577              :         });
    1578              : 
    1579              :         Some(ReconcilerWaiter {
    1580              :             tenant_shard_id: self.tenant_shard_id,
    1581              :             seq_wait: self.waiter.clone(),
    1582              :             error_seq_wait: self.error_waiter.clone(),
    1583              :             error: self.last_error.clone(),
    1584              :             seq: self.sequence,
    1585              :         })
    1586              :     }
    1587              : 
    1588            0 :     pub(crate) fn cancel_reconciler(&self) {
    1589            0 :         if let Some(handle) = self.reconciler.as_ref() {
    1590            0 :             handle.cancel.cancel()
    1591            0 :         }
    1592            0 :     }
    1593              : 
    1594              :     /// Get a waiter for any reconciliation in flight, but do not start reconciliation
    1595              :     /// if it is not already running
    1596            0 :     pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
    1597            0 :         if self.reconciler.is_some() {
    1598            0 :             Some(ReconcilerWaiter {
    1599            0 :                 tenant_shard_id: self.tenant_shard_id,
    1600            0 :                 seq_wait: self.waiter.clone(),
    1601            0 :                 error_seq_wait: self.error_waiter.clone(),
    1602            0 :                 error: self.last_error.clone(),
    1603            0 :                 seq: self.sequence,
    1604            0 :             })
    1605              :         } else {
    1606            0 :             None
    1607              :         }
    1608            0 :     }
    1609              : 
    1610              :     /// Called when a ReconcileResult has been emitted and the service is updating
    1611              :     /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
    1612              :     /// the handle to indicate there is no longer a reconciliation in progress.
    1613            0 :     pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
    1614            0 :         if let Some(reconcile_handle) = &self.reconciler {
    1615            0 :             if reconcile_handle.sequence <= sequence {
    1616            0 :                 self.reconciler = None;
    1617            0 :             }
    1618            0 :         }
    1619            0 :     }
    1620              : 
    1621              :     /// If we had any state at all referring to this node ID, drop it.  Does not
    1622              :     /// attempt to reschedule.
    1623              :     ///
    1624              :     /// Returns true if we modified the node's intent state.
    1625            0 :     pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
    1626            0 :         let mut intent_modified = false;
    1627            0 : 
    1628            0 :         // Drop if this node was our attached intent
    1629            0 :         if self.intent.attached == Some(node_id) {
    1630            0 :             self.intent.attached = None;
    1631            0 :             intent_modified = true;
    1632            0 :         }
    1633              : 
    1634              :         // Drop from the list of secondaries, and check if we modified it
    1635            0 :         let had_secondaries = self.intent.secondary.len();
    1636            0 :         self.intent.secondary.retain(|n| n != &node_id);
    1637            0 :         intent_modified |= self.intent.secondary.len() != had_secondaries;
    1638            0 : 
    1639            0 :         debug_assert!(!self.intent.all_pageservers().contains(&node_id));
    1640              : 
    1641            0 :         intent_modified
    1642            0 :     }
    1643              : 
    1644            0 :     pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
    1645            0 :         self.scheduling_policy = p;
    1646            0 :     }
    1647              : 
    1648            0 :     pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
    1649            0 :         &self.scheduling_policy
    1650            0 :     }
    1651              : 
    1652            0 :     pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
    1653            0 :         // Ordering: always set last_error before advancing sequence, so that sequence
    1654            0 :         // waiters are guaranteed to see a Some value when they see an error.
    1655            0 :         *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
    1656            0 :         self.error_waiter.advance(sequence);
    1657            0 :     }
    1658              : 
    1659            0 :     pub(crate) fn from_persistent(
    1660            0 :         tsp: TenantShardPersistence,
    1661            0 :         intent: IntentState,
    1662            0 :     ) -> anyhow::Result<Self> {
    1663            0 :         let tenant_shard_id = tsp.get_tenant_shard_id()?;
    1664            0 :         let shard_identity = tsp.get_shard_identity()?;
    1665              : 
    1666            0 :         metrics::METRICS_REGISTRY
    1667            0 :             .metrics_group
    1668            0 :             .storage_controller_tenant_shards
    1669            0 :             .inc();
    1670            0 : 
    1671            0 :         Ok(Self {
    1672            0 :             tenant_shard_id,
    1673            0 :             shard: shard_identity,
    1674            0 :             sequence: Sequence::initial(),
    1675            0 :             generation: tsp.generation.map(|g| Generation::new(g as u32)),
    1676            0 :             policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
    1677            0 :             intent,
    1678            0 :             observed: ObservedState::new(),
    1679            0 :             config: serde_json::from_str(&tsp.config).unwrap(),
    1680            0 :             reconciler: None,
    1681            0 :             splitting: tsp.splitting,
    1682            0 :             waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1683            0 :             error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1684            0 :             last_error: Arc::default(),
    1685            0 :             pending_compute_notification: false,
    1686            0 :             delayed_reconcile: false,
    1687            0 :             scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
    1688            0 :         })
    1689            0 :     }
    1690              : 
    1691            0 :     pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
    1692            0 :         TenantShardPersistence {
    1693            0 :             tenant_id: self.tenant_shard_id.tenant_id.to_string(),
    1694            0 :             shard_number: self.tenant_shard_id.shard_number.0 as i32,
    1695            0 :             shard_count: self.tenant_shard_id.shard_count.literal() as i32,
    1696            0 :             shard_stripe_size: self.shard.stripe_size.0 as i32,
    1697            0 :             generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
    1698            0 :             generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
    1699            0 :             placement_policy: serde_json::to_string(&self.policy).unwrap(),
    1700            0 :             config: serde_json::to_string(&self.config).unwrap(),
    1701            0 :             splitting: SplitState::default(),
    1702            0 :             scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
    1703            0 :             preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
    1704            0 :         }
    1705            0 :     }
    1706              : 
    1707        12502 :     pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
    1708        12502 :         self.intent.preferred_az_id.as_ref()
    1709        12502 :     }
    1710              : 
    1711            0 :     pub(crate) fn set_preferred_az(&mut self, preferred_az_id: Option<AvailabilityZone>) {
    1712            0 :         self.intent.preferred_az_id = preferred_az_id;
    1713            0 :     }
    1714              : 
    1715              :     /// Returns all the nodes to which this tenant shard is attached according to the
    1716              :     /// observed state and the generations. Return vector is sorted from latest generation
    1717              :     /// to earliest.
    1718            0 :     pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
    1719            0 :         self.observed
    1720            0 :             .locations
    1721            0 :             .iter()
    1722            0 :             .filter_map(|(node_id, observed)| {
    1723              :                 use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
    1724              : 
    1725            0 :                 let conf = observed.conf.as_ref()?;
    1726              : 
    1727            0 :                 match (conf.generation, conf.mode) {
    1728            0 :                     (Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
    1729            0 :                         Some((*node_id, gen))
    1730              :                     }
    1731            0 :                     _ => None,
    1732              :                 }
    1733            0 :             })
    1734            0 :             .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
    1735            0 :                 lhs_gen.cmp(rhs_gen).reverse()
    1736            0 :             })
    1737            0 :             .map(|(node_id, gen)| (node_id, Generation::new(gen)))
    1738            0 :             .collect()
    1739            0 :     }
    1740              : 
    1741              :     /// Update the observed state of the tenant by applying incremental deltas
    1742              :     ///
    1743              :     /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
    1744              :     /// They are then filtered in [`crate::service::Service::process_result`].
    1745            0 :     pub(crate) fn apply_observed_deltas(
    1746            0 :         &mut self,
    1747            0 :         deltas: impl Iterator<Item = ObservedStateDelta>,
    1748            0 :     ) {
    1749            0 :         for delta in deltas {
    1750            0 :             match delta {
    1751            0 :                 ObservedStateDelta::Upsert(ups) => {
    1752            0 :                     let (node_id, loc) = *ups;
    1753            0 : 
    1754            0 :                     // If the generation of the observed location in the delta is lagging
    1755            0 :                     // behind the current one, then we have a race condition and cannot
    1756            0 :                     // be certain about the true observed state. Set the observed state
    1757            0 :                     // to None in order to reflect this.
    1758            0 :                     let crnt_gen = self
    1759            0 :                         .observed
    1760            0 :                         .locations
    1761            0 :                         .get(&node_id)
    1762            0 :                         .and_then(|loc| loc.conf.as_ref())
    1763            0 :                         .and_then(|conf| conf.generation);
    1764            0 :                     let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
    1765            0 :                     match (crnt_gen, new_gen) {
    1766            0 :                         (Some(crnt), Some(new)) if crnt_gen > new_gen => {
    1767            0 :                             tracing::warn!(
    1768            0 :                                 "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
    1769              :                                 node_id, loc, crnt, new
    1770              :                             );
    1771              : 
    1772            0 :                             self.observed
    1773            0 :                                 .locations
    1774            0 :                                 .insert(node_id, ObservedStateLocation { conf: None });
    1775            0 : 
    1776            0 :                             continue;
    1777              :                         }
    1778            0 :                         _ => {}
    1779              :                     }
    1780              : 
    1781            0 :                     if let Some(conf) = &loc.conf {
    1782            0 :                         tracing::info!("Updating observed location {}: {:?}", node_id, conf);
    1783              :                     } else {
    1784            0 :                         tracing::info!("Setting observed location {} to None", node_id,)
    1785              :                     }
    1786              : 
    1787            0 :                     self.observed.locations.insert(node_id, loc);
    1788              :                 }
    1789            0 :                 ObservedStateDelta::Delete(node_id) => {
    1790            0 :                     tracing::info!("Deleting observed location {}", node_id);
    1791            0 :                     self.observed.locations.remove(&node_id);
    1792              :                 }
    1793              :             }
    1794              :         }
    1795            0 :     }
    1796              : 
    1797              :     /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
    1798              :     ///
    1799              :     /// If the shard does not have a preferred AZ, returns false.
    1800            0 :     pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
    1801            0 :         self.intent
    1802            0 :             .get_attached()
    1803            0 :             .map(|node_id| {
    1804            0 :                 Some(
    1805            0 :                     nodes
    1806            0 :                         .get(&node_id)
    1807            0 :                         .expect("referenced node exists")
    1808            0 :                         .get_availability_zone_id(),
    1809            0 :                 ) == self.intent.preferred_az_id.as_ref()
    1810            0 :             })
    1811            0 :             .unwrap_or(false)
    1812            0 :     }
    1813              : }
    1814              : 
    1815              : impl Drop for TenantShard {
    1816        12839 :     fn drop(&mut self) {
    1817        12839 :         metrics::METRICS_REGISTRY
    1818        12839 :             .metrics_group
    1819        12839 :             .storage_controller_tenant_shards
    1820        12839 :             .dec();
    1821        12839 :     }
    1822              : }
    1823              : 
    1824              : #[cfg(test)]
    1825              : pub(crate) mod tests {
    1826              :     use std::{cell::RefCell, rc::Rc};
    1827              : 
    1828              :     use pageserver_api::{
    1829              :         controller_api::NodeAvailability,
    1830              :         shard::{ShardCount, ShardNumber},
    1831              :     };
    1832              :     use rand::{rngs::StdRng, SeedableRng};
    1833              :     use utils::id::TenantId;
    1834              : 
    1835              :     use crate::scheduler::test_utils::make_test_nodes;
    1836              : 
    1837              :     use super::*;
    1838              : 
    1839            9 :     fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
    1840            9 :         let tenant_id = TenantId::generate();
    1841            9 :         let shard_number = ShardNumber(0);
    1842            9 :         let shard_count = ShardCount::new(1);
    1843            9 : 
    1844            9 :         let tenant_shard_id = TenantShardId {
    1845            9 :             tenant_id,
    1846            9 :             shard_number,
    1847            9 :             shard_count,
    1848            9 :         };
    1849            9 :         TenantShard::new(
    1850            9 :             tenant_shard_id,
    1851            9 :             ShardIdentity::new(
    1852            9 :                 shard_number,
    1853            9 :                 shard_count,
    1854            9 :                 pageserver_api::shard::ShardStripeSize(32768),
    1855            9 :             )
    1856            9 :             .unwrap(),
    1857            9 :             policy,
    1858            9 :             None,
    1859            9 :         )
    1860            9 :     }
    1861              : 
    1862         5004 :     pub(crate) fn make_test_tenant(
    1863         5004 :         policy: PlacementPolicy,
    1864         5004 :         shard_count: ShardCount,
    1865         5004 :         preferred_az: Option<AvailabilityZone>,
    1866         5004 :     ) -> Vec<TenantShard> {
    1867         5004 :         make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
    1868         5004 :     }
    1869              : 
    1870         5007 :     pub(crate) fn make_test_tenant_with_id(
    1871         5007 :         tenant_id: TenantId,
    1872         5007 :         policy: PlacementPolicy,
    1873         5007 :         shard_count: ShardCount,
    1874         5007 :         preferred_az: Option<AvailabilityZone>,
    1875         5007 :     ) -> Vec<TenantShard> {
    1876         5007 :         (0..shard_count.count())
    1877        12522 :             .map(|i| {
    1878        12522 :                 let shard_number = ShardNumber(i);
    1879        12522 : 
    1880        12522 :                 let tenant_shard_id = TenantShardId {
    1881        12522 :                     tenant_id,
    1882        12522 :                     shard_number,
    1883        12522 :                     shard_count,
    1884        12522 :                 };
    1885        12522 :                 TenantShard::new(
    1886        12522 :                     tenant_shard_id,
    1887        12522 :                     ShardIdentity::new(
    1888        12522 :                         shard_number,
    1889        12522 :                         shard_count,
    1890        12522 :                         pageserver_api::shard::ShardStripeSize(32768),
    1891        12522 :                     )
    1892        12522 :                     .unwrap(),
    1893        12522 :                     policy.clone(),
    1894        12522 :                     preferred_az.clone(),
    1895        12522 :                 )
    1896        12522 :             })
    1897         5007 :             .collect()
    1898         5007 :     }
    1899              : 
    1900              :     /// Test the scheduling behaviors used when a tenant configured for HA is subject
    1901              :     /// to nodes being marked offline.
    1902              :     #[test]
    1903            1 :     fn tenant_ha_scheduling() -> anyhow::Result<()> {
    1904            1 :         // Start with three nodes.  Our tenant will only use two.  The third one is
    1905            1 :         // expected to remain unused.
    1906            1 :         let mut nodes = make_test_nodes(3, &[]);
    1907            1 : 
    1908            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1909            1 :         let mut context = ScheduleContext::default();
    1910            1 : 
    1911            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1912            1 :         tenant_shard
    1913            1 :             .schedule(&mut scheduler, &mut context)
    1914            1 :             .expect("we have enough nodes, scheduling should work");
    1915            1 : 
    1916            1 :         // Expect to initially be schedule on to different nodes
    1917            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    1918            1 :         assert!(tenant_shard.intent.attached.is_some());
    1919              : 
    1920            1 :         let attached_node_id = tenant_shard.intent.attached.unwrap();
    1921            1 :         let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
    1922            1 :         assert_ne!(attached_node_id, secondary_node_id);
    1923              : 
    1924              :         // Notifying the attached node is offline should demote it to a secondary
    1925            1 :         let changed = tenant_shard
    1926            1 :             .intent
    1927            1 :             .demote_attached(&mut scheduler, attached_node_id);
    1928            1 :         assert!(changed);
    1929            1 :         assert!(tenant_shard.intent.attached.is_none());
    1930            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 2);
    1931              : 
    1932              :         // Update the scheduler state to indicate the node is offline
    1933            1 :         nodes
    1934            1 :             .get_mut(&attached_node_id)
    1935            1 :             .unwrap()
    1936            1 :             .set_availability(NodeAvailability::Offline);
    1937            1 :         scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
    1938            1 : 
    1939            1 :         // Scheduling the node should promote the still-available secondary node to attached
    1940            1 :         tenant_shard
    1941            1 :             .schedule(&mut scheduler, &mut context)
    1942            1 :             .expect("active nodes are available");
    1943            1 :         assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
    1944              : 
    1945              :         // The original attached node should have been retained as a secondary
    1946            1 :         assert_eq!(
    1947            1 :             *tenant_shard.intent.secondary.iter().last().unwrap(),
    1948            1 :             attached_node_id
    1949            1 :         );
    1950              : 
    1951            1 :         tenant_shard.intent.clear(&mut scheduler);
    1952            1 : 
    1953            1 :         Ok(())
    1954            1 :     }
    1955              : 
    1956              :     #[test]
    1957            1 :     fn intent_from_observed() -> anyhow::Result<()> {
    1958            1 :         let nodes = make_test_nodes(3, &[]);
    1959            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1960            1 : 
    1961            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1962            1 : 
    1963            1 :         tenant_shard.observed.locations.insert(
    1964            1 :             NodeId(3),
    1965            1 :             ObservedStateLocation {
    1966            1 :                 conf: Some(LocationConfig {
    1967            1 :                     mode: LocationConfigMode::AttachedMulti,
    1968            1 :                     generation: Some(2),
    1969            1 :                     secondary_conf: None,
    1970            1 :                     shard_number: tenant_shard.shard.number.0,
    1971            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1972            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1973            1 :                     tenant_conf: TenantConfig::default(),
    1974            1 :                 }),
    1975            1 :             },
    1976            1 :         );
    1977            1 : 
    1978            1 :         tenant_shard.observed.locations.insert(
    1979            1 :             NodeId(2),
    1980            1 :             ObservedStateLocation {
    1981            1 :                 conf: Some(LocationConfig {
    1982            1 :                     mode: LocationConfigMode::AttachedStale,
    1983            1 :                     generation: Some(1),
    1984            1 :                     secondary_conf: None,
    1985            1 :                     shard_number: tenant_shard.shard.number.0,
    1986            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1987            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1988            1 :                     tenant_conf: TenantConfig::default(),
    1989            1 :                 }),
    1990            1 :             },
    1991            1 :         );
    1992            1 : 
    1993            1 :         tenant_shard.intent_from_observed(&mut scheduler);
    1994            1 : 
    1995            1 :         // The highest generationed attached location gets used as attached
    1996            1 :         assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
    1997              :         // Other locations get used as secondary
    1998            1 :         assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
    1999              : 
    2000            1 :         scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
    2001              : 
    2002            1 :         tenant_shard.intent.clear(&mut scheduler);
    2003            1 :         Ok(())
    2004            1 :     }
    2005              : 
    2006              :     #[test]
    2007            1 :     fn scheduling_mode() -> anyhow::Result<()> {
    2008            1 :         let nodes = make_test_nodes(3, &[]);
    2009            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2010            1 : 
    2011            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2012            1 : 
    2013            1 :         // In pause mode, schedule() shouldn't do anything
    2014            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
    2015            1 :         assert!(tenant_shard
    2016            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    2017            1 :             .is_ok());
    2018            1 :         assert!(tenant_shard.intent.all_pageservers().is_empty());
    2019              : 
    2020              :         // In active mode, schedule() works
    2021            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
    2022            1 :         assert!(tenant_shard
    2023            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    2024            1 :             .is_ok());
    2025            1 :         assert!(!tenant_shard.intent.all_pageservers().is_empty());
    2026              : 
    2027            1 :         tenant_shard.intent.clear(&mut scheduler);
    2028            1 :         Ok(())
    2029            1 :     }
    2030              : 
    2031              :     #[test]
    2032              :     /// Simple case: moving attachment to somewhere better where we already have a secondary
    2033            1 :     fn optimize_attachment_simple() -> anyhow::Result<()> {
    2034            1 :         let nodes = make_test_nodes(
    2035            1 :             3,
    2036            1 :             &[
    2037            1 :                 AvailabilityZone("az-a".to_string()),
    2038            1 :                 AvailabilityZone("az-b".to_string()),
    2039            1 :                 AvailabilityZone("az-c".to_string()),
    2040            1 :             ],
    2041            1 :         );
    2042            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2043            1 : 
    2044            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2045            1 :         shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2046            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2047            1 :         shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2048            1 : 
    2049            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    2050            1 :         // on different nodes.
    2051            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    2052            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
    2053            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    2054            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
    2055              : 
    2056            1 :         fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
    2057            1 :             let mut schedule_context = ScheduleContext::default();
    2058            1 :             schedule_context.avoid(&shard_a.intent.all_pageservers());
    2059            1 :             schedule_context.avoid(&shard_b.intent.all_pageservers());
    2060            1 :             schedule_context
    2061            1 :         }
    2062              : 
    2063            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2064            1 :         let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2065            1 :         assert_eq!(
    2066            1 :             optimization_a,
    2067            1 :             Some(ScheduleOptimization {
    2068            1 :                 sequence: shard_a.sequence,
    2069            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2070            1 :                     old_attached_node_id: NodeId(2),
    2071            1 :                     new_attached_node_id: NodeId(1)
    2072            1 :                 })
    2073            1 :             })
    2074            1 :         );
    2075            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    2076            1 : 
    2077            1 :         // // Either shard should recognize that it has the option to switch to a secondary location where there
    2078            1 :         // // would be no other shards from the same tenant, and request to do so.
    2079            1 :         // assert_eq!(
    2080            1 :         //     optimization_a_prepare,
    2081            1 :         //     Some(ScheduleOptimization {
    2082            1 :         //         sequence: shard_a.sequence,
    2083            1 :         //         action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
    2084            1 :         //     })
    2085            1 :         // );
    2086            1 :         // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
    2087            1 : 
    2088            1 :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2089            1 :         // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2090            1 :         // assert_eq!(
    2091            1 :         //     optimization_a_migrate,
    2092            1 :         //     Some(ScheduleOptimization {
    2093            1 :         //         sequence: shard_a.sequence,
    2094            1 :         //         action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2095            1 :         //             old_attached_node_id: NodeId(1),
    2096            1 :         //             new_attached_node_id: NodeId(2)
    2097            1 :         //         })
    2098            1 :         //     })
    2099            1 :         // );
    2100            1 :         // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
    2101            1 : 
    2102            1 :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2103            1 :         // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2104            1 :         // assert_eq!(
    2105            1 :         //     optimization_a_cleanup,
    2106            1 :         //     Some(ScheduleOptimization {
    2107            1 :         //         sequence: shard_a.sequence,
    2108            1 :         //         action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
    2109            1 :         //     })
    2110            1 :         // );
    2111            1 :         // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
    2112            1 : 
    2113            1 :         // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
    2114            1 :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2115            1 :         // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
    2116            1 : 
    2117            1 :         shard_a.intent.clear(&mut scheduler);
    2118            1 :         shard_b.intent.clear(&mut scheduler);
    2119            1 : 
    2120            1 :         Ok(())
    2121            1 :     }
    2122              : 
    2123              :     #[test]
    2124              :     /// Complicated case: moving attachment to somewhere better where we do not have a secondary
    2125              :     /// already, creating one as needed.
    2126            1 :     fn optimize_attachment_multistep() -> anyhow::Result<()> {
    2127            1 :         let nodes = make_test_nodes(
    2128            1 :             3,
    2129            1 :             &[
    2130            1 :                 AvailabilityZone("az-a".to_string()),
    2131            1 :                 AvailabilityZone("az-b".to_string()),
    2132            1 :                 AvailabilityZone("az-c".to_string()),
    2133            1 :             ],
    2134            1 :         );
    2135            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2136            1 : 
    2137            1 :         // Two shards of a tenant that wants to be in AZ A
    2138            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2139            1 :         shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2140            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2141            1 :         shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
    2142            1 : 
    2143            1 :         // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
    2144            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    2145            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    2146            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
    2147            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
    2148              : 
    2149            3 :         fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
    2150            3 :             let mut schedule_context = ScheduleContext::default();
    2151            3 :             schedule_context.avoid(&shard_a.intent.all_pageservers());
    2152            3 :             schedule_context.avoid(&shard_b.intent.all_pageservers());
    2153            3 :             schedule_context
    2154            3 :         }
    2155              : 
    2156            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2157            1 :         let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2158            1 :         assert_eq!(
    2159            1 :             optimization_a_prepare,
    2160            1 :             Some(ScheduleOptimization {
    2161            1 :                 sequence: shard_a.sequence,
    2162            1 :                 action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
    2163            1 :             })
    2164            1 :         );
    2165            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
    2166            1 : 
    2167            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2168            1 :         let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2169            1 :         assert_eq!(
    2170            1 :             optimization_a_migrate,
    2171            1 :             Some(ScheduleOptimization {
    2172            1 :                 sequence: shard_a.sequence,
    2173            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2174            1 :                     old_attached_node_id: NodeId(2),
    2175            1 :                     new_attached_node_id: NodeId(1)
    2176            1 :                 })
    2177            1 :             })
    2178            1 :         );
    2179            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
    2180            1 : 
    2181            1 :         let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2182            1 :         let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
    2183            1 :         assert_eq!(
    2184            1 :             optimization_a_cleanup,
    2185            1 :             Some(ScheduleOptimization {
    2186            1 :                 sequence: shard_a.sequence,
    2187            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
    2188            1 :             })
    2189            1 :         );
    2190            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
    2191            1 : 
    2192            1 :         // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
    2193            1 :         // let schedule_context = make_schedule_context(&shard_a, &shard_b);
    2194            1 :         // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
    2195            1 : 
    2196            1 :         shard_a.intent.clear(&mut scheduler);
    2197            1 :         shard_b.intent.clear(&mut scheduler);
    2198            1 : 
    2199            1 :         Ok(())
    2200            1 :     }
    2201              : 
    2202              :     #[test]
    2203              :     /// Check that multi-step migration works when moving to somewhere that is only better by
    2204              :     /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
    2205              :     /// counting toward the affinity score such that it prevents the rest of the migration from happening.
    2206            1 :     fn optimize_attachment_marginal() -> anyhow::Result<()> {
    2207            1 :         let nodes = make_test_nodes(2, &[]);
    2208            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2209            1 : 
    2210            1 :         // Multi-sharded tenant, we will craft a situation where affinity
    2211            1 :         // scores differ only slightly
    2212            1 :         let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
    2213            1 : 
    2214            1 :         // 1 attached on node 1
    2215            1 :         shards[0]
    2216            1 :             .intent
    2217            1 :             .set_attached(&mut scheduler, Some(NodeId(1)));
    2218            1 :         // 3 attached on node 2
    2219            1 :         shards[1]
    2220            1 :             .intent
    2221            1 :             .set_attached(&mut scheduler, Some(NodeId(2)));
    2222            1 :         shards[2]
    2223            1 :             .intent
    2224            1 :             .set_attached(&mut scheduler, Some(NodeId(2)));
    2225            1 :         shards[3]
    2226            1 :             .intent
    2227            1 :             .set_attached(&mut scheduler, Some(NodeId(2)));
    2228              : 
    2229              :         // The scheduler should figure out that we need to:
    2230              :         // - Create a secondary for shard 3 on node 1
    2231              :         // - Migrate shard 3 to node 1
    2232              :         // - Remove shard 3's location on node 2
    2233              : 
    2234            4 :         fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
    2235            4 :             let mut schedule_context = ScheduleContext::default();
    2236           20 :             for shard in shards {
    2237           16 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    2238           16 :             }
    2239            4 :             schedule_context
    2240            4 :         }
    2241              : 
    2242            1 :         let schedule_context = make_schedule_context(&shards);
    2243            1 :         let optimization_a_prepare =
    2244            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context);
    2245            1 :         assert_eq!(
    2246            1 :             optimization_a_prepare,
    2247            1 :             Some(ScheduleOptimization {
    2248            1 :                 sequence: shards[1].sequence,
    2249            1 :                 action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
    2250            1 :             })
    2251            1 :         );
    2252            1 :         shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
    2253            1 : 
    2254            1 :         let schedule_context = make_schedule_context(&shards);
    2255            1 :         let optimization_a_migrate =
    2256            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context);
    2257            1 :         assert_eq!(
    2258            1 :             optimization_a_migrate,
    2259            1 :             Some(ScheduleOptimization {
    2260            1 :                 sequence: shards[1].sequence,
    2261            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    2262            1 :                     old_attached_node_id: NodeId(2),
    2263            1 :                     new_attached_node_id: NodeId(1)
    2264            1 :                 })
    2265            1 :             })
    2266            1 :         );
    2267            1 :         shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
    2268            1 : 
    2269            1 :         let schedule_context = make_schedule_context(&shards);
    2270            1 :         let optimization_a_cleanup =
    2271            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context);
    2272            1 :         assert_eq!(
    2273            1 :             optimization_a_cleanup,
    2274            1 :             Some(ScheduleOptimization {
    2275            1 :                 sequence: shards[1].sequence,
    2276            1 :                 action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
    2277            1 :             })
    2278            1 :         );
    2279            1 :         shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
    2280            1 : 
    2281            1 :         // Everything should be stable now
    2282            1 :         let schedule_context = make_schedule_context(&shards);
    2283            1 :         assert_eq!(
    2284            1 :             shards[0].optimize_attachment(&mut scheduler, &schedule_context),
    2285            1 :             None
    2286            1 :         );
    2287            1 :         assert_eq!(
    2288            1 :             shards[1].optimize_attachment(&mut scheduler, &schedule_context),
    2289            1 :             None
    2290            1 :         );
    2291            1 :         assert_eq!(
    2292            1 :             shards[2].optimize_attachment(&mut scheduler, &schedule_context),
    2293            1 :             None
    2294            1 :         );
    2295            1 :         assert_eq!(
    2296            1 :             shards[3].optimize_attachment(&mut scheduler, &schedule_context),
    2297            1 :             None
    2298            1 :         );
    2299              : 
    2300            5 :         for mut shard in shards {
    2301            4 :             shard.intent.clear(&mut scheduler);
    2302            4 :         }
    2303              : 
    2304            1 :         Ok(())
    2305            1 :     }
    2306              : 
    2307              :     #[test]
    2308            1 :     fn optimize_secondary() -> anyhow::Result<()> {
    2309            1 :         let nodes = make_test_nodes(4, &[]);
    2310            1 :         let mut scheduler = Scheduler::new(nodes.values());
    2311            1 : 
    2312            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2313            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    2314            1 : 
    2315            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    2316            1 :         // on different nodes.
    2317            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    2318            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    2319            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    2320            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    2321            1 : 
    2322            1 :         let mut schedule_context = ScheduleContext::default();
    2323            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    2324            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    2325            1 : 
    2326            1 :         let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
    2327            1 : 
    2328            1 :         // Since there is a node with no locations available, the node with two locations for the
    2329            1 :         // same tenant should generate an optimization to move one away
    2330            1 :         assert_eq!(
    2331            1 :             optimization_a,
    2332            1 :             Some(ScheduleOptimization {
    2333            1 :                 sequence: shard_a.sequence,
    2334            1 :                 action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    2335            1 :                     old_node_id: NodeId(3),
    2336            1 :                     new_node_id: NodeId(4)
    2337            1 :                 })
    2338            1 :             })
    2339            1 :         );
    2340              : 
    2341            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    2342            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    2343            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
    2344              : 
    2345            1 :         shard_a.intent.clear(&mut scheduler);
    2346            1 :         shard_b.intent.clear(&mut scheduler);
    2347            1 : 
    2348            1 :         Ok(())
    2349            1 :     }
    2350              : 
    2351              :     // Optimize til quiescent: this emulates what Service::optimize_all does, when
    2352              :     // called repeatedly in the background.
    2353              :     // Returns the applied optimizations
    2354            3 :     fn optimize_til_idle(
    2355            3 :         scheduler: &mut Scheduler,
    2356            3 :         shards: &mut [TenantShard],
    2357            3 :     ) -> Vec<ScheduleOptimization> {
    2358            3 :         let mut loop_n = 0;
    2359            3 :         let mut optimizations = Vec::default();
    2360              :         loop {
    2361            6 :             let mut schedule_context = ScheduleContext::default();
    2362            6 :             let mut any_changed = false;
    2363              : 
    2364           24 :             for shard in shards.iter() {
    2365           24 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    2366           24 :             }
    2367              : 
    2368           15 :             for shard in shards.iter_mut() {
    2369           15 :                 let optimization = shard.optimize_attachment(scheduler, &schedule_context);
    2370           15 :                 tracing::info!(
    2371            0 :                     "optimize_attachment({})={:?}",
    2372              :                     shard.tenant_shard_id,
    2373              :                     optimization
    2374              :                 );
    2375           15 :                 if let Some(optimization) = optimization {
    2376              :                     // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
    2377            3 :                     assert!(shard.maybe_optimizable(scheduler, &schedule_context));
    2378            3 :                     optimizations.push(optimization.clone());
    2379            3 :                     shard.apply_optimization(scheduler, optimization);
    2380            3 :                     any_changed = true;
    2381            3 :                     break;
    2382           12 :                 }
    2383           12 : 
    2384           12 :                 let optimization = shard.optimize_secondary(scheduler, &schedule_context);
    2385           12 :                 tracing::info!(
    2386            0 :                     "optimize_secondary({})={:?}",
    2387              :                     shard.tenant_shard_id,
    2388              :                     optimization
    2389              :                 );
    2390           12 :                 if let Some(optimization) = optimization {
    2391              :                     // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
    2392            0 :                     assert!(shard.maybe_optimizable(scheduler, &schedule_context));
    2393              : 
    2394            0 :                     optimizations.push(optimization.clone());
    2395            0 :                     shard.apply_optimization(scheduler, optimization);
    2396            0 :                     any_changed = true;
    2397            0 :                     break;
    2398           12 :                 }
    2399              :             }
    2400              : 
    2401            6 :             if !any_changed {
    2402            3 :                 break;
    2403            3 :             }
    2404            3 : 
    2405            3 :             // Assert no infinite loop
    2406            3 :             loop_n += 1;
    2407            3 :             assert!(loop_n < 1000);
    2408              :         }
    2409              : 
    2410            3 :         optimizations
    2411            3 :     }
    2412              : 
    2413              :     /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
    2414              :     /// that it converges.
    2415              :     #[test]
    2416            1 :     fn optimize_add_nodes() -> anyhow::Result<()> {
    2417            1 :         let nodes = make_test_nodes(
    2418            1 :             9,
    2419            1 :             &[
    2420            1 :                 // Initial 6 nodes
    2421            1 :                 AvailabilityZone("az-a".to_string()),
    2422            1 :                 AvailabilityZone("az-a".to_string()),
    2423            1 :                 AvailabilityZone("az-b".to_string()),
    2424            1 :                 AvailabilityZone("az-b".to_string()),
    2425            1 :                 AvailabilityZone("az-c".to_string()),
    2426            1 :                 AvailabilityZone("az-c".to_string()),
    2427            1 :                 // Three we will add later
    2428            1 :                 AvailabilityZone("az-a".to_string()),
    2429            1 :                 AvailabilityZone("az-b".to_string()),
    2430            1 :                 AvailabilityZone("az-c".to_string()),
    2431            1 :             ],
    2432            1 :         );
    2433            1 : 
    2434            1 :         // Only show the scheduler two nodes in each AZ to start with
    2435            1 :         let mut scheduler = Scheduler::new([].iter());
    2436            7 :         for i in 1..=6 {
    2437            6 :             scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
    2438            6 :         }
    2439              : 
    2440            1 :         let mut shards = make_test_tenant(
    2441            1 :             PlacementPolicy::Attached(1),
    2442            1 :             ShardCount::new(4),
    2443            1 :             Some(AvailabilityZone("az-a".to_string())),
    2444            1 :         );
    2445            1 :         let mut schedule_context = ScheduleContext::default();
    2446            5 :         for shard in &mut shards {
    2447            4 :             assert!(shard
    2448            4 :                 .schedule(&mut scheduler, &mut schedule_context)
    2449            4 :                 .is_ok());
    2450              :         }
    2451              : 
    2452              :         // Initial: attached locations land in the tenant's home AZ.
    2453            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
    2454            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
    2455            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    2456            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
    2457              : 
    2458              :         // Initial: secondary locations in a remote AZ
    2459            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
    2460            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
    2461            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
    2462            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
    2463            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
    2464            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
    2465            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
    2466            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
    2467              : 
    2468              :         // Add another three nodes: we should see the shards spread out when their optimize
    2469              :         // methods are called
    2470            1 :         scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
    2471            1 :         scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
    2472            1 :         scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
    2473            1 :         optimize_til_idle(&mut scheduler, &mut shards);
    2474            1 : 
    2475            1 :         // We expect one attached location was moved to the new node in the tenant's home AZ
    2476            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
    2477            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
    2478              :         // The original node has one less attached shard
    2479            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
    2480            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
    2481              : 
    2482              :         // One of the original nodes still has two attachments, since there are an odd number of nodes
    2483            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    2484            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
    2485              : 
    2486              :         // None of our secondaries moved, since we already had enough nodes for those to be
    2487              :         // scheduled perfectly
    2488            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
    2489            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
    2490            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
    2491            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
    2492            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
    2493            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
    2494            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
    2495            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
    2496              : 
    2497            4 :         for shard in shards.iter_mut() {
    2498            4 :             shard.intent.clear(&mut scheduler);
    2499            4 :         }
    2500              : 
    2501            1 :         Ok(())
    2502            1 :     }
    2503              : 
    2504              :     /// Test that initial shard scheduling is optimal. By optimal we mean
    2505              :     /// that the optimizer cannot find a way to improve it.
    2506              :     ///
    2507              :     /// This test is an example of the scheduling issue described in
    2508              :     /// https://github.com/neondatabase/neon/issues/8969
    2509              :     #[test]
    2510            1 :     fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
    2511              :         use itertools::Itertools;
    2512              : 
    2513            1 :         let nodes = make_test_nodes(2, &[]);
    2514            1 : 
    2515            1 :         let mut scheduler = Scheduler::new([].iter());
    2516            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    2517            1 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    2518            1 : 
    2519            1 :         let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
    2520            1 :         let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
    2521            1 : 
    2522            1 :         let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
    2523            1 :         let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
    2524            1 : 
    2525            4 :         let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
    2526            4 :         let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
    2527            1 : 
    2528            1 :         let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
    2529              : 
    2530            9 :         for (shard, context) in schedule_order {
    2531            8 :             let context = &mut *context.borrow_mut();
    2532            8 :             shard.schedule(&mut scheduler, context).unwrap();
    2533            8 :         }
    2534              : 
    2535            1 :         let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
    2536            1 :         assert_eq!(applied_to_a, vec![]);
    2537              : 
    2538            1 :         let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
    2539            1 :         assert_eq!(applied_to_b, vec![]);
    2540              : 
    2541            8 :         for shard in a.iter_mut().chain(b.iter_mut()) {
    2542            8 :             shard.intent.clear(&mut scheduler);
    2543            8 :         }
    2544              : 
    2545            1 :         Ok(())
    2546            1 :     }
    2547              : 
    2548              :     #[test]
    2549            1 :     fn random_az_shard_scheduling() -> anyhow::Result<()> {
    2550              :         use rand::seq::SliceRandom;
    2551              : 
    2552           51 :         for seed in 0..50 {
    2553           50 :             eprintln!("Running test with seed {seed}");
    2554           50 :             let mut rng = StdRng::seed_from_u64(seed);
    2555           50 : 
    2556           50 :             let az_a_tag = AvailabilityZone("az-a".to_string());
    2557           50 :             let az_b_tag = AvailabilityZone("az-b".to_string());
    2558           50 :             let azs = [az_a_tag, az_b_tag];
    2559           50 :             let nodes = make_test_nodes(4, &azs);
    2560           50 :             let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
    2561           50 : 
    2562           50 :             let mut scheduler = Scheduler::new([].iter());
    2563          200 :             for node in nodes.values() {
    2564          200 :                 scheduler.node_upsert(node);
    2565          200 :             }
    2566              : 
    2567           50 :             let mut shards = Vec::default();
    2568           50 :             let mut contexts = Vec::default();
    2569           50 :             let mut az_picker = azs.iter().cycle().cloned();
    2570         5050 :             for i in 0..100 {
    2571         5000 :                 let az = az_picker.next().unwrap();
    2572         5000 :                 let shard_count = i % 4 + 1;
    2573         5000 :                 *shards_per_az.entry(az.clone()).or_default() += shard_count;
    2574         5000 : 
    2575         5000 :                 let tenant_shards = make_test_tenant(
    2576         5000 :                     PlacementPolicy::Attached(1),
    2577         5000 :                     ShardCount::new(shard_count.try_into().unwrap()),
    2578         5000 :                     Some(az),
    2579         5000 :                 );
    2580         5000 :                 let context = Rc::new(RefCell::new(ScheduleContext::default()));
    2581         5000 : 
    2582         5000 :                 contexts.push(context.clone());
    2583         5000 :                 let with_ctx = tenant_shards
    2584         5000 :                     .into_iter()
    2585        12500 :                     .map(|shard| (shard, context.clone()));
    2586        17500 :                 for shard_with_ctx in with_ctx {
    2587        12500 :                     shards.push(shard_with_ctx);
    2588        12500 :                 }
    2589              :             }
    2590              : 
    2591           50 :             shards.shuffle(&mut rng);
    2592              : 
    2593              :             #[derive(Default, Debug)]
    2594              :             struct NodeStats {
    2595              :                 attachments: u32,
    2596              :                 secondaries: u32,
    2597              :             }
    2598              : 
    2599           50 :             let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
    2600           50 :             let mut attachments_in_wrong_az = 0;
    2601           50 :             let mut secondaries_in_wrong_az = 0;
    2602              : 
    2603        12550 :             for (shard, context) in &mut shards {
    2604        12500 :                 let context = &mut *context.borrow_mut();
    2605        12500 :                 shard.schedule(&mut scheduler, context).unwrap();
    2606        12500 : 
    2607        12500 :                 let attached_node = shard.intent.get_attached().unwrap();
    2608        12500 :                 let stats = node_stats.entry(attached_node).or_default();
    2609        12500 :                 stats.attachments += 1;
    2610        12500 : 
    2611        12500 :                 let secondary_node = *shard.intent.get_secondary().first().unwrap();
    2612        12500 :                 let stats = node_stats.entry(secondary_node).or_default();
    2613        12500 :                 stats.secondaries += 1;
    2614        12500 : 
    2615        12500 :                 let attached_node_az = nodes
    2616        12500 :                     .get(&attached_node)
    2617        12500 :                     .unwrap()
    2618        12500 :                     .get_availability_zone_id();
    2619        12500 :                 let secondary_node_az = nodes
    2620        12500 :                     .get(&secondary_node)
    2621        12500 :                     .unwrap()
    2622        12500 :                     .get_availability_zone_id();
    2623        12500 :                 let preferred_az = shard.preferred_az().unwrap();
    2624        12500 : 
    2625        12500 :                 if attached_node_az != preferred_az {
    2626            0 :                     eprintln!(
    2627            0 :                         "{} attachment was scheduled in AZ {} but preferred AZ {}",
    2628            0 :                         shard.tenant_shard_id, attached_node_az, preferred_az
    2629            0 :                     );
    2630            0 :                     attachments_in_wrong_az += 1;
    2631        12500 :                 }
    2632              : 
    2633        12500 :                 if secondary_node_az == preferred_az {
    2634            0 :                     eprintln!(
    2635            0 :                         "{} secondary was scheduled in AZ {} which matches preference",
    2636            0 :                         shard.tenant_shard_id, attached_node_az
    2637            0 :                     );
    2638            0 :                     secondaries_in_wrong_az += 1;
    2639        12500 :                 }
    2640              :             }
    2641              : 
    2642           50 :             let mut violations = Vec::default();
    2643           50 : 
    2644           50 :             if attachments_in_wrong_az > 0 {
    2645            0 :                 violations.push(format!(
    2646            0 :                     "{} attachments scheduled to the incorrect AZ",
    2647            0 :                     attachments_in_wrong_az
    2648            0 :                 ));
    2649           50 :             }
    2650              : 
    2651           50 :             if secondaries_in_wrong_az > 0 {
    2652            0 :                 violations.push(format!(
    2653            0 :                     "{} secondaries scheduled to the incorrect AZ",
    2654            0 :                     secondaries_in_wrong_az
    2655            0 :                 ));
    2656           50 :             }
    2657              : 
    2658           50 :             eprintln!(
    2659           50 :                 "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
    2660           50 :                 attachments_in_wrong_az, secondaries_in_wrong_az
    2661           50 :             );
    2662              : 
    2663          250 :             for (node_id, stats) in &node_stats {
    2664          200 :                 let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
    2665          200 :                 let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
    2666          200 :                 let allowed_attachment_load =
    2667          200 :                     (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
    2668          200 : 
    2669          200 :                 if !allowed_attachment_load.contains(&stats.attachments) {
    2670            0 :                     violations.push(format!(
    2671            0 :                         "Found {} attachments on node {}, but expected {}",
    2672            0 :                         stats.attachments, node_id, ideal_attachment_load
    2673            0 :                     ));
    2674          200 :                 }
    2675              : 
    2676          200 :                 eprintln!(
    2677          200 :                     "{}: attachments={} secondaries={} ideal_attachment_load={}",
    2678          200 :                     node_id, stats.attachments, stats.secondaries, ideal_attachment_load
    2679          200 :                 );
    2680              :             }
    2681              : 
    2682           50 :             assert!(violations.is_empty(), "{violations:?}");
    2683              : 
    2684        12550 :             for (mut shard, _ctx) in shards {
    2685        12500 :                 shard.intent.clear(&mut scheduler);
    2686        12500 :             }
    2687              :         }
    2688            1 :         Ok(())
    2689            1 :     }
    2690              : }
        

Generated by: LCOV version 2.1-beta