LCOV - code coverage report
Current view: top level - storage_controller/src - tenant_shard.rs (source / functions) Coverage Total Hit
Test: a43a77853355b937a79c57b07a8f05607cf29e6c.info Lines: 62.9 % 904 569
Test Date: 2024-09-19 12:04:32 Functions: 36.6 % 101 37

            Line data    Source code
       1              : use std::{
       2              :     collections::{HashMap, HashSet},
       3              :     sync::Arc,
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use crate::{
       8              :     metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
       9              :     persistence::TenantShardPersistence,
      10              :     reconciler::{ReconcileUnits, ReconcilerConfig},
      11              :     scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
      12              :     service::ReconcileResultRequest,
      13              : };
      14              : use pageserver_api::controller_api::{
      15              :     NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
      16              : };
      17              : use pageserver_api::{
      18              :     models::{LocationConfig, LocationConfigMode, TenantConfig},
      19              :     shard::{ShardIdentity, TenantShardId},
      20              : };
      21              : use serde::{Deserialize, Serialize};
      22              : use tokio::task::JoinHandle;
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::{instrument, Instrument};
      25              : use utils::{
      26              :     generation::Generation,
      27              :     id::NodeId,
      28              :     seqwait::{SeqWait, SeqWaitError},
      29              :     sync::gate::GateGuard,
      30              : };
      31              : 
      32              : use crate::{
      33              :     compute_hook::ComputeHook,
      34              :     node::Node,
      35              :     persistence::{split_state::SplitState, Persistence},
      36              :     reconciler::{
      37              :         attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
      38              :     },
      39              :     scheduler::{ScheduleError, Scheduler},
      40              :     service, Sequence,
      41              : };
      42              : 
      43              : /// Serialization helper
      44            0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
      45            0 : where
      46            0 :     S: serde::ser::Serializer,
      47            0 :     T: std::fmt::Display,
      48            0 : {
      49            0 :     serializer.collect_str(
      50            0 :         &v.lock()
      51            0 :             .unwrap()
      52            0 :             .as_ref()
      53            0 :             .map(|e| format!("{e}"))
      54            0 :             .unwrap_or("".to_string()),
      55            0 :     )
      56            0 : }
      57              : 
      58              : /// In-memory state for a particular tenant shard.
      59              : ///
      60              : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
      61              : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
      62            0 : #[derive(Serialize)]
      63              : pub(crate) struct TenantShard {
      64              :     pub(crate) tenant_shard_id: TenantShardId,
      65              : 
      66              :     pub(crate) shard: ShardIdentity,
      67              : 
      68              :     // Runtime only: sequence used to coordinate when updating this object while
      69              :     // with background reconcilers may be running.  A reconciler runs to a particular
      70              :     // sequence.
      71              :     pub(crate) sequence: Sequence,
      72              : 
      73              :     // Latest generation number: next time we attach, increment this
      74              :     // and use the incremented number when attaching.
      75              :     //
      76              :     // None represents an incompletely onboarded tenant via the [`Service::location_config`]
      77              :     // API, where this tenant may only run in PlacementPolicy::Secondary.
      78              :     pub(crate) generation: Option<Generation>,
      79              : 
      80              :     // High level description of how the tenant should be set up.  Provided
      81              :     // externally.
      82              :     pub(crate) policy: PlacementPolicy,
      83              : 
      84              :     // Low level description of exactly which pageservers should fulfil
      85              :     // which role.  Generated by `Self::schedule`.
      86              :     pub(crate) intent: IntentState,
      87              : 
      88              :     // Low level description of how the tenant is configured on pageservers:
      89              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      90              :     // with `Self::reconcile`.
      91              :     pub(crate) observed: ObservedState,
      92              : 
      93              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
      94              :     // for all shards in a tenant.
      95              :     pub(crate) config: TenantConfig,
      96              : 
      97              :     /// If a reconcile task is currently in flight, it may be joined here (it is
      98              :     /// only safe to join if either the result has been received or the reconciler's
      99              :     /// cancellation token has been fired)
     100              :     #[serde(skip)]
     101              :     pub(crate) reconciler: Option<ReconcilerHandle>,
     102              : 
     103              :     /// If a tenant is being split, then all shards with that TenantId will have a
     104              :     /// SplitState set, this acts as a guard against other operations such as background
     105              :     /// reconciliation, and timeline creation.
     106              :     pub(crate) splitting: SplitState,
     107              : 
     108              :     /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
     109              :     /// is set. This flag is cleared when the tenant is popped off the delay queue.
     110              :     pub(crate) delayed_reconcile: bool,
     111              : 
     112              :     /// Optionally wait for reconciliation to complete up to a particular
     113              :     /// sequence number.
     114              :     #[serde(skip)]
     115              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     116              : 
     117              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
     118              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
     119              :     /// and callers should stop waiting for `waiter` and propagate the error.
     120              :     #[serde(skip)]
     121              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     122              : 
     123              :     /// The most recent error from a reconcile on this tenant.  This is a nested Arc
     124              :     /// because:
     125              :     ///  - ReconcileWaiters need to Arc-clone the overall object to read it later
     126              :     ///  - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
     127              :     ///    many waiters for one shard, and the underlying error types are not Clone.
     128              :     ///
     129              :     /// TODO: generalize to an array of recent events
     130              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
     131              :     #[serde(serialize_with = "read_last_error")]
     132              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     133              : 
     134              :     /// If we have a pending compute notification that for some reason we weren't able to send,
     135              :     /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
     136              :     /// and trigger a Reconciler run.  This is the mechanism by which compute notifications are included in the scope
     137              :     /// of state that we publish externally in an eventually consistent way.
     138              :     pub(crate) pending_compute_notification: bool,
     139              : 
     140              :     // Support/debug tool: if something is going wrong or flapping with scheduling, this may
     141              :     // be set to a non-active state to avoid making changes while the issue is fixed.
     142              :     scheduling_policy: ShardSchedulingPolicy,
     143              : 
     144              :     // We should attempt to schedule this shard in the provided AZ to
     145              :     // decrease chances of cross-AZ compute.
     146              :     preferred_az_id: Option<String>,
     147              : }
     148              : 
     149              : #[derive(Default, Clone, Debug, Serialize)]
     150              : pub(crate) struct IntentState {
     151              :     attached: Option<NodeId>,
     152              :     secondary: Vec<NodeId>,
     153              : }
     154              : 
     155              : impl IntentState {
     156           13 :     pub(crate) fn new() -> Self {
     157           13 :         Self {
     158           13 :             attached: None,
     159           13 :             secondary: vec![],
     160           13 :         }
     161           13 :     }
     162            0 :     pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
     163            0 :         if let Some(node_id) = node_id {
     164            0 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
     165            0 :         }
     166            0 :         Self {
     167            0 :             attached: node_id,
     168            0 :             secondary: vec![],
     169            0 :         }
     170            0 :     }
     171              : 
     172           24 :     pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
     173           24 :         if self.attached != new_attached {
     174           24 :             if let Some(old_attached) = self.attached.take() {
     175            0 :                 scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
     176           24 :             }
     177           24 :             if let Some(new_attached) = &new_attached {
     178           24 :                 scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
     179           24 :             }
     180           24 :             self.attached = new_attached;
     181            0 :         }
     182           24 :     }
     183              : 
     184              :     /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
     185              :     /// secondary to attached while maintaining the scheduler's reference counts.
     186            5 :     pub(crate) fn promote_attached(
     187            5 :         &mut self,
     188            5 :         scheduler: &mut Scheduler,
     189            5 :         promote_secondary: NodeId,
     190            5 :     ) {
     191            5 :         // If we call this with a node that isn't in secondary, it would cause incorrect
     192            5 :         // scheduler reference counting, since we assume the node is already referenced as a secondary.
     193            5 :         debug_assert!(self.secondary.contains(&promote_secondary));
     194              : 
     195           10 :         self.secondary.retain(|n| n != &promote_secondary);
     196            5 : 
     197            5 :         let demoted = self.attached;
     198            5 :         self.attached = Some(promote_secondary);
     199            5 : 
     200            5 :         scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
     201            5 :         if let Some(demoted) = demoted {
     202            0 :             scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
     203            5 :         }
     204            5 :     }
     205              : 
     206           17 :     pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
     207           17 :         debug_assert!(!self.secondary.contains(&new_secondary));
     208           17 :         scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
     209           17 :         self.secondary.push(new_secondary);
     210           17 :     }
     211              : 
     212              :     /// It is legal to call this with a node that is not currently a secondary: that is a no-op
     213            5 :     pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
     214            5 :         let index = self.secondary.iter().position(|n| *n == node_id);
     215            5 :         if let Some(index) = index {
     216            5 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
     217            5 :             self.secondary.remove(index);
     218            5 :         }
     219            5 :     }
     220              : 
     221           23 :     pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
     222           23 :         for secondary in self.secondary.drain(..) {
     223           12 :             scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
     224           12 :         }
     225           23 :     }
     226              : 
     227              :     /// Remove the last secondary node from the list of secondaries
     228            0 :     pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
     229            0 :         if let Some(node_id) = self.secondary.pop() {
     230            0 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
     231            0 :         }
     232            0 :     }
     233              : 
     234           23 :     pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
     235           23 :         if let Some(old_attached) = self.attached.take() {
     236           23 :             scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
     237           23 :         }
     238              : 
     239           23 :         self.clear_secondary(scheduler);
     240           23 :     }
     241              : 
     242           70 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     243           70 :         let mut result = Vec::new();
     244           70 :         if let Some(p) = self.attached {
     245           68 :             result.push(p)
     246            2 :         }
     247              : 
     248           70 :         result.extend(self.secondary.iter().copied());
     249           70 : 
     250           70 :         result
     251           70 :     }
     252              : 
     253           59 :     pub(crate) fn get_attached(&self) -> &Option<NodeId> {
     254           59 :         &self.attached
     255           59 :     }
     256              : 
     257           16 :     pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
     258           16 :         &self.secondary
     259           16 :     }
     260              : 
     261              :     /// If the node is in use as the attached location, demote it into
     262              :     /// the list of secondary locations.  This is used when a node goes offline,
     263              :     /// and we want to use a different node for attachment, but not permanently
     264              :     /// forget the location on the offline node.
     265              :     ///
     266              :     /// Returns true if a change was made
     267            5 :     pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
     268            5 :         if self.attached == Some(node_id) {
     269            5 :             self.attached = None;
     270            5 :             self.secondary.push(node_id);
     271            5 :             scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
     272            5 :             true
     273              :         } else {
     274            0 :             false
     275              :         }
     276            5 :     }
     277              : }
     278              : 
     279              : impl Drop for IntentState {
     280           24 :     fn drop(&mut self) {
     281           24 :         // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
     282           24 :         // We do not check this while panicking, to avoid polluting unit test failures or
     283           24 :         // other assertions with this assertion's output.  It's still wrong to leak these,
     284           24 :         // but if we already have a panic then we don't need to independently flag this case.
     285           24 :         if !(std::thread::panicking()) {
     286           24 :             debug_assert!(self.attached.is_none() && self.secondary.is_empty());
     287            0 :         }
     288           23 :     }
     289              : }
     290              : 
     291            0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
     292              : pub(crate) struct ObservedState {
     293              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
     294              : }
     295              : 
     296              : /// Our latest knowledge of how this tenant is configured in the outside world.
     297              : ///
     298              : /// Meaning:
     299              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
     300              : ///       node for this shard.
     301              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
     302              : ///       what it is (e.g. we failed partway through configuring it)
     303              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     304              : ///       and that configuration will still be present unless something external interfered.
     305            0 : #[derive(Clone, Serialize, Deserialize, Debug)]
     306              : pub(crate) struct ObservedStateLocation {
     307              :     /// If None, it means we do not know the status of this shard's location on this node, but
     308              :     /// we know that we might have some state on this node.
     309              :     pub(crate) conf: Option<LocationConfig>,
     310              : }
     311              : pub(crate) struct ReconcilerWaiter {
     312              :     // For observability purposes, remember the ID of the shard we're
     313              :     // waiting for.
     314              :     pub(crate) tenant_shard_id: TenantShardId,
     315              : 
     316              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     317              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     318              :     error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
     319              :     seq: Sequence,
     320              : }
     321              : 
     322              : pub(crate) enum ReconcilerStatus {
     323              :     Done,
     324              :     Failed,
     325              :     InProgress,
     326              : }
     327              : 
     328            0 : #[derive(thiserror::Error, Debug)]
     329              : pub(crate) enum ReconcileWaitError {
     330              :     #[error("Timeout waiting for shard {0}")]
     331              :     Timeout(TenantShardId),
     332              :     #[error("shutting down")]
     333              :     Shutdown,
     334              :     #[error("Reconcile error on shard {0}: {1}")]
     335              :     Failed(TenantShardId, Arc<ReconcileError>),
     336              : }
     337              : 
     338              : #[derive(Eq, PartialEq, Debug)]
     339              : pub(crate) struct ReplaceSecondary {
     340              :     old_node_id: NodeId,
     341              :     new_node_id: NodeId,
     342              : }
     343              : 
     344              : #[derive(Eq, PartialEq, Debug)]
     345              : pub(crate) struct MigrateAttachment {
     346              :     pub(crate) old_attached_node_id: NodeId,
     347              :     pub(crate) new_attached_node_id: NodeId,
     348              : }
     349              : 
     350              : #[derive(Eq, PartialEq, Debug)]
     351              : pub(crate) enum ScheduleOptimizationAction {
     352              :     // Replace one of our secondary locations with a different node
     353              :     ReplaceSecondary(ReplaceSecondary),
     354              :     // Migrate attachment to an existing secondary location
     355              :     MigrateAttachment(MigrateAttachment),
     356              : }
     357              : 
     358              : #[derive(Eq, PartialEq, Debug)]
     359              : pub(crate) struct ScheduleOptimization {
     360              :     // What was the reconcile sequence when we generated this optimization?  The optimization
     361              :     // should only be applied if the shard's sequence is still at this value, in case other changes
     362              :     // happened between planning the optimization and applying it.
     363              :     sequence: Sequence,
     364              : 
     365              :     pub(crate) action: ScheduleOptimizationAction,
     366              : }
     367              : 
     368              : impl ReconcilerWaiter {
     369            0 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     370            0 :         tokio::select! {
     371            0 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     372            0 :                 result.map_err(|e| match e {
     373            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     374            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     375            0 :                 })?;
     376              :             },
     377            0 :             result = self.error_seq_wait.wait_for(self.seq) => {
     378            0 :                 result.map_err(|e| match e {
     379            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     380            0 :                     SeqWaitError::Timeout => unreachable!()
     381            0 :                 })?;
     382              : 
     383            0 :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
     384            0 :                     self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
     385              :             }
     386              :         }
     387              : 
     388            0 :         Ok(())
     389            0 :     }
     390              : 
     391            0 :     pub(crate) fn get_status(&self) -> ReconcilerStatus {
     392            0 :         if self.seq_wait.would_wait_for(self.seq).is_ok() {
     393            0 :             ReconcilerStatus::Done
     394            0 :         } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
     395            0 :             ReconcilerStatus::Failed
     396              :         } else {
     397            0 :             ReconcilerStatus::InProgress
     398              :         }
     399            0 :     }
     400              : }
     401              : 
     402              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     403              : /// information to optionally cancel & await it later.
     404              : pub(crate) struct ReconcilerHandle {
     405              :     sequence: Sequence,
     406              :     handle: JoinHandle<()>,
     407              :     cancel: CancellationToken,
     408              : }
     409              : 
     410              : pub(crate) enum ReconcileNeeded {
     411              :     /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
     412              :     /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
     413              :     No,
     414              :     /// shard has a reconciler running, and its intent hasn't changed since that one was
     415              :     /// spawned: wait for the existing reconciler rather than spawning a new one.
     416              :     WaitExisting(ReconcilerWaiter),
     417              :     /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
     418              :     Yes,
     419              : }
     420              : 
     421              : /// When a reconcile task completes, it sends this result object
     422              : /// to be applied to the primary TenantShard.
     423              : pub(crate) struct ReconcileResult {
     424              :     pub(crate) sequence: Sequence,
     425              :     /// On errors, `observed` should be treated as an incompleted description
     426              :     /// of state (i.e. any nodes present in the result should override nodes
     427              :     /// present in the parent tenant state, but any unmentioned nodes should
     428              :     /// not be removed from parent tenant state)
     429              :     pub(crate) result: Result<(), ReconcileError>,
     430              : 
     431              :     pub(crate) tenant_shard_id: TenantShardId,
     432              :     pub(crate) generation: Option<Generation>,
     433              :     pub(crate) observed: ObservedState,
     434              : 
     435              :     /// Set [`TenantShard::pending_compute_notification`] from this flag
     436              :     pub(crate) pending_compute_notification: bool,
     437              : }
     438              : 
     439              : impl ObservedState {
     440            0 :     pub(crate) fn new() -> Self {
     441            0 :         Self {
     442            0 :             locations: HashMap::new(),
     443            0 :         }
     444            0 :     }
     445              : }
     446              : 
     447              : impl TenantShard {
     448           11 :     pub(crate) fn new(
     449           11 :         tenant_shard_id: TenantShardId,
     450           11 :         shard: ShardIdentity,
     451           11 :         policy: PlacementPolicy,
     452           11 :     ) -> Self {
     453           11 :         Self {
     454           11 :             tenant_shard_id,
     455           11 :             policy,
     456           11 :             intent: IntentState::default(),
     457           11 :             generation: Some(Generation::new(0)),
     458           11 :             shard,
     459           11 :             observed: ObservedState::default(),
     460           11 :             config: TenantConfig::default(),
     461           11 :             reconciler: None,
     462           11 :             splitting: SplitState::Idle,
     463           11 :             sequence: Sequence(1),
     464           11 :             delayed_reconcile: false,
     465           11 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     466           11 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     467           11 :             last_error: Arc::default(),
     468           11 :             pending_compute_notification: false,
     469           11 :             scheduling_policy: ShardSchedulingPolicy::default(),
     470           11 :             preferred_az_id: None,
     471           11 :         }
     472           11 :     }
     473              : 
     474              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     475              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     476              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     477              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     478            1 :     pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
     479            1 :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     480            1 :         // generation
     481            1 :         let mut attached_locs = self
     482            1 :             .observed
     483            1 :             .locations
     484            1 :             .iter()
     485            2 :             .filter_map(|(node_id, l)| {
     486            2 :                 if let Some(conf) = &l.conf {
     487            2 :                     if conf.mode == LocationConfigMode::AttachedMulti
     488            1 :                         || conf.mode == LocationConfigMode::AttachedSingle
     489            1 :                         || conf.mode == LocationConfigMode::AttachedStale
     490              :                     {
     491            2 :                         Some((node_id, conf.generation))
     492              :                     } else {
     493            0 :                         None
     494              :                     }
     495              :                 } else {
     496            0 :                     None
     497              :                 }
     498            2 :             })
     499            1 :             .collect::<Vec<_>>();
     500            1 : 
     501            2 :         attached_locs.sort_by_key(|i| i.1);
     502            1 :         if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
     503            1 :             self.intent.set_attached(scheduler, Some(*node_id));
     504            1 :         }
     505              : 
     506              :         // All remaining observed locations generate secondary intents.  This includes None
     507              :         // observations, as these may well have some local content on disk that is usable (this
     508              :         // is an edge case that might occur if we restarted during a migration or other change)
     509              :         //
     510              :         // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
     511              :         // will take care of promoting one of these secondaries to be attached.
     512            2 :         self.observed.locations.keys().for_each(|node_id| {
     513            2 :             if Some(*node_id) != self.intent.attached {
     514            1 :                 self.intent.push_secondary(scheduler, *node_id);
     515            1 :             }
     516            2 :         });
     517            1 :     }
     518              : 
     519              :     /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
     520              :     /// attached pageserver for a shard.
     521              :     ///
     522              :     /// Returns whether we modified it, and the NodeId selected.
     523            7 :     fn schedule_attached(
     524            7 :         &mut self,
     525            7 :         scheduler: &mut Scheduler,
     526            7 :         context: &ScheduleContext,
     527            7 :     ) -> Result<(bool, NodeId), ScheduleError> {
     528              :         // No work to do if we already have an attached tenant
     529            7 :         if let Some(node_id) = self.intent.attached {
     530            0 :             return Ok((false, node_id));
     531            7 :         }
     532              : 
     533            7 :         if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
     534              :             // Promote a secondary
     535            1 :             tracing::debug!("Promoted secondary {} to attached", promote_secondary);
     536            1 :             self.intent.promote_attached(scheduler, promote_secondary);
     537            1 :             Ok((true, promote_secondary))
     538              :         } else {
     539              :             // Pick a fresh node: either we had no secondaries or none were schedulable
     540            6 :             let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
     541            6 :             tracing::debug!("Selected {} as attached", node_id);
     542            6 :             self.intent.set_attached(scheduler, Some(node_id));
     543            6 :             Ok((true, node_id))
     544              :         }
     545            7 :     }
     546              : 
     547            8 :     pub(crate) fn schedule(
     548            8 :         &mut self,
     549            8 :         scheduler: &mut Scheduler,
     550            8 :         context: &mut ScheduleContext,
     551            8 :     ) -> Result<(), ScheduleError> {
     552            8 :         let r = self.do_schedule(scheduler, context);
     553            8 : 
     554            8 :         context.avoid(&self.intent.all_pageservers());
     555            8 :         if let Some(attached) = self.intent.get_attached() {
     556            7 :             context.push_attached(*attached);
     557            7 :         }
     558              : 
     559            8 :         r
     560            8 :     }
     561              : 
     562            8 :     pub(crate) fn do_schedule(
     563            8 :         &mut self,
     564            8 :         scheduler: &mut Scheduler,
     565            8 :         context: &ScheduleContext,
     566            8 :     ) -> Result<(), ScheduleError> {
     567            8 :         // TODO: before scheduling new nodes, check if any existing content in
     568            8 :         // self.intent refers to pageservers that are offline, and pick other
     569            8 :         // pageservers if so.
     570            8 : 
     571            8 :         // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
     572            8 :         // change their attach location.
     573            8 : 
     574            8 :         match self.scheduling_policy {
     575            7 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
     576              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     577              :                 // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
     578            1 :                 tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     579            0 :                     "Scheduling is disabled by policy {:?}", self.scheduling_policy);
     580            1 :                 return Ok(());
     581              :             }
     582              :         }
     583              : 
     584              :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     585              :         // more work on the same pageservers we're already using.
     586            7 :         let mut modified = false;
     587              : 
     588              :         // Add/remove nodes to fulfil policy
     589              :         use PlacementPolicy::*;
     590            7 :         match self.policy {
     591            7 :             Attached(secondary_count) => {
     592            7 :                 let retain_secondaries = if self.intent.attached.is_none()
     593            7 :                     && scheduler.node_preferred(&self.intent.secondary).is_some()
     594              :                 {
     595              :                     // If we have no attached, and one of the secondaries is elegible to be promoted, retain
     596              :                     // one more secondary than we usually would, as one of them will become attached futher down this function.
     597            1 :                     secondary_count + 1
     598              :                 } else {
     599            6 :                     secondary_count
     600              :                 };
     601              : 
     602            7 :                 while self.intent.secondary.len() > retain_secondaries {
     603            0 :                     // We have no particular preference for one secondary location over another: just
     604            0 :                     // arbitrarily drop from the end
     605            0 :                     self.intent.pop_secondary(scheduler);
     606            0 :                     modified = true;
     607            0 :                 }
     608              : 
     609              :                 // Should have exactly one attached, and N secondaries
     610            7 :                 let (modified_attached, attached_node_id) =
     611            7 :                     self.schedule_attached(scheduler, context)?;
     612            7 :                 modified |= modified_attached;
     613            7 : 
     614            7 :                 let mut used_pageservers = vec![attached_node_id];
     615           13 :                 while self.intent.secondary.len() < secondary_count {
     616            6 :                     let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
     617            6 :                     self.intent.push_secondary(scheduler, node_id);
     618            6 :                     used_pageservers.push(node_id);
     619            6 :                     modified = true;
     620              :                 }
     621              :             }
     622              :             Secondary => {
     623            0 :                 if let Some(node_id) = self.intent.get_attached() {
     624            0 :                     // Populate secondary by demoting the attached node
     625            0 :                     self.intent.demote_attached(scheduler, *node_id);
     626            0 :                     modified = true;
     627            0 :                 } else if self.intent.secondary.is_empty() {
     628              :                     // Populate secondary by scheduling a fresh node
     629            0 :                     let node_id = scheduler.schedule_shard(&[], context)?;
     630            0 :                     self.intent.push_secondary(scheduler, node_id);
     631            0 :                     modified = true;
     632            0 :                 }
     633            0 :                 while self.intent.secondary.len() > 1 {
     634            0 :                     // We have no particular preference for one secondary location over another: just
     635            0 :                     // arbitrarily drop from the end
     636            0 :                     self.intent.pop_secondary(scheduler);
     637            0 :                     modified = true;
     638            0 :                 }
     639              :             }
     640              :             Detached => {
     641              :                 // Never add locations in this mode
     642            0 :                 if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
     643            0 :                     self.intent.clear(scheduler);
     644            0 :                     modified = true;
     645            0 :                 }
     646              :             }
     647              :         }
     648              : 
     649            7 :         if modified {
     650            7 :             self.sequence.0 += 1;
     651            7 :         }
     652              : 
     653            7 :         Ok(())
     654            8 :     }
     655              : 
     656              :     /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
     657              :     /// if the swap is not possible and leaves the intent state in its original state.
     658              :     ///
     659              :     /// Arguments:
     660              :     /// `attached_to`: the currently attached location matching the intent state (may be None if the
     661              :     /// shard is not attached)
     662              :     /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
     663              :     /// the scheduler to recommend a node
     664            0 :     pub(crate) fn reschedule_to_secondary(
     665            0 :         &mut self,
     666            0 :         promote_to: Option<NodeId>,
     667            0 :         scheduler: &mut Scheduler,
     668            0 :     ) -> Result<(), ScheduleError> {
     669            0 :         let promote_to = match promote_to {
     670            0 :             Some(node) => node,
     671            0 :             None => match scheduler.node_preferred(self.intent.get_secondary()) {
     672            0 :                 Some(node) => node,
     673              :                 None => {
     674            0 :                     return Err(ScheduleError::ImpossibleConstraint);
     675              :                 }
     676              :             },
     677              :         };
     678              : 
     679            0 :         assert!(self.intent.get_secondary().contains(&promote_to));
     680              : 
     681            0 :         if let Some(node) = self.intent.get_attached() {
     682            0 :             let demoted = self.intent.demote_attached(scheduler, *node);
     683            0 :             if !demoted {
     684            0 :                 return Err(ScheduleError::ImpossibleConstraint);
     685            0 :             }
     686            0 :         }
     687              : 
     688            0 :         self.intent.promote_attached(scheduler, promote_to);
     689            0 : 
     690            0 :         // Increment the sequence number for the edge case where a
     691            0 :         // reconciler is already running to avoid waiting on the
     692            0 :         // current reconcile instead of spawning a new one.
     693            0 :         self.sequence = self.sequence.next();
     694            0 : 
     695            0 :         Ok(())
     696            0 :     }
     697              : 
     698              :     /// Optimize attachments: if a shard has a secondary location that is preferable to
     699              :     /// its primary location based on soft constraints, switch that secondary location
     700              :     /// to be attached.
     701           15 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     702              :     pub(crate) fn optimize_attachment(
     703              :         &self,
     704              :         nodes: &HashMap<NodeId, Node>,
     705              :         schedule_context: &ScheduleContext,
     706              :     ) -> Option<ScheduleOptimization> {
     707              :         let attached = (*self.intent.get_attached())?;
     708              :         if self.intent.secondary.is_empty() {
     709              :             // We can only do useful work if we have both attached and secondary locations: this
     710              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     711              :             return None;
     712              :         }
     713              : 
     714              :         let current_affinity_score = schedule_context.get_node_affinity(attached);
     715              :         let current_attachment_count = schedule_context.get_node_attachments(attached);
     716              : 
     717              :         // Generate score for each node, dropping any un-schedulable nodes.
     718              :         let all_pageservers = self.intent.all_pageservers();
     719              :         let mut scores = all_pageservers
     720              :             .iter()
     721           30 :             .flat_map(|node_id| {
     722           30 :                 let node = nodes.get(node_id);
     723           30 :                 if node.is_none() {
     724            0 :                     None
     725           30 :                 } else if matches!(
     726           30 :                     node.unwrap().get_scheduling(),
     727              :                     NodeSchedulingPolicy::Filling
     728              :                 ) {
     729              :                     // If the node is currently filling, don't count it as a candidate to avoid,
     730              :                     // racing with the background fill.
     731            0 :                     None
     732           30 :                 } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
     733            0 :                     None
     734              :                 } else {
     735           30 :                     let affinity_score = schedule_context.get_node_affinity(*node_id);
     736           30 :                     let attachment_count = schedule_context.get_node_attachments(*node_id);
     737           30 :                     Some((*node_id, affinity_score, attachment_count))
     738              :                 }
     739           30 :             })
     740              :             .collect::<Vec<_>>();
     741              : 
     742              :         // Sort precedence:
     743              :         //  1st - prefer nodes with the lowest total affinity score
     744              :         //  2nd - prefer nodes with the lowest number of attachments in this context
     745              :         //  3rd - if all else is equal, sort by node ID for determinism in tests.
     746           30 :         scores.sort_by_key(|i| (i.1, i.2, i.0));
     747              : 
     748              :         if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
     749              :             scores.first()
     750              :         {
     751              :             if attached != *preferred_node {
     752              :                 // The best alternative must be more than 1 better than us, otherwise we could end
     753              :                 // up flapping back next time we're called (e.g. there's no point migrating from
     754              :                 // a location with score 1 to a score zero, because on next location the situation
     755              :                 // would be the same, but in reverse).
     756              :                 if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
     757              :                     || current_attachment_count > *preferred_attachment_count + 1
     758              :                 {
     759              :                     tracing::info!(
     760              :                         "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
     761              :                         self.intent.get_secondary()
     762              :                     );
     763              :                     return Some(ScheduleOptimization {
     764              :                         sequence: self.sequence,
     765              :                         action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
     766              :                             old_attached_node_id: attached,
     767              :                             new_attached_node_id: *preferred_node,
     768              :                         }),
     769              :                     });
     770              :                 }
     771              :             } else {
     772              :                 tracing::debug!(
     773              :                     "Node {} is already preferred (score {:?})",
     774              :                     preferred_node,
     775              :                     preferred_affinity_score
     776              :                 );
     777              :             }
     778              :         }
     779              : 
     780              :         // Fall-through: we didn't find an optimization
     781              :         None
     782              :     }
     783              : 
     784           12 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     785              :     pub(crate) fn optimize_secondary(
     786              :         &self,
     787              :         scheduler: &mut Scheduler,
     788              :         schedule_context: &ScheduleContext,
     789              :     ) -> Option<ScheduleOptimization> {
     790              :         if self.intent.secondary.is_empty() {
     791              :             // We can only do useful work if we have both attached and secondary locations: this
     792              :             // function doesn't schedule new locations, only swaps between attached and secondaries.
     793              :             return None;
     794              :         }
     795              : 
     796              :         for secondary in self.intent.get_secondary() {
     797              :             let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
     798              :                 // We're already on a node unaffected any affinity constraints,
     799              :                 // so we won't change it.
     800              :                 continue;
     801              :             };
     802              : 
     803              :             // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
     804              :             // This implicitly limits the choice to nodes that are available, and prefers nodes
     805              :             // with lower utilization.
     806              :             let Ok(candidate_node) =
     807              :                 scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
     808              :             else {
     809              :                 // A scheduling error means we have no possible candidate replacements
     810              :                 continue;
     811              :             };
     812              : 
     813              :             let candidate_affinity_score = schedule_context
     814              :                 .nodes
     815              :                 .get(&candidate_node)
     816              :                 .unwrap_or(&AffinityScore::FREE);
     817              : 
     818              :             // The best alternative must be more than 1 better than us, otherwise we could end
     819              :             // up flapping back next time we're called.
     820              :             if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
     821              :                 // If some other node is available and has a lower score than this node, then
     822              :                 // that other node is a good place to migrate to.
     823              :                 tracing::info!(
     824              :                     "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
     825              :                     self.intent.get_secondary()
     826              :                 );
     827              :                 return Some(ScheduleOptimization {
     828              :                     sequence: self.sequence,
     829              :                     action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
     830              :                         old_node_id: *secondary,
     831              :                         new_node_id: candidate_node,
     832              :                     }),
     833              :                 });
     834              :             }
     835              :         }
     836              : 
     837              :         None
     838              :     }
     839              : 
     840              :     /// Return true if the optimization was really applied: it will not be applied if the optimization's
     841              :     /// sequence is behind this tenant shard's
     842            9 :     pub(crate) fn apply_optimization(
     843            9 :         &mut self,
     844            9 :         scheduler: &mut Scheduler,
     845            9 :         optimization: ScheduleOptimization,
     846            9 :     ) -> bool {
     847            9 :         if optimization.sequence != self.sequence {
     848            0 :             return false;
     849            9 :         }
     850            9 : 
     851            9 :         metrics::METRICS_REGISTRY
     852            9 :             .metrics_group
     853            9 :             .storage_controller_schedule_optimization
     854            9 :             .inc();
     855            9 : 
     856            9 :         match optimization.action {
     857              :             ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
     858            4 :                 old_attached_node_id,
     859            4 :                 new_attached_node_id,
     860            4 :             }) => {
     861            4 :                 self.intent.demote_attached(scheduler, old_attached_node_id);
     862            4 :                 self.intent
     863            4 :                     .promote_attached(scheduler, new_attached_node_id);
     864            4 :             }
     865              :             ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
     866            5 :                 old_node_id,
     867            5 :                 new_node_id,
     868            5 :             }) => {
     869            5 :                 self.intent.remove_secondary(scheduler, old_node_id);
     870            5 :                 self.intent.push_secondary(scheduler, new_node_id);
     871            5 :             }
     872              :         }
     873              : 
     874            9 :         true
     875            9 :     }
     876              : 
     877              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
     878              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
     879              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
     880              :     ///
     881              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
     882              :     /// funciton should not be used to decide whether to reconcile.
     883            0 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
     884            0 :         if let Some(attach_intent) = self.intent.attached {
     885            0 :             match self.observed.locations.get(&attach_intent) {
     886            0 :                 Some(loc) => match &loc.conf {
     887            0 :                     Some(conf) => match conf.mode {
     888              :                         LocationConfigMode::AttachedMulti
     889              :                         | LocationConfigMode::AttachedSingle
     890              :                         | LocationConfigMode::AttachedStale => {
     891              :                             // Our intent and observed state agree that this node is in an attached state.
     892            0 :                             Some(attach_intent)
     893              :                         }
     894              :                         // Our observed config is not an attached state
     895            0 :                         _ => None,
     896              :                     },
     897              :                     // Our observed state is None, i.e. in flux
     898            0 :                     None => None,
     899              :                 },
     900              :                 // We have no observed state for this node
     901            0 :                 None => None,
     902              :             }
     903              :         } else {
     904              :             // Our intent is not to attach
     905            0 :             None
     906              :         }
     907            0 :     }
     908              : 
     909            0 :     fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
     910            0 :         let mut dirty_nodes = HashSet::new();
     911              : 
     912            0 :         if let Some(node_id) = self.intent.attached {
     913              :             // Maybe panic: it is a severe bug if we try to attach while generation is null.
     914            0 :             let generation = self
     915            0 :                 .generation
     916            0 :                 .expect("Attempted to enter attached state without a generation");
     917            0 : 
     918            0 :             let wanted_conf =
     919            0 :                 attached_location_conf(generation, &self.shard, &self.config, &self.policy);
     920            0 :             match self.observed.locations.get(&node_id) {
     921            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     922            0 :                 Some(_) | None => {
     923            0 :                     dirty_nodes.insert(node_id);
     924            0 :                 }
     925              :             }
     926            0 :         }
     927              : 
     928            0 :         for node_id in &self.intent.secondary {
     929            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     930            0 :             match self.observed.locations.get(node_id) {
     931            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     932            0 :                 Some(_) | None => {
     933            0 :                     dirty_nodes.insert(*node_id);
     934            0 :                 }
     935              :             }
     936              :         }
     937              : 
     938            0 :         for node_id in self.observed.locations.keys() {
     939            0 :             if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
     940            0 :                 // We have observed state that isn't part of our intent: need to clean it up.
     941            0 :                 dirty_nodes.insert(*node_id);
     942            0 :             }
     943              :         }
     944              : 
     945            0 :         dirty_nodes.retain(|node_id| {
     946            0 :             nodes
     947            0 :                 .get(node_id)
     948            0 :                 .map(|n| n.is_available())
     949            0 :                 .unwrap_or(false)
     950            0 :         });
     951            0 : 
     952            0 :         !dirty_nodes.is_empty()
     953            0 :     }
     954              : 
     955              :     #[allow(clippy::too_many_arguments)]
     956            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     957              :     pub(crate) fn get_reconcile_needed(
     958              :         &mut self,
     959              :         pageservers: &Arc<HashMap<NodeId, Node>>,
     960              :     ) -> ReconcileNeeded {
     961              :         // If there are any ambiguous observed states, and the nodes they refer to are available,
     962              :         // we should reconcile to clean them up.
     963              :         let mut dirty_observed = false;
     964              :         for (node_id, observed_loc) in &self.observed.locations {
     965              :             let node = pageservers
     966              :                 .get(node_id)
     967              :                 .expect("Nodes may not be removed while referenced");
     968              :             if observed_loc.conf.is_none() && node.is_available() {
     969              :                 dirty_observed = true;
     970              :                 break;
     971              :             }
     972              :         }
     973              : 
     974              :         let active_nodes_dirty = self.dirty(pageservers);
     975              : 
     976              :         // Even if there is no pageserver work to be done, if we have a pending notification to computes,
     977              :         // wake up a reconciler to send it.
     978              :         let do_reconcile =
     979              :             active_nodes_dirty || dirty_observed || self.pending_compute_notification;
     980              : 
     981              :         if !do_reconcile {
     982              :             tracing::debug!("Not dirty, no reconciliation needed.");
     983              :             return ReconcileNeeded::No;
     984              :         }
     985              : 
     986              :         // If we are currently splitting, then never start a reconciler task: the splitting logic
     987              :         // requires that shards are not interfered with while it runs. Do this check here rather than
     988              :         // up top, so that we only log this message if we would otherwise have done a reconciliation.
     989              :         if !matches!(self.splitting, SplitState::Idle) {
     990              :             tracing::info!("Refusing to reconcile, splitting in progress");
     991              :             return ReconcileNeeded::No;
     992              :         }
     993              : 
     994              :         // Reconcile already in flight for the current sequence?
     995              :         if let Some(handle) = &self.reconciler {
     996              :             if handle.sequence == self.sequence {
     997              :                 tracing::info!(
     998              :                     "Reconciliation already in progress for sequence {:?}",
     999              :                     self.sequence,
    1000              :                 );
    1001              :                 return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
    1002              :                     tenant_shard_id: self.tenant_shard_id,
    1003              :                     seq_wait: self.waiter.clone(),
    1004              :                     error_seq_wait: self.error_waiter.clone(),
    1005              :                     error: self.last_error.clone(),
    1006              :                     seq: self.sequence,
    1007              :                 });
    1008              :             }
    1009              :         }
    1010              : 
    1011              :         // Pre-checks done: finally check whether we may actually do the work
    1012              :         match self.scheduling_policy {
    1013              :             ShardSchedulingPolicy::Active
    1014              :             | ShardSchedulingPolicy::Essential
    1015              :             | ShardSchedulingPolicy::Pause => {}
    1016              :             ShardSchedulingPolicy::Stop => {
    1017              :                 // We only reach this point if there is work to do and we're going to skip
    1018              :                 // doing it: warn it obvious why this tenant isn't doing what it ought to.
    1019              :                 tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
    1020              :                 return ReconcileNeeded::No;
    1021              :             }
    1022              :         }
    1023              : 
    1024              :         ReconcileNeeded::Yes
    1025              :     }
    1026              : 
    1027              :     /// Ensure the sequence number is set to a value where waiting for this value will make us wait
    1028              :     /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
    1029              :     ///
    1030              :     /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
    1031              :     /// that the waiter will not complete until some future Reconciler is constructed and run.
    1032            0 :     fn ensure_sequence_ahead(&mut self) {
    1033            0 :         // Find the highest sequence for which a Reconciler has previously run or is currently
    1034            0 :         // running
    1035            0 :         let max_seen = std::cmp::max(
    1036            0 :             self.reconciler
    1037            0 :                 .as_ref()
    1038            0 :                 .map(|r| r.sequence)
    1039            0 :                 .unwrap_or(Sequence(0)),
    1040            0 :             std::cmp::max(self.waiter.load(), self.error_waiter.load()),
    1041            0 :         );
    1042            0 : 
    1043            0 :         if self.sequence <= max_seen {
    1044            0 :             self.sequence = max_seen.next();
    1045            0 :         }
    1046            0 :     }
    1047              : 
    1048              :     /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
    1049              :     ///
    1050              :     /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
    1051              :     /// you would like to wait on the next reconciler that gets spawned in the background.
    1052            0 :     pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
    1053            0 :         self.ensure_sequence_ahead();
    1054            0 : 
    1055            0 :         ReconcilerWaiter {
    1056            0 :             tenant_shard_id: self.tenant_shard_id,
    1057            0 :             seq_wait: self.waiter.clone(),
    1058            0 :             error_seq_wait: self.error_waiter.clone(),
    1059            0 :             error: self.last_error.clone(),
    1060            0 :             seq: self.sequence,
    1061            0 :         }
    1062            0 :     }
    1063              : 
    1064              :     #[allow(clippy::too_many_arguments)]
    1065            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    1066              :     pub(crate) fn spawn_reconciler(
    1067              :         &mut self,
    1068              :         result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
    1069              :         pageservers: &Arc<HashMap<NodeId, Node>>,
    1070              :         compute_hook: &Arc<ComputeHook>,
    1071              :         reconciler_config: ReconcilerConfig,
    1072              :         service_config: &service::Config,
    1073              :         persistence: &Arc<Persistence>,
    1074              :         units: ReconcileUnits,
    1075              :         gate_guard: GateGuard,
    1076              :         cancel: &CancellationToken,
    1077              :     ) -> Option<ReconcilerWaiter> {
    1078              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
    1079              :         // doing our sequence's work.
    1080              :         let old_handle = self.reconciler.take();
    1081              : 
    1082              :         // Build list of nodes from which the reconciler should detach
    1083              :         let mut detach = Vec::new();
    1084              :         for node_id in self.observed.locations.keys() {
    1085              :             if self.intent.get_attached() != &Some(*node_id)
    1086              :                 && !self.intent.secondary.contains(node_id)
    1087              :             {
    1088              :                 detach.push(
    1089              :                     pageservers
    1090              :                         .get(node_id)
    1091              :                         .expect("Intent references non-existent pageserver")
    1092              :                         .clone(),
    1093              :                 )
    1094              :             }
    1095              :         }
    1096              : 
    1097              :         // Advance the sequence before spawning a reconciler, so that sequence waiters
    1098              :         // can distinguish between before+after the reconcile completes.
    1099              :         self.ensure_sequence_ahead();
    1100              : 
    1101              :         let reconciler_cancel = cancel.child_token();
    1102              :         let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
    1103              :         let mut reconciler = Reconciler {
    1104              :             tenant_shard_id: self.tenant_shard_id,
    1105              :             shard: self.shard,
    1106              :             placement_policy: self.policy.clone(),
    1107              :             generation: self.generation,
    1108              :             intent: reconciler_intent,
    1109              :             detach,
    1110              :             reconciler_config,
    1111              :             config: self.config.clone(),
    1112              :             observed: self.observed.clone(),
    1113              :             compute_hook: compute_hook.clone(),
    1114              :             service_config: service_config.clone(),
    1115              :             _gate_guard: gate_guard,
    1116              :             _resource_units: units,
    1117              :             cancel: reconciler_cancel.clone(),
    1118              :             persistence: persistence.clone(),
    1119              :             compute_notify_failure: false,
    1120              :         };
    1121              : 
    1122              :         let reconcile_seq = self.sequence;
    1123              : 
    1124              :         tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
    1125              :         let must_notify = self.pending_compute_notification;
    1126              :         let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
    1127              :                                                         tenant_id=%reconciler.tenant_shard_id.tenant_id,
    1128              :                                                         shard_id=%reconciler.tenant_shard_id.shard_slug());
    1129              :         metrics::METRICS_REGISTRY
    1130              :             .metrics_group
    1131              :             .storage_controller_reconcile_spawn
    1132              :             .inc();
    1133              :         let result_tx = result_tx.clone();
    1134              :         let join_handle = tokio::task::spawn(
    1135            0 :             async move {
    1136              :                 // Wait for any previous reconcile task to complete before we start
    1137            0 :                 if let Some(old_handle) = old_handle {
    1138            0 :                     old_handle.cancel.cancel();
    1139            0 :                     if let Err(e) = old_handle.handle.await {
    1140              :                         // We can't do much with this other than log it: the task is done, so
    1141              :                         // we may proceed with our work.
    1142            0 :                         tracing::error!("Unexpected join error waiting for reconcile task: {e}");
    1143            0 :                     }
    1144            0 :                 }
    1145              : 
    1146              :                 // Early check for cancellation before doing any work
    1147              :                 // TODO: wrap all remote API operations in cancellation check
    1148              :                 // as well.
    1149            0 :                 if reconciler.cancel.is_cancelled() {
    1150            0 :                     metrics::METRICS_REGISTRY
    1151            0 :                         .metrics_group
    1152            0 :                         .storage_controller_reconcile_complete
    1153            0 :                         .inc(ReconcileCompleteLabelGroup {
    1154            0 :                             status: ReconcileOutcome::Cancel,
    1155            0 :                         });
    1156            0 :                     return;
    1157            0 :                 }
    1158              : 
    1159              :                 // Attempt to make observed state match intent state
    1160            0 :                 let result = reconciler.reconcile().await;
    1161              : 
    1162              :                 // If we know we had a pending compute notification from some previous action, send a notification irrespective
    1163              :                 // of whether the above reconcile() did any work
    1164            0 :                 if result.is_ok() && must_notify {
    1165              :                     // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
    1166            0 :                     reconciler.compute_notify().await.ok();
    1167            0 :                 }
    1168              : 
    1169              :                 // Update result counter
    1170            0 :                 let outcome_label = match &result {
    1171            0 :                     Ok(_) => ReconcileOutcome::Success,
    1172            0 :                     Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
    1173            0 :                     Err(_) => ReconcileOutcome::Error,
    1174              :                 };
    1175              : 
    1176            0 :                 metrics::METRICS_REGISTRY
    1177            0 :                     .metrics_group
    1178            0 :                     .storage_controller_reconcile_complete
    1179            0 :                     .inc(ReconcileCompleteLabelGroup {
    1180            0 :                         status: outcome_label,
    1181            0 :                     });
    1182            0 : 
    1183            0 :                 // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
    1184            0 :                 // try and schedule more work in response to our result.
    1185            0 :                 let result = ReconcileResult {
    1186            0 :                     sequence: reconcile_seq,
    1187            0 :                     result,
    1188            0 :                     tenant_shard_id: reconciler.tenant_shard_id,
    1189            0 :                     generation: reconciler.generation,
    1190            0 :                     observed: reconciler.observed,
    1191            0 :                     pending_compute_notification: reconciler.compute_notify_failure,
    1192            0 :                 };
    1193            0 : 
    1194            0 :                 result_tx
    1195            0 :                     .send(ReconcileResultRequest::ReconcileResult(result))
    1196            0 :                     .ok();
    1197            0 :             }
    1198              :             .instrument(reconciler_span),
    1199              :         );
    1200              : 
    1201              :         self.reconciler = Some(ReconcilerHandle {
    1202              :             sequence: self.sequence,
    1203              :             handle: join_handle,
    1204              :             cancel: reconciler_cancel,
    1205              :         });
    1206              : 
    1207              :         Some(ReconcilerWaiter {
    1208              :             tenant_shard_id: self.tenant_shard_id,
    1209              :             seq_wait: self.waiter.clone(),
    1210              :             error_seq_wait: self.error_waiter.clone(),
    1211              :             error: self.last_error.clone(),
    1212              :             seq: self.sequence,
    1213              :         })
    1214              :     }
    1215              : 
    1216              :     /// Get a waiter for any reconciliation in flight, but do not start reconciliation
    1217              :     /// if it is not already running
    1218            0 :     pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
    1219            0 :         if self.reconciler.is_some() {
    1220            0 :             Some(ReconcilerWaiter {
    1221            0 :                 tenant_shard_id: self.tenant_shard_id,
    1222            0 :                 seq_wait: self.waiter.clone(),
    1223            0 :                 error_seq_wait: self.error_waiter.clone(),
    1224            0 :                 error: self.last_error.clone(),
    1225            0 :                 seq: self.sequence,
    1226            0 :             })
    1227              :         } else {
    1228            0 :             None
    1229              :         }
    1230            0 :     }
    1231              : 
    1232              :     /// Called when a ReconcileResult has been emitted and the service is updating
    1233              :     /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
    1234              :     /// the handle to indicate there is no longer a reconciliation in progress.
    1235            0 :     pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
    1236            0 :         if let Some(reconcile_handle) = &self.reconciler {
    1237            0 :             if reconcile_handle.sequence <= sequence {
    1238            0 :                 self.reconciler = None;
    1239            0 :             }
    1240            0 :         }
    1241            0 :     }
    1242              : 
    1243              :     /// If we had any state at all referring to this node ID, drop it.  Does not
    1244              :     /// attempt to reschedule.
    1245              :     ///
    1246              :     /// Returns true if we modified the node's intent state.
    1247            0 :     pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
    1248            0 :         let mut intent_modified = false;
    1249            0 : 
    1250            0 :         // Drop if this node was our attached intent
    1251            0 :         if self.intent.attached == Some(node_id) {
    1252            0 :             self.intent.attached = None;
    1253            0 :             intent_modified = true;
    1254            0 :         }
    1255              : 
    1256              :         // Drop from the list of secondaries, and check if we modified it
    1257            0 :         let had_secondaries = self.intent.secondary.len();
    1258            0 :         self.intent.secondary.retain(|n| n != &node_id);
    1259            0 :         intent_modified |= self.intent.secondary.len() != had_secondaries;
    1260            0 : 
    1261            0 :         debug_assert!(!self.intent.all_pageservers().contains(&node_id));
    1262              : 
    1263            0 :         intent_modified
    1264            0 :     }
    1265              : 
    1266            0 :     pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
    1267            0 :         self.scheduling_policy = p;
    1268            0 :     }
    1269              : 
    1270            0 :     pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
    1271            0 :         &self.scheduling_policy
    1272            0 :     }
    1273              : 
    1274            0 :     pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
    1275            0 :         // Ordering: always set last_error before advancing sequence, so that sequence
    1276            0 :         // waiters are guaranteed to see a Some value when they see an error.
    1277            0 :         *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
    1278            0 :         self.error_waiter.advance(sequence);
    1279            0 :     }
    1280              : 
    1281            0 :     pub(crate) fn from_persistent(
    1282            0 :         tsp: TenantShardPersistence,
    1283            0 :         intent: IntentState,
    1284            0 :     ) -> anyhow::Result<Self> {
    1285            0 :         let tenant_shard_id = tsp.get_tenant_shard_id()?;
    1286            0 :         let shard_identity = tsp.get_shard_identity()?;
    1287              : 
    1288            0 :         Ok(Self {
    1289            0 :             tenant_shard_id,
    1290            0 :             shard: shard_identity,
    1291            0 :             sequence: Sequence::initial(),
    1292            0 :             generation: tsp.generation.map(|g| Generation::new(g as u32)),
    1293            0 :             policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
    1294            0 :             intent,
    1295            0 :             observed: ObservedState::new(),
    1296            0 :             config: serde_json::from_str(&tsp.config).unwrap(),
    1297            0 :             reconciler: None,
    1298            0 :             splitting: tsp.splitting,
    1299            0 :             waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1300            0 :             error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
    1301            0 :             last_error: Arc::default(),
    1302            0 :             pending_compute_notification: false,
    1303            0 :             delayed_reconcile: false,
    1304            0 :             scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
    1305            0 :             preferred_az_id: tsp.preferred_az_id,
    1306            0 :         })
    1307            0 :     }
    1308              : 
    1309            0 :     pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
    1310            0 :         TenantShardPersistence {
    1311            0 :             tenant_id: self.tenant_shard_id.tenant_id.to_string(),
    1312            0 :             shard_number: self.tenant_shard_id.shard_number.0 as i32,
    1313            0 :             shard_count: self.tenant_shard_id.shard_count.literal() as i32,
    1314            0 :             shard_stripe_size: self.shard.stripe_size.0 as i32,
    1315            0 :             generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
    1316            0 :             generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
    1317            0 :             placement_policy: serde_json::to_string(&self.policy).unwrap(),
    1318            0 :             config: serde_json::to_string(&self.config).unwrap(),
    1319            0 :             splitting: SplitState::default(),
    1320            0 :             scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
    1321            0 :             preferred_az_id: self.preferred_az_id.clone(),
    1322            0 :         }
    1323            0 :     }
    1324              : 
    1325            0 :     pub(crate) fn preferred_az(&self) -> Option<&str> {
    1326            0 :         self.preferred_az_id.as_deref()
    1327            0 :     }
    1328              : 
    1329            0 :     pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
    1330            0 :         self.preferred_az_id = Some(preferred_az_id);
    1331            0 :     }
    1332              : }
    1333              : 
    1334              : #[cfg(test)]
    1335              : pub(crate) mod tests {
    1336              :     use pageserver_api::{
    1337              :         controller_api::NodeAvailability,
    1338              :         shard::{ShardCount, ShardNumber},
    1339              :     };
    1340              :     use utils::id::TenantId;
    1341              : 
    1342              :     use crate::scheduler::test_utils::make_test_nodes;
    1343              : 
    1344              :     use super::*;
    1345              : 
    1346            7 :     fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
    1347            7 :         let tenant_id = TenantId::generate();
    1348            7 :         let shard_number = ShardNumber(0);
    1349            7 :         let shard_count = ShardCount::new(1);
    1350            7 : 
    1351            7 :         let tenant_shard_id = TenantShardId {
    1352            7 :             tenant_id,
    1353            7 :             shard_number,
    1354            7 :             shard_count,
    1355            7 :         };
    1356            7 :         TenantShard::new(
    1357            7 :             tenant_shard_id,
    1358            7 :             ShardIdentity::new(
    1359            7 :                 shard_number,
    1360            7 :                 shard_count,
    1361            7 :                 pageserver_api::shard::ShardStripeSize(32768),
    1362            7 :             )
    1363            7 :             .unwrap(),
    1364            7 :             policy,
    1365            7 :         )
    1366            7 :     }
    1367              : 
    1368            1 :     fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
    1369            1 :         let tenant_id = TenantId::generate();
    1370            1 : 
    1371            1 :         (0..shard_count.count())
    1372            4 :             .map(|i| {
    1373            4 :                 let shard_number = ShardNumber(i);
    1374            4 : 
    1375            4 :                 let tenant_shard_id = TenantShardId {
    1376            4 :                     tenant_id,
    1377            4 :                     shard_number,
    1378            4 :                     shard_count,
    1379            4 :                 };
    1380            4 :                 TenantShard::new(
    1381            4 :                     tenant_shard_id,
    1382            4 :                     ShardIdentity::new(
    1383            4 :                         shard_number,
    1384            4 :                         shard_count,
    1385            4 :                         pageserver_api::shard::ShardStripeSize(32768),
    1386            4 :                     )
    1387            4 :                     .unwrap(),
    1388            4 :                     policy.clone(),
    1389            4 :                 )
    1390            4 :             })
    1391            1 :             .collect()
    1392            1 :     }
    1393              : 
    1394              :     /// Test the scheduling behaviors used when a tenant configured for HA is subject
    1395              :     /// to nodes being marked offline.
    1396              :     #[test]
    1397            1 :     fn tenant_ha_scheduling() -> anyhow::Result<()> {
    1398            1 :         // Start with three nodes.  Our tenant will only use two.  The third one is
    1399            1 :         // expected to remain unused.
    1400            1 :         let mut nodes = make_test_nodes(3);
    1401            1 : 
    1402            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1403            1 :         let mut context = ScheduleContext::default();
    1404            1 : 
    1405            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1406            1 :         tenant_shard
    1407            1 :             .schedule(&mut scheduler, &mut context)
    1408            1 :             .expect("we have enough nodes, scheduling should work");
    1409            1 : 
    1410            1 :         // Expect to initially be schedule on to different nodes
    1411            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 1);
    1412            1 :         assert!(tenant_shard.intent.attached.is_some());
    1413              : 
    1414            1 :         let attached_node_id = tenant_shard.intent.attached.unwrap();
    1415            1 :         let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
    1416            1 :         assert_ne!(attached_node_id, secondary_node_id);
    1417              : 
    1418              :         // Notifying the attached node is offline should demote it to a secondary
    1419            1 :         let changed = tenant_shard
    1420            1 :             .intent
    1421            1 :             .demote_attached(&mut scheduler, attached_node_id);
    1422            1 :         assert!(changed);
    1423            1 :         assert!(tenant_shard.intent.attached.is_none());
    1424            1 :         assert_eq!(tenant_shard.intent.secondary.len(), 2);
    1425              : 
    1426              :         // Update the scheduler state to indicate the node is offline
    1427            1 :         nodes
    1428            1 :             .get_mut(&attached_node_id)
    1429            1 :             .unwrap()
    1430            1 :             .set_availability(NodeAvailability::Offline);
    1431            1 :         scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
    1432            1 : 
    1433            1 :         // Scheduling the node should promote the still-available secondary node to attached
    1434            1 :         tenant_shard
    1435            1 :             .schedule(&mut scheduler, &mut context)
    1436            1 :             .expect("active nodes are available");
    1437            1 :         assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
    1438              : 
    1439              :         // The original attached node should have been retained as a secondary
    1440            1 :         assert_eq!(
    1441            1 :             *tenant_shard.intent.secondary.iter().last().unwrap(),
    1442            1 :             attached_node_id
    1443            1 :         );
    1444              : 
    1445            1 :         tenant_shard.intent.clear(&mut scheduler);
    1446            1 : 
    1447            1 :         Ok(())
    1448            1 :     }
    1449              : 
    1450              :     #[test]
    1451            1 :     fn intent_from_observed() -> anyhow::Result<()> {
    1452            1 :         let nodes = make_test_nodes(3);
    1453            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1454            1 : 
    1455            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1456            1 : 
    1457            1 :         tenant_shard.observed.locations.insert(
    1458            1 :             NodeId(3),
    1459            1 :             ObservedStateLocation {
    1460            1 :                 conf: Some(LocationConfig {
    1461            1 :                     mode: LocationConfigMode::AttachedMulti,
    1462            1 :                     generation: Some(2),
    1463            1 :                     secondary_conf: None,
    1464            1 :                     shard_number: tenant_shard.shard.number.0,
    1465            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1466            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1467            1 :                     tenant_conf: TenantConfig::default(),
    1468            1 :                 }),
    1469            1 :             },
    1470            1 :         );
    1471            1 : 
    1472            1 :         tenant_shard.observed.locations.insert(
    1473            1 :             NodeId(2),
    1474            1 :             ObservedStateLocation {
    1475            1 :                 conf: Some(LocationConfig {
    1476            1 :                     mode: LocationConfigMode::AttachedStale,
    1477            1 :                     generation: Some(1),
    1478            1 :                     secondary_conf: None,
    1479            1 :                     shard_number: tenant_shard.shard.number.0,
    1480            1 :                     shard_count: tenant_shard.shard.count.literal(),
    1481            1 :                     shard_stripe_size: tenant_shard.shard.stripe_size.0,
    1482            1 :                     tenant_conf: TenantConfig::default(),
    1483            1 :                 }),
    1484            1 :             },
    1485            1 :         );
    1486            1 : 
    1487            1 :         tenant_shard.intent_from_observed(&mut scheduler);
    1488            1 : 
    1489            1 :         // The highest generationed attached location gets used as attached
    1490            1 :         assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
    1491              :         // Other locations get used as secondary
    1492            1 :         assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
    1493              : 
    1494            1 :         scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
    1495              : 
    1496            1 :         tenant_shard.intent.clear(&mut scheduler);
    1497            1 :         Ok(())
    1498            1 :     }
    1499              : 
    1500              :     #[test]
    1501            1 :     fn scheduling_mode() -> anyhow::Result<()> {
    1502            1 :         let nodes = make_test_nodes(3);
    1503            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1504            1 : 
    1505            1 :         let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1506            1 : 
    1507            1 :         // In pause mode, schedule() shouldn't do anything
    1508            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
    1509            1 :         assert!(tenant_shard
    1510            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1511            1 :             .is_ok());
    1512            1 :         assert!(tenant_shard.intent.all_pageservers().is_empty());
    1513              : 
    1514              :         // In active mode, schedule() works
    1515            1 :         tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
    1516            1 :         assert!(tenant_shard
    1517            1 :             .schedule(&mut scheduler, &mut ScheduleContext::default())
    1518            1 :             .is_ok());
    1519            1 :         assert!(!tenant_shard.intent.all_pageservers().is_empty());
    1520              : 
    1521            1 :         tenant_shard.intent.clear(&mut scheduler);
    1522            1 :         Ok(())
    1523            1 :     }
    1524              : 
    1525              :     #[test]
    1526            1 :     fn optimize_attachment() -> anyhow::Result<()> {
    1527            1 :         let nodes = make_test_nodes(3);
    1528            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1529            1 : 
    1530            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1531            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1532            1 : 
    1533            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1534            1 :         // on different nodes.
    1535            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1536            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
    1537            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1538            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1539            1 : 
    1540            1 :         let mut schedule_context = ScheduleContext::default();
    1541            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1542            1 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1543            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1544            1 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1545            1 : 
    1546            1 :         let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
    1547            1 : 
    1548            1 :         // Either shard should recognize that it has the option to switch to a secondary location where there
    1549            1 :         // would be no other shards from the same tenant, and request to do so.
    1550            1 :         assert_eq!(
    1551            1 :             optimization_a,
    1552            1 :             Some(ScheduleOptimization {
    1553            1 :                 sequence: shard_a.sequence,
    1554            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1555            1 :                     old_attached_node_id: NodeId(1),
    1556            1 :                     new_attached_node_id: NodeId(2)
    1557            1 :                 })
    1558            1 :             })
    1559            1 :         );
    1560              : 
    1561              :         // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
    1562              :         // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
    1563              :         // of [`Service::optimize_all`] to avoid trying
    1564              :         // to do optimizations for multiple shards in the same tenant at the same time.  Generating
    1565              :         // both optimizations is just done for test purposes
    1566            1 :         let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
    1567            1 :         assert_eq!(
    1568            1 :             optimization_b,
    1569            1 :             Some(ScheduleOptimization {
    1570            1 :                 sequence: shard_b.sequence,
    1571            1 :                 action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
    1572            1 :                     old_attached_node_id: NodeId(1),
    1573            1 :                     new_attached_node_id: NodeId(3)
    1574            1 :                 })
    1575            1 :             })
    1576            1 :         );
    1577              : 
    1578              :         // Applying these optimizations should result in the end state proposed
    1579            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1580            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
    1581            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
    1582            1 :         shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
    1583            1 :         assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
    1584            1 :         assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
    1585              : 
    1586            1 :         shard_a.intent.clear(&mut scheduler);
    1587            1 :         shard_b.intent.clear(&mut scheduler);
    1588            1 : 
    1589            1 :         Ok(())
    1590            1 :     }
    1591              : 
    1592              :     #[test]
    1593            1 :     fn optimize_secondary() -> anyhow::Result<()> {
    1594            1 :         let nodes = make_test_nodes(4);
    1595            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1596            1 : 
    1597            1 :         let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1598            1 :         let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
    1599            1 : 
    1600            1 :         // Initially: both nodes attached on shard 1, and both have secondary locations
    1601            1 :         // on different nodes.
    1602            1 :         shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
    1603            1 :         shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
    1604            1 :         shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
    1605            1 :         shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
    1606            1 : 
    1607            1 :         let mut schedule_context = ScheduleContext::default();
    1608            1 :         schedule_context.avoid(&shard_a.intent.all_pageservers());
    1609            1 :         schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
    1610            1 :         schedule_context.avoid(&shard_b.intent.all_pageservers());
    1611            1 :         schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
    1612            1 : 
    1613            1 :         let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
    1614            1 : 
    1615            1 :         // Since there is a node with no locations available, the node with two locations for the
    1616            1 :         // same tenant should generate an optimization to move one away
    1617            1 :         assert_eq!(
    1618            1 :             optimization_a,
    1619            1 :             Some(ScheduleOptimization {
    1620            1 :                 sequence: shard_a.sequence,
    1621            1 :                 action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
    1622            1 :                     old_node_id: NodeId(3),
    1623            1 :                     new_node_id: NodeId(4)
    1624            1 :                 })
    1625            1 :             })
    1626            1 :         );
    1627              : 
    1628            1 :         shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
    1629            1 :         assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
    1630            1 :         assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
    1631              : 
    1632            1 :         shard_a.intent.clear(&mut scheduler);
    1633            1 :         shard_b.intent.clear(&mut scheduler);
    1634            1 : 
    1635            1 :         Ok(())
    1636            1 :     }
    1637              : 
    1638              :     // Optimize til quiescent: this emulates what Service::optimize_all does, when
    1639              :     // called repeatedly in the background.
    1640            1 :     fn optimize_til_idle(
    1641            1 :         nodes: &HashMap<NodeId, Node>,
    1642            1 :         scheduler: &mut Scheduler,
    1643            1 :         shards: &mut [TenantShard],
    1644            1 :     ) {
    1645            1 :         let mut loop_n = 0;
    1646              :         loop {
    1647            7 :             let mut schedule_context = ScheduleContext::default();
    1648            7 :             let mut any_changed = false;
    1649              : 
    1650           28 :             for shard in shards.iter() {
    1651           28 :                 schedule_context.avoid(&shard.intent.all_pageservers());
    1652           28 :                 if let Some(attached) = shard.intent.get_attached() {
    1653           28 :                     schedule_context.push_attached(*attached);
    1654           28 :                 }
    1655              :             }
    1656              : 
    1657           13 :             for shard in shards.iter_mut() {
    1658           13 :                 let optimization = shard.optimize_attachment(nodes, &schedule_context);
    1659           13 :                 if let Some(optimization) = optimization {
    1660            2 :                     shard.apply_optimization(scheduler, optimization);
    1661            2 :                     any_changed = true;
    1662            2 :                     break;
    1663           11 :                 }
    1664           11 : 
    1665           11 :                 let optimization = shard.optimize_secondary(scheduler, &schedule_context);
    1666           11 :                 if let Some(optimization) = optimization {
    1667            4 :                     shard.apply_optimization(scheduler, optimization);
    1668            4 :                     any_changed = true;
    1669            4 :                     break;
    1670            7 :                 }
    1671              :             }
    1672              : 
    1673            7 :             if !any_changed {
    1674            1 :                 break;
    1675            6 :             }
    1676            6 : 
    1677            6 :             // Assert no infinite loop
    1678            6 :             loop_n += 1;
    1679            6 :             assert!(loop_n < 1000);
    1680              :         }
    1681            1 :     }
    1682              : 
    1683              :     /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
    1684              :     /// that it converges.
    1685              :     #[test]
    1686            1 :     fn optimize_add_nodes() -> anyhow::Result<()> {
    1687            1 :         let nodes = make_test_nodes(4);
    1688            1 : 
    1689            1 :         // Only show the scheduler a couple of nodes
    1690            1 :         let mut scheduler = Scheduler::new([].iter());
    1691            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1692            1 :         scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
    1693            1 : 
    1694            1 :         let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
    1695            1 :         let mut schedule_context = ScheduleContext::default();
    1696            5 :         for shard in &mut shards {
    1697            4 :             assert!(shard
    1698            4 :                 .schedule(&mut scheduler, &mut schedule_context)
    1699            4 :                 .is_ok());
    1700              :         }
    1701              : 
    1702              :         // We should see equal number of locations on the two nodes.
    1703            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
    1704            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
    1705              : 
    1706            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
    1707            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
    1708              : 
    1709              :         // Add another two nodes: we should see the shards spread out when their optimize
    1710              :         // methods are called
    1711            1 :         scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
    1712            1 :         scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
    1713            1 :         optimize_til_idle(&nodes, &mut scheduler, &mut shards);
    1714            1 : 
    1715            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
    1716            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
    1717              : 
    1718            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
    1719            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
    1720              : 
    1721            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
    1722            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
    1723              : 
    1724            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
    1725            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
    1726              : 
    1727            4 :         for shard in shards.iter_mut() {
    1728            4 :             shard.intent.clear(&mut scheduler);
    1729            4 :         }
    1730              : 
    1731            1 :         Ok(())
    1732            1 :     }
    1733              : }
        

Generated by: LCOV version 2.1-beta