LCOV - code coverage report
Current view: top level - storage_controller/src - tenant_shard.rs (source / functions) Coverage Total Hit
Test: b9728233c33232dfae45024a493738ef141ccd5d.info Lines: 60.2 % 1193 718
Test Date: 2025-01-10 20:41:15 Functions: 37.2 % 113 42

            Line data    Source code
       1              : use std::{
       2              :     collections::{HashMap, HashSet},
       3              :     sync::Arc,
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use crate::{
       8              :     metrics::{
       9              :         self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
      10              :     },
      11              :     persistence::TenantShardPersistence,
      12              :     reconciler::{ReconcileUnits, ReconcilerConfig},
      13              :     scheduler::{
      14              :         AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
      15              :         SecondaryShardTag,
      16              :     },
      17              :     service::ReconcileResultRequest,
      18              : };
      19              : use futures::future::{self, Either};
      20              : use itertools::Itertools;
      21              : use pageserver_api::controller_api::{
      22              :     AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
      23              : };
      24              : use pageserver_api::{
      25              :     models::{LocationConfig, LocationConfigMode, TenantConfig},
      26              :     shard::{ShardIdentity, TenantShardId},
      27              : };
      28              : use serde::{Deserialize, Serialize};
      29              : use tokio::task::JoinHandle;
      30              : use tokio_util::sync::CancellationToken;
      31              : use tracing::{instrument, Instrument};
      32              : use utils::{
      33              :     generation::Generation,
      34              :     id::NodeId,
      35              :     seqwait::{SeqWait, SeqWaitError},
      36              :     sync::gate::GateGuard,
      37              : };
      38              : 
      39              : use crate::{
      40              :     compute_hook::ComputeHook,
      41              :     node::Node,
      42              :     persistence::{split_state::SplitState, Persistence},
      43              :     reconciler::{
      44              :         attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
      45              :     },
      46              :     scheduler::{ScheduleError, Scheduler},
      47              :     service, Sequence,
      48              : };
      49              : 
      50              : /// Serialization helper
      51            0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
      52            0 : where
      53            0 :     S: serde::ser::Serializer,
      54            0 :     T: std::fmt::Display,
      55            0 : {
      56            0 :     serializer.collect_str(
      57            0 :         &v.lock()
      58            0 :             .unwrap()
      59            0 :             .as_ref()
      60            0 :             .map(|e| format!("{e}"))
      61            0 :             .unwrap_or("".to_string()),
      62            0 :     )
      63            0 : }
      64              : 
      65              : /// In-memory state for a particular tenant shard.
      66              : ///
      67              : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
      68              : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
      69            0 : #[derive(Serialize)]
      70              : pub(crate) struct TenantShard {
      71              :     pub(crate) tenant_shard_id: TenantShardId,
      72              : 
      73              :     pub(crate) shard: ShardIdentity,
      74              : 
      75              :     // Runtime only: sequence used to coordinate when updating this object while
      76              :     // with background reconcilers may be running.  A reconciler runs to a particular
      77              :     // sequence.
      78              :     pub(crate) sequence: Sequence,
      79              : 
      80              :     // Latest generation number: next time we attach, increment this
      81              :     // and use the incremented number when attaching.
      82              :     //
      83              :     // None represents an incompletely onboarded tenant via the [`Service::location_config`]
      84              :     // API, where this tenant may only run in PlacementPolicy::Secondary.
      85              :     pub(crate) generation: Option<Generation>,
      86              : 
      87              :     // High level description of how the tenant should be set up.  Provided
      88              :     // externally.
      89              :     pub(crate) policy: PlacementPolicy,
      90              : 
      91              :     // Low level description of exactly which pageservers should fulfil
      92              :     // which role.  Generated by `Self::schedule`.
      93              :     pub(crate) intent: IntentState,
      94              : 
      95              :     // Low level description of how the tenant is configured on pageservers:
      96              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      97              :     // with `Self::reconcile`.
      98              :     pub(crate) observed: ObservedState,
      99              : 
     100              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
     101              :     // for all shards in a tenant.
     102              :     pub(crate) config: TenantConfig,
     103              : 
     104              :     /// If a reconcile task is currently in flight, it may be joined here (it is
     105              :     /// only safe to join if either the result has been received or the reconciler's
     106              :     /// cancellation token has been fired)
     107              :     #[serde(skip)]
     108              :     pub(crate) reconciler: Option<ReconcilerHandle>,
     109              : 
     110              :     /// If a tenant is being split, then all shards with that TenantId will have a
     111              :     /// SplitState set, this acts as a guard against other operations such as background
     112              :     /// reconciliation, and timeline creation.
     113              :     pub(crate) splitting: SplitState,
     114              : 
     115              :     /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
     116              :     /// is set. This flag is cleared when the tenant is popped off the delay queue.
     117              :     pub(crate) delayed_reconcile: bool,
     118              : 
     119              :     /// Optionally wait for reconciliation to complete up to a particular
     120              :     /// sequence number.
     121              :     #[serde(skip)]
     122              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     123              : 
     124              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
     125              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
     126              :     /// and callers should stop waiting for `waiter` and propagate the error.
     127              :     #[serde(skip)]
     128              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     129              : 
     130              :     /// The most recent error from a reconcile on this tenant.  This is a nested Arc
     131              :     /// because:
     132              :     ///  - ReconcileWaiters need to Arc-clone the overall object to read it later
     133              :     ///  - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
     134              :     ///    many waiters for one shard, and the underlying error types are not Clone.
     135              :     ///
     136              :     /// TODO: generalize to an array of recent events
     137              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
     138              :     #[serde(serialize_with = "read_last_error")]
     139              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     140              : 
     141              :     /// If we have a pending compute notification that for some reason we weren't able to send,
     142              :     /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
     143              :     /// and trigger a Reconciler run.  This is the mechanism by which compute notifications are included in the scope
     144              :     /// of state that we publish externally in an eventually consistent way.
     145              :     pub(crate) pending_compute_notification: bool,
     146              : 
     147              :     // Support/debug tool: if something is going wrong or flapping with scheduling, this may
     148              :     // be set to a non-active state to avoid making changes while the issue is fixed.
     149              :     scheduling_policy: ShardSchedulingPolicy,
     150              : 
     151              :     // We should attempt to schedule this shard in the provided AZ to
     152              :     // decrease chances of cross-AZ compute.
     153              :     preferred_az_id: Option<AvailabilityZone>,
     154              : }
     155              : 
     156              : #[derive(Default, Clone, Debug, Serialize)]
     157              : pub(crate) struct IntentState {
     158              :     attached: Option<NodeId>,
     159              :     secondary: Vec<NodeId>,
     160              : }
     161              : 
     162              : impl IntentState {
     163           19 :     pub(crate) fn new() -> Self {
     164           19 :         Self {
     165           19 :             attached: None,
     166           19 :             secondary: vec![],
     167           19 :         }
     168           19 :     }
     169            0 :     pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
     170            0 :         if let Some(node_id) = node_id {
     171            0 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
     172            0 :         }
     173            0 :         Self {
     174            0 :             attached: node_id,
     175            0 :             secondary: vec![],
     176            0 :         }
     177            0 :     }
     178              : 
     179        12544 :     pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
     180        12544 :         if self.attached != new_attached {
     181        12544 :             if let Some(old_attached) = self.attached.take() {
     182            0 :                 scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
     183        12544 :             }
     184        12544 :             if let Some(new_attached) = &new_attached {
     185        12544 :                 scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
     186        12544 :             }
     187        12544 :             self.attached = new_attached;
     188            0 :         }
     189        12544 :     }
     190              : 
     191              :     /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
     192              :     /// secondary to attached while maintaining the scheduler's reference counts.
     193            5 :     pub(crate) fn promote_attached(
     194            5 :         &mut self,
     195            5 :         scheduler: &mut Scheduler,
     196            5 :         promote_secondary: NodeId,
     197            5 :     ) {
     198            5 :         // If we call this with a node that isn't in secondary, it would cause incorrect
     199            5 :         // scheduler reference counting, since we assume the node is already referenced as a secondary.
     200            5 :         debug_assert!(self.secondary.contains(&promote_secondary));
     201              : 
     202           10 :         self.secondary.retain(|n| n != &promote_secondary);
     203            5 : 
     204            5 :         let demoted = self.attached;
     205            5 :         self.attached = Some(promote_secondary);
     206            5 : 
     207            5 :         scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
     208            5 :         if let Some(demoted) = demoted {
     209            0 :             scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
     210            5 :         }
     211            5 :     }
     212              : 
     213        12531 :     pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
     214        12531 :         debug_assert!(!self.secondary.contains(&new_secondary));
     215        12531 :         scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
     216        12531 :         self.secondary.push(new_secondary);
     217        12531 :     }
     218              : 
     219              :     /// It is legal to call this with a node that is not currently a secondary: that is a no-op
     220            5 :     pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
     221            5 :         let index = self.secondary.iter().position(|n| *n == node_id);
     222            5 :         if let Some(index) = index {
     223            5 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
     224            5 :             self.secondary.remove(index);
     225            5 :         }
     226            5 :     }
     227              : 
     228        12543 :     pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
     229        12543 :         for secondary in self.secondary.drain(..) {
     230        12526 :             scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
     231        12526 :         }
     232        12543 :     }
     233              : 
     234              :     /// Remove the last secondary node from the list of secondaries
     235            0 :     pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
     236            0 :         if let Some(node_id) = self.secondary.pop() {
     237            0 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
     238            0 :         }
     239            0 :     }
     240              : 
     241        12543 :     pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
     242        12543 :         if let Some(old_attached) = self.attached.take() {
     243        12543 :             scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
     244        12543 :         }
     245              : 
     246        12543 :         self.clear_secondary(scheduler);
     247        12543 :     }
     248              : 
     249        12614 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     250        12614 :         let mut result = Vec::new();
     251        12614 :         if let Some(p) = self.attached {
     252        12612 :             result.push(p)
     253            2 :         }
     254              : 
     255        12614 :         result.extend(self.secondary.iter().copied());
     256        12614 : 
     257        12614 :         result
     258        12614 :     }
     259              : 
     260        25095 :     pub(crate) fn get_attached(&self) -> &Option<NodeId> {
     261        25095 :         &self.attached
     262        25095 :     }
     263              : 
     264        12524 :     pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
     265        12524 :         &self.secondary
     266        12524 :     }
     267              : 
     268              :     /// If the node is in use as the attached location, demote it into
     269              :     /// the list of secondary locations.  This is used when a node goes offline,
     270              :     /// and we want to use a different node for attachment, but not permanently
     271              :     /// forget the location on the offline node.
     272              :     ///
     273              :     /// Returns true if a change was made
     274            5 :     pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
     275            5 :         if self.attached == Some(node_id) {
     276            5 :             self.attached = None;
     277            5 :             self.secondary.push(node_id);
     278            5 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
     279            5 :             true
     280              :         } else {
     281            0 :             false
     282              :         }
     283            5 :     }
     284              : }
     285              : 
     286              : impl Drop for IntentState {
     287        12544 :     fn drop(&mut self) {
     288        12544 :         // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
     289        12544 :         // We do not check this while panicking, to avoid polluting unit test failures or
     290        12544 :         // other assertions with this assertion's output.  It's still wrong to leak these,
     291        12544 :         // but if we already have a panic then we don't need to independently flag this case.
     292        12544 :         if !(std::thread::panicking()) {
     293        12544 :             debug_assert!(self.attached.is_none() && self.secondary.is_empty());
     294            0 :         }
     295        12543 :     }
     296              : }
     297              : 
     298            0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
     299              : pub(crate) struct ObservedState {
     300              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
     301              : }
     302              : 
     303              : /// Our latest knowledge of how this tenant is configured in the outside world.
     304              : ///
     305              : /// Meaning:
     306              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
     307              : ///       node for this shard.
     308              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
     309              : ///       what it is (e.g. we failed partway through configuring it)
     310              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     311              : ///       and that configuration will still be present unless something external interfered.
     312            0 : #[derive(Clone, Serialize, Deserialize, Debug)]
     313              : pub(crate) struct ObservedStateLocation {
     314              :     /// If None, it means we do not know the status of this shard's location on this node, but
     315              :     /// we know that we might have some state on this node.
     316              :     pub(crate) conf: Option<LocationConfig>,
     317              : }
     318              : pub(crate) struct ReconcilerWaiter {
     319              :     // For observability purposes, remember the ID of the shard we're
     320              :     // waiting for.
     321              :     pub(crate) tenant_shard_id: TenantShardId,
     322              : 
     323              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     324              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     325              :     error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     326              :     seq: Sequence,
     327              : }
     328              : 
     329              : pub(crate) enum ReconcilerStatus {
     330              :     Done,
     331              :     Failed,
     332              :     InProgress,
     333              : }
     334              : 
     335              : #[derive(thiserror::Error, Debug)]
     336              : pub(crate) enum ReconcileWaitError {
     337              :     #[error("Timeout waiting for shard {0}")]
     338              :     Timeout(TenantShardId),
     339              :     #[error("shutting down")]
     340              :     Shutdown,
     341              :     #[error("Reconcile error on shard {0}: {1}")]
     342              :     Failed(TenantShardId, Arc<ReconcileError>),
     343              : }
     344              : 
     345              : #[derive(Eq, PartialEq, Debug, Clone)]
     346              : pub(crate) struct ReplaceSecondary {
     347              :     old_node_id: NodeId,
     348              :     new_node_id: NodeId,
     349              : }
     350              : 
     351              : #[derive(Eq, PartialEq, Debug, Clone)]
     352              : pub(crate) struct MigrateAttachment {
     353              :     pub(crate) old_attached_node_id: NodeId,
     354              :     pub(crate) new_attached_node_id: NodeId,
     355              : }
     356              : 
     357              : #[derive(Eq, PartialEq, Debug, Clone)]
     358              : pub(crate) enum ScheduleOptimizationAction {
     359              :     // Replace one of our secondary locations with a different node
     360              :     ReplaceSecondary(ReplaceSecondary),
     361              :     // Migrate attachment to an existing secondary location
     362              :     MigrateAttachment(MigrateAttachment),
     363              : }
     364              : 
     365              : #[derive(Eq, PartialEq, Debug, Clone)]
     366              : pub(crate) struct ScheduleOptimization {
     367              :     // What was the reconcile sequence when we generated this optimization?  The optimization
     368              :     // should only be applied if the shard's sequence is still at this value, in case other changes
     369              :     // happened between planning the optimization and applying it.
     370              :     sequence: Sequence,
     371              : 
     372              :     pub(crate) action: ScheduleOptimizationAction,
     373              : }
     374              : 
     375              : impl ReconcilerWaiter {
     376            0 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     377            0 :         tokio::select! {
     378            0 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     379            0 :                 result.map_err(|e| match e {
     380            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     381            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     382            0 :                 })?;
     383              :             },
     384            0 :             result = self.error_seq_wait.wait_for(self.seq) => {
     385            0 :                 result.map_err(|e| match e {
     386            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     387            0 :                     SeqWaitError::Timeout => unreachable!()
     388            0 :                 })?;
     389              : 
     390            0 :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
     391            0 :                     self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
     392              :             }
     393              :         }
     394              : 
     395            0 :         Ok(())
     396            0 :     }
     397              : 
     398            0 :     pub(crate) fn get_status(&self) -> ReconcilerStatus {
     399            0 :         if self.seq_wait.would_wait_for(self.seq).is_ok() {
     400            0 :             ReconcilerStatus::Done
     401            0 :         } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
     402            0 :             ReconcilerStatus::Failed
     403              :         } else {
     404            0 :             ReconcilerStatus::InProgress
     405              :         }
     406            0 :     }
     407              : }
     408              : 
     409              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     410              : /// information to optionally cancel & await it later.
     411              : pub(crate) struct ReconcilerHandle {
     412              :     sequence: Sequence,
     413              :     handle: JoinHandle<()>,
     414              :     cancel: CancellationToken,
     415              : }
     416              : 
     417              : pub(crate) enum ReconcileNeeded {
     418              :     /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
     419              :     /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
     420              :     No,
     421              :     /// shard has a reconciler running, and its intent hasn't changed since that one was
     422              :     /// spawned: wait for the existing reconciler rather than spawning a new one.
     423              :     WaitExisting(ReconcilerWaiter),
     424              :     /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
     425              :     Yes,
     426              : }
     427              : 
     428              : /// Pending modification to the observed state of a tenant shard.
     429              : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
     430              : pub(crate) enum ObservedStateDelta {
     431              :     Upsert(Box<(NodeId, ObservedStateLocation)>),
     432              :     Delete(NodeId),
     433              : }
     434              : 
     435              : impl ObservedStateDelta {
     436            0 :     pub(crate) fn node_id(&self) -> &NodeId {
     437            0 :         match self {
     438            0 :             Self::Upsert(up) => &up.0,
     439            0 :             Self::Delete(nid) => nid,
     440              :         }
     441            0 :     }
     442              : }
     443              : 
     444              : /// When a reconcile task completes, it sends this result object
     445              : /// to be applied to the primary TenantShard.
     446              : pub(crate) struct ReconcileResult {
     447              :     pub(crate) sequence: Sequence,
     448              :     /// On errors, `observed` should be treated as an incompleted description
     449              :     /// of state (i.e. any nodes present in the result should override nodes
     450              :     /// present in the parent tenant state, but any unmentioned nodes should
     451              :     /// not be removed from parent tenant state)
     452              :     pub(crate) result: Result<(), ReconcileError>,
     453              : 
     454              :     pub(crate) tenant_shard_id: TenantShardId,
     455              :     pub(crate) generation: Option<Generation>,
     456              :     pub(crate) observed_deltas: Vec<ObservedStateDelta>,
     457              : 
     458              :     /// Set [`TenantShard::pending_compute_notification`] from this flag
     459              :     pub(crate) pending_compute_notification: bool,
     460              : }
     461              : 
     462              : impl ObservedState {
     463            0 :     pub(crate) fn new() -> Self {
     464            0 :         Self {
     465            0 :             locations: HashMap::new(),
     466            0 :         }
     467            0 :     }
     468              : 
     469            0 :     pub(crate) fn is_empty(&self) -> bool {
     470            0 :         self.locations.is_empty()
     471            0 :     }
     472              : }
     473              : 
     474              : impl TenantShard {
     475        12525 :     pub(crate) fn new(
     476        12525 :         tenant_shard_id: TenantShardId,
     477        12525 :         shard: ShardIdentity,
     478        12525 :         policy: PlacementPolicy,
     479        12525 :         preferred_az_id: Option<AvailabilityZone>,
     480        12525 :     ) -> Self {
     481        12525 :         metrics::METRICS_REGISTRY
     482        12525 :             .metrics_group
     483        12525 :             .storage_controller_tenant_shards
     484        12525 :             .inc();
     485        12525 : 
     486        12525 :         Self {
     487        12525 :             tenant_shard_id,
     488        12525 :             policy,
     489        12525 :             intent: IntentState::default(),
     490        12525 :             generation: Some(Generation::new(0)),
     491        12525 :             shard,
     492        12525 :             observed: ObservedState::default(),
     493        12525 :             config: TenantConfig::default(),
     494        12525 :             reconciler: None,
     495        12525 :             splitting: SplitState::Idle,
     496        12525 :             sequence: Sequence(1),
     497        12525 :             delayed_reconcile: false,
     498        12525 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     499        12525 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     500        12525 :             last_error: Arc::default(),
     501        12525 :             pending_compute_notification: false,
     502        12525 :             scheduling_policy: ShardSchedulingPolicy::default(),
     503        12525 :             preferred_az_id,
     504        12525 :         }
     505        12525 :     }
     506              : 
     507              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     508              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     509              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     510              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     511            1 :     pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
     512            1 :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     513            1 :         // generation
     514            1 :         let mut attached_locs = self
     515            1 :             .observed
     516            1 :             .locations
     517            1 :             .iter()
     518            2 :             .filter_map(|(node_id, l)| {
     519            2 :                 if let Some(conf) = &l.conf {
     520            2 :                     if conf.mode == LocationConfigMode::AttachedMulti
     521            1 :                         || conf.mode == LocationConfigMode::AttachedSingle
     522            1 :                         || conf.mode == LocationConfigMode::AttachedStale
     523              :                     {
     524            2 :                         Some((node_id, conf.generation))
     525              :                     } else {
     526            0 :                         None
     527              :                     }
     528              :                 } else {
     529            0 :                     None
     530              :                 }
     531            2 :             })
     532            1 :             .collect::<Vec<_>>();
     533            1 : 
     534            2 :         attached_locs.sort_by_key(|i| i.1);
     535            1 :         if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
     536            1 :             self.intent.set_attached(scheduler, Some(*node_id));
     537            1 :         }
     538              : 
     539              :         // All remaining observed locations generate secondary intents.  This includes None
     540              :         // observations, as these may well have some local content on disk that is usable (this
     541              :         // is an edge case that might occur if we restarted during a migration or other change)
     542              :         //
     543              :         // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
     544              :         // will take care of promoting one of these secondaries to be attached.
     545            2 :         self.observed.locations.keys().for_each(|node_id| {
     546            2 :             if Some(*node_id) != self.intent.attached {
     547            1 :                 self.intent.push_secondary(scheduler, *node_id);
     548            1 :             }
     549            2 :         });
     550            1 :     }
     551              : 
     552              :     /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
     553              :     /// attached pageserver for a shard.
     554              :     ///
     555              :     /// Returns whether we modified it, and the NodeId selected.
     556        12521 :     fn schedule_attached(
     557        12521 :         &mut self,
     558        12521 :         scheduler: &mut Scheduler,
     559        12521 :         context: &ScheduleContext,
     560        12521 :     ) -> Result<(bool, NodeId), ScheduleError> {
     561              :         // No work to do if we already have an attached tenant
     562        12521 :         if let Some(node_id) = self.intent.attached {
     563            0 :             return Ok((false, node_id));
     564        12521 :         }
     565              : 
     566        12521 :         if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
     567              :             // Promote a secondary
     568            1 :             tracing::debug!("Promoted secondary {} to attached", promote_secondary);
     569            1 :             self.intent.promote_attached(scheduler, promote_secondary);
     570            1 :             Ok((true, promote_secondary))
     571              :         } else {
     572              :             // Pick a fresh node: either we had no secondaries or none were schedulable
     573        12520 :             let node_id = scheduler.schedule_shard::<AttachedShardTag>(
     574        12520 :                 &self.intent.secondary,
     575        12520 :                 &self.preferred_az_id,
     576        12520 :                 context,
     577        12520 :             )?;
     578        12520 :             tracing::debug!("Selected {} as attached", node_id);
     579        12520 :             self.intent.set_attached(scheduler, Some(node_id));
     580        12520 :             Ok((true, node_id))
     581              :         }
     582        12521 :     }
     583              : 
     584              :     #[instrument(skip_all, fields(
     585              :         tenant_id=%self.tenant_shard_id.tenant_id,
     586              :         shard_id=%self.tenant_shard_id.shard_slug(),
     587              :         sequence=%self.sequence
     588              :     ))]
     589              :     pub(crate) fn schedule(
     590              :         &mut self,
     591              :         scheduler: &mut Scheduler,
     592              :         context: &mut ScheduleContext,
     593              :     ) -> Result<(), ScheduleError> {
     594              :         let r = self.do_schedule(scheduler, context);
     595              : 
     596              :         context.avoid(&self.intent.all_pageservers());
     597              :         if let Some(attached) = self.intent.get_attached() {
     598              :             context.push_attached(*attached);
     599              :         }
     600              : 
     601              :         r
     602              :     }
     603              : 
     604        12522 :     pub(crate) fn do_schedule(
     605        12522 :         &mut self,
     606        12522 :         scheduler: &mut Scheduler,
     607        12522 :         context: &ScheduleContext,
     608        12522 :     ) -> Result<(), ScheduleError> {
     609        12522 :         // TODO: before scheduling new nodes, check if any existing content in
     610        12522 :         // self.intent refers to pageservers that are offline, and pick other
     611        12522 :         // pageservers if so.
     612        12522 : 
     613        12522 :         // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
     614        12522 :         // change their attach location.
     615        12522 : 
     616        12522 :         match self.scheduling_policy {
     617        12521 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
     618              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     619              :                 // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
     620            1 :                 tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     621            0 :                     "Scheduling is disabled by policy {:?}", self.scheduling_policy);
     622            1 :                 return Ok(());
     623              :             }
     624              :         }
     625              : 
     626              :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     627              :         // more work on the same pageservers we're already using.
     628        12521 :         let mut modified = false;
     629              : 
     630              :         // Add/remove nodes to fulfil policy
     631              :         use PlacementPolicy::*;
     632        12521 :         match self.policy {
     633        12521 :             Attached(secondary_count) => {
     634        12521 :                 let retain_secondaries = if self.intent.attached.is_none()
     635        12521 :                     && scheduler.node_preferred(&self.intent.secondary).is_some()
     636              :                 {
     637              :                     // If we have no attached, and one of the secondaries is elegible to be promoted, retain
     638              :                     // one more secondary than we usually would, as one of them will become attached futher down this function.
     639            1 :                     secondary_count + 1
     640              :                 } else {
     641        12520 :                     secondary_count
     642              :                 };
     643              : 
     644        12521 :                 while self.intent.secondary.len() > retain_secondaries {
     645            0 :                     // We have no particular preference for one secondary location over another: just
     646            0 :                     // arbitrarily drop from the end
     647            0 :                     self.intent.pop_secondary(scheduler);
     648            0 :                     modified = true;
     649            0 :                 }
     650              : 
     651              :                 // Should have exactly one attached, and N secondaries
     652        12521 :                 let (modified_attached, attached_node_id) =
     653        12521 :                     self.schedule_attached(scheduler, context)?;
     654        12521 :                 modified |= modified_attached;
     655        12521 : 
     656        12521 :                 let mut used_pageservers = vec![attached_node_id];
     657        25041 :                 while self.intent.secondary.len() < secondary_count {
     658        12520 :                     let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
     659        12520 :                         &used_pageservers,
     660        12520 :                         &self.preferred_az_id,
     661        12520 :                         context,
     662        12520 :                     )?;
     663        12520 :                     self.intent.push_secondary(scheduler, node_id);
     664        12520 :                     used_pageservers.push(node_id);
     665        12520 :                     modified = true;
     666              :                 }
     667              :             }
     668              :             Secondary => {
     669            0 :                 if let Some(node_id) = self.intent.get_attached() {
     670            0 :                     // Populate secondary by demoting the attached node
     671            0 :                     self.intent.demote_attached(scheduler, *node_id);
     672            0 :                     modified = true;
     673            0 :                 } else if self.intent.secondary.is_empty() {
     674              :                     // Populate secondary by scheduling a fresh node
     675            0 :                     let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
     676            0 :                         &[],
     677            0 :                         &self.preferred_az_id,
     678            0 :                         context,
     679            0 :                     )?;
     680            0 :                     self.intent.push_secondary(scheduler, node_id);
     681            0 :                     modified = true;
     682            0 :                 }
     683            0 :                 while self.intent.secondary.len() > 1 {
     684            0 :                     // We have no particular preference for one secondary location over another: just
     685            0 :                     // arbitrarily drop from the end
     686            0 :                     self.intent.pop_secondary(scheduler);
     687            0 :                     modified = true;
     688            0 :                 }
     689              :             }
     690              :             Detached => {
     691              :                 // Never add locations in this mode
     692            0 :                 if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
     693            0 :                     self.intent.clear(scheduler);
     694            0 :                     modified = true;
     695            0 :                 }
     696              :             }
     697              :         }
     698              : 
     699        12521 :         if modified {
     700        12521 :             self.sequence.0 += 1;
     701        12521 :         }
     702              : 
     703        12521 :         Ok(())
     704        12522 :     }
     705              : 
     706              :     /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
     707              :     /// if the swap is not possible and leaves the intent state in its original state.
     708              :     ///
     709              :     /// Arguments:
     710              :     /// `attached_to`: the currently attached location matching the intent state (may be None if the
     711              :     /// shard is not attached)
     712              :     /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
     713              :     /// the scheduler to recommend a node
     714            0 :     pub(crate) fn reschedule_to_secondary(
     715            0 :         &mut self,
     716            0 :         promote_to: Option<NodeId>,
     717            0 :         scheduler: &mut Scheduler,
     718            0 :     ) -> Result<(), ScheduleError> {
     719            0 :         let promote_to = match promote_to {
     720            0 :             Some(node) => node,
     721            0 :             None => match scheduler.node_preferred(self.intent.get_secondary()) {
     722            0 :                 Some(node) => node,
     723              :                 None => {
     724            0 :                     return Err(ScheduleError::ImpossibleConstraint);
     725              :                 }
     726              :             },
     727              :         };
     728              : 
     729            0 :         assert!(self.intent.get_secondary().contains(&promote_to));
     730              : 
     731            0 :         if let Some(node) = self.intent.get_attached() {
     732            0 :             let demoted = self.intent.demote_attached(scheduler, *node);
     733            0 :             if !demoted {
     734            0 :                 return Err(ScheduleError::ImpossibleConstraint);
     735            0 :             }
     736            0 :         }
     737              : 
     738            0 :         self.intent.promote_attached(scheduler, promote_to);
     739            0 : 
     740            0 :         // Increment the sequence number for the edge case where a
     741            0 :         // reconciler is already running to avoid waiting on the
     742            0 :         // current reconcile instead of spawning a new one.
     743            0 :         self.sequence = self.sequence.next();
     744            0 : 
     745            0 :         Ok(())
     746            0 :     }
     747              : 
     748              :     /// Optimize attachments: if a shard has a secondary location that is preferable to
     749              :     /// its primary location based on soft constraints, switch that secondary location
     750              :     /// to be attached.
     751              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     752              :     pub(crate) fn optimize_attachment(
     753              :         &self,
     754              :         nodes: &HashMap<NodeId, Node>,
     755              :         schedule_context: &ScheduleContext,
     756              :     ) -> Option<ScheduleOptimization> {
     757              :         let attached = (*self.intent.get_attached())?;
     758              :         if self.intent.secondary.is_empty() {
     759              :             // We can only do useful work if we have both attached and secondary locations: this
     760              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     761              :             return None;
     762              :         }
     763              : 
     764              :         let current_affinity_score = schedule_context.get_node_affinity(attached);
     765              :         let current_attachment_count = schedule_context.get_node_attachments(attached);
     766              : 
     767              :         // Generate score for each node, dropping any un-schedulable nodes.
     768              :         let all_pageservers = self.intent.all_pageservers();
     769              :         let mut scores = all_pageservers
     770              :             .iter()
     771           46 :             .flat_map(|node_id| {
     772           46 :                 let node = nodes.get(node_id);
     773           46 :                 if node.is_none() {
     774            0 :                     None
     775           46 :                 } else if matches!(
     776           46 :                     node.unwrap().get_scheduling(),
     777              :                     NodeSchedulingPolicy::Filling
     778              :                 ) {
     779              :                     // If the node is currently filling, don't count it as a candidate to avoid,
     780              :                     // racing with the background fill.
     781            0 :                     None
     782           46 :                 } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
     783            0 :                     None
     784              :                 } else {
     785           46 :                     let affinity_score = schedule_context.get_node_affinity(*node_id);
     786           46 :                     let attachment_count = schedule_context.get_node_attachments(*node_id);
     787           46 :                     Some((*node_id, affinity_score, attachment_count))
     788              :                 }
     789           46 :             })
     790              :             .collect::<Vec<_>>();
     791              : 
     792              :         // Sort precedence:
     793              :         //  1st - prefer nodes with the lowest total affinity score
     794              :         //  2nd - prefer nodes with the lowest number of attachments in this context
     795              :         //  3rd - if all else is equal, sort by node ID for determinism in tests.
     796           46 :         scores.sort_by_key(|i| (i.1, i.2, i.0));
     797              : 
     798              :         if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
     799              :             scores.first()
     800              :         {
     801              :             if attached != *preferred_node {
     802              :                 // The best alternative must be more than 1 better than us, otherwise we could end
     803              :                 // up flapping back next time we're called (e.g. there's no point migrating from
     804              :                 // a location with score 1 to a score zero, because on next location the situation
     805              :                 // would be the same, but in reverse).
     806              :                 if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
     807              :                     || current_attachment_count > *preferred_attachment_count + 1
     808              :                 {
     809              :                     tracing::info!(
     810              :                         "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
     811              :                         self.intent.get_secondary()
     812              :                     );
     813              :                     return Some(ScheduleOptimization {
     814              :                         sequence: self.sequence,
     815              :                         action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
     816              :                             old_attached_node_id: attached,
     817              :                             new_attached_node_id: *preferred_node,
     818              :                         }),
     819              :                     });
     820              :                 }
     821              :             } else {
     822              :                 tracing::debug!(
     823              :                     "Node {} is already preferred (score {:?})",
     824              :                     preferred_node,
     825              :                     preferred_affinity_score
     826              :                 );
     827              :             }
     828              :         }
     829              : 
     830              :         // Fall-through: we didn't find an optimization
     831              :         None
     832              :     }
     833              : 
     834              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     835              :     pub(crate) fn optimize_secondary(
     836              :         &self,
     837              :         scheduler: &mut Scheduler,
     838              :         schedule_context: &ScheduleContext,
     839              :     ) -> Option<ScheduleOptimization> {
     840              :         if self.intent.secondary.is_empty() {
     841              :             // We can only do useful work if we have both attached and secondary locations: this
     842              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     843              :             return None;
     844              :         }
     845              : 
     846              :         for secondary in self.intent.get_secondary() {
     847              :             let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
     848              :                 // We're already on a node unaffected any affinity constraints,
     849              :                 // so we won't change it.
     850              :                 continue;
     851              :             };
     852              : 
     853              :             // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
     854              :             // This implicitly limits the choice to nodes that are available, and prefers nodes
     855              :             // with lower utilization.
     856              :             let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
     857              :                 &self.intent.all_pageservers(),
     858              :                 &self.preferred_az_id,
     859              :                 schedule_context,
     860              :             ) else {
     861              :                 // A scheduling error means we have no possible candidate replacements
     862              :                 continue;
     863              :             };
     864              : 
     865              :             let candidate_affinity_score = schedule_context
     866              :                 .nodes
     867              :                 .get(&candidate_node)
     868              :                 .unwrap_or(&AffinityScore::FREE);
     869              : 
     870              :             // The best alternative must be more than 1 better than us, otherwise we could end
     871              :             // up flapping back next time we're called.
     872              :             if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
     873              :                 // If some other node is available and has a lower score than this node, then
     874              :                 // that other node is a good place to migrate to.
     875              :                 tracing::info!(
     876              :                     "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
     877              :                     self.intent.get_secondary()
     878              :                 );
     879              :                 return Some(ScheduleOptimization {
     880              :                     sequence: self.sequence,
     881              :                     action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
     882              :                         old_node_id: *secondary,
     883              :                         new_node_id: candidate_node,
     884              :                     }),
     885              :                 });
     886              :             }
     887              :         }
     888              : 
     889              :         None
     890              :     }
     891              : 
     892              :     /// Return true if the optimization was really applied: it will not be applied if the optimization's
     893              :     /// sequence is behind this tenant shard's
     894            9 :     pub(crate) fn apply_optimization(
     895            9 :         &mut self,
     896            9 :         scheduler: &mut Scheduler,
     897            9 :         optimization: ScheduleOptimization,
     898            9 :     ) -> bool {
     899            9 :         if optimization.sequence != self.sequence {
     900            0 :             return false;
     901            9 :         }
     902            9 : 
     903            9 :         metrics::METRICS_REGISTRY
     904            9 :             .metrics_group
     905            9 :             .storage_controller_schedule_optimization
     906            9 :             .inc();
     907            9 : 
     908            9 :         match optimization.action {
     909              :             ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
     910            4 :                 old_attached_node_id,
     911            4 :                 new_attached_node_id,
     912            4 :             }) => {
     913            4 :                 self.intent.demote_attached(scheduler, old_attached_node_id);
     914            4 :                 self.intent
     915            4 :                     .promote_attached(scheduler, new_attached_node_id);
     916            4 :             }
     917              :             ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
     918            5 :                 old_node_id,
     919            5 :                 new_node_id,
     920            5 :             }) => {
     921            5 :                 self.intent.remove_secondary(scheduler, old_node_id);
     922            5 :                 self.intent.push_secondary(scheduler, new_node_id);
     923            5 :             }
     924              :         }
     925              : 
     926            9 :         true
     927            9 :     }
     928              : 
     929              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
     930              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
     931              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
     932              :     ///
     933              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
     934              :     /// funciton should not be used to decide whether to reconcile.
     935            0 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
     936            0 :         if let Some(attach_intent) = self.intent.attached {
     937            0 :             match self.observed.locations.get(&attach_intent) {
     938            0 :                 Some(loc) => match &loc.conf {
     939            0 :                     Some(conf) => match conf.mode {
     940              :                         LocationConfigMode::AttachedMulti
     941              :                         | LocationConfigMode::AttachedSingle
     942              :                         | LocationConfigMode::AttachedStale => {
     943              :                             // Our intent and observed state agree that this node is in an attached state.
     944            0 :                             Some(attach_intent)
     945              :                         }
     946              :                         // Our observed config is not an attached state
     947            0 :                         _ => None,
     948              :                     },
     949              :                     // Our observed state is None, i.e. in flux
     950            0 :                     None => None,
     951              :                 },
     952              :                 // We have no observed state for this node
     953            0 :                 None => None,
     954              :             }
     955              :         } else {
     956              :             // Our intent is not to attach
     957            0 :             None
     958              :         }
     959            0 :     }
     960              : 
     961            0 :     fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
     962            0 :         let mut dirty_nodes = HashSet::new();
     963              : 
     964            0 :         if let Some(node_id) = self.intent.attached {
     965              :             // Maybe panic: it is a severe bug if we try to attach while generation is null.
     966            0 :             let generation = self
     967            0 :                 .generation
     968            0 :                 .expect("Attempted to enter attached state without a generation");
     969            0 : 
     970            0 :             let wanted_conf =
     971            0 :                 attached_location_conf(generation, &self.shard, &self.config, &self.policy);
     972            0 :             match self.observed.locations.get(&node_id) {
     973            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     974            0 :                 Some(_) | None => {
     975            0 :                     dirty_nodes.insert(node_id);
     976            0 :                 }
     977              :             }
     978            0 :         }
     979              : 
     980            0 :         for node_id in &self.intent.secondary {
     981            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     982            0 :             match self.observed.locations.get(node_id) {
     983            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     984            0 :                 Some(_) | None => {
     985            0 :                     dirty_nodes.insert(*node_id);
     986            0 :                 }
     987              :             }
     988              :         }
     989              : 
     990            0 :         for node_id in self.observed.locations.keys() {
     991            0 :             if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
     992            0 :                 // We have observed state that isn't part of our intent: need to clean it up.
     993            0 :                 dirty_nodes.insert(*node_id);
     994            0 :             }
     995              :         }
     996              : 
     997            0 :         dirty_nodes.retain(|node_id| {
     998            0 :             nodes
     999            0 :                 .get(node_id)
    1000            0 :                 .map(|n| n.is_available())
    1001            0 :                 .unwrap_or(false)
    1002            0 :         });
    1003            0 : 
    1004            0 :         !dirty_nodes.is_empty()
    1005            0 :     }
    1006              : 
    1007              :     #[allow(clippy::too_many_arguments)]
    1008              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1009              :     pub(crate) fn get_reconcile_needed(
    1010              :         &mut self,
    1011              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1012              :     ) -> ReconcileNeeded {
    1013              :         // If there are any ambiguous observed states, and the nodes they refer to are available,
    1014              :         // we should reconcile to clean them up.
    1015              :         let mut dirty_observed = false;
    1016              :         for (node_id, observed_loc) in &self.observed.locations {
    1017              :             let node = pageservers
    1018              :                 .get(node_id)
    1019              :                 .expect("Nodes may not be removed while referenced");
    1020              :             if observed_loc.conf.is_none() && node.is_available() {
    1021              :                 dirty_observed = true;
    1022              :                 break;
    1023              :             }
    1024              :         }
    1025              : 
    1026              :         let active_nodes_dirty = self.dirty(pageservers);
    1027              : 
    1028              :         // Even if there is no pageserver work to be done, if we have a pending notification to computes,
    1029              :         // wake up a reconciler to send it.
    1030              :         let do_reconcile =
    1031              :             active_nodes_dirty || dirty_observed || self.pending_compute_notification;
    1032              : 
    1033              :         if !do_reconcile {
    1034              :             tracing::debug!("Not dirty, no reconciliation needed.");
    1035              :             return ReconcileNeeded::No;
    1036              :         }
    1037              : 
    1038              :         // If we are currently splitting, then never start a reconciler task: the splitting logic
    1039              :         // requires that shards are not interfered with while it runs. Do this check here rather than
    1040              :         // up top, so that we only log this message if we would otherwise have done a reconciliation.
    1041              :         if !matches!(self.splitting, SplitState::Idle) {
    1042              :             tracing::info!("Refusing to reconcile, splitting in progress");
    1043              :             return ReconcileNeeded::No;
    1044              :         }
    1045              : 
    1046              :         // Reconcile already in flight for the current sequence?
    1047              :         if let Some(handle) = &self.reconciler {
    1048              :             if handle.sequence == self.sequence {
    1049              :                 tracing::info!(
    1050              :                     "Reconciliation already in progress for sequence {:?}",
    1051              :                     self.sequence,
    1052              :                 );
    1053              :                 return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
    1054              :                     tenant_shard_id: self.tenant_shard_id,
    1055              :                     seq_wait: self.waiter.clone(),
    1056              :                     error_seq_wait: self.error_waiter.clone(),
    1057              :                     error: self.last_error.clone(),
    1058              :                     seq: self.sequence,
    1059              :                 });
    1060              :             }
    1061              :         }
    1062              : 
    1063              :         // Pre-checks done: finally check whether we may actually do the work
    1064              :         match self.scheduling_policy {
    1065              :             ShardSchedulingPolicy::Active
    1066              :             | ShardSchedulingPolicy::Essential
    1067              :             | ShardSchedulingPolicy::Pause => {}
    1068              :             ShardSchedulingPolicy::Stop => {
    1069              :                 // We only reach this point if there is work to do and we're going to skip
    1070              :                 // doing it: warn it obvious why this tenant isn't doing what it ought to.
    1071              :                 tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
    1072              :                 return ReconcileNeeded::No;
    1073              :             }
    1074              :         }
    1075              : 
    1076              :         ReconcileNeeded::Yes
    1077              :     }
    1078              : 
    1079              :     /// Ensure the sequence number is set to a value where waiting for this value will make us wait
    1080              :     /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
    1081              :     ///
    1082              :     /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
    1083              :     /// that the waiter will not complete until some future Reconciler is constructed and run.
    1084            0 :     fn ensure_sequence_ahead(&mut self) {
    1085            0 :         // Find the highest sequence for which a Reconciler has previously run or is currently
    1086            0 :         // running
    1087            0 :         let max_seen = std::cmp::max(
    1088            0 :             self.reconciler
    1089            0 :                 .as_ref()
    1090            0 :                 .map(|r| r.sequence)
    1091            0 :                 .unwrap_or(Sequence(0)),
    1092            0 :             std::cmp::max(self.waiter.load(), self.error_waiter.load()),
    1093            0 :         );
    1094            0 : 
    1095            0 :         if self.sequence <= max_seen {
    1096            0 :             self.sequence = max_seen.next();
    1097            0 :         }
    1098            0 :     }
    1099              : 
    1100              :     /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
    1101              :     ///
    1102              :     /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
    1103              :     /// you would like to wait on the next reconciler that gets spawned in the background.
    1104            0 :     pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
    1105            0 :         self.ensure_sequence_ahead();
    1106            0 : 
    1107            0 :         ReconcilerWaiter {
    1108            0 :             tenant_shard_id: self.tenant_shard_id,
    1109            0 :             seq_wait: self.waiter.clone(),
    1110            0 :             error_seq_wait: self.error_waiter.clone(),
    1111            0 :             error: self.last_error.clone(),
    1112            0 :             seq: self.sequence,
    1113            0 :         }
    1114            0 :     }
    1115              : 
    1116            0 :     async fn reconcile(
    1117            0 :         sequence: Sequence,
    1118            0 :         mut reconciler: Reconciler,
    1119            0 :         must_notify: bool,
    1120            0 :     ) -> ReconcileResult {
    1121              :         // Attempt to make observed state match intent state
    1122            0 :         let result = reconciler.reconcile().await;
    1123              : 
    1124              :         // If we know we had a pending compute notification from some previous action, send a notification irrespective
    1125              :         // of whether the above reconcile() did any work
    1126            0 :         if result.is_ok() && must_notify {
    1127              :             // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
    1128            0 :             reconciler.compute_notify().await.ok();
    1129            0 :         }
    1130              : 
    1131              :         // Update result counter
    1132            0 :         let outcome_label = match &result {
    1133            0 :             Ok(_) => ReconcileOutcome::Success,
    1134            0 :             Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
    1135            0 :             Err(_) => ReconcileOutcome::Error,
    1136              :         };
    1137              : 
    1138            0 :         metrics::METRICS_REGISTRY
    1139            0 :             .metrics_group
    1140            0 :             .storage_controller_reconcile_complete
    1141            0 :             .inc(ReconcileCompleteLabelGroup {
    1142            0 :                 status: outcome_label,
    1143            0 :             });
    1144            0 : 
    1145            0 :         // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
    1146            0 :         // try and schedule more work in response to our result.
    1147            0 :         ReconcileResult {
    1148            0 :             sequence,
    1149            0 :             result,
    1150            0 :             tenant_shard_id: reconciler.tenant_shard_id,
    1151            0 :             generation: reconciler.generation,
    1152            0 :             observed_deltas: reconciler.observed_deltas(),
    1153            0 :             pending_compute_notification: reconciler.compute_notify_failure,
    1154            0 :         }
    1155            0 :     }
    1156              : 
    1157              :     #[allow(clippy::too_many_arguments)]
    1158              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1159              :     pub(crate) fn spawn_reconciler(
    1160              :         &mut self,
    1161              :         result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
    1162              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1163              :         compute_hook: &Arc<ComputeHook>,
    1164              :         reconciler_config: ReconcilerConfig,
    1165              :         service_config: &service::Config,
    1166              :         persistence: &Arc<Persistence>,
    1167              :         units: ReconcileUnits,
    1168              :         gate_guard: GateGuard,
    1169              :         cancel: &CancellationToken,
    1170              :     ) -> Option<ReconcilerWaiter> {
    1171              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
    1172              :         // doing our sequence's work.
    1173              :         let old_handle = self.reconciler.take();
    1174              : 
    1175              :         // Build list of nodes from which the reconciler should detach
    1176              :         let mut detach = Vec::new();
    1177              :         for node_id in self.observed.locations.keys() {
    1178              :             if self.intent.get_attached() != &Some(*node_id)
    1179              :                 && !self.intent.secondary.contains(node_id)
    1180              :             {
    1181              :                 detach.push(
    1182              :                     pageservers
    1183              :                         .get(node_id)
    1184              :                         .expect("Intent references non-existent pageserver")
    1185              :                         .clone(),
    1186              :                 )
    1187              :             }
    1188              :         }
    1189              : 
    1190              :         // Advance the sequence before spawning a reconciler, so that sequence waiters
    1191              :         // can distinguish between before+after the reconcile completes.
    1192              :         self.ensure_sequence_ahead();
    1193              : 
    1194              :         let reconciler_cancel = cancel.child_token();
    1195              :         let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
    1196              :         let reconciler = Reconciler {
    1197              :             tenant_shard_id: self.tenant_shard_id,
    1198              :             shard: self.shard,
    1199              :             placement_policy: self.policy.clone(),
    1200              :             generation: self.generation,
    1201              :             intent: reconciler_intent,
    1202              :             detach,
    1203              :             reconciler_config,
    1204              :             config: self.config.clone(),
    1205              :             preferred_az: self.preferred_az_id.clone(),
    1206              :             observed: self.observed.clone(),
    1207              :             original_observed: self.observed.clone(),
    1208              :             compute_hook: compute_hook.clone(),
    1209              :             service_config: service_config.clone(),
    1210              :             _gate_guard: gate_guard,
    1211              :             _resource_units: units,
    1212              :             cancel: reconciler_cancel.clone(),
    1213              :             persistence: persistence.clone(),
    1214              :             compute_notify_failure: false,
    1215              :         };
    1216              : 
    1217              :         let reconcile_seq = self.sequence;
    1218              :         let long_reconcile_threshold = service_config.long_reconcile_threshold;
    1219              : 
    1220              :         tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
    1221              :         let must_notify = self.pending_compute_notification;
    1222              :         let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
    1223              :                                                         tenant_id=%reconciler.tenant_shard_id.tenant_id,
    1224              :                                                         shard_id=%reconciler.tenant_shard_id.shard_slug());
    1225              :         metrics::METRICS_REGISTRY
    1226              :             .metrics_group
    1227              :             .storage_controller_reconcile_spawn
    1228              :             .inc();
    1229              :         let result_tx = result_tx.clone();
    1230              :         let join_handle = tokio::task::spawn(
    1231            0 :             async move {
    1232              :                 // Wait for any previous reconcile task to complete before we start
    1233            0 :                 if let Some(old_handle) = old_handle {
    1234            0 :                     old_handle.cancel.cancel();
    1235            0 :                     if let Err(e) = old_handle.handle.await {
    1236              :                         // We can't do much with this other than log it: the task is done, so
    1237              :                         // we may proceed with our work.
    1238            0 :                         tracing::error!("Unexpected join error waiting for reconcile task: {e}");
    1239            0 :                     }
    1240            0 :                 }
    1241              : 
    1242              :                 // Early check for cancellation before doing any work
    1243              :                 // TODO: wrap all remote API operations in cancellation check
    1244              :                 // as well.
    1245            0 :                 if reconciler.cancel.is_cancelled() {
    1246            0 :                     metrics::METRICS_REGISTRY
    1247            0 :                         .metrics_group
    1248            0 :                         .storage_controller_reconcile_complete
    1249            0 :                         .inc(ReconcileCompleteLabelGroup {
    1250            0 :                             status: ReconcileOutcome::Cancel,
    1251            0 :                         });
    1252            0 :                     return;
    1253            0 :                 }
    1254            0 : 
    1255            0 :                 let (tenant_id_label, shard_number_label, sequence_label) = {
    1256            0 :                     (
    1257            0 :                         reconciler.tenant_shard_id.tenant_id.to_string(),
    1258            0 :                         reconciler.tenant_shard_id.shard_number.0.to_string(),
    1259            0 :                         reconcile_seq.to_string(),
    1260            0 :                     )
    1261            0 :                 };
    1262            0 : 
    1263            0 :                 let label_group = ReconcileLongRunningLabelGroup {
    1264            0 :                     tenant_id: &tenant_id_label,
    1265            0 :                     shard_number: &shard_number_label,
    1266            0 :                     sequence: &sequence_label,
    1267            0 :                 };
    1268            0 : 
    1269            0 :                 let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
    1270            0 :                 let long_reconcile_fut = {
    1271            0 :                     let label_group = label_group.clone();
    1272            0 :                     async move {
    1273            0 :                         tokio::time::sleep(long_reconcile_threshold).await;
    1274              : 
    1275            0 :                         tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
    1276              : 
    1277            0 :                         metrics::METRICS_REGISTRY
    1278            0 :                             .metrics_group
    1279            0 :                             .storage_controller_reconcile_long_running
    1280            0 :                             .inc(label_group);
    1281            0 :                     }
    1282              :                 };
    1283              : 
    1284            0 :                 let reconcile_fut = std::pin::pin!(reconcile_fut);
    1285            0 :                 let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
    1286              : 
    1287            0 :                 let (was_long, result) =
    1288            0 :                     match future::select(reconcile_fut, long_reconcile_fut).await {
    1289            0 :                         Either::Left((reconcile_result, _)) => (false, reconcile_result),
    1290            0 :                         Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
    1291              :                     };
    1292              : 
    1293            0 :                 if was_long {
    1294            0 :                     let id = metrics::METRICS_REGISTRY
    1295            0 :                         .metrics_group
    1296            0 :                         .storage_controller_reconcile_long_running
    1297            0 :                         .with_labels(label_group);
    1298            0 :                     metrics::METRICS_REGISTRY
    1299            0 :                         .metrics_group
    1300            0 :                         .storage_controller_reconcile_long_running
    1301            0 :                         .remove_metric(id);
    1302            0 :                 }
    1303              : 
    1304            0 :                 result_tx
    1305            0 :                     .send(ReconcileResultRequest::ReconcileResult(result))
    1306            0 :                     .ok();
    1307            0 :             }
    1308              :             .instrument(reconciler_span),
    1309              :         );
    1310              : 
    1311              :         self.reconciler = Some(ReconcilerHandle {
    1312              :             sequence: self.sequence,
    1313              :             handle: join_handle,
    1314              :             cancel: reconciler_cancel,
    1315              :         });
    1316              : 
    1317              :         Some(ReconcilerWaiter {
    1318              :             tenant_shard_id: self.tenant_shard_id,
    1319              :             seq_wait: self.waiter.clone(),
    1320              :             error_seq_wait: self.error_waiter.clone(),
    1321              :             error: self.last_error.clone(),
    1322              :             seq: self.sequence,
    1323              :         })
    1324              :     }
    1325              : 
    1326            0 :     pub(crate) fn cancel_reconciler(&self) {
    1327            0 :         if let Some(handle) = self.reconciler.as_ref() {
    1328            0 :             handle.cancel.cancel()
    1329            0 :         }
    1330            0 :     }
    1331              : 
    1332              :     /// Get a waiter for any reconciliation in flight, but do not start reconciliation
    1333              :     /// if it is not already running
    1334            0 :     pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
    1335            0 :         if self.reconciler.is_some() {
    1336            0 :             Some(ReconcilerWaiter {
    1337            0 :                 tenant_shard_id: self.tenant_shard_id,
    1338            0 :                 seq_wait: self.waiter.clone(),
    1339            0 :                 error_seq_wait: self.error_waiter.clone(),
    1340            0 :                 error: self.last_error.clone(),
    1341            0 :                 seq: self.sequence,
    1342            0 :             })
    1343              :         } else {
    1344            0 :             None
    1345              :         }
    1346            0 :     }
    1347              : 
    1348              :     /// Called when a ReconcileResult has been emitted and the service is updating
    1349              :     /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
    1350              :     /// the handle to indicate there is no longer a reconciliation in progress.
    1351            0 :     pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
    1352            0 :         if let Some(reconcile_handle) = &self.reconciler {
    1353            0 :             if reconcile_handle.sequence <= sequence {
    1354            0 :                 self.reconciler = None;
    1355            0 :             }
    1356            0 :         }
    1357            0 :     }
    1358              : 
    1359              :     /// If we had any state at all referring to this node ID, drop it.  Does not
    1360              :     /// attempt to reschedule.
    1361              :     ///
    1362              :     /// Returns true if we modified the node's intent state.
    1363            0 :     pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
    1364            0 :         let mut intent_modified = false;
    1365            0 : 
    1366            0 :         // Drop if this node was our attached intent
    1367            0 :         if self.intent.attached == Some(node_id) {
    1368            0 :             self.intent.attached = None;
    1369            0 :             intent_modified = true;
    1370            0 :         }
    1371              : 
    1372              :         // Drop from the list of secondaries, and check if we modified it
    1373            0 :         let had_secondaries = self.intent.secondary.len();
    1374            0 :         self.intent.secondary.retain(|n| n != &node_id);
    1375            0 :         intent_modified |= self.intent.secondary.len() != had_secondaries;
    1376            0 : 
    1377            0 :         debug_assert!(!self.intent.all_pageservers().contains(&node_id));
    1378              : 
    1379            0 :         intent_modified
    1380            0 :     }
    1381              : 
    1382            0 :     pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
    1383            0 :         self.scheduling_policy = p;
    1384            0 :     }
    1385              : 
    1386            0 :     pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
    1387            0 :         &self.scheduling_policy
    1388            0 :     }
    1389              : 
    1390            0 :     pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
    1391            0 :         // Ordering: always set last_error before advancing sequence, so that sequence
    1392            0 :         // waiters are guaranteed to see a Some value when they see an error.
    1393            0 :         *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
    1394            0 :         self.error_waiter.advance(sequence);
    1395            0 :     }
    1396              : 
    1397            0 :     pub(crate) fn from_persistent(
    1398            0 :         tsp: TenantShardPersistence,
    1399            0 :         intent: IntentState,
    1400            0 :     ) -> anyhow::Result<Self> {
    1401            0 :         let tenant_shard_id = tsp.get_tenant_shard_id()?;
    1402            0 :         let shard_identity = tsp.get_shard_identity()?;
    1403              : 
    1404            0 :         metrics::METRICS_REGISTRY
    1405            0 :             .metrics_group
    1406            0 :             .storage_controller_tenant_shards
    1407            0 :             .inc();
    1408            0 : 
    1409            0 :         Ok(Self {
    1410            0 :             tenant_shard_id,
    1411            0 :             shard: shard_identity,
    1412            0 :             sequence: Sequence::initial(),
    1413            0 :             generation: tsp.generation.map(|g| Generation::new(g as u32)),
    1414            0 :             policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
    1415            0 :             intent,
    1416            0 :             observed: ObservedState::new(),
    1417            0 :             config: serde_json::from_str(&tsp.config).unwrap(),
    1418            0 :             reconciler: None,
    1419            0 :             splitting: tsp.splitting,
    1420            0 :             waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1421            0 :             error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1422            0 :             last_error: Arc::default(),
    1423            0 :             pending_compute_notification: false,
    1424            0 :             delayed_reconcile: false,
    1425            0 :             scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
    1426            0 :             preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
    1427            0 :         })
    1428            0 :     }
    1429              : 
    1430            0 :     pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
    1431            0 :         TenantShardPersistence {
    1432            0 :             tenant_id: self.tenant_shard_id.tenant_id.to_string(),
    1433            0 :             shard_number: self.tenant_shard_id.shard_number.0 as i32,
    1434            0 :             shard_count: self.tenant_shard_id.shard_count.literal() as i32,
    1435            0 :             shard_stripe_size: self.shard.stripe_size.0 as i32,
    1436            0 :             generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
    1437            0 :             generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
    1438            0 :             placement_policy: serde_json::to_string(&self.policy).unwrap(),
    1439            0 :             config: serde_json::to_string(&self.config).unwrap(),
    1440            0 :             splitting: SplitState::default(),
    1441            0 :             scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
    1442            0 :             preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
    1443            0 :         }
    1444            0 :     }
    1445              : 
    1446        12500 :     pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
    1447        12500 :         self.preferred_az_id.as_ref()
    1448        12500 :     }
    1449              : 
    1450            0 :     pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
    1451            0 :         self.preferred_az_id = Some(preferred_az_id);
    1452            0 :     }
    1453              : 
    1454              :     /// Returns all the nodes to which this tenant shard is attached according to the
    1455              :     /// observed state and the generations. Return vector is sorted from latest generation
    1456              :     /// to earliest.
    1457            0 :     pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
    1458            0 :         self.observed
    1459            0 :             .locations
    1460            0 :             .iter()
    1461            0 :             .filter_map(|(node_id, observed)| {
    1462              :                 use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
    1463              : 
    1464            0 :                 let conf = observed.conf.as_ref()?;
    1465              : 
    1466            0 :                 match (conf.generation, conf.mode) {
    1467            0 :                     (Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
    1468            0 :                         Some((*node_id, gen))
    1469              :                     }
    1470            0 :                     _ => None,
    1471              :                 }
    1472            0 :             })
    1473            0 :             .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
    1474            0 :                 lhs_gen.cmp(rhs_gen).reverse()
    1475            0 :             })
    1476            0 :             .map(|(node_id, gen)| (node_id, Generation::new(gen)))
    1477            0 :             .collect()
    1478            0 :     }
    1479              : 
    1480              :     /// Update the observed state of the tenant by applying incremental deltas
    1481              :     ///
    1482              :     /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
    1483              :     /// They are then filtered in [`crate::service::Service::process_result`].
    1484            0 :     pub(crate) fn apply_observed_deltas(
    1485            0 :         &mut self,
    1486            0 :         deltas: impl Iterator<Item = ObservedStateDelta>,
    1487            0 :     ) {
    1488            0 :         for delta in deltas {
    1489            0 :             match delta {
    1490            0 :                 ObservedStateDelta::Upsert(ups) => {
    1491            0 :                     let (node_id, loc) = *ups;
    1492            0 : 
    1493            0 :                     // If the generation of the observed location in the delta is lagging
    1494            0 :                     // behind the current one, then we have a race condition and cannot
    1495            0 :                     // be certain about the true observed state. Set the observed state
    1496            0 :                     // to None in order to reflect this.
    1497            0 :                     let crnt_gen = self
    1498            0 :                         .observed
    1499            0 :                         .locations
    1500            0 :                         .get(&node_id)
    1501            0 :                         .and_then(|loc| loc.conf.as_ref())
    1502            0 :                         .and_then(|conf| conf.generation);
    1503            0 :                     let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
    1504            0 :                     match (crnt_gen, new_gen) {
    1505            0 :                         (Some(crnt), Some(new)) if crnt_gen > new_gen => {
    1506            0 :                             tracing::warn!(
    1507            0 :                                 "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
    1508              :                                 node_id, loc, crnt, new
    1509              :                             );
    1510              : 
    1511            0 :                             self.observed
    1512            0 :                                 .locations
    1513            0 :                                 .insert(node_id, ObservedStateLocation { conf: None });
    1514            0 : 
    1515            0 :                             continue;
    1516              :                         }
    1517            0 :                         _ => {}
    1518              :                     }
    1519              : 
    1520            0 :                     if let Some(conf) = &loc.conf {
    1521            0 :                         tracing::info!("Updating observed location {}: {:?}", node_id, conf);
    1522              :                     } else {
    1523            0 :                         tracing::info!("Setting observed location {} to None", node_id,)
    1524              :                     }
    1525              : 
    1526            0 :                     self.observed.locations.insert(node_id, loc);
    1527              :                 }
    1528            0 :                 ObservedStateDelta::Delete(node_id) => {
    1529            0 :                     tracing::info!("Deleting observed location {}", node_id);
    1530            0 :                     self.observed.locations.remove(&node_id);
    1531              :                 }
    1532              :             }
    1533              :         }
    1534            0 :     }
    1535              : }
    1536              : 
    1537              : impl Drop for TenantShard {
    1538        12525 :     fn drop(&mut self) {
    1539        12525 :         metrics::METRICS_REGISTRY
    1540        12525 :             .metrics_group
    1541        12525 :             .storage_controller_tenant_shards
    1542        12525 :             .dec();
    1543        12525 :     }
    1544              : }
    1545              : 
    1546              : #[cfg(test)]
    1547              : pub(crate) mod tests {
    1548              :     use std::{cell::RefCell, rc::Rc};
    1549              : 
    1550              :     use pageserver_api::{
    1551              :         controller_api::NodeAvailability,
    1552              :         shard::{ShardCount, ShardNumber},
    1553              :     };
    1554              :     use rand::{rngs::StdRng, SeedableRng};
    1555              :     use utils::id::TenantId;
    1556              : 
    1557              :     use crate::scheduler::test_utils::make_test_nodes;
    1558              : 
    1559              :     use super::*;
    1560              : 
    1561            7 :     fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
    1562            7 :         let tenant_id = TenantId::generate();
    1563            7 :         let shard_number = ShardNumber(0);
    1564            7 :         let shard_count = ShardCount::new(1);
    1565            7 : 
    1566            7 :         let tenant_shard_id = TenantShardId {
    1567            7 :             tenant_id,
    1568            7 :             shard_number,
    1569            7 :             shard_count,
    1570            7 :         };
    1571            7 :         TenantShard::new(
    1572            7 :             tenant_shard_id,
    1573            7 :             ShardIdentity::new(
    1574            7 :                 shard_number,
    1575            7 :                 shard_count,
    1576            7 :                 pageserver_api::shard::ShardStripeSize(32768),
    1577            7 :             )
    1578            7 :             .unwrap(),
    1579            7 :             policy,
    1580            7 :             None,
    1581            7 :         )
    1582            7 :     }
    1583              : 
    1584         5003 :     pub(crate) fn make_test_tenant(
    1585         5003 :         policy: PlacementPolicy,
    1586         5003 :         shard_count: ShardCount,
    1587         5003 :         preferred_az: Option<AvailabilityZone>,
    1588         5003 :     ) -> Vec<TenantShard> {
    1589         5003 :         make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
    1590         5003 :     }
    1591              : 
    1592         5006 :     pub(crate) fn make_test_tenant_with_id(
    1593         5006 :         tenant_id: TenantId,
    1594         5006 :         policy: PlacementPolicy,
    1595         5006 :         shard_count: ShardCount,
    1596         5006 :         preferred_az: Option<AvailabilityZone>,
    1597         5006 :     ) -> Vec<TenantShard> {
    1598         5006 :         (0..shard_count.count())
    1599        12518 :             .map(|i| {
    1600        12518 :                 let shard_number = ShardNumber(i);
    1601        12518 : 
    1602        12518 :                 let tenant_shard_id = TenantShardId {
    1603        12518 :                     tenant_id,
    1604        12518 :                     shard_number,
    1605        12518 :                     shard_count,
    1606        12518 :                 };
    1607        12518 :                 TenantShard::new(
    1608        12518 :                     tenant_shard_id,
    1609        12518 :                     ShardIdentity::new(
    1610        12518 :                         shard_number,
    1611        12518 :                         shard_count,
    1612        12518 :                         pageserver_api::shard::ShardStripeSize(32768),
    1613        12518 :                     )
    1614        12518 :                     .unwrap(),
    1615        12518 :                     policy.clone(),
    1616        12518 :                     preferred_az.clone(),
    1617        12518 :                 )
    1618        12518 :             })
    1619         5006 :             .collect()
    1620         5006 :     }
    1621              : 
    1622              :     /// Test the scheduling behaviors used when a tenant configured for HA is subject
    1623              :     /// to nodes being marked offline.
    1624              :     #[test]
    1625            1 :     fn tenant_ha_scheduling() -> anyhow::Result<()> {
    1626            1 :         // Start with three nodes.  Our tenant will only use two.  The third one is
    1627            1 :         // expected to remain unused.
    1628            1 :         let mut nodes = make_test_nodes(3, &[]);
    1629            1 : 
    1630            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1631            1 :         let mut context = ScheduleContext::default();
    1632            1 : 
    1633            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1634            1 :         tenant_shard
    1635            1 :             .schedule(&mut scheduler, &mut context)
    1636            1 :             .expect("we have enough nodes, scheduling should work");
    1637            1 : 
    1638            1 :         // Expect to initially be schedule on to different nodes
    1639            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    1640            1 :         assert!(tenant_shard.intent.attached.is_some());
    1641              : 
    1642            1 :         let attached_node_id = tenant_shard.intent.attached.unwrap();
    1643            1 :         let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
    1644            1 :         assert_ne!(attached_node_id, secondary_node_id);
    1645              : 
    1646              :         // Notifying the attached node is offline should demote it to a secondary
    1647            1 :         let changed = tenant_shard
    1648            1 :             .intent
    1649            1 :             .demote_attached(&mut scheduler, attached_node_id);
    1650            1 :         assert!(changed);
    1651            1 :         assert!(tenant_shard.intent.attached.is_none());
    1652            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 2);
    1653              : 
    1654              :         // Update the scheduler state to indicate the node is offline
    1655            1 :         nodes
    1656            1 :             .get_mut(&attached_node_id)
    1657            1 :             .unwrap()
    1658            1 :             .set_availability(NodeAvailability::Offline);
    1659            1 :         scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
    1660            1 : 
    1661            1 :         // Scheduling the node should promote the still-available secondary node to attached
    1662            1 :         tenant_shard
    1663            1 :             .schedule(&mut scheduler, &mut context)
    1664            1 :             .expect("active nodes are available");
    1665            1 :         assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
    1666              : 
    1667              :         // The original attached node should have been retained as a secondary
    1668            1 :         assert_eq!(
    1669            1 :             *tenant_shard.intent.secondary.iter().last().unwrap(),
    1670            1 :             attached_node_id
    1671            1 :         );
    1672              : 
    1673            1 :         tenant_shard.intent.clear(&mut scheduler);
    1674            1 : 
    1675            1 :         Ok(())
    1676            1 :     }
    1677              : 
    1678              :     #[test]
    1679            1 :     fn intent_from_observed() -> anyhow::Result<()> {
    1680            1 :         let nodes = make_test_nodes(3, &[]);
    1681            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1682            1 : 
    1683            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1684            1 : 
    1685            1 :         tenant_shard.observed.locations.insert(
    1686            1 :             NodeId(3),
    1687            1 :             ObservedStateLocation {
    1688            1 :                 conf: Some(LocationConfig {
    1689            1 :                     mode: LocationConfigMode::AttachedMulti,
    1690            1 :                     generation: Some(2),
    1691            1 :                     secondary_conf: None,
    1692            1 :                     shard_number: tenant_shard.shard.number.0,
    1693            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1694            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1695            1 :                     tenant_conf: TenantConfig::default(),
    1696            1 :                 }),
    1697            1 :             },
    1698            1 :         );
    1699            1 : 
    1700            1 :         tenant_shard.observed.locations.insert(
    1701            1 :             NodeId(2),
    1702            1 :             ObservedStateLocation {
    1703            1 :                 conf: Some(LocationConfig {
    1704            1 :                     mode: LocationConfigMode::AttachedStale,
    1705            1 :                     generation: Some(1),
    1706            1 :                     secondary_conf: None,
    1707            1 :                     shard_number: tenant_shard.shard.number.0,
    1708            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1709            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1710            1 :                     tenant_conf: TenantConfig::default(),
    1711            1 :                 }),
    1712            1 :             },
    1713            1 :         );
    1714            1 : 
    1715            1 :         tenant_shard.intent_from_observed(&mut scheduler);
    1716            1 : 
    1717            1 :         // The highest generationed attached location gets used as attached
    1718            1 :         assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
    1719              :         // Other locations get used as secondary
    1720            1 :         assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
    1721              : 
    1722            1 :         scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
    1723              : 
    1724            1 :         tenant_shard.intent.clear(&mut scheduler);
    1725            1 :         Ok(())
    1726            1 :     }
    1727              : 
    1728              :     #[test]
    1729            1 :     fn scheduling_mode() -> anyhow::Result<()> {
    1730            1 :         let nodes = make_test_nodes(3, &[]);
    1731            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1732            1 : 
    1733            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1734            1 : 
    1735            1 :         // In pause mode, schedule() shouldn't do anything
    1736            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
    1737            1 :         assert!(tenant_shard
    1738            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1739            1 :             .is_ok());
    1740            1 :         assert!(tenant_shard.intent.all_pageservers().is_empty());
    1741              : 
    1742              :         // In active mode, schedule() works
    1743            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
    1744            1 :         assert!(tenant_shard
    1745            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1746            1 :             .is_ok());
    1747            1 :         assert!(!tenant_shard.intent.all_pageservers().is_empty());
    1748              : 
    1749            1 :         tenant_shard.intent.clear(&mut scheduler);
    1750            1 :         Ok(())
    1751            1 :     }
    1752              : 
    1753              :     #[test]
    1754            1 :     fn optimize_attachment() -> anyhow::Result<()> {
    1755            1 :         let nodes = make_test_nodes(3, &[]);
    1756            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1757            1 : 
    1758            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1759            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1760            1 : 
    1761            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1762            1 :         // on different nodes.
    1763            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1764            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
    1765            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1766            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1767            1 : 
    1768            1 :         let mut schedule_context = ScheduleContext::default();
    1769            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1770            1 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1771            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1772            1 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1773            1 : 
    1774            1 :         let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
    1775            1 : 
    1776            1 :         // Either shard should recognize that it has the option to switch to a secondary location where there
    1777            1 :         // would be no other shards from the same tenant, and request to do so.
    1778            1 :         assert_eq!(
    1779            1 :             optimization_a,
    1780            1 :             Some(ScheduleOptimization {
    1781            1 :                 sequence: shard_a.sequence,
    1782            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1783            1 :                     old_attached_node_id: NodeId(1),
    1784            1 :                     new_attached_node_id: NodeId(2)
    1785            1 :                 })
    1786            1 :             })
    1787            1 :         );
    1788              : 
    1789              :         // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
    1790              :         // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
    1791              :         // of [`Service::optimize_all`] to avoid trying
    1792              :         // to do optimizations for multiple shards in the same tenant at the same time.  Generating
    1793              :         // both optimizations is just done for test purposes
    1794            1 :         let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
    1795            1 :         assert_eq!(
    1796            1 :             optimization_b,
    1797            1 :             Some(ScheduleOptimization {
    1798            1 :                 sequence: shard_b.sequence,
    1799            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1800            1 :                     old_attached_node_id: NodeId(1),
    1801            1 :                     new_attached_node_id: NodeId(3)
    1802            1 :                 })
    1803            1 :             })
    1804            1 :         );
    1805              : 
    1806              :         // Applying these optimizations should result in the end state proposed
    1807            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1808            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
    1809            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
    1810            1 :         shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
    1811            1 :         assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
    1812            1 :         assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
    1813              : 
    1814            1 :         shard_a.intent.clear(&mut scheduler);
    1815            1 :         shard_b.intent.clear(&mut scheduler);
    1816            1 : 
    1817            1 :         Ok(())
    1818            1 :     }
    1819              : 
    1820              :     #[test]
    1821            1 :     fn optimize_secondary() -> anyhow::Result<()> {
    1822            1 :         let nodes = make_test_nodes(4, &[]);
    1823            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1824            1 : 
    1825            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1826            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1827            1 : 
    1828            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1829            1 :         // on different nodes.
    1830            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1831            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    1832            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    1833            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1834            1 : 
    1835            1 :         let mut schedule_context = ScheduleContext::default();
    1836            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1837            1 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1838            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1839            1 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1840            1 : 
    1841            1 :         let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
    1842            1 : 
    1843            1 :         // Since there is a node with no locations available, the node with two locations for the
    1844            1 :         // same tenant should generate an optimization to move one away
    1845            1 :         assert_eq!(
    1846            1 :             optimization_a,
    1847            1 :             Some(ScheduleOptimization {
    1848            1 :                 sequence: shard_a.sequence,
    1849            1 :                 action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1850            1 :                     old_node_id: NodeId(3),
    1851            1 :                     new_node_id: NodeId(4)
    1852            1 :                 })
    1853            1 :             })
    1854            1 :         );
    1855              : 
    1856            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1857            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    1858            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
    1859              : 
    1860            1 :         shard_a.intent.clear(&mut scheduler);
    1861            1 :         shard_b.intent.clear(&mut scheduler);
    1862            1 : 
    1863            1 :         Ok(())
    1864            1 :     }
    1865              : 
    1866              :     // Optimize til quiescent: this emulates what Service::optimize_all does, when
    1867              :     // called repeatedly in the background.
    1868              :     // Returns the applied optimizations
    1869            3 :     fn optimize_til_idle(
    1870            3 :         nodes: &HashMap<NodeId, Node>,
    1871            3 :         scheduler: &mut Scheduler,
    1872            3 :         shards: &mut [TenantShard],
    1873            3 :     ) -> Vec<ScheduleOptimization> {
    1874            3 :         let mut loop_n = 0;
    1875            3 :         let mut optimizations = Vec::default();
    1876              :         loop {
    1877            9 :             let mut schedule_context = ScheduleContext::default();
    1878            9 :             let mut any_changed = false;
    1879              : 
    1880           36 :             for shard in shards.iter() {
    1881           36 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    1882           36 :                 if let Some(attached) = shard.intent.get_attached() {
    1883           36 :                     schedule_context.push_attached(*attached);
    1884           36 :                 }
    1885              :             }
    1886              : 
    1887           21 :             for shard in shards.iter_mut() {
    1888           21 :                 let optimization = shard.optimize_attachment(nodes, &schedule_context);
    1889           21 :                 if let Some(optimization) = optimization {
    1890            2 :                     optimizations.push(optimization.clone());
    1891            2 :                     shard.apply_optimization(scheduler, optimization);
    1892            2 :                     any_changed = true;
    1893            2 :                     break;
    1894           19 :                 }
    1895           19 : 
    1896           19 :                 let optimization = shard.optimize_secondary(scheduler, &schedule_context);
    1897           19 :                 if let Some(optimization) = optimization {
    1898            4 :                     optimizations.push(optimization.clone());
    1899            4 :                     shard.apply_optimization(scheduler, optimization);
    1900            4 :                     any_changed = true;
    1901            4 :                     break;
    1902           15 :                 }
    1903              :             }
    1904              : 
    1905            9 :             if !any_changed {
    1906            3 :                 break;
    1907            6 :             }
    1908            6 : 
    1909            6 :             // Assert no infinite loop
    1910            6 :             loop_n += 1;
    1911            6 :             assert!(loop_n < 1000);
    1912              :         }
    1913              : 
    1914            3 :         optimizations
    1915            3 :     }
    1916              : 
    1917              :     /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
    1918              :     /// that it converges.
    1919              :     #[test]
    1920            1 :     fn optimize_add_nodes() -> anyhow::Result<()> {
    1921            1 :         let nodes = make_test_nodes(4, &[]);
    1922            1 : 
    1923            1 :         // Only show the scheduler a couple of nodes
    1924            1 :         let mut scheduler = Scheduler::new([].iter());
    1925            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1926            1 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    1927            1 : 
    1928            1 :         let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
    1929            1 :         let mut schedule_context = ScheduleContext::default();
    1930            5 :         for shard in &mut shards {
    1931            4 :             assert!(shard
    1932            4 :                 .schedule(&mut scheduler, &mut schedule_context)
    1933            4 :                 .is_ok());
    1934              :         }
    1935              : 
    1936              :         // We should see equal number of locations on the two nodes.
    1937            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
    1938            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
    1939              : 
    1940            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
    1941            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
    1942              : 
    1943              :         // Add another two nodes: we should see the shards spread out when their optimize
    1944              :         // methods are called
    1945            1 :         scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
    1946            1 :         scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
    1947            1 :         optimize_til_idle(&nodes, &mut scheduler, &mut shards);
    1948            1 : 
    1949            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
    1950            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
    1951              : 
    1952            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    1953            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
    1954              : 
    1955            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
    1956            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
    1957              : 
    1958            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
    1959            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
    1960              : 
    1961            4 :         for shard in shards.iter_mut() {
    1962            4 :             shard.intent.clear(&mut scheduler);
    1963            4 :         }
    1964              : 
    1965            1 :         Ok(())
    1966            1 :     }
    1967              : 
    1968              :     /// Test that initial shard scheduling is optimal. By optimal we mean
    1969              :     /// that the optimizer cannot find a way to improve it.
    1970              :     ///
    1971              :     /// This test is an example of the scheduling issue described in
    1972              :     /// https://github.com/neondatabase/neon/issues/8969
    1973              :     #[test]
    1974            1 :     fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
    1975              :         use itertools::Itertools;
    1976              : 
    1977            1 :         let nodes = make_test_nodes(2, &[]);
    1978            1 : 
    1979            1 :         let mut scheduler = Scheduler::new([].iter());
    1980            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1981            1 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    1982            1 : 
    1983            1 :         let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
    1984            1 :         let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
    1985            1 : 
    1986            1 :         let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
    1987            1 :         let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
    1988            1 : 
    1989            4 :         let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
    1990            4 :         let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
    1991            1 : 
    1992            1 :         let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
    1993              : 
    1994            9 :         for (shard, context) in schedule_order {
    1995            8 :             let context = &mut *context.borrow_mut();
    1996            8 :             shard.schedule(&mut scheduler, context).unwrap();
    1997            8 :         }
    1998              : 
    1999            1 :         let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
    2000            1 :         assert_eq!(applied_to_a, vec![]);
    2001              : 
    2002            1 :         let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
    2003            1 :         assert_eq!(applied_to_b, vec![]);
    2004              : 
    2005            8 :         for shard in a.iter_mut().chain(b.iter_mut()) {
    2006            8 :             shard.intent.clear(&mut scheduler);
    2007            8 :         }
    2008              : 
    2009            1 :         Ok(())
    2010            1 :     }
    2011              : 
    2012              :     #[test]
    2013            1 :     fn random_az_shard_scheduling() -> anyhow::Result<()> {
    2014              :         use rand::seq::SliceRandom;
    2015              : 
    2016           51 :         for seed in 0..50 {
    2017           50 :             eprintln!("Running test with seed {seed}");
    2018           50 :             let mut rng = StdRng::seed_from_u64(seed);
    2019           50 : 
    2020           50 :             let az_a_tag = AvailabilityZone("az-a".to_string());
    2021           50 :             let az_b_tag = AvailabilityZone("az-b".to_string());
    2022           50 :             let azs = [az_a_tag, az_b_tag];
    2023           50 :             let nodes = make_test_nodes(4, &azs);
    2024           50 :             let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
    2025           50 : 
    2026           50 :             let mut scheduler = Scheduler::new([].iter());
    2027          200 :             for node in nodes.values() {
    2028          200 :                 scheduler.node_upsert(node);
    2029          200 :             }
    2030              : 
    2031           50 :             let mut shards = Vec::default();
    2032           50 :             let mut contexts = Vec::default();
    2033           50 :             let mut az_picker = azs.iter().cycle().cloned();
    2034         5050 :             for i in 0..100 {
    2035         5000 :                 let az = az_picker.next().unwrap();
    2036         5000 :                 let shard_count = i % 4 + 1;
    2037         5000 :                 *shards_per_az.entry(az.clone()).or_default() += shard_count;
    2038         5000 : 
    2039         5000 :                 let tenant_shards = make_test_tenant(
    2040         5000 :                     PlacementPolicy::Attached(1),
    2041         5000 :                     ShardCount::new(shard_count.try_into().unwrap()),
    2042         5000 :                     Some(az),
    2043         5000 :                 );
    2044         5000 :                 let context = Rc::new(RefCell::new(ScheduleContext::default()));
    2045         5000 : 
    2046         5000 :                 contexts.push(context.clone());
    2047         5000 :                 let with_ctx = tenant_shards
    2048         5000 :                     .into_iter()
    2049        12500 :                     .map(|shard| (shard, context.clone()));
    2050        17500 :                 for shard_with_ctx in with_ctx {
    2051        12500 :                     shards.push(shard_with_ctx);
    2052        12500 :                 }
    2053              :             }
    2054              : 
    2055           50 :             shards.shuffle(&mut rng);
    2056              : 
    2057              :             #[derive(Default, Debug)]
    2058              :             struct NodeStats {
    2059              :                 attachments: u32,
    2060              :                 secondaries: u32,
    2061              :             }
    2062              : 
    2063           50 :             let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
    2064           50 :             let mut attachments_in_wrong_az = 0;
    2065           50 :             let mut secondaries_in_wrong_az = 0;
    2066              : 
    2067        12550 :             for (shard, context) in &mut shards {
    2068        12500 :                 let context = &mut *context.borrow_mut();
    2069        12500 :                 shard.schedule(&mut scheduler, context).unwrap();
    2070        12500 : 
    2071        12500 :                 let attached_node = shard.intent.get_attached().unwrap();
    2072        12500 :                 let stats = node_stats.entry(attached_node).or_default();
    2073        12500 :                 stats.attachments += 1;
    2074        12500 : 
    2075        12500 :                 let secondary_node = *shard.intent.get_secondary().first().unwrap();
    2076        12500 :                 let stats = node_stats.entry(secondary_node).or_default();
    2077        12500 :                 stats.secondaries += 1;
    2078        12500 : 
    2079        12500 :                 let attached_node_az = nodes
    2080        12500 :                     .get(&attached_node)
    2081        12500 :                     .unwrap()
    2082        12500 :                     .get_availability_zone_id();
    2083        12500 :                 let secondary_node_az = nodes
    2084        12500 :                     .get(&secondary_node)
    2085        12500 :                     .unwrap()
    2086        12500 :                     .get_availability_zone_id();
    2087        12500 :                 let preferred_az = shard.preferred_az().unwrap();
    2088        12500 : 
    2089        12500 :                 if attached_node_az != preferred_az {
    2090            0 :                     eprintln!(
    2091            0 :                         "{} attachment was scheduled in AZ {} but preferred AZ {}",
    2092            0 :                         shard.tenant_shard_id, attached_node_az, preferred_az
    2093            0 :                     );
    2094            0 :                     attachments_in_wrong_az += 1;
    2095        12500 :                 }
    2096              : 
    2097        12500 :                 if secondary_node_az == preferred_az {
    2098            0 :                     eprintln!(
    2099            0 :                         "{} secondary was scheduled in AZ {} which matches preference",
    2100            0 :                         shard.tenant_shard_id, attached_node_az
    2101            0 :                     );
    2102            0 :                     secondaries_in_wrong_az += 1;
    2103        12500 :                 }
    2104              :             }
    2105              : 
    2106           50 :             let mut violations = Vec::default();
    2107           50 : 
    2108           50 :             if attachments_in_wrong_az > 0 {
    2109            0 :                 violations.push(format!(
    2110            0 :                     "{} attachments scheduled to the incorrect AZ",
    2111            0 :                     attachments_in_wrong_az
    2112            0 :                 ));
    2113           50 :             }
    2114              : 
    2115           50 :             if secondaries_in_wrong_az > 0 {
    2116            0 :                 violations.push(format!(
    2117            0 :                     "{} secondaries scheduled to the incorrect AZ",
    2118            0 :                     secondaries_in_wrong_az
    2119            0 :                 ));
    2120           50 :             }
    2121              : 
    2122           50 :             eprintln!(
    2123           50 :                 "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
    2124           50 :                 attachments_in_wrong_az, secondaries_in_wrong_az
    2125           50 :             );
    2126              : 
    2127          250 :             for (node_id, stats) in &node_stats {
    2128          200 :                 let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
    2129          200 :                 let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
    2130          200 :                 let allowed_attachment_load =
    2131          200 :                     (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
    2132          200 : 
    2133          200 :                 if !allowed_attachment_load.contains(&stats.attachments) {
    2134            0 :                     violations.push(format!(
    2135            0 :                         "Found {} attachments on node {}, but expected {}",
    2136            0 :                         stats.attachments, node_id, ideal_attachment_load
    2137            0 :                     ));
    2138          200 :                 }
    2139              : 
    2140          200 :                 eprintln!(
    2141          200 :                     "{}: attachments={} secondaries={} ideal_attachment_load={}",
    2142          200 :                     node_id, stats.attachments, stats.secondaries, ideal_attachment_load
    2143          200 :                 );
    2144              :             }
    2145              : 
    2146           50 :             assert!(violations.is_empty(), "{violations:?}");
    2147              : 
    2148        12550 :             for (mut shard, _ctx) in shards {
    2149        12500 :                 shard.intent.clear(&mut scheduler);
    2150        12500 :             }
    2151              :         }
    2152            1 :         Ok(())
    2153            1 :     }
    2154              : }
        

Generated by: LCOV version 2.1-beta